Python实现钉钉发送报警消息的方法

作者:steanxy 时间:2022-11-18 06:06:44 

钉钉开放平台传送门:https://open.dingtalk.com

我司使用钉钉作为内部通讯工具,基本上大家在电脑和手机上都开着,消息可以第一时间查看,报警消息的即时性要求比较高,所以适合用钉钉通知。

下面介绍如何用Python实现钉钉发送报警消息。

获取access token

要使用钉钉发送消息,首先需要获取access token,代码如下:


def get_access_token():
url = 'https://oapi.dingtalk.com/gettoken?corpid=%s&corpsecret=%s' % (corp_id, corp_secret)
request = urllib2.Request(url)
response = urllib2.urlopen(request)
response_str = response.read()
response_dict = json.loads(response_str)
error_code_key = "errcode"
access_token_key = "access_token"
if response_dict.has_key(error_code_key) and response_dict[error_code_key] == 0 and response_dict.has_key(access_token_key):
 return response_dict[access_token_key]
else:
 return ''

access token在2小时内有效,有效期内重复获取返回相同结果,有效期会自动延长。corp_id和corp_secret是企业的id和secret,在钉钉的管理后台可以找到。另外,上面用到了urllib2和json,需要import:


import urllib2
import json

消息类型

钉钉的消息类型分为:text,image,voice,file,link和OA,具体消息格式参见:https://open-doc.dingtalk.com/docs/doc.htm?treeId=172&articleId=104972&docType=1 。

下面以发送文本,链接和文件消息为例进行说明。

给用户发送消息

发送文本


def send_text_to_users(access_token, users, text):
msg_type, msg = _gen_text_msg(text)
return _send_msg_to_users(access_token, users, msg_type, msg)

def _gen_text_msg(text):
msg_type = 'text'
msg = { "content": text }
return msg_type, msg

def _send_msg_to_users(access_token, users, msg_type, msg):
to_users = '|'.join(users)
body_dict = {
 "touser": to_users,
 "agentid": agent_id,
 "msgtype": msg_type
}
body_dict[msg_type] = msg
body = json.dumps(body_dict)
return _send_msg("https://oapi.dingtalk.com/message/send?access_token=", access_token, body)

其中agent_id是一个钉钉应用的id,以钉钉应用的名义给用户发送消息。users是用户id列表,每个用户id是一个字符串。

发送链接


def send_link_to_users(access_token, users, url, title, text):
msg_type, msg = _gen_link_msg(url, title, text)
return _send_msg_to_users(access_token, users, msg_type, msg)

def _gen_link_msg(url, title, text):
msg_type = 'link'
msg = {
 "messageUrl": url,
 "picUrl": "https://gw.alicdn.com/tps/TB1FN16LFXXXXXJXpXXXXXXXXXX-256-130.png",
 "title": title,
 "text": text
}
return msg_type, msg

其中_send_msg_to_users方法参见前面的代码,picUrl字段设置的是钉钉官方的图片,这里用于测试。

发送文件


def send_file_to_users(access_token, users, file_name):
media_id = upload_file(access_token, file_name)
if media_id == '':
 return
msg_type, msg = _gen_file_msg(media_id)
return _send_msg_to_users(access_token, users, msg_type, msg)

def upload_file(access_token, file_name):
register_openers()
datagen, headers = multipart_encode({'media': open(file_name, 'rb')})
requst_url = 'https://oapi.dingtalk.com/media/upload?access_token=' + access_token + '&type=file'
request = urllib2.Request(requst_url, datagen, headers)
response = urllib2.urlopen(request)
response_str = response.read()
response_dict = json.loads(response_str)
media_id_key = 'media_id'
error_code_key = 'errcode'
if response_dict.has_key(error_code_key) and response_dict[error_code_key] == 0 and response_dict.has_key(media_id_key):
 return response_dict[media_id_key]
else:
 return ''

需要先上传文件获得media_id,然后使用media_id将文件发送给用户。另外,这里用到了poster,可使用pip安装:


pip install poster

之后引入multipart_encode和register_openers函数:


from poster.encode import multipart_encode
from poster.streaminghttp import register_openers

给群会话发送消息

与给用户发送信息类似,区别是需要群会话id,而不是用户列表,以发送文本消息为例,代码如下:


def send_text_to_chat(access_token, chat_id, text):
msg_type, msg = _gen_text_msg(text)
return _send_msg_to_chat(access_token, chat_id, msg_type, msg)

def _send_msg_to_chat(access_token, chat_id, msg_type, msg):
body_dict = {
 "chatid": chat_id,
 "msgtype": msg_type
}
body_dict[msg_type] = msg
body = json.dumps(body_dict)
return _send_msg("https://oapi.dingtalk.com/chat/send?access_token=", access_token, body)

其中_gen_text_msg方法参见前面的代码。

群会话可以自行创建,参见https://open-doc.dingtalk.com/docs/doc.htm?treeId=172&articleId=104977&docType=1 。

来源:https://blog.csdn.net/anxiaoyuthss/article/details/71908522

标签:Python,钉钉,报警
0
投稿

猜你喜欢

  • python爬虫的数据库连接问题【推荐】

    2024-01-19 18:26:41
  • Python绘制圣诞树+落叶+雪花+背景音乐+浪漫弹窗 五合一版圣诞树

    2022-11-20 01:32:46
  • jmeter正则表达式的使用

    2022-08-10 01:10:42
  • Python处理字符串的常用函数实例总结

    2022-10-19 21:09:39
  • 详解如何利用Python绘制迷宫小游戏

    2021-02-28 05:31:32
  • 详解Golang利用反射reflect动态调用方法

    2024-05-02 16:23:47
  • 使用django和vue进行数据交互的方法步骤

    2021-12-20 03:29:07
  • XML:OpenSearch 浏览器指定搜索应用

    2010-05-04 19:37:00
  • 用非动态SQL Server SQL语句来对动态查询进行执行

    2024-01-19 08:55:05
  • 详解django中Template语言

    2022-12-01 21:20:59
  • 如何基于Python获取图片的物理尺寸

    2023-03-25 00:17:19
  • Pygame的程序开始示例代码

    2021-12-20 19:45:01
  • MYSQL中文乱码问题的解决方案

    2024-01-18 00:49:48
  • 如何基于Python爬取隐秘的角落评论

    2022-02-17 05:31:43
  • TensorFlow 滑动平均的示例代码

    2023-10-25 15:41:28
  • Python 文件管理实例详解

    2022-08-22 16:29:54
  • php中pcntl_fork创建子进程的方法实例

    2023-11-15 02:28:21
  • 如何实现文本的卷屏浏览?

    2010-05-24 18:36:00
  • python求质数的3种方法

    2023-02-12 04:07:54
  • python操作日期和时间的方法

    2021-08-29 18:32:59
  • asp之家 网络编程 m.aspxhome.com