1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| import logging import requests import json import time
class CompanyWeixin: __corp_id = 'xxxxxx' __agent_id = 100001 __corp_secret = 'xxxxxxxxxxxx' __access_token = '' __expires_in = 0
headers = { 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Pragma': 'no-cache', 'User-Agent': 'self-defind-user-agent', 'Cookie': 'name=self-define-cookies-in header', 'X-Requested-With': 'XMLHttpRequest' }
@staticmethod def send_post(url: str, json_data: dict): try: res = requests.post(url=url, data=json.dumps(json_data), headers=CompanyWeixin.headers) return json.loads(res.content.decode("utf-8").encode("utf-8")) except Exception as e: logging.info('[send_get]Failed to json.load, {0}'.format(e)) return {}
@staticmethod def send_get(url: str): try: res = requests.get(url=url) return json.loads(res.content.decode("utf-8").encode("utf-8")) except Exception as e: logging.info('[send_get]Failed to json.load, {0}'.format(e)) return {}
def get_access_token(self): url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}'. \ format(CompanyWeixin.__corp_id, CompanyWeixin.__corp_secret) result = CompanyWeixin.send_get(url) self.__access_token = result['access_token'] return result
def upload_file(self, file_name): try: url = 'https://qyapi.weixin.qq.com/cgi-bin/media/upload?access_token={0}&type=image'. \ format(self.__access_token) files = {"pic": (file_name, open(file_name, "rb"), "images/jpg")} return requests.post(url=url, files=files).json() except Exception as e: logging.info('[send_get]Failed to json.load, {0}'.format(e)) return {}
def send_img(self, media_id, touser: str = '@all'): url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={0}'. \ format(self.__access_token) data = { "touser": touser, "msgtype": "image", "agentid": CompanyWeixin.__agent_id, "image": { "media_id": media_id }, "enable_id_trans": 0, "enable_duplicate_check": 0, "duplicate_check_interval": 1800 } return CompanyWeixin.send_post(url, data)
def download_img(new_file_name): tp1 = requests.get('http://bjb.yunwj.top/php/tp/lj.php').json()['tp1'] img = requests.get(url=tp1).content with open(new_file_name, 'wb') as tmp: tmp.write(img)
today_str = time.strftime("%Y-%m-%d", time.localtime()) file_name = today_str + '.jpg' download_img(file_name) weixin = CompanyWeixin() weixin.get_access_token() img_msg = weixin.upload_file(file_name) img_msg_media_id = img_msg['media_id'] weixin.send_img(img_msg_media_id)
|