使用python tornado实现微信公众号实例
实现几个必须功能的例子,离实用远着呢:
main.py:
#!/usr/bin/python #-*- encoding:utf-8 -*- ''' 需要安装: $ sudo yum install python-devel $ tar -zxvf pycrypto-2.6.1.tar.gz $ cd pycrypto-2.6.1 $ sudo python setup.py build $ sudo python setup.py install $ sudo yum install libxml2-devel $ sudo yum install libxml2-python $ sudo yum install libxml++-devel $ sudo yum install libxslt-devel $ sudo pip install lxml $ sudo pip install requests 后台运行: $ sudo bash -c 'nohup python ./main.py &' ''' import os.path import platform import hashlib import time import sys import importlib import urllib2 import requests import mimetypes import io import json # from xml.etree import cElementTree as ETree import lxml.etree as ETree # import torndb import tornado.httpserver import tornado.web import tornado.ioloop import tornado.options import tornado.template from tornado.options import define, options import global_define import access_token from encrypt_WeiXin.WXBizMsgCrypt import WXBizMsgCrypt SYS_PLATFORM = platform.system() define("port", default=80, help="run port", type=int) # 微信只支持80端口 ''' define("mysql_host", default="127.0.0.1:3306", help="db host") define("mysql_database", default="todo", help="db name") define("mysql_user", default="root", help="db user") define("mysql_password", default="", help="db password") ''' TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "templates") STATIC_PATH = os.path.join(os.path.dirname(__file__), "static") class Application(tornado.web.Application): def __init__(self): handlers = [ (r"/", IndexHandler), (r"/todo/new", NewHandler), (r"/todo/(\d+)/edit", EditHandler), ] if SYS_PLATFORM == "Windows": settings = dict( template_path = TEMPLATE_PATH, static_path = STATIC_PATH, debug = True # debug 和 autoreload 模式下不能启用多进程方式(启动debug,自动autoreload) ) else: settings = dict( template_path = TEMPLATE_PATH, static_path = STATIC_PATH, ) tornado.web.Application.__init__(self, handlers, **settings) ''' self.db = torndb.Connection( host = options.mysql_host, database = options.mysql_database, user = options.mysql_user, password = options.mysql_password ) ''' class IndexHandler(tornado.web.RequestHandler): def get(self): # 验证开发接口,参考 http://mp.weixin.qq.com/wiki/17/2d4265491f12608cd170a95559800f2d.html self.set_header("Server", "Car Ocean") self.set_header("X-Powered-By", "Car Ocean") token = global_define.TOKEN signatures = self.get_arguments("signature") if len(signatures) == 0: print("INFO: GET operation, need 'signature' parameter") return signature = self.get_arguments("signature")[0] timestamp = self.get_arguments("timestamp")[0] nonce = self.get_arguments("nonce")[0] echostr = self.get_arguments("echostr")[0] sortlist = [token, timestamp, nonce] sortlist.sort() shaStr = "".join(sortlist) sha = hashlib.sha1(shaStr) # sha.update("".join(sortlist)) calculate_signature = sha.hexdigest() print("token: %s" % token) print("timestamp: %s" % timestamp) print("nonce: %s" % nonce) print("echostr: %s" % echostr) print("getted signature: %s" % signature) print("calculate signature: %s" % calculate_signature) if calculate_signature == signature: self.write(echostr) else: todos = ['ERROR', 'bbb'] self.render("index.html", todos=todos) def post(self): self.set_header("Server", "Car Ocean") self.set_header("X-Powered-By", "Car Ocean") body = self.request.body print("Thread(Main), Time: %s, DEBUG: body: \n%s" % (time.ctime(), body)) encrypt_typeList = self.get_arguments("encrypt_type") msg_signature = "" timestamp = "" nonce = "" if len(encrypt_typeList) == 0: print("Thread(Main), Time: %s, INFO: is not crypt message" % (time.ctime())) xmlData = ETree.fromstring(body) encrypt_type = "raw" else: encrypt_type = self.get_arguments("encrypt_type")[0] if encrypt_type == "raw": print("Thread(Main), Time: %s, INFO: is not crypt message" % (time.ctime())) xmlData = ETree.fromstring(body) elif encrypt_type == "aes": print("Thread(Main), Time: %s, INFO: aes crypt message" % (time.ctime())) msg_signature = self.get_arguments("msg_signature")[0] timestamp = self.get_arguments("timestamp")[0] nonce = self.get_arguments("nonce")[0] wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID) ret , decryptBody = wxBizMsgCrypt.DecryptMsg(body, msg_signature, timestamp, nonce) print("Thread(Main), Time: %s, DEBUG: decrypt body: %s" % (time.ctime(), decryptBody)) print("Thread(Main), Time: %s, DEBUG: msg_signature: %s" % (time.ctime(), msg_signature)) print("Thread(Main), Time: %s, DEBUG: timestamp: %s" % (time.ctime(), timestamp)) print("Thread(Main), Time: %s, DEBUG: nonce: %s" % (time.ctime(), nonce)) xmlData = ETree.fromstring(decryptBody) else: print("Thread(Main), Time: %s, ERROR: not support encrypt_type: %s" % (time.ctime(), encrypt_type)) xmlData = "" if not xmlData == "": toUserName = xmlData.find("ToUserName").text print("Thread(Main), Time: %s, DEBUG: ToUserName : %s" % (time.ctime(), toUserName)) fromUserName = xmlData.find("FromUserName").text print("Thread(Main), Time: %s, DEBUG: FromUserName : %s" % (time.ctime(), fromUserName)) createTime = xmlData.find("CreateTime").text print("Thread(Main), Time: %s, DEBUG: CreateTime : %s" % (time.ctime(), createTime)) msgType = xmlData.find("MsgType").text print("Thread(Main), Time: %s, DEBUG: MsgType : %s" % (time.ctime(), msgType)) if( msgType == "event" ): event = xmlData.find("Event").text print("Thread(Main), Time: %s, DEBUG: Event : %s" % (time.ctime(), event)) if event == "subscribe": self.Subscribe(encrypt_type, toUserName, fromUserName, createTime) elif event == "unsubscribe": self.Unsubscribe() elif event == "CLICK": eventKey = xmlData.find("EventKey").text print("Thread(Main), Time: %s, DEBUG: EventKey : %s" % (time.ctime(), eventKey)) self.MenuClick(encrypt_type, toUserName, fromUserName, createTime, eventKey) else: self.write("") if (msgType == "text"): Content = xmlData.find("Content").text print("Thread(Main), Time: %s, DEBUG: Content : %s" % (time.ctime(), Content)) MsgId = xmlData.find("MsgId").text print("Thread(Main), Time: %s, DEBUG: MsgId : %s" % (time.ctime(), MsgId)) if Content == "1": self.RecvTextMsg1(encrypt_type, toUserName, fromUserName, createTime, Content, MsgId) elif Content == "2": self.RecvTextMsg2(encrypt_type, toUserName, fromUserName, createTime, Content, MsgId) else: self.write("") def MenuClick(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgEventKey): # 生成未加密原始xml消息 xmlRoot = ETree.Element("xml") toUserName = ETree.SubElement(xmlRoot, 'ToUserName') toUserName.text = ETree.CDATA(msgFromUserName) fromUserName = ETree.SubElement(xmlRoot, 'FromUserName') fromUserName.text = ETree.CDATA(msgToUserName) createTime = ETree.SubElement(xmlRoot, 'CreateTime') createTime.text = msgCreateTime msgType = ETree.SubElement(xmlRoot, 'MsgType') msgType.text = ETree.CDATA("text") content = ETree.SubElement(xmlRoot, 'Content') content.text = ETree.CDATA(u"点击了菜单:"+msgEventKey) #注意在xml中的中文需要使用unicode方式 xmlData = ETree.tostring(xmlRoot, encoding="utf-8") print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData)) # 加密处理 if (msgEncrypt_type == "raw"): self.write(xmlData) else: nonce = str(int(time.time())) wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID) ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce) if ret != 0: print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret)) self.write("") xmlData = encryptBody print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData)) self.set_header('Content-Type', 'text/xml; encoding=utf-8') self.write(xmlData) def RecvTextMsg1(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgContent, msgMsgId): # 生成未加密原始xml消息 xmlRoot = ETree.Element("xml") toUserName = ETree.SubElement(xmlRoot, 'ToUserName') toUserName.text = ETree.CDATA(msgFromUserName) fromUserName = ETree.SubElement(xmlRoot, 'FromUserName') fromUserName.text = ETree.CDATA(msgToUserName) createTime = ETree.SubElement(xmlRoot, 'CreateTime') createTime.text = msgCreateTime msgType = ETree.SubElement(xmlRoot, 'MsgType') msgType.text = ETree.CDATA("text") content = ETree.SubElement(xmlRoot, 'Content') content.text = ETree.CDATA(u"您订阅成功!") xmlData = ETree.tostring(xmlRoot, encoding="utf-8") print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData)) # 加密处理 if (msgEncrypt_type == "raw"): self.write(xmlData) else: nonce = str(int(time.time())) wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID) ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce) if ret != 0: print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret)) self.write("") xmlData = encryptBody print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData)) self.set_header('Content-Type', 'text/xml; encoding=utf-8') self.write(xmlData) def RecvTextMsg2(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime, msgContent, msgMsgId): # 生成未加密原始xml消息 xmlRoot = ETree.Element("xml") toUserName = ETree.SubElement(xmlRoot, 'ToUserName') toUserName.text = ETree.CDATA(msgFromUserName) fromUserName = ETree.SubElement(xmlRoot, 'FromUserName') fromUserName.text = ETree.CDATA(msgToUserName) createTime = ETree.SubElement(xmlRoot, 'CreateTime') createTime.text = msgCreateTime msgType = ETree.SubElement(xmlRoot, 'MsgType') msgType.text = ETree.CDATA("news") articleCount = ETree.SubElement(xmlRoot, 'ArticleCount') articleCount.text = "1" articles = ETree.SubElement(xmlRoot, 'Articles') item = ETree.SubElement(articles, 'item') title = ETree.SubElement(item, 'Title') title.text = ETree.CDATA(u"车可讯") description = ETree.SubElement(item, 'Description') description.text = ETree.CDATA(u"欢迎光临车可讯") picUrl = ETree.SubElement(item, 'PicUrl') picUrl.text = ETree.CDATA("http://mmbiz.qpic.cn/mmbiz_png/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA/0") url = ETree.SubElement(item, 'Url') url.text = ETree.CDATA("http://cloud.car-ocean.com:8080/") xmlData = ETree.tostring(xmlRoot, encoding="utf-8") print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData)) # 加密处理 if (msgEncrypt_type == "raw"): self.write(xmlData) else: nonce = str(int(time.time())) wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID) ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce) if ret != 0: print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret)) self.write("") xmlData = encryptBody print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData)) self.set_header('Content-Type', 'text/xml; encoding=utf-8') self.write(xmlData) def Subscribe(self, msgEncrypt_type, msgToUserName, msgFromUserName, msgCreateTime): # 生成未加密原始xml消息 xmlRoot = ETree.Element("xml") toUserName = ETree.SubElement(xmlRoot, 'ToUserName') toUserName.text = ETree.CDATA(msgFromUserName) fromUserName = ETree.SubElement(xmlRoot, 'FromUserName') fromUserName.text = ETree.CDATA(msgToUserName) createTime = ETree.SubElement(xmlRoot, 'CreateTime') createTime.text = msgCreateTime msgType = ETree.SubElement(xmlRoot, 'MsgType') msgType.text = ETree.CDATA("text") content = ETree.SubElement(xmlRoot, 'Content') content.text = ETree.CDATA(u"您订阅成功!欢迎您的到来!") xmlData = ETree.tostring(xmlRoot, encoding="utf-8") print("Thread(Main), Time: %s, DEBUG: Return original message : \n%s" % (time.ctime(), xmlData)) # 加密处理 if( msgEncrypt_type == "raw" ): self.write(xmlData) else: nonce = str(int(time.time())) wxBizMsgCrypt = WXBizMsgCrypt(global_define.TOKEN, global_define.EncodingAESKey, global_define.APPID) ret, encryptBody = wxBizMsgCrypt.EncryptMsg(xmlData, nonce) if ret != 0: print("Thread(Main), Time: %s, ERROR: EncryptMsg error : %d" % (time.ctime(), ret)) self.write("") xmlData = encryptBody print("Thread(Main), Time: %s, DEBUG: Return encrypted message : \n%s" % (time.ctime(), xmlData)) self.set_header('Content-Type', 'text/xml; encoding=utf-8') self.write(xmlData) def Unsubscribe(self): self.write("") class NewHandler(tornado.web.RequestHandler): def post(self): title = self.get_argument("title") if not title: return None self.redirect("/") class EditHandler(tornado.web.RequestHandler): def get(self, id): todo = 1 return self.render("edit.html", todo=todo) def post(self, id): self.redirect("/") class JSONObject: def __init__(self, d): self.__dict__ = d def main(): tornado.options.parse_command_line() app = tornado.httpserver.HTTPServer(Application()) if SYS_PLATFORM == "Windows": app.listen(options.port) else: app.bind(options.port) app.start(num_processes=0) # 启用多进程,0为启动CPU个数的进程,必须关掉debug模式(settings的debug=true语句),windows下不能用 try: tornado.ioloop.IOLoop.instance().start() except KeyboardInterrupt: tornado.ioloop.IOLoop.instance().stop() if __name__ == "__main__": global_define.accessToken = access_token.AccessToken(1) if not SYS_PLATFORM == "Windows": global_define.accessToken.start() access_token = global_define.accessToken.GetAccessToken() # 第一次取 access_token = global_define.accessToken.access_token # 需要时这样取 # 上传素材图片: 永久 """ fileName = '../car_ocean_logo.png' if(os.path.isfile(fileName)): fileData = io.BytesIO(open(fileName).read()) fileType = mimetypes.guess_type(fileName) or 'application/octet-stream' fields = {} files = {'media': ('car_ocean_logo.png', fileData, fileType)} url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token="+access_token+"&type=image" print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url)) request = requests.post(url, verify=False, data=fields, files=files) print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content)) else: print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName)) """ ''' 返回值: {"media_id": "EXXGrEsCjc4xXltwe1bKnwygZ1EjuWxZxjmHANE2nGE", "url": "http:\/\/mmbiz.qpic.cn\/mmbiz_png\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA\/0?wx_fmt=png"} ''' # 上传素材缩略图:永久 """ fileName = '../car_ocean_logo.jpg' if(os.path.isfile(fileName)): fileData = io.BytesIO(open(fileName).read()) fileType = mimetypes.guess_type(fileName) or 'application/octet-stream' fields = {} files = {'media': ('car_ocean_logo.jpg', fileData, fileType)} url = "https://api.weixin.qq.com/cgi-bin/material/add_material?access_token="+access_token+"&type=thumb" print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url)) request = requests.post(url, verify=False, data=fields, files=files) print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content)) else: print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName)) """ ''' 返回值: {"media_id":"EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U", "url":"http:\/\/mmbiz.qpic.cn\/mmbiz_jpg\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN1300ZIOiasdJdqTerKJ3ootb3b1SzOW0yxtWnqQDDSVkicvSuScicE4og\/0?wx_fmt=jpeg"} ''' # 上传素材图片: 临时 """ fileName = '../car_ocean_logo.png' if(os.path.isfile(fileName)): fileData = io.BytesIO(open(fileName).read()) fileType = mimetypes.guess_type(fileName) or 'application/octet-stream' fields = {} files = {'media': ('car_ocean_logo.png', fileData, fileType)} url = "https://api.weixin.qq.com/cgi-bin/media/uploadimg?access_token="+access_token+"&type=image" print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url)) request = requests.post(url, verify=False, data=fields, files=files) print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), request.content)) else: print("Thread(Main), Time: %s, ERROR: file is not exist: %s" % (time.ctime(), fileName)) """ ''' 返回值: {"url":"http:\/\/mmbiz.qpic.cn\/mmbiz_png\/SAydTFbl7nv4ibI88uPrLxFIoKRUiawGwN7Z3p7x1N002uBiaHsYKTksMNrSUT7icshGNia1xtAYAbCTkiaia4yLqtwWA\/0"} ''' # 上传素材图文:永久 """ content1 = "欢迎光临车可讯" #注意在json素材中的中文需要使用默认utf-8方式 fileName = 'templates/test.html' if(os.path.isfile(fileName)): content1 = open(fileName).read() content2 = "测试用" data ={ "articles":[ {"title": "车可讯", "thumb_media_id": "EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U", "author": "Car Ocean", "digest": "", "show_cover_pic": 0, "content": content1, "content_source_url": "http://www.car-ocean.com/" }, {"title": "测试1", "thumb_media_id": "EXXGrEsCjc4xXltwe1bKn9doo_oPZSwcCrlHVL8h2_U", "author": "Car Ocean", "digest": "", "show_cover_pic": 0, "content": content2, "content_source_url": "https://cloud.car-ocean.com:8443/admin/" }, ] } json_str = json.dumps(data, ensure_ascii=False) print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str)) url = "https://api.weixin.qq.com/cgi-bin/material/add_news?access_token="+access_token print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url)) request = urllib2.Request(url) request.add_header("Content-Type", "application/json") response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read() #注意ensure_ascii必须为False dataRtn = json.loads(response, object_hook=JSONObject) print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response)) if(hasattr(dataRtn, "media_id")): print("Thread(Main), Time: %s, DEBUG: OUTPUT JSONObject: %s" % (time.ctime(), dataRtn.media_id)) else: dataRtn.media_id = "" """ ''' 返回值: {"media_id":"EXXGrEsCjc4xXltwe1bKn8uwkLzodNgZ-zdZ3DaL6kc"} 删除: curl -d '{"media_id":"EXXGrEsCjc4xXltwe1bKn8uwkLzodNgZ-zdZ3DaL6kc"}' https://api.weixin.qq.com/cgi-bin/material/del_material?access_token=hSNOuGcaMdtV-WpgnvplF1ZIOmC8shJKPbeeXUN6L1wyFcBnuzZ71fCxd85dpNYguhgp5V96kuWvOWQTFJjqnSzkp-oydPbuC7lPvSzkEWob8g9_ULpgvsx_AV3xogYfIYGdABAZPX ''' # 推送消息:每天不超过一次,需认证后能用 """ data ={ "filter":{ "is_to_all":True }, "mpnews":{ "media_id":dataRtn.media_id }, "msgtype":"mpnews" } data ={ "touser":"oJ-k1wLaKRynhjPWoRaa4_vAH36o", "mpnews":{ "media_id":dataRtn.media_id }, "msgtype":"mpnews" } json_str = json.dumps(data, ensure_ascii=False) print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str)) # url = "https://api.weixin.qq.com/cgi-bin/message/mass/sendall?access_token="+access_token url = "https://api.weixin.qq.com/cgi-bin/message/mass/preview?access_token="+access_token # 仅预览测试 print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url)) request = urllib2.Request(url) request.add_header("Content-Type", "application/json") response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read() print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response)) dataRtn = json.loads(response, object_hook=JSONObject) """ ''' 返回值: ''' # 创建菜单 """ data ={ "button":[ { "type":"click", "name":"点击菜单", "key":"MENU_1_NONE" }, { "name":"含子菜单", "sub_button":[ { "type":"view", "name":"车可讯官网", "url":"http://www.car-ocean.com/" }, { "type":"view", "name":"车可讯下载", "url":"http://cloud.car-ocean.com:8080/" } ] } ] } json_str = json.dumps(data, ensure_ascii=False) print("Thread(Main), Time: %s, DEBUG: REQUEST JSON: %s" % (time.ctime(), json_str)) url = "https://api.weixin.qq.com/cgi-bin/menu/create?access_token="+access_token print("Thread(Main), Time: %s, DEBUG: REQUEST URL: %s" % (time.ctime(), url)) request = urllib2.Request(url) request.add_header("Content-Type", "application/json") response = urllib2.urlopen(request, json.dumps(data, ensure_ascii=False)).read() dataRtn = json.loads(response, object_hook=JSONObject) print("Thread(Main), Time: %s, DEBUG: OUTPUT: %s" % (time.ctime(), response)) if(hasattr(dataRtn, "errmsg")): print("Thread(Main), Time: %s, DEBUG: OUTPUT JSONObject: %s" % (time.ctime(), dataRtn.errmsg)) """ ''' 返回值: 删除: ''' main()
access_token.py:
#!/usr/bin/python #-*- encoding:utf-8 -*- import time import threading import urllib2 import json import traceback import signal import platform import global_define # 处理键盘中断 def sigint_handler(signum, frame): global is_sigint_up is_sigint_up = True print('CATCHED INTERRUPT SIGNAL!') raise SystemExit('Ctrl+C') signal.signal(signal.SIGINT, sigint_handler) if not platform.system() == "Windows": signal.signal(signal.SIGHUP, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) is_sigint_up = False class AccessToken(threading.Thread): def __init__(self, number): #在我重写__init__方法的时候要记得调用基类的__init__ 方法 threading.Thread.__init__(self) self.number = number self.access_token = "" self.expires_in = 7200 def run(self): #重写run()方法,把自己的线程函数的 代码放到这里 count = 0 while True: if(count >= self.expires_in - 2): count = 0 self.runMain() if(is_sigint_up): # 退出 break count += 1 time.sleep(1) def runMain(self): print("Thread(%d), Time: %s, DEBUG: INPUT: %s" % (self.number, time.ctime(), global_define.ACCESS_TOKEN_URL)) httpStream = urllib2.urlopen(global_define.ACCESS_TOKEN_URL) httpContent = httpStream.read() print("Thread(%d), Time: %s, DEBUG: OUTPUT: %s" % (self.number, time.ctime(), httpContent)) httpHeader = httpStream.info() print("Thread(%d), Time: %s, DEBUG: OUTPUT HEADER: %s" % (self.number, time.ctime(), httpHeader)) httpHeaderContenttype = httpHeader.getheader('Content-Type') print("Thread(%d), Time: %s, DEBUG: OUTPUT HEADER Content-Type: %s" % (self.number, time.ctime(), httpHeaderContenttype)) hjson = json.loads(httpContent) print("Thread(%d), Time: %s, DEBUG: JSON: %s" % (self.number, time.ctime(), json.dumps(hjson, sort_keys=True, indent=4))) self.access_token = hjson['access_token'] self.expires_in = hjson['expires_in'] print('Thread(%d), Time: %s, DEBUG: access_token: %s' % (self.number, time.ctime(), self.access_token)) print('Thread(%d), Time: %s, DEBUG: expires_in: %d' % (self.number, time.ctime(), self.expires_in)) url = "https://api.weixin.qq.com/cgi-bin/getcallbackip?access_token="+self.access_token httpStream = urllib2.urlopen(url) httpContent = httpStream.read() hjson = json.loads(httpContent) self.weiXinIpList = hjson['ip_list'] print('Thread(%d), Time: %s, DEBUG: WeiXin server ip list: %s' % (self.number, time.ctime(), self.weiXinIpList)) def GetAccessToken(self): if self.access_token == "": print("Thread(%d), Time: %s, INFO: not yet get access_token" % (self.number, time.ctime())) self.runMain() print("Thread(%d), Time: %s, DEBUG: access_token: %s" % (self.number, time.ctime(), self.access_token)) return self.access_token
global_define.py(xxx的地方要替换):
#!/usr/bin/python #-*- encoding:utf-8 -*- global accessToken # 微信测试号: gh_589de042e9ab TOKEN = "xxxxxxxxx" # 3-32字符 EncodingAESKey = "xxxxxxxxxxxxxx" # 43字符 APPID = "xxxxxxxxxxxxxxxxx" APPSECRET = "xxxxxxxxxxxxxxxxxx" ACCESS_TOKEN_URL = "https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid="+APPID+u"&secret="+APPSECRET