from __future__ import unicode_literals import time import json import requests import redis import string import random import hashlib __all__ = ("wx_login", "WeixinMP", ) try: unicode = unicode except NameError: # python 3 basestring = (str, bytes) else: # python 2 bytes = str class WeixinError(Exception): def __init__(self, msg): super(WeixinError, self).__init__(msg) class Map(dict): """ 提供字典的dot访问模式 Example: m = Map({'first_name': 'Eduardo'}, last_name='Pool', age=24, sports=['Soccer']) """ def __init__(self, *args, **kwargs): super(Map, self).__init__(*args, **kwargs) for arg in args: if isinstance(arg, dict): for k, v in arg.items(): if isinstance(v, dict): v = Map(v) self[k] = v if kwargs: for k, v in kwargs.items(): if isinstance(v, dict): v = Map(v) self[k] = v def __getattr__(self, attr): return self[attr] def __setattr__(self, key, value): self.__setitem__(key, value) def __getitem__(self, key): if key not in self.__dict__: super(Map, self).__setitem__(key, {}) self.__dict__.update({key: Map()}) return self.__dict__[key] def __setitem__(self, key, value): super(Map, self).__setitem__(key, value) self.__dict__.update({key: value}) def __delattr__(self, item): self.__delitem__(item) def __delitem__(self, key): super(Map, self).__delitem__(key) del self.__dict__[key] class WeixinLoginError(WeixinError): def __init__(self, msg): super(WeixinLoginError, self).__init__(msg) class WeixinMP(object): api_uri = "https://api.weixin.qq.com/cgi-bin" def __init__(self, app_id=None, app_secret=None, redis_url=None): self.app_id = app_id self.app_secret = app_secret self.session = requests.Session() if redis_url is not None: self.redis_store = redis.StrictRedis.from_url( redis_url, decode_responses=True) else: self.redis_store = None def init_app(self, app): """ flask config """ app.config.setdefault("WXMP_REDIS_URL", None) app.config.setdefault("WXMP_APP_ID", None) app.config.setdefault("WXMP_APP_SECRET", None) self.app_id = app.config.get("WXMP_APP_ID") self.app_secret = app.config.get("WXMP_APP_SECRET") redis_url = app.config.get("WXMP_REDIS_URL") assert self.app_id is not None, "APP_ID IS NULL" assert self.app_secret is not None, "APP_SECRET IS NULL" # assert redis_url is not None, "WXMP_REDIS_URL IS NULL" if redis_url: self.redis_store = redis.StrictRedis.from_url( redis_url, decode_responses=True) def fetch(self, method, url, params=None, data=None, headers=None): req = requests.Request(method, url, params=params, data=data, headers=headers) prepped = req.prepare() resp = self.session.send(prepped, timeout=20) data = Map(resp.json()) # if data.errcode: # msg = "%(errcode)d %(errmsg)s" % data # raise WeixinError(msg) return data def get(self, path, params=None, token=True): url = "{0}{1}".format(self.api_uri, path) params = {} if not params else params token and params.setdefault("access_token", self.access_token) return self.fetch("GET", url, params) def post(self, path, params={}, isjson=True): url = "{0}{1}".format(self.api_uri, path) if isjson: resp = requests.post(url, data=json.dumps(params)) else: resp = requests.post(url, data=params) data = Map(resp.json()) return data def gen_token(self): params = dict() params.setdefault("grant_type", "client_credential") params.setdefault("appid", self.app_id) params.setdefault("secret", self.app_secret) data = self.get("/token", params, False) print("-" * 20, data, params) return data.access_token def get_user_info(self, access_token, user_id="hehe", lang="zh_CN"): """ 获取用户基本信息。 :param user_id: 用户 ID 。 就是你收到的 `Message` 的 source :param lang: 返回国家地区语言版本,zh_CN 简体,zh_TW 繁体,en 英语 :return: 返回的 JSON 数据包 """ params = dict() params.setdefault("openid", user_id) params.setdefault("access_token", access_token) params.setdefault("lang", lang) data = self.get("/user/info", params, False) print(data, "get_user_info") return data def check_token(self, access_token): data = self.get_user_info(access_token) if data.errcode in (40001, 42001): return False return True @property def access_token(self): """ 获取服务端凭证 """ if self.redis_store: ac_key = "access_token:%s" % self.app_id access_token = self.redis_store.get(ac_key) if not access_token: access_token = self.gen_token() self.redis_store.setex(ac_key, 2 * 60 * 60, access_token) return access_token else: if self.check_token(access_token): return access_token return self.gen_token() def gen_ticket(self): params = dict() params.setdefault("type", "jsapi") data = self.get("/ticket/getticket", params, True) return data.ticket @property def jsapi_ticket(self): """ 获取jsapi ticket """ if self.redis_store: ticket_key = "jsapi_ticket:%s" % self.app_id ticket = self.redis_store.get(ticket_key) if not ticket: ticket = self.gen_ticket() self.redis_store.setex(ticket_key, 2 * 60 * 60, ticket) return ticket else: return self.gen_ticket() @property def nonce_str(self): char = string.ascii_letters + string.digits return "".join(random.choice(char) for _ in range(32)) def jsapi_sign(self, **kwargs): """ 生成签名给js使用 """ timestamp = str(int(time.time())) nonce_str = self.nonce_str kwargs.setdefault("jsapi_ticket", self.jsapi_ticket) kwargs.setdefault("timestamp", timestamp) kwargs.setdefault("noncestr", nonce_str) raw = [(str(k), str(kwargs[k])) for k in sorted(kwargs.keys())] s = "&".join("=".join(kv) for kv in raw if kv[1]) print(s) sign = hashlib.sha1(s.encode("utf-8")).hexdigest().lower() return Map(sign=sign, timestamp=timestamp, noncestr=nonce_str) def send_sp_template_msg(self, openid, temp_id, data, page="", miniprogram_state=""): """ 发送小程序模板消息 """ url_path = "/message/subscribe/send?access_token={}".format( self.access_token) _dict = dict(touser=openid, template_id=temp_id, page=page, miniprogram_state=miniprogram_state, data=data) print(url_path, _dict) return self.post(url_path, _dict) class WeixinLogin(object): def __init__(self, app=None, prefix=""): if app: self.app_id = app.config.get(prefix + "WXMP_APP_ID") # print(self.app_id) self.app_secret = app.config.get(prefix + "WXMP_APP_SECRET") else: self.app_id = "" self.app_secret = "" def init_app(self, app, prefix=""): self.app_id = app.config.get(prefix + "WXMP_APP_ID") self.app_secret = app.config.get(prefix + "WXMP_APP_SECRET") def _get(self, url, params): resp = requests.get(url, params=params) data = Map(json.loads(resp.content.decode("utf-8"))) # if data.errcode: # msg = "%(errcode)d %(errmsg)s" % data # raise WeixinLoginError(msg) return data def authorize(self, redirect_uri, scope="snsapi_base", state=None): """ 生成微信认证地址并且跳转 :param redirect_uri: 跳转地址 :param scope: 微信认证方式,有`snsapi_base`跟`snsapi_userinfo`两种 :param state: 认证成功后会原样带上此字段 """ assert scope in ["snsapi_base", "snsapi_userinfo", "snsapi_login"] if scope == "snsapi_login": url = "https://open.weixin.qq.com/connect/qrconnect" else: url = "https://open.weixin.qq.com/connect/oauth2/authorize" data = dict() data.setdefault("appid", self.app_id) data.setdefault("redirect_uri", redirect_uri) data.setdefault("response_type", "code") data.setdefault("scope", scope) if state: data.setdefault("state", state) print(data, "data") data = [(k, data[k]) for k in sorted(data.keys()) if data[k]] s = "&".join("=".join(kv) for kv in data if kv[1]) print(s) return "{0}?{1}#wechat_redirect".format(url, s) def access_token(self, code): """ 获取令牌 """ url = "https://api.weixin.qq.com/sns/oauth2/access_token" args = dict() args.setdefault("appid", self.app_id) args.setdefault("secret", self.app_secret) args.setdefault("code", code) args.setdefault("grant_type", "authorization_code") return self._get(url, args) def auth(self, access_token, openid): """ 检验授权凭证 :param access_token: 授权凭证 :param openid: 唯一id """ url = "https://api.weixin.qq.com/sns/auth" args = dict() args.setdefault("access_token", access_token) args.setdefault("openid", openid) return self._get(url, args) def refresh_token(self, refresh_token): """ 重新获取access_token :param refresh_token: 刷新令牌 """ url = "https://api.weixin.qq.com/sns/oauth2/refresh_token" args = dict() args.setdefault("appid", self.app_id) args.setdefault("grant_type", "refresh_token") args.setdefault("refresh_token", refresh_token) return self._get(url, args) def userinfo(self, access_token, openid): """ 获取用户信息 :param access_token: 令牌 :param openid: 用户id,每个应用内唯一 """ url = "https://api.weixin.qq.com/sns/userinfo" args = dict() args.setdefault("access_token", access_token) args.setdefault("openid", openid) args.setdefault("lang", "zh_CN") return self._get(url, args) def user_info(self, access_token, openid): """ 获取用户信息 兼容老版本0.3.0,与WeixinMP的user_info冲突 """ return self.userinfo(access_token, openid) def get_sp_unionid(self, code): url = "https://api.weixin.qq.com/sns/jscode2session" args = dict(appid=self.app_id, secret=self.app_secret, js_code=code, grant_type="authorization_code") return self._get(url, args) wx_login = WeixinLogin()