|
- from __future__ import unicode_literals
- import time
- import json
- import requests
- import redis
- import string
- import random
- import hashlib
- __all__ = ("wx_login", "WeixinMP", )
- try:
- unicode = unicode
- except NameError:
- # python 3
- basestring = (str, bytes)
- else:
- # python 2
- bytes = str
- class WeixinError(Exception):
- def __init__(self, msg):
- super(WeixinError, self).__init__(msg)
- class Map(dict):
- """
- 提供字典的dot访问模式
- Example:
- m = Map({'first_name': 'Eduardo'}, last_name='Pool', age=24, sports=['Soccer'])
- """
- def __init__(self, *args, **kwargs):
- super(Map, self).__init__(*args, **kwargs)
- for arg in args:
- if isinstance(arg, dict):
- for k, v in arg.items():
- if isinstance(v, dict):
- v = Map(v)
- self[k] = v
- if kwargs:
- for k, v in kwargs.items():
- if isinstance(v, dict):
- v = Map(v)
- self[k] = v
- def __getattr__(self, attr):
- return self[attr]
- def __setattr__(self, key, value):
- self.__setitem__(key, value)
- def __getitem__(self, key):
- if key not in self.__dict__:
- super(Map, self).__setitem__(key, {})
- self.__dict__.update({key: Map()})
- return self.__dict__[key]
- def __setitem__(self, key, value):
- super(Map, self).__setitem__(key, value)
- self.__dict__.update({key: value})
- def __delattr__(self, item):
- self.__delitem__(item)
- def __delitem__(self, key):
- super(Map, self).__delitem__(key)
- del self.__dict__[key]
- class WeixinLoginError(WeixinError):
- def __init__(self, msg):
- super(WeixinLoginError, self).__init__(msg)
- class WeixinMP(object):
- api_uri = "https://api.weixin.qq.com/cgi-bin"
- def __init__(self, app_id=None, app_secret=None, redis_url=None):
- self.app_id = app_id
- self.app_secret = app_secret
- self.session = requests.Session()
- if redis_url is not None:
- self.redis_store = redis.StrictRedis.from_url(
- redis_url, decode_responses=True)
- else:
- self.redis_store = None
- def init_app(self, app):
- """
- flask config
- """
- app.config.setdefault("WXMP_REDIS_URL", None)
- app.config.setdefault("WXMP_APP_ID", None)
- app.config.setdefault("WXMP_APP_SECRET", None)
- self.app_id = app.config.get("WXMP_APP_ID")
- self.app_secret = app.config.get("WXMP_APP_SECRET")
- redis_url = app.config.get("WXMP_REDIS_URL")
- assert self.app_id is not None, "APP_ID IS NULL"
- assert self.app_secret is not None, "APP_SECRET IS NULL"
- # assert redis_url is not None, "WXMP_REDIS_URL IS NULL"
- if redis_url:
- self.redis_store = redis.StrictRedis.from_url(
- redis_url, decode_responses=True)
- def fetch(self, method, url, params=None, data=None, headers=None):
- req = requests.Request(method, url, params=params,
- data=data, headers=headers)
- prepped = req.prepare()
- resp = self.session.send(prepped, timeout=20)
- data = Map(resp.json())
- # if data.errcode:
- # msg = "%(errcode)d %(errmsg)s" % data
- # raise WeixinError(msg)
- return data
- def get(self, path, params=None, token=True):
- url = "{0}{1}".format(self.api_uri, path)
- params = {} if not params else params
- token and params.setdefault("access_token", self.access_token)
- return self.fetch("GET", url, params)
- def post(self, path, params={}, isjson=True):
- url = "{0}{1}".format(self.api_uri, path)
- if isjson:
- resp = requests.post(url, data=json.dumps(params))
- else:
- resp = requests.post(url, data=params)
- data = Map(resp.json())
- return data
- def gen_token(self):
- params = dict()
- params.setdefault("grant_type", "client_credential")
- params.setdefault("appid", self.app_id)
- params.setdefault("secret", self.app_secret)
- data = self.get("/token", params, False)
- print("-" * 20, data, params)
- return data.access_token
- def get_user_info(self, access_token, user_id="hehe", lang="zh_CN"):
- """
- 获取用户基本信息。
- :param user_id: 用户 ID 。 就是你收到的 `Message` 的 source
- :param lang: 返回国家地区语言版本,zh_CN 简体,zh_TW 繁体,en 英语
- :return: 返回的 JSON 数据包
- """
- params = dict()
- params.setdefault("openid", user_id)
- params.setdefault("access_token", access_token)
- params.setdefault("lang", lang)
- data = self.get("/user/info", params, False)
- print(data, "get_user_info")
- return data
- def check_token(self, access_token):
- data = self.get_user_info(access_token)
- if data.errcode in (40001, 42001):
- return False
- return True
- @property
- def access_token(self):
- """
- 获取服务端凭证
- """
- if self.redis_store:
- ac_key = "access_token:%s" % self.app_id
- access_token = self.redis_store.get(ac_key)
- if not access_token:
- access_token = self.gen_token()
- self.redis_store.setex(ac_key, 2 * 60 * 60, access_token)
- return access_token
- else:
- if self.check_token(access_token):
- return access_token
- return self.gen_token()
- def gen_ticket(self):
- params = dict()
- params.setdefault("type", "jsapi")
- data = self.get("/ticket/getticket", params, True)
- return data.ticket
- @property
- def jsapi_ticket(self):
- """
- 获取jsapi ticket
- """
- if self.redis_store:
- ticket_key = "jsapi_ticket:%s" % self.app_id
- ticket = self.redis_store.get(ticket_key)
- if not ticket:
- ticket = self.gen_ticket()
- self.redis_store.setex(ticket_key, 2 * 60 * 60, ticket)
- return ticket
- else:
- return self.gen_ticket()
- @property
- def nonce_str(self):
- char = string.ascii_letters + string.digits
- return "".join(random.choice(char) for _ in range(32))
- def jsapi_sign(self, **kwargs):
- """
- 生成签名给js使用
- """
- timestamp = str(int(time.time()))
- nonce_str = self.nonce_str
- kwargs.setdefault("jsapi_ticket", self.jsapi_ticket)
- kwargs.setdefault("timestamp", timestamp)
- kwargs.setdefault("noncestr", nonce_str)
- raw = [(str(k), str(kwargs[k])) for k in sorted(kwargs.keys())]
- s = "&".join("=".join(kv) for kv in raw if kv[1])
- print(s)
- sign = hashlib.sha1(s.encode("utf-8")).hexdigest().lower()
- return Map(sign=sign, timestamp=timestamp, noncestr=nonce_str)
- def send_sp_template_msg(self, openid, temp_id, data,
- page="", miniprogram_state=""):
- """
- 发送小程序模板消息
- """
- url_path = "/message/subscribe/send?access_token={}".format(
- self.access_token)
- _dict = dict(touser=openid, template_id=temp_id,
- page=page, miniprogram_state=miniprogram_state,
- data=data)
- print(url_path, _dict)
- return self.post(url_path, _dict)
- class WeixinLogin(object):
- def __init__(self, app=None, prefix=""):
- if app:
- self.app_id = app.config.get(prefix + "WXMP_APP_ID")
- # print(self.app_id)
- self.app_secret = app.config.get(prefix + "WXMP_APP_SECRET")
- else:
- self.app_id = ""
- self.app_secret = ""
- def init_app(self, app, prefix=""):
- self.app_id = app.config.get(prefix + "WXMP_APP_ID")
- self.app_secret = app.config.get(prefix + "WXMP_APP_SECRET")
- def _get(self, url, params):
- resp = requests.get(url, params=params)
- data = Map(json.loads(resp.content.decode("utf-8")))
- # if data.errcode:
- # msg = "%(errcode)d %(errmsg)s" % data
- # raise WeixinLoginError(msg)
- return data
- def authorize(self, redirect_uri, scope="snsapi_base", state=None):
- """
- 生成微信认证地址并且跳转
- :param redirect_uri: 跳转地址
- :param scope: 微信认证方式,有`snsapi_base`跟`snsapi_userinfo`两种
- :param state: 认证成功后会原样带上此字段
- """
- assert scope in ["snsapi_base", "snsapi_userinfo", "snsapi_login"]
- if scope == "snsapi_login":
- url = "https://open.weixin.qq.com/connect/qrconnect"
- else:
- url = "https://open.weixin.qq.com/connect/oauth2/authorize"
- data = dict()
- data.setdefault("appid", self.app_id)
- data.setdefault("redirect_uri", redirect_uri)
- data.setdefault("response_type", "code")
- data.setdefault("scope", scope)
- if state:
- data.setdefault("state", state)
- print(data, "data")
- data = [(k, data[k]) for k in sorted(data.keys()) if data[k]]
- s = "&".join("=".join(kv) for kv in data if kv[1])
- print(s)
- return "{0}?{1}#wechat_redirect".format(url, s)
- def access_token(self, code):
- """
- 获取令牌
- """
- url = "https://api.weixin.qq.com/sns/oauth2/access_token"
- args = dict()
- args.setdefault("appid", self.app_id)
- args.setdefault("secret", self.app_secret)
- args.setdefault("code", code)
- args.setdefault("grant_type", "authorization_code")
- return self._get(url, args)
- def auth(self, access_token, openid):
- """
- 检验授权凭证
- :param access_token: 授权凭证
- :param openid: 唯一id
- """
- url = "https://api.weixin.qq.com/sns/auth"
- args = dict()
- args.setdefault("access_token", access_token)
- args.setdefault("openid", openid)
- return self._get(url, args)
- def refresh_token(self, refresh_token):
- """
- 重新获取access_token
- :param refresh_token: 刷新令牌
- """
- url = "https://api.weixin.qq.com/sns/oauth2/refresh_token"
- args = dict()
- args.setdefault("appid", self.app_id)
- args.setdefault("grant_type", "refresh_token")
- args.setdefault("refresh_token", refresh_token)
- return self._get(url, args)
- def userinfo(self, access_token, openid):
- """
- 获取用户信息
- :param access_token: 令牌
- :param openid: 用户id,每个应用内唯一
- """
- url = "https://api.weixin.qq.com/sns/userinfo"
- args = dict()
- args.setdefault("access_token", access_token)
- args.setdefault("openid", openid)
- args.setdefault("lang", "zh_CN")
- return self._get(url, args)
- def user_info(self, access_token, openid):
- """
- 获取用户信息
- 兼容老版本0.3.0,与WeixinMP的user_info冲突
- """
- return self.userinfo(access_token, openid)
- def get_sp_unionid(self, code):
- url = "https://api.weixin.qq.com/sns/jscode2session"
- args = dict(appid=self.app_id, secret=self.app_secret,
- js_code=code, grant_type="authorization_code")
- return self._get(url, args)
- wx_login = WeixinLogin()
|