Commit d45a546f authored by l2m2's avatar l2m2

upload.

parent 00f90f92
qrcode.png

973 Bytes

var httpfunc = require('topsin.httpfunc');
var REQ = httpfunc.argv().request;
var RES = httpfunc.argv().response;
var SinEror = require('topsin.error');
var result = new (require('topsin.responsedata'))();
var DB = require('topsin.database');
var DBNAME = REQ.pathCapture('DBNAME');
var Crypto = require('topsin.crypto');
var _ = require('lodash');
var fs = require("fs");
try {
var cookie_path = "{0}/CI_DONT_REMOVE_ME/dingdingcookie";
cookie_path = _.format(cookie_path, APP.getConfigValue('httpserver.file.storage'));
if (!fs.fileExists(cookie_path)) {
throw "The cookie file does not exist.";
}
var content = fs.readFile(cookie_path);
result.setData(content);
RES.body(result.toJson());
} catch (err) {
result.setErrText(CONFIG.tr(_.toString(err)));
RES.body(result.toJson());
}
\ No newline at end of file
var httpfunc = require('topsin.httpfunc');
var REQ = httpfunc.argv().request;
var RES = httpfunc.argv().response;
var SinEror = require('topsin.error');
var result = new (require('topsin.responsedata'))();
var DB = require('topsin.database');
var DBNAME = REQ.pathCapture('DBNAME');
var Crypto = require('topsin.crypto');
var _ = require('lodash');
var fs = require("fs");
try {
if (REQ.method() != "POST") throw "only support POST method!";
var body = JSON.parse(REQ.body());
var cookie = _.get(body, 'cookie');
var cookie_path = "{0}/CI_DONT_REMOVE_ME/dingdingcookie";
cookie_path = _.format(cookie_path, APP.getConfigValue('httpserver.file.storage'));
fs.writeFile(cookie_path, cookie);
result.setData(true);
RES.body(result.toJson());
} catch (err) {
result.setErrText(CONFIG.tr(_.toString(err)));
RES.body(result.toJson());
}
from dingdinghelper import DingDingHelper
import requests
import ctypes
def Mbox(title, text, style):
return ctypes.windll.user32.MessageBoxW(0, text, title, style)
if __name__ == "__main__":
ding = DingDingHelper()
ding.renew_cookie()
r = requests.post("http://139.196.104.13:9181/api/_/dingding/setCookie", json={ "cookie": ding.cookie })
if r.status_code == 200:
Mbox('提示', '上传成功!', 0)
else:
Mbox('提示', '上传失败!', 0)
\ No newline at end of file
'''
@File: __init__.py
@Author: leon.li(l2m2lq@gmail.com)
@Date: 2018-12-27 05:29:50
'''
from .dingdinghelper import DingDingHelper
\ No newline at end of file
'''
@File: dingdinghelper.py
@Author: leon.li(l2m2lq@gmail.com)
@Date: 2018-12-27 17:30:23
'''
import os, json, math, time, sys
from urllib import request, parse
from filechunkio import FileChunkIO
from pathlib import Path
import requests
from .ws import get_cookie, Message
class DingDingHelper:
"""
DingDing Helper
"""
def __init__(self):
self._cookie = None
self._username = 'Unknown'
self._password = 'Unknown'
self._msgurl = 'Unknown'
self._corpid = 'Unknown'
self._corpsecret = 'Unknown'
self._cookiepath = str(Path.home()) + os.sep + ".dingding_cookie"
@property
def cookie(self):
return self._cookie
@cookie.setter
def cookie(self, value):
self._cookie = value
@property
def username(self):
return self._username
@username.setter
def username(self, value):
self._username = value
@property
def password(self):
return self._password
@password.setter
def password(self, value):
self._password = value
@property
def msgurl(self):
return self._msgurl
@msgurl.setter
def msgurl(self, value):
self._msgurl = value
@property
def corpid(self):
return self._corpid
@corpid.setter
def corpid(self, value):
self._corpid = value
@property
def corpsecret(self):
return self._corpsecret
@property
def cookiepath(self):
return self._cookiepath
@corpsecret.setter
def corpsecret(self, value):
self._corpsecret = value
def get_access_token(self):
self._access_token = ""
params = parse.urlencode({'corpid': self.corpid, 'corpsecret': self.corpsecret})
url = 'https://oapi.dingtalk.com/gettoken?%s' % params
with request.urlopen(url) as f:
res = json.loads(f.read().decode('utf-8'))
if res.get("errmsg") == "ok":
self._access_token = res.get("access_token")
return self._access_token
def send_msg(self, msg):
data = {
"msgtype": "text",
"text": { "content": msg },
"at": { "isAtAll": False }
}
data = json.dumps(data).encode(encoding='utf-8')
req = request.Request(url=self.msgurl, data=data, headers={
"Content-Type": "application/json", "charset": "utf-8"
})
res = request.urlopen(req)
res = res.read()
if not (json.loads(res).get('errmsg') == 'ok'):
self.send_msg(msg)
def _get_uploadid(self, access_token, size):
uploadid = ''
params = parse.urlencode({'access_token': access_token, 'size': size})
url = 'https://oapi.dingtalk.com/file/upload/create?%s' % params
with request.urlopen(url) as f:
res = json.loads(f.read().decode('utf-8'))
if res.get('code') == '0':
uploadid = res.get('uploadid')
else:
print('Error: get uploadid failed.')
return uploadid
def _upload(self, access_token, uploadid, file_path, file_size, chunk_size):
mediaid = ''
params = parse.urlencode({'access_token': access_token, 'uploadid': uploadid})
url = 'https://oapi.dingtalk.com/file/upload?%s' % params
chunk_cnt = int(math.ceil(file_size * 1.0 / chunk_size))
for i in range(0, chunk_cnt):
offset = i * chunk_size
lens = min(chunk_size, file_size - offset)
chunk = FileChunkIO(file_path, 'r', offset=offset, bytes=lens)
ndpartition = "bytes={s}-{e}".format(s = chunk_size * i, e = chunk_size * (i + 1) - 1)
if i == chunk_cnt - 1:
ndpartition = "bytes={s}-{e}".format(s = chunk_size * i, e = file_size - 1)
headers = { "NDPartition": ndpartition }
files = { 'file': ('blob', chunk, "application/octet-stream") }
print("uploading {i}/{t}.".format(i = i+1, t = chunk_cnt))
res = requests.post(url, files=files, headers=headers).json()
if res.get('code') == '0':
print("upload {i}/{t} successfully.".format(i = i+1, t = chunk_cnt))
if i == chunk_cnt - 1:
mediaid = res.get("filepath", "")
else:
print("upload {i}/{t} failed.".format(i = i+1, t = chunk_cnt))
return mediaid
def _add_file_to_space(self, access_token, mediaid, space_id, space_path):
params = parse.urlencode({'access_token': access_token})
url = "https://im.dingtalk.com/v1/space/file/add?%s" % params
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Connection": "keep-alive",
"Host": "im.dingtalk.com",
"Cookie": self._cookie,
"Origin": "https://im.dingtalk.com",
"Referer": "https://im.dingtalk.com/?spm=a3140.8736650.2231772.1.7eb3e3dwxRnir&source=2202&lwfrom=2017120202092064209309201",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.108 Safari/537.36"
}
data = {
"autoRename": True,
"fromIm": False,
"notification": False,
"path": space_path,
"spaceId": space_id,
"tempUrl": mediaid
}
res = requests.post(url, json=data, headers=headers)
if res.json().get("success"):
print("Add file to space successfully.")
else:
print("Add file to space failed.")
def generate_cookie(self):
tmp = None
try:
with open(self.cookiepath, 'r') as fd:
tmp = fd.read()
except Exception:
self.renew_cookie()
return
data = json.loads(tmp)
# self._cookie = data["cookie"]
# check if cookie valid
now = math.ceil(time.time())
old = int(data["expiration"])
if now - old > int(3600 * 24 * 6.5):
self.renew_cookie()
def renew_cookie(self):
self._cookie = get_cookie()
expiration_time = math.ceil(time.time())
try:
fd = open(self.cookiepath, 'w')
data = {"expiration": expiration_time, "cookie": self._cookie}
print(json.dumps(data))
fd.write(json.dumps(data))
fd.close()
except Exception as e:
print("Error: {err}".format(err = e.args))
sys.exit(1)
def upload_file(self, file_path, space_id, space_path):
"""
API: https://g.alicdn.com/dingding/opendoc/docs/_server/tab10-50.html#%E4%B8%8A%E4%BC%A0%E6%96%87%E4%BB%B6
"""
print("file_path = ", file_path)
# get access_token
access_token = self.get_access_token()
print("access_token = ", access_token)
# calc size
size = os.path.getsize(file_path)
print("size = ", size)
# get uploadid
uploadid = self._get_uploadid(access_token, size)
print("uploadid = ", uploadid)
if uploadid == '':
return False
# upload file chunk
chunk_size = 1024 * 1024
mediaid = self._upload(access_token, uploadid, file_path, size, chunk_size)
print("mediaid = ", mediaid)
if mediaid == '':
return False
# add file to space
self._add_file_to_space(access_token, mediaid, space_id, space_path + "/" + os.path.basename(file_path))
return True
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
from ._abnf import *
from ._app import WebSocketApp
from ._core import *
from ._exceptions import *
from ._logging import *
from ._socket import *
__version__ = "0.47.0"
This diff is collapsed.
This diff is collapsed.
try:
import Cookie
except:
import http.cookies as Cookie
class SimpleCookieJar(object):
def __init__(self):
self.jar = dict()
def add(self, set_cookie):
if set_cookie:
try:
simpleCookie = Cookie.SimpleCookie(set_cookie)
except:
simpleCookie = Cookie.SimpleCookie(set_cookie.encode('ascii', 'ignore'))
for k, v in simpleCookie.items():
domain = v.get("domain")
if domain:
if not domain.startswith("."):
domain = "." + domain
cookie = self.jar.get(domain) if self.jar.get(domain) else Cookie.SimpleCookie()
cookie.update(simpleCookie)
self.jar[domain.lower()] = cookie
def set(self, set_cookie):
if set_cookie:
try:
simpleCookie = Cookie.SimpleCookie(set_cookie)
except:
simpleCookie = Cookie.SimpleCookie(set_cookie.encode('ascii', 'ignore'))
for k, v in simpleCookie.items():
domain = v.get("domain")
if domain:
if not domain.startswith("."):
domain = "." + domain
self.jar[domain.lower()] = simpleCookie
def get(self, host):
if not host:
return ""
cookies = []
for domain, simpleCookie in self.jar.items():
host = host.lower()
if host.endswith(domain) or host == domain[1:]:
cookies.append(self.jar.get(domain))
return "; ".join(filter(None, ["%s=%s" % (k, v.value) for cookie in filter(None, sorted(cookies)) for k, v in
sorted(cookie.items())]))
This diff is collapsed.
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
"""
define websocket exceptions
"""
class WebSocketException(Exception):
"""
websocket exception class.
"""
pass
class WebSocketProtocolException(WebSocketException):
"""
If the websocket protocol is invalid, this exception will be raised.
"""
pass
class WebSocketPayloadException(WebSocketException):
"""
If the websocket payload is invalid, this exception will be raised.
"""
pass
class WebSocketConnectionClosedException(WebSocketException):
"""
If remote host closed the connection or some network error happened,
this exception will be raised.
"""
pass
class WebSocketTimeoutException(WebSocketException):
"""
WebSocketTimeoutException will be raised at socket timeout during read/write data.
"""
pass
class WebSocketProxyException(WebSocketException):
"""
WebSocketProxyException will be raised when proxy error occurred.
"""
pass
class WebSocketBadStatusException(WebSocketException):
"""
WebSocketBadStatusException will be raised when we get bad handshake status code.
"""
def __init__(self, message, status_code, status_message=None):
msg = message % (status_code, status_message) if status_message is not None \
else message % status_code
super(WebSocketBadStatusException, self).__init__(msg)
self.status_code = status_code
class WebSocketAddressException(WebSocketException):
"""
If the websocket address info cannot be found, this exception will be raised.
"""
pass
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
import hashlib
import hmac
import os
import six
from ._cookiejar import SimpleCookieJar
from ._exceptions import *
from ._http import *
from ._logging import *
from ._socket import *
if six.PY3:
from base64 import encodebytes as base64encode
else:
from base64 import encodestring as base64encode
__all__ = ["handshake_response", "handshake"]
if hasattr(hmac, "compare_digest"):
compare_digest = hmac.compare_digest
else:
def compare_digest(s1, s2):
return s1 == s2
# websocket supported version.
VERSION = 13
CookieJar = SimpleCookieJar()
class handshake_response(object):
def __init__(self, status, headers, subprotocol):
self.status = status
self.headers = headers
self.subprotocol = subprotocol
CookieJar.add(headers.get("set-cookie"))
def handshake(sock, hostname, port, resource, **options):
headers, key = _get_handshake_headers(resource, hostname, port, options)
header_str = "\r\n".join(headers)
send(sock, header_str)
dump("request header", header_str)
status, resp = _get_resp_headers(sock)
success, subproto = _validate(resp, key, options.get("subprotocols"))
if not success:
raise WebSocketException("Invalid WebSocket Header")
return handshake_response(status, resp, subproto)
def _pack_hostname(hostname):
# IPv6 address
if ':' in hostname:
return '[' + hostname + ']'
return hostname
def _get_handshake_headers(resource, host, port, options):
headers = [
"GET %s HTTP/1.1" % resource,
"Upgrade: websocket",
"Connection: Upgrade"
]
if port == 80 or port == 443:
hostport = _pack_hostname(host)
else:
hostport = "%s:%d" % (_pack_hostname(host), port)
if "host" in options and options["host"] is not None:
headers.append("Host: %s" % options["host"])
else:
headers.append("Host: %s" % hostport)
if "origin" in options and options["origin"] is not None:
headers.append("Origin: %s" % options["origin"])
else:
headers.append("Origin: http://%s" % hostport)
key = _create_sec_websocket_key()
headers.append("Sec-WebSocket-Key: %s" % key)
headers.append("Sec-WebSocket-Version: %s" % VERSION)
subprotocols = options.get("subprotocols")
if subprotocols:
headers.append("Sec-WebSocket-Protocol: %s" % ",".join(subprotocols))
if "header" in options:
header = options["header"]
if isinstance(header, dict):
header = map(": ".join, header.items())
headers.extend(header)
server_cookie = CookieJar.get(host)
client_cookie = options.get("cookie", None)
cookie = "; ".join(filter(None, [server_cookie, client_cookie]))
if cookie:
headers.append("Cookie: %s" % cookie)
headers.append("")
headers.append("")
return headers, key
def _get_resp_headers(sock, success_status=101):
status, resp_headers, status_message = read_headers(sock)
if status != success_status:
raise WebSocketBadStatusException("Handshake status %d %s", status, status_message)
return status, resp_headers
_HEADERS_TO_CHECK = {
"upgrade": "websocket",
"connection": "upgrade",
}
def _validate(headers, key, subprotocols):
subproto = None
for k, v in _HEADERS_TO_CHECK.items():
r = headers.get(k, None)
if not r:
return False, None
r = r.lower()
if v != r:
return False, None
if subprotocols:
subproto = headers.get("sec-websocket-protocol", None).lower()
if not subproto or subproto not in [s.lower() for s in subprotocols]:
error("Invalid subprotocol: " + str(subprotocols))
return False, None
result = headers.get("sec-websocket-accept", None)
if not result:
return False, None
result = result.lower()
if isinstance(result, six.text_type):
result = result.encode('utf-8')
value = (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode('utf-8')
hashed = base64encode(hashlib.sha1(value).digest()).strip().lower()
success = compare_digest(hashed, result)
if success:
return True, subproto
else:
return False, None
def _create_sec_websocket_key():
randomness = os.urandom(16)
return base64encode(randomness).decode('utf-8').strip()
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
import errno
import os
import socket
import sys
import six
from ._exceptions import *
from ._logging import *
from ._socket import*
from ._ssl_compat import *
from ._url import *
if six.PY3:
from base64 import encodebytes as base64encode
else:
from base64 import encodestring as base64encode
__all__ = ["proxy_info", "connect", "read_headers"]
class proxy_info(object):
def __init__(self, **options):
self.host = options.get("http_proxy_host", None)
if self.host:
self.port = options.get("http_proxy_port", 0)
self.auth = options.get("http_proxy_auth", None)
else:
self.port = 0
self.auth = None
self.no_proxy = options.get("http_no_proxy", None)
def connect(url, options, proxy, socket):
hostname, port, resource, is_secure = parse_url(url)
if socket:
return socket, (hostname, port, resource)
addrinfo_list, need_tunnel, auth = _get_addrinfo_list(
hostname, port, is_secure, proxy)
if not addrinfo_list:
raise WebSocketException(
"Host not found.: " + hostname + ":" + str(port))
sock = None
try:
sock = _open_socket(addrinfo_list, options.sockopt, options.timeout)
if need_tunnel:
sock = _tunnel(sock, hostname, port, auth)
if is_secure:
if HAVE_SSL:
sock = _ssl_socket(sock, options.sslopt, hostname)
else:
raise WebSocketException("SSL not available.")
return sock, (hostname, port, resource)
except:
if sock:
sock.close()
raise
def _get_addrinfo_list(hostname, port, is_secure, proxy):
phost, pport, pauth = get_proxy_info(
hostname, is_secure, proxy.host, proxy.port, proxy.auth, proxy.no_proxy)
try:
if not phost:
addrinfo_list = socket.getaddrinfo(
hostname, port, 0, 0, socket.SOL_TCP)
return addrinfo_list, False, None
else:
pport = pport and pport or 80
addrinfo_list = socket.getaddrinfo(phost, pport, 0, 0, socket.SOL_TCP)
return addrinfo_list, True, pauth
except socket.gaierror as e:
raise WebSocketAddressException(e)
def _open_socket(addrinfo_list, sockopt, timeout):
err = None
for addrinfo in addrinfo_list:
family, socktype, proto = addrinfo[:3]
sock = socket.socket(family, socktype, proto)
sock.settimeout(timeout)
# for opts in DEFAULT_SOCKET_OPTION:
# sock.setsockopt(*opts)
# for opts in sockopt:
# sock.setsockopt(*opts)
address = addrinfo[4]
try:
sock.connect(address)
except socket.error as error:
error.remote_ip = str(address[0])
try:
eConnRefused = (errno.ECONNREFUSED, errno.WSAECONNREFUSED)
except:
eConnRefused = (errno.ECONNREFUSED, )
if error.errno in eConnRefused:
err = error
continue
else:
raise
else:
break
else:
raise err
return sock
def _can_use_sni():
return six.PY2 and sys.version_info >= (2, 7, 9) or sys.version_info >= (3, 2)
def _wrap_sni_socket(sock, sslopt, hostname, check_hostname):
context = ssl.SSLContext(sslopt.get('ssl_version', ssl.PROTOCOL_SSLv23))
if sslopt.get('cert_reqs', ssl.CERT_NONE) != ssl.CERT_NONE:
context.load_verify_locations(cafile=sslopt.get('ca_certs', None), capath=sslopt.get('ca_cert_path', None))
if sslopt.get('certfile', None):
context.load_cert_chain(
sslopt['certfile'],
sslopt.get('keyfile', None),
sslopt.get('password', None),
)
# see
# https://github.com/liris/websocket-client/commit/b96a2e8fa765753e82eea531adb19716b52ca3ca#commitcomment-10803153
context.verify_mode = sslopt['cert_reqs']
if HAVE_CONTEXT_CHECK_HOSTNAME:
context.check_hostname = check_hostname
if 'ciphers' in sslopt:
context.set_ciphers(sslopt['ciphers'])
if 'cert_chain' in sslopt:
certfile, keyfile, password = sslopt['cert_chain']
context.load_cert_chain(certfile, keyfile, password)
if 'ecdh_curve' in sslopt:
context.set_ecdh_curve(sslopt['ecdh_curve'])
return context.wrap_socket(
sock,
do_handshake_on_connect=sslopt.get('do_handshake_on_connect', True),
suppress_ragged_eofs=sslopt.get('suppress_ragged_eofs', True),
server_hostname=hostname,
)
def _ssl_socket(sock, user_sslopt, hostname):
sslopt = dict(cert_reqs=ssl.CERT_REQUIRED)
sslopt.update(user_sslopt)
if os.environ.get('WEBSOCKET_CLIENT_CA_BUNDLE'):
certPath = os.environ.get('WEBSOCKET_CLIENT_CA_BUNDLE')
else:
certPath = os.path.join(
os.path.dirname(__file__), "cacert.pem")
if os.path.isfile(certPath) and user_sslopt.get('ca_certs', None) is None \
and user_sslopt.get('ca_cert', None) is None:
sslopt['ca_certs'] = certPath
elif os.path.isdir(certPath) and user_sslopt.get('ca_cert_path', None) is None:
sslopt['ca_cert_path'] = certPath
check_hostname = sslopt["cert_reqs"] != ssl.CERT_NONE and sslopt.pop(
'check_hostname', True)
if _can_use_sni():
sock = _wrap_sni_socket(sock, sslopt, hostname, check_hostname)
else:
sslopt.pop('check_hostname', True)
sock = ssl.wrap_socket(sock, **sslopt)
if not HAVE_CONTEXT_CHECK_HOSTNAME and check_hostname:
match_hostname(sock.getpeercert(), hostname)
return sock
def _tunnel(sock, host, port, auth):
debug("Connecting proxy...")
connect_header = "CONNECT %s:%d HTTP/1.0\r\n" % (host, port)
# TODO: support digest auth.
if auth and auth[0]:
auth_str = auth[0]
if auth[1]:
auth_str += ":" + auth[1]
encoded_str = base64encode(auth_str.encode()).strip().decode()
connect_header += "Proxy-Authorization: Basic %s\r\n" % encoded_str
connect_header += "\r\n"
dump("request header", connect_header)
send(sock, connect_header)
try:
status, resp_headers, status_message = read_headers(sock)
except Exception as e:
raise WebSocketProxyException(str(e))
if status != 200:
raise WebSocketProxyException(
"failed CONNECT via proxy status: %r" % status)
return sock
def read_headers(sock):
status = None
status_message = None
headers = {}
trace("--- response header ---")
while True:
line = recv_line(sock)
line = line.decode('utf-8').strip()
if not line:
break
trace(line)
if not status:
status_info = line.split(" ", 2)
status = int(status_info[1])
status_message = status_info[2]
else:
kv = line.split(":", 1)
if len(kv) == 2:
key, value = kv
headers[key.lower()] = value.strip()
else:
raise WebSocketException("Invalid header")
trace("-----------------------")
return status, headers, status_message
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
import logging
_logger = logging.getLogger('websocket')
_traceEnabled = False
__all__ = ["enableTrace", "dump", "error", "warning", "debug", "trace",
"isEnabledForError", "isEnabledForDebug"]
def enableTrace(traceable):
"""
turn on/off the traceability.
traceable: boolean value. if set True, traceability is enabled.
"""
global _traceEnabled
_traceEnabled = traceable
if traceable:
if not _logger.handlers:
_logger.addHandler(logging.StreamHandler())
_logger.setLevel(logging.DEBUG)
def dump(title, message):
if _traceEnabled:
_logger.debug("--- " + title + " ---")
_logger.debug(message)
_logger.debug("-----------------------")
def error(msg):
_logger.error(msg)
def warning(msg):
_logger.warning(msg)
def debug(msg):
_logger.debug(msg)
def trace(msg):
if _traceEnabled:
_logger.debug(msg)
def isEnabledForError():
return _logger.isEnabledFor(logging.ERROR)
def isEnabledForDebug():
return _logger.isEnabledFor(logging.DEBUG)
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
import socket
import six
import sys
from ._exceptions import *
from ._ssl_compat import *
from ._utils import *
DEFAULT_SOCKET_OPTION = [(socket.SOL_TCP, socket.TCP_NODELAY, 1)]
if hasattr(socket, "SO_KEEPALIVE"):
DEFAULT_SOCKET_OPTION.append((socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1))
if hasattr(socket, "TCP_KEEPIDLE"):
DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPIDLE, 30))
if hasattr(socket, "TCP_KEEPINTVL"):
DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPINTVL, 10))
if hasattr(socket, "TCP_KEEPCNT"):
DEFAULT_SOCKET_OPTION.append((socket.SOL_TCP, socket.TCP_KEEPCNT, 3))
_default_timeout = None
__all__ = ["DEFAULT_SOCKET_OPTION", "sock_opt", "setdefaulttimeout", "getdefaulttimeout",
"recv", "recv_line", "send"]
class sock_opt(object):
def __init__(self, sockopt, sslopt):
if sockopt is None:
sockopt = []
if sslopt is None:
sslopt = {}
self.sockopt = sockopt
self.sslopt = sslopt
self.timeout = None
def setdefaulttimeout(timeout):
"""
Set the global timeout setting to connect.
timeout: default socket timeout time. This value is second.
"""
global _default_timeout
_default_timeout = timeout
def getdefaulttimeout():
"""
Return the global timeout setting(second) to connect.
"""
return _default_timeout
def recv(sock, bufsize):
if not sock:
raise WebSocketConnectionClosedException("socket is already closed.")
try:
bytes_ = sock.recv(bufsize)
except socket.timeout as e:
message = extract_err_message(e)
raise WebSocketTimeoutException(message)
except SSLError as e:
message = extract_err_message(e)
if isinstance(message, str) and 'timed out' in message:
raise WebSocketTimeoutException(message)
else:
raise
if not bytes_:
raise WebSocketConnectionClosedException(
"Connection is already closed.")
return bytes_
def recv_line(sock):
line = []
while True:
c = recv(sock, 1)
line.append(c)
if c == six.b("\n"):
break
return six.b("").join(line)
def send(sock, data):
if isinstance(data, six.text_type):
data = data.encode('utf-8')
if not sock:
raise WebSocketConnectionClosedException("socket is already closed.")
try:
return sock.send(data)
except socket.timeout as e:
message = extract_err_message(e)
raise WebSocketTimeoutException(message)
except Exception as e:
message = extract_err_message(e)
if isinstance(message, str) and "timed out" in message:
raise WebSocketTimeoutException(message)
else:
raise
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
__all__ = ["HAVE_SSL", "ssl", "SSLError"]
try:
import ssl
from ssl import SSLError
if hasattr(ssl, 'SSLContext') and hasattr(ssl.SSLContext, 'check_hostname'):
HAVE_CONTEXT_CHECK_HOSTNAME = True
else:
HAVE_CONTEXT_CHECK_HOSTNAME = False
if hasattr(ssl, "match_hostname"):
from ssl import match_hostname
else:
from backports.ssl_match_hostname import match_hostname
__all__.append("match_hostname")
__all__.append("HAVE_CONTEXT_CHECK_HOSTNAME")
HAVE_SSL = True
except ImportError:
# dummy class of SSLError for ssl none-support environment.
class SSLError(Exception):
pass
HAVE_SSL = False
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
import os
import socket
import struct
from six.moves.urllib.parse import urlparse
__all__ = ["parse_url", "get_proxy_info"]
def parse_url(url):
"""
parse url and the result is tuple of
(hostname, port, resource path and the flag of secure mode)
url: url string.
"""
if ":" not in url:
raise ValueError("url is invalid")
scheme, url = url.split(":", 1)
parsed = urlparse(url, scheme="ws")
if parsed.hostname:
hostname = parsed.hostname
else:
raise ValueError("hostname is invalid")
port = 0
if parsed.port:
port = parsed.port
is_secure = False
if scheme == "ws":
if not port:
port = 80
elif scheme == "wss":
is_secure = True
if not port:
port = 443
else:
raise ValueError("scheme %s is invalid" % scheme)
if parsed.path:
resource = parsed.path
else:
resource = "/"
if parsed.query:
resource += "?" + parsed.query
return hostname, port, resource, is_secure
DEFAULT_NO_PROXY_HOST = ["localhost", "127.0.0.1"]
def _is_ip_address(addr):
try:
socket.inet_aton(addr)
except socket.error:
return False
else:
return True
def _is_subnet_address(hostname):
try:
addr, netmask = hostname.split("/")
return _is_ip_address(addr) and 0 <= int(netmask) < 32
except ValueError:
return False
def _is_address_in_network(ip, net):
ipaddr = struct.unpack('I', socket.inet_aton(ip))[0]
netaddr, bits = net.split('/')
netmask = struct.unpack('I', socket.inet_aton(netaddr))[0] & ((2 << int(bits) - 1) - 1)
return ipaddr & netmask == netmask
def _is_no_proxy_host(hostname, no_proxy):
if not no_proxy:
v = os.environ.get("no_proxy", "").replace(" ", "")
no_proxy = v.split(",")
if not no_proxy:
no_proxy = DEFAULT_NO_PROXY_HOST
if hostname in no_proxy:
return True
elif _is_ip_address(hostname):
return any([_is_address_in_network(hostname, subnet) for subnet in no_proxy if _is_subnet_address(subnet)])
return False
def get_proxy_info(
hostname, is_secure, proxy_host=None, proxy_port=0, proxy_auth=None,
no_proxy=None):
"""
try to retrieve proxy host and port from environment
if not provided in options.
result is (proxy_host, proxy_port, proxy_auth).
proxy_auth is tuple of username and password
of proxy authentication information.
hostname: websocket server name.
is_secure: is the connection secure? (wss)
looks for "https_proxy" in env
before falling back to "http_proxy"
options: "http_proxy_host" - http proxy host name.
"http_proxy_port" - http proxy port.
"http_no_proxy" - host names, which doesn't use proxy.
"http_proxy_auth" - http proxy auth information.
tuple of username and password.
default is None
"""
if _is_no_proxy_host(hostname, no_proxy):
return None, 0, None
if proxy_host:
port = proxy_port
auth = proxy_auth
return proxy_host, port, auth
env_keys = ["http_proxy"]
if is_secure:
env_keys.insert(0, "https_proxy")
for key in env_keys:
value = os.environ.get(key, None)
if value:
proxy = urlparse(value)
auth = (proxy.username, proxy.password) if proxy.username else None
return proxy.hostname, proxy.port, auth
return None, 0, None
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor,
Boston, MA 02110-1335 USA
"""
import six
__all__ = ["NoLock", "validate_utf8", "extract_err_message"]
class NoLock(object):
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
pass
try:
# If wsaccel is available we use compiled routines to validate UTF-8
# strings.
from wsaccel.utf8validator import Utf8Validator
def _validate_utf8(utfbytes):
return Utf8Validator().validate(utfbytes)[0]
except ImportError:
# UTF-8 validator
# python implementation of http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
_UTF8_ACCEPT = 0
_UTF8_REJECT = 12
_UTF8D = [
# The first part of the table maps bytes to character classes that
# to reduce the size of the transition table and create bitmasks.
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
1,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1, 9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,9,
7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7, 7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,7,
8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
10,3,3,3,3,3,3,3,3,3,3,3,3,4,3,3, 11,6,6,6,5,8,8,8,8,8,8,8,8,8,8,8,
# The second part is a transition table that maps a combination
# of a state of the automaton and a character class to a state.
0,12,24,36,60,96,84,12,12,12,48,72, 12,12,12,12,12,12,12,12,12,12,12,12,
12, 0,12,12,12,12,12, 0,12, 0,12,12, 12,24,12,12,12,12,12,24,12,24,12,12,
12,12,12,12,12,12,12,24,12,12,12,12, 12,24,12,12,12,12,12,12,12,24,12,12,
12,12,12,12,12,12,12,36,12,36,12,12, 12,36,12,12,12,12,12,36,12,36,12,12,
12,36,12,12,12,12,12,12,12,12,12,12, ]
def _decode(state, codep, ch):
tp = _UTF8D[ch]
codep = (ch & 0x3f) | (codep << 6) if (
state != _UTF8_ACCEPT) else (0xff >> tp) & ch
state = _UTF8D[256 + state + tp]
return state, codep
def _validate_utf8(utfbytes):
state = _UTF8_ACCEPT
codep = 0
for i in utfbytes:
if six.PY2:
i = ord(i)
state, codep = _decode(state, codep, i)
if state == _UTF8_REJECT:
return False
return True
def validate_utf8(utfbytes):
"""
validate utf8 byte string.
utfbytes: utf byte string to check.
return value: if valid utf8 string, return true. Otherwise, return false.
"""
return _validate_utf8(utfbytes)
def extract_err_message(exception):
if exception.args:
return exception.args[0]
else:
return None
This diff is collapsed.
#!/usr/bin/env python3
# coding=utf-8
# => Author: Abby Cin
# => Mail: abbytsing@gmail.com
# => Created Time: Sat 31 Mar 2018 12:45:38 PM CST
from .websocket import create_connection
import json
import time
import math
import random
import requests
import sys
import re
import qrcode
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
from multiprocessing import Process
schema_list = ["wss://webalfa-cm10.dingtalk.com/long",
"wss://webalfa.dingtalk.com/long", "wss://webalfa-cm3.dingtalk.com/long"]
host_list = ["webalfa-cm10.dingtalk.com",
"webalfa.dingtalk.com", "webalfa-cm3.dingtalk.com"]
ua = "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36 OS(windows/6.1) Browser(chrome/74.0.3729.169) DingWeb/3.6.3 LANG/zh_CN"
class Message():
def __init__(self, phone, did):
self.phone = phone
self.token = None
self.app_key = "85A09F60A599F5E1867EAB915A8BB07F"
self.device_id = did
self.reg = {
"lwp": "/reg", "headers": {
"cache-header": "token app-key did ua vhost wv",
"vhost": "WK",
"ua": ua,
"app-key": self.app_key,
"wv": "im:3,au:3,sy:4",
"mid": "f61f0002 0",
"did": self.device_id
},
"body": None
}
self.keep_alive = {
"lwp": "/!",
"headers": {
"mid": "fb420007 0"
},
"body": None
}
self.check_license = {
"lwp": "/r/Adaptor/LoginI/checkLicense",
"headers": {
"mid": "9113001a 0"
},
"body": [self.phone, 0]
}
self.subscribe = {
"lwp": "/subscribe",
"headers": {
"token": "access token of login response",
"sync": "0,0;0;0;",
"set-ver": "0",
"mid": "b91b000d 0"
}
}
self.switch_status = {"lwp": "/r/Sync/getSwitchStatus",
"headers": {"mid": "0b75000e 0"}, "body": []}
self.confirm_info = {"lwp": "/r/Adaptor/IDLDing/getConfirmStatusInfo",
"headers": {"mid": "98d9000f 0"}, "body": []}
self.accept_license = {"lwp": "/r/Adaptor/LoginI/acceptLicense",
"headers": {"mid": "caae001f 0"}, "body": [self.phone, 0]}
self.create_session = {
"lwp": "/r/Adaptor/LoginI/createTempSessionInfo",
"headers": {
"mid": "f6970023 0"
},
"body": []
}
def get_mid(self):
curr = int(time.time())
return hex(curr)[2:] + " 0"
def get_random(self):
return str(math.ceil(time.time() * 1e3)) + str(math.ceil(random.random() * 1e3))
def get_reg_msg(self):
reg = self.reg
reg["mid"] = self.get_mid()
return reg
def get_keepalive_msg(self):
res = self.keep_alive
res["mid"] = self.get_mid()
return res
def get_subscribe_msg(self, access_token):
res = self.subscribe
res["headers"]["token"] = access_token
res["headers"]["mid"] = self.get_mid()
# res['headers']['set-ver'] = "154001657378" # fixed version, get from websocket /subscribe
return res
def get_switch_status_msg(self):
res = self.switch_status
res["headers"]["mid"] = self.get_mid()
return res
def get_confirm_msg(self):
res = self.confirm_info
res["headers"]["mid"] = self.get_mid()
def get_create_session_msg(self):
res = self.create_session
res["headers"]["mid"] = self.get_mid()
return res
def get_header():
header = {
"Host": "login.dingtalk.com",
"Connection": "keep-alive",
"User-Agent": ua,
"Accept": "*/*",
"Referer": "https://im.dingtalk.com/",
"Accept-Encoding": "gzip, deflate, br"
}
return header
def get_qrcode():
url = "https://login.dingtalk.com/user/qrcode/generate.jsonp"
r = requests.get(url)
matches = re.search(r'.*\((.+?)\);$', r.text)
j = json.loads(matches.group(1))
guid = j['result']
return guid
def show_qrcode(guid):
print('show_qrcode: {}'.format(guid))
url = 'http://qr.dingtalk.com/action/login?code={}'.format(guid)
png = 'qrcode.png'
qrcode.make(url).save(png)
img = mpimg.imread(png)
plt.imshow(img)
plt.show()
def is_logged_in(uuid):
print('is_logged_in: {}'.format(uuid))
base = "https://login.dingtalk.com/user/qrcode/is_logged.jsonp"
params = {
"qrcode": uuid,
"pdmToken": "TAB2ADF2E0706AAC01FE45DAE1F90726276B4742885DC5C13D9CE34F3F5", # non necessary
"pdmTitle": "Windows 7",
"pdmModel": "Windows 7 Web",
"appKey": "85A09F60A599F5E1867EAB915A8BB07F",
"callback": "angular.callbacks._2q" # non necessary
}
count = 0
j = {}
while True and count < 10:
count += 1
r = requests.get(base, params = params, headers = get_header())
m = re.search(r'.*\(({.+?})\);$', r.text)
j = json.loads(m.group(1))
if j['success']:
break
time.sleep(1)
if count >= 10:
print("can't login, exit")
sys.exit(1)
# set cookie
params = {
"code": j['result']['tmpCode'],
"appkey": j['result']['appKey'],
"isSession": True,
"callback": "__jp0"
}
h = get_header()
h['Host'] = 'webalfa-cm10.dingtalk.com'
h['Connection'] = 'close'
device_id = ""
idx = 0
for i in range(len(host_list)):
r = requests.get("https://{}/setCookie".format(host_list[i]), params=params, headers=h)
if r.text.find('200'):
idx = i
cookie = r.headers['set-cookie']
device_id = cookie.split(';')[0].split('=')[1]
break
return (j, device_id, idx)
def connect(device_id, schema_idx):
ws = None
ws = create_connection(schema_list[schema_idx], host=host_list[schema_idx], origin='https://im.dingtalk.com', cookie='{}; deviceid_exist=true'.format(device_id))
if ws == None or ws.connected == False:
print("can't connect to these hosts: {h}".format(h=schema_list[schema_idx]))
sys.exit(1)
return ws
def heartbeat(ws, msg):
ws.send(msg)
ws.recv()
def get_cookie():
guid = get_qrcode()
handle = Process(target = show_qrcode, args = (guid,))
handle.start()
r, device_id, schema_idx = is_logged_in(guid)
res = r['result']
handle.terminate()
phone = int(res['userProfileExtensionModel']['userProfileModel']['mobile'])
msg = Message(phone, device_id)
ws = connect(msg.device_id, schema_idx)
ws.send(json.dumps(msg.get_reg_msg()))
ws.recv()
token = res['accessToken']
ws.send(json.dumps(msg.get_subscribe_msg(access_token=token)))
res = json.loads(ws.recv())
code = res['code']
echo = json.dumps(msg.get_keepalive_msg())
count = 0
while code != 200 and count < 3:
count += 1
heartbeat(ws, echo)
time.sleep(1)
ws.send(json.dumps(msg.get_subscribe_msg(access_token=token)))
res = json.loads(ws.recv())
print(res['body']['reason'])
code = res['code']
ws.send(json.dumps(msg.get_switch_status_msg()))
res = json.loads(ws.recv())
ws.send(json.dumps(msg.get_confirm_msg()))
ws.recv()
ws.recv() # why response twice ??
ws.send(json.dumps(msg.get_create_session_msg()))
res = json.loads(ws.recv())
if res["code"] == 400:
print(res["body"]["reason"])
sid = res["headers"]["sid"]
# create tmp session
header = get_header()
header["Cookie"] = "deciveid={did}; deviceid_exist=true; dd_sid={session_id}".format(did=msg.device_id, session_id=sid)
tmp = str(math.ceil(time.time() * 1e3))
payload = {
"callback": "jQuery19104339311785622433_{t}".format(t=tmp),
"sessionId": res["body"],
"_": "{t}".format(t=tmp)
}
resp = requests.get("https://login.dingtalk.com/login/createSessionInfoByTemp.jsonp", params=payload, headers=header)
dt_s = resp.headers.get("Set-Cookie").split(';')[0]
# build cookie
cookie_factory = "deviceid={did}; deviceid_exist=true; up_ab=y; preview_ab=y; dd_sid={session_id}; {dts}"
cookie = cookie_factory.format(did=msg.device_id, session_id=sid, dts=dt_s)
return cookie
def renew_cookie(cookie_filepath):
cookie = get_cookie()
expiration_time = math.ceil(time.time())
try:
fd = open(cookie_filepath, 'w')
data = {"expiration": expiration_time, "cookie": cookie}
fd.write(json.dumps(data))
fd.close()
except Exception as e:
print("Error: {err}".format(err=e.args))
sys.exit(1)
return cookie
def generate_cookie(cookie_filepath):
week_sec = 60 * 60 * 24 * 6
tmp = None
try:
with open(cookie_filepath, 'r') as fd:
tmp = fd.read()
fd.close()
except Exception:
return renew_cookie(cookie_filepath)
data = json.loads(tmp)
cookie = data["cookie"]
# check if cookie valid
now = math.ceil(time.time())
old = int(data["expiration"])
if now - old > week_sec:
cookie = renew_cookie(cookie_filepath)
return cookie
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment