釘釘API文檔:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld
釘釘有回調事件流程,有哪些回調?比如:通訊錄回調、審批回調等等,拿通訊錄回調來說,就是當你公司組織架構發生變動時,會自動觸發你自己注冊的回調地址,然后根據回調信息做一些自定義的處理,不得不說,釘釘真的是解決了協同辦公的很多問題,非常nice,但就回調事件來說,每個企業只能注冊一個回調地址,即使你要監聽的是不同的事件、即使還有其他業務線需要用到回調,也只能不多于一個回調,當然這都是出于人家服務多方面考慮的設計,廢話不多說,
好,流程:
一個注冊端向釘釘發送注冊請求,釘釘callback你自己的服務url,必須是公網訪問的地址,并且該地址返回釘釘json數據來告訴釘釘回調成功
注冊成功之后這個地址就可以接收釘釘的回調post請求,做一些自定義處理,記得返回json
好,代碼
1、注冊端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
mport requests, json class DingDingCallBack( object ): def __init__( self ): self .appsecret = '' self .appkey = '' self .api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s" % ( self .appkey, self .appsecret) self .aes_key = "" self .callbackUrl = "回調url" self .headers = { 'Content-Type' : 'application/json' , 'User-Agent' : 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36' , 'username' : 'zhangsan' , 'password' : '123456' } def get_token( self ): res = requests.get( self .api_url) if res.status_code = = 200 : str_res = res.text token = (json.loads(str_res)).get( 'access_token' ) return token def regist_call_back( self ): url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}' . format (ACCESS_TOKEN = self .get_token()) data = { "call_back_tag" : [ "bpms_task_change" , "bpms_instance_change" ], # 審批回調 "token" : "qxN3cm" , "aes_key" : self .aes_key, "url" : self .callbackUrl, } data1 = json.dumps(data) response = requests.post(url, headers = self .headers, data = data1) print (response) print (response.text) def query_callback( self ): url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}' . format (ACCESS_TOKEN = self .get_token()) response = requests.get(url, headers = self .headers, ) print (response) print (response.text) def update_call_back( self ): url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}' . format ( self .get_token()) data = { "call_back_tag" : [ "bpms_task_change" , "bpms_instance_change" , "user_add_org" , "user_leave_org" , "org_dept_create" , "org_dept_modify" , "org_dept_remove" ], "token" : "自定義字符串" , "aes_key" : self .aes_key, "url" : self .callbackUrl } data1 = json.dumps(data) response = requests.post(url, headers = self .headers, data = data1) print (response) print (response.text) def get_fail( self ): url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}' . format ( self .get_token()) response = requests.get(url, headers = self .headers, ) print (response.text) # todo 刪除回調 def delete_callback( self ): url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}' . format (ACCESS_TOKEN = self .get_token()) result = requests.get(url) print (result) print (result.text) # if __name__ = = '__main__' : dingOBJ = DingDingCallBack() # todo 獲取釘釘token # # dingOBJ.get_token() # todo 獲取回調失敗 # dingOBJ.get_fail() # todo 注冊回調 # dingOBJ.regist_call_back() # todo 查詢回調事件 dingOBJ.query_callback() # todo 修改回調事件 # dingOBJ.update_call_back() # todo 刪除回調 # dingOBJ.delete_callback() |
2、回調端:以下示例代碼為python2,django 框架
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# -*- coding:utf-8 -*- from django.shortcuts import render from django.http import JsonResponse,HttpResponse from django.views.generic import View from django.views.decorators.csrf import csrf_exempt from DingCrypto import DingCrypto import random,string,time,json,requests,simplejson # Create your views here.from ast import literal_eval encodingAesKey = '' key = '' token = '自定義字符串' appsecret = '' appkey = '' api_url = "https://oapi.dingtalk.com/gettoken?appkey=%s&appsecret=%s" % (appkey,appsecret) def randam_string(n): ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n)) return ran_str # 注冊回調 @csrf_exempt def workOrderCallback(request): if request.method = = 'GET' : return JsonResponse({ 'code' : 200 , 'msg' : 'ok' }) if request.method = = 'POST' : print request.GET dingCrypto = DingCrypto(encodingAesKey,key) nonce = randam_string( 8 ) timestamp = str ( int ( round (time.time()))) encrpyt = dingCrypto.encrypt( 'success' ) # print nonce,timestamp,token,encrpyt signature = dingCrypto.generateSignature(nonce = nonce,timestamp = timestamp,token = token,msg_encrypt = encrpyt) new_data = { 'data' : { 'msg_signature' : signature, 'timeStamp' : timestamp, 'nonce' : nonce, 'encrypt' : encrpyt }, } encrpyt11 = dingCrypto.decrypt(encrpyt) return JsonResponse(new_data.get( 'data' )) # 響應回調 class CallBack(View): def get( self ,request, * args, * * kwargs): return JsonResponse({ 'code' : 200 , 'msg' : 'ok' }) def get_token( self ): res = requests.get(api_url) if res.status_code = = 200 : str_res = res.text token = (json.loads(str_res)).get( 'access_token' ) return token def post( self ,request, * args, * * kwargs): # data = request.GET dingCrypto = DingCrypto(encodingAesKey, key) nonce = randam_string( 8 ) timestamp = str ( int ( round (time.time()))) encrpyt = dingCrypto.encrypt( 'success' ) # signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt) callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get( 'encrypt' ))) if callback[ 'EventType' ] = = 'bpms_instance_change' : # 審批實例開始,結束 url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}' . format (ACCESS_TOKEN = self .get_token()) instace_ = { "process_instance_id" : callback[ 'processInstanceId' ] } data2 = json.dumps(instace_) req = requests.post(url,data = data2) data = literal_eval( str (req.text)).get( 'process_instance' ) excute_workorder(callback[ 'processInstanceId' ],data) elif callback[ 'EventType' ] = = 'bpms_task_change' : # 審批任務開始,結束,轉交 print 'bpms_task_change' elif callback[ 'EventType' ] = = 'user_add_org' : print '用戶增加' elif callback[ 'EventType' ] = = 'user_leave_org' : print '用戶離職' elif callback[ 'EventType' ] = = 'org_dept_create' : print '組織架構添加' elif callback[ 'EventType' ] = = 'org_dept_modify' : print '組織架構變更' elif callback[ 'EventType' ] = = 'org_dept_remove' : print '組織架構刪除' return HttpResponse(encrpyt) |
到此這篇關于python注冊釘釘回調事件的實現的文章就介紹到這了,更多相關python注冊釘釘回調事件內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.cnblogs.com/lutt/p/13579711.html