⬆️ 升级(dailycheckin):版本号从23.10.18更新到24.01.07

♻️ 重构(dailycheckin/bilibili):移除未使用的requests.utils导入
 功能(dailycheckin/bilibili):添加不进行硬币兑换的日志消息
This commit is contained in:
shitao 2024-01-07 21:40:14 +08:00
parent 463bd0cb74
commit df742a681a
33 changed files with 2551 additions and 1 deletions

15
.flake8 Normal file
View File

@ -0,0 +1,15 @@
[flake8]
max-line-length = 120
max-complexity = 24
ignore = F401, W503, E203, E501, F841, E722, C901
exclude =
.git,
__pycache__,
scripts,
logs,
upload,
build,
dist,
docs,
migrations,
.scrapy

38
.pre-commit-config.yaml Normal file
View File

@ -0,0 +1,38 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: check-added-large-files
args: [--maxkb=10000]
- id: check-json
exclude: .vscode
- id: check-case-conflict
- id: detect-private-key
- id: mixed-line-ending
- id: trailing-whitespace
- id: fix-encoding-pragma
- id: requirements-txt-fixer
- id: trailing-whitespace
- repo: https://github.com/PyCQA/autoflake
rev: v2.2.1
hooks:
- id: autoflake
- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8
exclude: ^migrations/|^uploads/|^scripts/|^logs/|^docs/|^dist/|^build/
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 23.9.1
hooks:
- id: black
language_version: python3
exclude: ^migrations/|^uploads/|^scripts/|^logs/|^docs/|^dist/|^build/

View File

@ -1,4 +1,16 @@
.PHONY: mkdocs
.PHONY: clean sdist upload pre-commit mkdocs
sdist: clean
python3 setup.py sdist bdist_wheel --universa
upload: clean
python3 setup.py upload
clean:
rm -rf build dailycheckin.egg-info dist
pre-commit:
pre-commit run --all-files
mkdocs:
mkdocs gh-deploy --force

12
dailycheckin/__init__.py Executable file
View File

@ -0,0 +1,12 @@
# -*- coding: utf-8 -*-
import pkgutil as _pkgutil
class CheckIn(object):
name = "Base"
__path__ = _pkgutil.extend_path(__path__, __name__)
for _, _modname, _ in _pkgutil.walk_packages(path=__path__, prefix=__name__ + "."):
if _modname not in ["dailycheckin.main", "dailycheckin.configs"]:
__import__(_modname)

2
dailycheckin/__version__.py Executable file
View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
__version__ = "24.01.07"

0
dailycheckin/acfun/__init__.py Executable file
View File

185
dailycheckin/acfun/main.py Executable file
View File

@ -0,0 +1,185 @@
# -*- coding: utf-8 -*-
import json
import os
import re
import requests
import urllib3
from dailycheckin import CheckIn
urllib3.disable_warnings()
class AcFun(CheckIn):
name = "AcFun"
def __init__(self, check_item: dict):
self.check_item = check_item
self.contentid = "27259341"
self.st = None
@staticmethod
def login(phone, password, session):
url = "https://id.app.acfun.cn/rest/web/login/signin"
body = f"username={phone}&password={password}&key=&captcha="
res = session.post(url=url, data=body).json()
return (True, res) if res.get("result") == 0 else (False, res.get("err_msg"))
@staticmethod
def get_cookies(session, phone, password):
url = "https://id.app.acfun.cn/rest/app/login/signin"
headers = {
"Host": "id.app.acfun.cn",
"user-agent": "AcFun/6.39.0 (iPhone; iOS 14.3; Scale/2.00)",
"devicetype": "0",
"accept-language": "zh-Hans-CN;q=1, en-CN;q=0.9, ja-CN;q=0.8, zh-Hant-HK;q=0.7, io-Latn-CN;q=0.6",
"accept": "application/json",
"content-type": "application/x-www-form-urlencoded",
}
data = f"password={password}&username={phone}"
response = session.post(url=url, data=data, headers=headers, verify=False)
acpasstoken = response.json().get("acPassToken")
auth_key = str(response.json().get("auth_key"))
if acpasstoken and auth_key:
cookies = {"acPasstoken": acpasstoken, "auth_key": auth_key}
return cookies
else:
return False
def get_token(self, session):
url = "https://id.app.acfun.cn/rest/web/token/get?sid=acfun.midground.api"
res = session.post(url=url).json()
self.st = res.get("acfun.midground.api_st") if res.get("result") == 0 else ""
return self.st
def get_video(self, session):
url = "https://www.acfun.cn/rest/pc-direct/rank/channel"
res = session.get(url=url).json()
self.contentid = res.get("rankList")[0].get("contentId")
return self.contentid
@staticmethod
def sign(session):
url = "https://www.acfun.cn/rest/pc-direct/user/signIn"
response = session.post(url=url)
return {"name": "签到信息", "value": response.json().get("msg")}
def danmu(self, session):
url = "https://www.acfun.cn/rest/pc-direct/new-danmaku/add"
data = {
"mode": "1",
"color": "16777215",
"size": "25",
"body": "123321",
"videoId": "26113662",
"position": "2719",
"type": "douga",
"id": "31224739",
"subChannelId": "1",
"subChannelName": "动画",
}
response = session.get(url=f"https://www.acfun.cn/v/ac{self.contentid}")
videoId = re.findall(r'"currentVideoId":(\d+),', response.text)
subChannel = re.findall(
r'{subChannelId:(\d+),subChannelName:"([\u4e00-\u9fa5]+)"}', response.text
)
if videoId:
data["videoId"] = videoId[0]
data["subChannelId"] = subChannel[0][0]
data["subChannelName"] = subChannel[0][1]
res = session.post(url=url, data=data).json()
msg = "弹幕成功" if res.get("result") == 0 else "弹幕失败"
return {"name": "弹幕任务", "value": msg}
def throwbanana(self, session):
url = "https://www.acfun.cn/rest/pc-direct/banana/throwBanana"
data = {"resourceId": self.contentid, "count": "1", "resourceType": "2"}
res = session.post(url=url, data=data).json()
msg = "投🍌成功" if res.get("result") == 0 else "投🍌失败"
return {"name": "香蕉任务", "value": msg}
def like(self, session):
like_url = "https://kuaishouzt.com/rest/zt/interact/add"
unlike_url = "https://kuaishouzt.com/rest/zt/interact/delete"
body = (
f"kpn=ACFUN_APP&kpf=PC_WEB&subBiz=mainApp&interactType=1&"
f"objectType=2&objectId={self.contentid}&acfun.midground.api_st={self.st}&"
f"extParams%5BisPlaying%5D=false&extParams%5BshowCount%5D=1&extParams%5B"
f"otherBtnClickedCount%5D=10&extParams%5BplayBtnClickedCount%5D=0"
)
res = session.post(url=like_url, data=body).json()
session.post(url=unlike_url, data=body)
msg = "点赞成功" if res.get("result") == 1 else "点赞失败"
return {"name": "点赞任务", "value": msg}
def share(self, session, cookies):
url = "https://api-ipv6.acfunchina.com/rest/app/task/reportTaskAction?taskType=1&market=tencent&product=ACFUN_APP&appMode=0"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
response = session.get(url=url, headers=headers, cookies=cookies, verify=False)
if response.json().get("result") == 0:
msg = "分享成功"
else:
msg = "分享失败"
return {"name": "分享任务", "value": msg}
@staticmethod
def get_info(session):
url = "https://www.acfun.cn/rest/pc-direct/user/personalInfo"
res = session.get(url=url).json()
if res.get("result") != 0:
return [{"name": "当前等级", "value": "查询失败"}]
info = res.get("info")
return [
{"name": "当前等级", "value": info.get("level")},
{"name": "持有香蕉", "value": info.get("banana")},
]
def main(self):
phone = self.check_item.get("phone")
password = self.check_item.get("password")
session = requests.session()
session.headers.update(
{
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"content-type": "application/x-www-form-urlencoded; charset=UTF-8",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.70",
"Referer": "https://www.acfun.cn/",
}
)
flag, res = self.login(phone, password, session)
if flag is True:
self.get_video(session=session)
self.get_token(session=session)
sign_msg = self.sign(session=session)
like_msg = self.like(session=session)
danmu_msg = self.danmu(session=session)
throwbanana_msg = self.throwbanana(session=session)
info_msg = self.get_info(session=session)
msg = [
{"name": "帐号信息", "value": phone},
sign_msg,
like_msg,
danmu_msg,
throwbanana_msg,
] + info_msg
else:
msg = [{"name": "帐号信息", "value": phone}, {"name": "错误信息", "value": res}]
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("ACFUN", [])[0]
print(AcFun(check_item=_check_item).main())

0
dailycheckin/baidu/__init__.py Executable file
View File

64
dailycheckin/baidu/main.py Executable file
View File

@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
import json
import os
from urllib import parse
import requests
from dailycheckin import CheckIn
class Baidu(CheckIn):
name = "百度站点提交"
def __init__(self, check_item: dict):
self.check_item = check_item
@staticmethod
def url_submit(data_url: str, submit_url: str, times: int = 100) -> str:
site = parse.parse_qs(parse.urlsplit(submit_url).query).get("site")[0]
urls_data = requests.get(url=data_url)
remian = 100000
success_count = 0
error_count = 0
for one in range(times):
try:
response = requests.post(url=submit_url, data=urls_data)
if response.json().get("success"):
remian = response.json().get("remain")
success_count += response.json().get("success")
else:
error_count += 1
except Exception as e:
print(e)
error_count += 1
msg = [
{"name": "站点地址", "value": site},
{"name": "剩余条数", "value": remian},
{"name": "成功条数", "value": success_count},
{"name": "成功次数", "value": times - error_count},
{"name": "失败次数", "value": error_count},
]
return msg
def main(self):
data_url = self.check_item.get("data_url")
submit_url = self.check_item.get("submit_url")
times = int(self.check_item.get("times", 100))
if data_url and submit_url:
msg = self.url_submit(data_url=data_url, submit_url=submit_url, times=times)
else:
msg = {"name": "站点配置", "value": "配置错误"}
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("BAIDU", [])[0]
print(Baidu(check_item=_check_item).main())

View File

429
dailycheckin/bilibili/main.py Executable file
View File

@ -0,0 +1,429 @@
# -*- coding: utf-8 -*-
import json
import os
import time
import requests
from dailycheckin import CheckIn
class BiliBili(CheckIn):
name = "Bilibili"
def __init__(self, check_item: dict):
self.check_item = check_item
@staticmethod
def get_nav(session):
url = "https://api.bilibili.com/x/web-interface/nav"
ret = session.get(url=url).json()
uname = ret.get("data", {}).get("uname")
uid = ret.get("data", {}).get("mid")
is_login = ret.get("data", {}).get("isLogin")
coin = ret.get("data", {}).get("money")
vip_type = ret.get("data", {}).get("vipType")
current_exp = ret.get("data", {}).get("level_info", {}).get("current_exp")
return uname, uid, is_login, coin, vip_type, current_exp
@staticmethod
def get_today_exp(session: requests.Session) -> list:
"""GET 获取今日经验信息
:param requests.Session session:
:return list: 今日经验信息列表
"""
url = "https://api.bilibili.com/x/member/web/exp/log?jsonp=jsonp"
today = time.strftime("%Y-%m-%d", time.localtime())
return list(
filter(
lambda x: x["time"].split()[0] == today,
session.get(url=url).json().get("data").get("list"),
)
)
@staticmethod
def reward(session) -> dict:
"""取B站经验信息"""
url = "https://api.bilibili.com/x/member/web/exp/log?jsonp=jsonp"
today = time.strftime("%Y-%m-%d", time.localtime())
return list(
filter(
lambda x: x["time"].split()[0] == today,
session.get(url=url).json().get("data").get("list"),
)
)
@staticmethod
def live_sign(session) -> dict:
"""B站直播签到"""
try:
url = "https://api.live.bilibili.com/xlive/web-ucenter/v1/sign/DoSign"
ret = session.get(url=url).json()
if ret["code"] == 0:
msg = f'签到成功,{ret["data"]["text"]},特别信息:{ret["data"]["specialText"]},本月已签到{ret["data"]["hadSignDays"]}'
elif ret["code"] == 1011040:
msg = "今日已签到过,无法重复签到"
else:
msg = f'签到失败,信息为: {ret["message"]}'
except Exception as e:
msg = f"签到异常,原因为{str(e)}"
print(msg)
return msg
@staticmethod
def manga_sign(session, platform="android") -> dict:
"""
模拟B站漫画客户端签到
"""
try:
url = "https://manga.bilibili.com/twirp/activity.v1.Activity/ClockIn"
post_data = {"platform": platform}
ret = session.post(url=url, data=post_data).json()
if ret["code"] == 0:
msg = "签到成功"
elif ret["msg"] == "clockin clockin is duplicate":
msg = "今天已经签到过了"
else:
msg = f'签到失败,信息为({ret["msg"]})'
print(msg)
except Exception as e:
msg = f"签到异常,原因为: {str(e)}"
print(msg)
return msg
@staticmethod
def vip_privilege_receive(session, bili_jct, receive_type: int = 1) -> dict:
"""
领取B站大会员权益
receive_type int 权益类型1为B币劵2为优惠券
"""
url = "https://api.bilibili.com/x/vip/privilege/receive"
post_data = {"type": receive_type, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def vip_manga_reward(session) -> dict:
"""获取漫画大会员福利"""
url = "https://manga.bilibili.com/twirp/user.v1.User/GetVipReward"
ret = session.post(url=url, json={"reason_id": 1}).json()
return ret
@staticmethod
def report_task(session, bili_jct, aid: int, cid: int, progres: int = 300) -> dict:
"""
B站上报视频观看进度
aid int 视频av号
cid int 视频cid号
progres int 观看秒数
"""
url = "http://api.bilibili.com/x/v2/history/report"
post_data = {"aid": aid, "cid": cid, "progres": progres, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def share_task(session, bili_jct, aid) -> dict:
"""
分享指定av号视频
aid int 视频av号
"""
url = "https://api.bilibili.com/x/web-interface/share/add"
post_data = {"aid": aid, "csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def get_followings(
session,
uid: int,
pn: int = 1,
ps: int = 50,
order: str = "desc",
order_type: str = "attention",
) -> dict:
"""
获取指定用户关注的up主
uid int 账户uid默认为本账户非登录账户只能获取20个*5
pn int 页码默认第一页
ps int 每页数量默认50
order str 排序方式默认desc
order_type 排序类型默认attention
"""
params = {
"vmid": uid,
"pn": pn,
"ps": ps,
"order": order,
"order_type": order_type,
}
url = "https://api.bilibili.com/x/relation/followings"
ret = session.get(url=url, params=params).json()
return ret
@staticmethod
def space_arc_search(
session,
uid: int,
pn: int = 1,
ps: int = 30,
tid: int = 0,
order: str = "pubdate",
keyword: str = "",
) -> dict:
"""
获取指定up主空间视频投稿信息
uid int 账户uid默认为本账户
pn int 页码默认第一页
ps int 每页数量默认50
tid int 分区 默认为0(所有分区)
order str 排序方式默认pubdate
keyword str 关键字默认为空
"""
params = {
"mid": uid,
"pn": pn,
"Ps": ps,
"tid": tid,
"order": order,
"keyword": keyword,
}
url = "https://api.bilibili.com/x/space/arc/search"
ret = session.get(url=url, params=params).json()
count = 2
data_list = [
{
"aid": one.get("aid"),
"cid": 0,
"title": one.get("title"),
"owner": one.get("author"),
}
for one in ret.get("data", {}).get("list", {}).get("vlist", [])[:count]
]
return data_list, count
@staticmethod
def elec_pay(session, bili_jct, uid: int, num: int = 50) -> dict:
"""
用B币给up主充电
uid int up主uid
num int 充电电池数量
"""
url = "https://api.bilibili.com/x/ugcpay/trade/elec/pay/quick"
post_data = {
"elec_num": num,
"up_mid": uid,
"otype": "up",
"oid": uid,
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def coin_add(
session, bili_jct, aid: int, num: int = 1, select_like: int = 1
) -> dict:
"""
给指定 av 号视频投币
aid int 视频av号
num int 投币数量
select_like int 是否点赞
"""
url = "https://api.bilibili.com/x/web-interface/coin/add"
post_data = {
"aid": aid,
"multiply": num,
"select_like": select_like,
"cross_domain": "true",
"csrf": bili_jct,
}
ret = session.post(url=url, data=post_data).json()
return ret
@staticmethod
def live_status(session) -> dict:
"""B站直播获取金银瓜子状态"""
url = "https://api.live.bilibili.com/pay/v1/Exchange/getStatus"
ret = session.get(url=url).json()
data = ret.get("data")
silver = data.get("silver", 0)
gold = data.get("gold", 0)
coin = data.get("coin", 0)
msg = [
{"name": "硬币数量", "value": coin},
{"name": "金瓜子数", "value": gold},
{"name": "银瓜子数", "value": silver},
]
return msg
@staticmethod
def get_region(session, rid=1, num=6) -> dict:
"""
获取 B站分区视频信息
rid int 分区号
num int 获取视频数量
"""
url = (
"https://api.bilibili.com/x/web-interface/dynamic/region?ps="
+ str(num)
+ "&rid="
+ str(rid)
)
ret = session.get(url=url).json()
data_list = [
{
"aid": one.get("aid"),
"cid": one.get("cid"),
"title": one.get("title"),
"owner": one.get("owner", {}).get("name"),
}
for one in ret.get("data", {}).get("archives", [])
]
return data_list
@staticmethod
def silver2coin(session, bili_jct) -> dict:
"""B站银瓜子换硬币"""
url = "https://api.live.bilibili.com/xlive/revenue/v1/wallet/silver2coin"
post_data = {"csrf": bili_jct}
ret = session.post(url=url, data=post_data).json()
return ret
def main(self):
bilibili_cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
bili_jct = bilibili_cookie.get("bili_jct")
coin_num = self.check_item.get("coin_num", 0)
coin_type = self.check_item.get("coin_type", 1)
silver2coin = self.check_item.get("silver2coin", False)
session = requests.session()
requests.utils.add_dict_to_cookiejar(session.cookies, bilibili_cookie)
session.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36 Edg/91.0.864.64",
"Referer": "https://www.bilibili.com/",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Connection": "keep-alive",
}
)
success_count = 0
uname, uid, is_login, coin, vip_type, current_exp = self.get_nav(
session=session
)
if is_login:
manhua_msg = self.manga_sign(session=session)
live_msg = self.live_sign(session=session)
aid_list = self.get_region(session=session) if coin_type == 0 else []
coins_av_count = len(
list(
filter(
lambda x: x["reason"] == "视频投币奖励",
self.get_today_exp(session=session),
)
)
)
coin_num = coin_num - coins_av_count
coin_num = coin_num if coin_num < coin else coin
if coin_type == 1:
following_list = self.get_followings(session=session, uid=uid)
count = 0
for following in following_list.get("data", {}).get("list"):
mid = following.get("mid")
if mid:
tmplist, tmpcount = self.space_arc_search(
session=session, uid=mid
)
aid_list += tmplist
count += tmpcount
if count > coin_num:
print("已获取足够关注用户的视频")
break
else:
aid_list += self.get_region(session=session)
for one in aid_list[::-1]:
print(one)
if coin_num > 0:
for aid in aid_list[::-1]:
ret = self.coin_add(
session=session, aid=aid.get("aid"), bili_jct=bili_jct
)
if ret["code"] == 0:
coin_num -= 1
print(f'成功给{aid.get("title")}投一个币')
success_count += 1
elif ret["code"] == 34005:
print(f'投币{aid.get("title")}失败,原因为{ret["message"]}')
continue
# -104 硬币不够了 -111 csrf 失败 34005 投币达到上限
else:
print(f'投币{aid.get("title")}失败,原因为{ret["message"]},跳过投币')
break
if coin_num <= 0:
break
coin_msg = f"今日成功投币{success_count + coins_av_count}/{self.check_item.get('coin_num', 5)}"
else:
coin_msg = (
f"今日成功投币{coins_av_count}/{self.check_item.get('coin_num', 5)}"
)
aid = aid_list[0].get("aid")
cid = aid_list[0].get("cid")
title = aid_list[0].get("title")
report_ret = self.report_task(
session=session, bili_jct=bili_jct, aid=aid, cid=cid
)
if report_ret.get("code") == 0:
report_msg = f"观看《{title}》300秒"
else:
report_msg = "任务失败"
share_ret = self.share_task(session=session, bili_jct=bili_jct, aid=aid)
if share_ret.get("code") == 0:
share_msg = f"分享《{title}》成功"
else:
share_msg = "分享失败"
print(share_msg)
s2c_msg = "不兑换硬币"
if silver2coin:
silver2coin_ret = self.silver2coin(session=session, bili_jct=bili_jct)
s2c_msg = silver2coin_ret["message"]
if silver2coin_ret["code"] != 0:
print(s2c_msg)
else:
s2c_msg = ""
live_stats = self.live_status(session=session)
uname, uid, is_login, new_coin, vip_type, new_current_exp = self.get_nav(
session=session
)
today_exp = sum(
map(lambda x: x["delta"], self.get_today_exp(session=session))
)
update_data = (28800 - new_current_exp) // (today_exp if today_exp else 1)
msg = [
{"name": "帐号信息", "value": uname},
{"name": "漫画签到", "value": manhua_msg},
{"name": "直播签到", "value": live_msg},
{"name": "登陆任务", "value": "今日已登陆"},
{"name": "观看视频", "value": report_msg},
{"name": "分享任务", "value": share_msg},
{"name": "瓜子兑换", "value": s2c_msg},
{"name": "投币任务", "value": coin_msg},
{"name": "今日经验", "value": today_exp},
{"name": "当前经验", "value": new_current_exp},
{"name": "升级还需", "value": f"{update_data}"},
] + live_stats
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("BILIBILI", [])[0]
print(BiliBili(check_item=_check_item).main())

95
dailycheckin/configs.py Executable file
View File

@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
import json
import os
from dailycheckin import CheckIn
def checkin_map():
result = {}
for cls in CheckIn.__subclasses__():
check_name = cls.__name__.upper()
if check_name:
result[check_name] = (cls.name, cls)
return result
checkin_map = checkin_map()
notice_map = {
"BARK_URL": "",
"COOLPUSHEMAIL": "",
"COOLPUSHQQ": "",
"COOLPUSHSKEY": "",
"COOLPUSHWX": "",
"DINGTALK_ACCESS_TOKEN": "",
"DINGTALK_SECRET": "",
"FSKEY": "",
"PUSHPLUS_TOKEN": "",
"PUSHPLUS_TOPIC": "",
"QMSG_KEY": "",
"QMSG_TYPE": "",
"QYWX_AGENTID": "",
"QYWX_CORPID": "",
"QYWX_CORPSECRET": "",
"QYWX_KEY": "",
"QYWX_TOUSER": "",
"QYWX_MEDIA_ID": "",
"SCKEY": "",
"SENDKEY": "",
"TG_API_HOST": "",
"TG_BOT_TOKEN": "",
"TG_PROXY": "",
"TG_USER_ID": "",
"MERGE_PUSH": "",
}
def env2list(key):
try:
value = json.loads(os.getenv(key, []).strip()) if os.getenv(key) else []
if isinstance(value, list):
value = value
else:
value = []
except Exception as e:
print(e)
value = []
return value
def env2str(key):
try:
value = os.getenv(key, "") if os.getenv(key) else ""
if isinstance(value, str):
value = value.strip()
elif isinstance(value, bool):
value = value
else:
value = None
except Exception as e:
print(e)
value = None
return value
def get_checkin_info(data):
result = {}
if isinstance(data, dict):
for one in checkin_map.keys():
result[one.lower()] = data.get(one, [])
else:
for one in checkin_map.keys():
result[one.lower()] = env2list(one)
return result
def get_notice_info(data):
result = {}
if isinstance(data, dict):
for one in notice_map.keys():
result[one.lower()] = data.get(one, None)
else:
for one in notice_map.keys():
result[one.lower()] = env2str(one)
return result

0
dailycheckin/fmapp/__init__.py Executable file
View File

111
dailycheckin/fmapp/main.py Executable file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
import json
import os
import requests
from dailycheckin import CheckIn
class FMAPP(CheckIn):
name = "Fa米家"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def sign(headers):
try:
url = (
"https://fmapp.chinafamilymart.com.cn/api/app/market/member/signin/sign"
)
response = requests.post(url=url, headers=headers).json()
code = response.get("code")
if code == "200":
data = response.get("data", {})
msg = (
f"在坚持{data.get('nextDay')}天即可获得{data.get('nextNumber')}个发米粒, "
f"签到{data.get('lastDay')}天可获得{data.get('lastNumber')}个发米粒"
)
else:
msg = response.get("message")
except Exception as e:
print("错误信息", str(e))
msg = "未知错误,检查日志"
msg = {"name": "签到信息", "value": msg}
return msg
@staticmethod
def user_info(headers):
try:
url = "https://fmapp.chinafamilymart.com.cn/api/app/member/info"
response = requests.post(url=url, headers=headers).json()
code = response.get("code")
if code == "200":
data = response.get("data", {})
msg = data.get("nickName")
else:
msg = response.get("message")
except Exception as e:
print("错误信息", str(e))
msg = "未知错误,检查日志"
msg = {"name": "帐号信息", "value": msg}
return msg
@staticmethod
def mili_count(headers):
try:
url = "https://fmapp.chinafamilymart.com.cn/api/app/member/v1/mili/service/detail"
response = requests.post(
url=url, headers=headers, data=json.dumps({"pageSize": 10, "pageNo": 1})
).json()
code = response.get("code")
if code == "200":
data = response.get("data", {})
msg = data.get("miliNum")
else:
msg = response.get("message")
except Exception as e:
print("错误信息", str(e))
msg = "未知错误,检查日志"
msg = {"name": "米粒数量", "value": msg}
return msg
def main(self):
token = self.check_item.get("token")
blackbox = self.check_item.get("blackbox")
device_id = self.check_item.get("device_id")
fmversion = self.check_item.get("fmversion", "2.2.3")
fm_os = self.check_item.get("os", "ios")
useragent = self.check_item.get("useragent", "Fa")
headers = {
"Accept": "*/*",
"Accept-Language": "zh-Hans;q=1.0",
"Accept-Encoding": "br;q=1.0, gzip;q=0.9, deflate;q=0.8",
"Host": "fmapp.chinafamilymart.com.cn",
"Content-Type": "application/json",
"loginChannel": "app",
"token": token,
"fmVersion": fmversion,
"deviceId": device_id,
"User-Agent": useragent,
"os": fm_os,
"blackBox": blackbox,
}
sign_msg = self.sign(headers=headers)
name_msg = self.user_info(headers=headers)
mili_msg = self.mili_count(headers=headers)
msg = [name_msg, sign_msg, mili_msg]
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("FMAPP", [])[0]
print(FMAPP(check_item=_check_item).main())

0
dailycheckin/iqiyi/__init__.py Executable file
View File

254
dailycheckin/iqiyi/main.py Executable file
View File

@ -0,0 +1,254 @@
# -*- coding: utf-8 -*-
import json
import os
import re
import time
from urllib.parse import unquote
import requests
from dailycheckin import CheckIn
class IQIYI(CheckIn):
name = "爱奇艺"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def parse_cookie(cookie):
p00001 = (
re.findall(r"P00001=(.*?);", cookie)[0]
if re.findall(r"P00001=(.*?);", cookie)
else ""
)
p00002 = (
re.findall(r"P00002=(.*?);", cookie)[0]
if re.findall(r"P00002=(.*?);", cookie)
else ""
)
p00003 = (
re.findall(r"P00003=(.*?);", cookie)[0]
if re.findall(r"P00003=(.*?);", cookie)
else ""
)
return p00001, p00002, p00003
@staticmethod
def user_information(p00001):
"""
账号信息查询
"""
time.sleep(3)
url = "http://serv.vip.iqiyi.com/vipgrowth/query.action"
params = {"P00001": p00001}
res = requests.get(url=url, params=params).json()
if res["code"] == "A00000":
try:
res_data = res.get("data", {})
level = res_data.get("level", 0) # VIP 等级
growthvalue = res_data.get("growthvalue", 0) # 当前 VIP 成长值
distance = res_data.get("distance", 0) # 升级需要成长值
deadline = res_data.get("deadline", "非 VIP 用户") # VIP 到期时间
today_growth_value = res_data.get("todayGrowthValue", 0) # 今日成长值
msg = [
{"name": "VIP 等级", "value": level},
{"name": "当前成长", "value": growthvalue},
{"name": "今日成长", "value": today_growth_value},
{"name": "升级还需", "value": distance},
{"name": "VIP 到期", "value": deadline},
]
except Exception as e:
msg = [
{"name": "账号信息", "value": str(e)},
]
print(msg)
else:
msg = [
{"name": "账号信息", "value": res.get("msg")},
]
return msg
@staticmethod
def sign(p00001):
"""
VIP 签到
"""
url = "https://tc.vip.iqiyi.com/taskCenter/task/queryUserTask"
params = {"P00001": p00001, "autoSign": "yes"}
res = requests.get(url=url, params=params).json()
if res["code"] == "A00000":
try:
cumulate_sign_days_sum = res["data"]["monthlyGrowthReward"]
msg = [
{"name": "当月成长", "value": f"{cumulate_sign_days_sum}成长值"},
]
except Exception as e:
print(e)
msg = [{"name": "当月成长", "value": str(e)}]
else:
msg = [{"name": "当月成长", "value": res.get("msg")}]
return msg
@staticmethod
def query_user_task(p00001):
"""
获取 VIP 日常任务 taskCode(任务状态)
"""
url = "https://tc.vip.iqiyi.com/taskCenter/task/queryUserTask"
params = {"P00001": p00001}
task_list = []
res = requests.get(url=url, params=params).json()
if res["code"] == "A00000":
for item in res["data"]["tasks"]["daily"]:
task_list.append(
{
"name": item["name"],
"taskCode": item["taskCode"],
"status": item["status"],
"taskReward": item["taskReward"]["task_reward_growth"],
}
)
return task_list
@staticmethod
def join_task(p00001, task_list):
"""
遍历完成任务
"""
url = "https://tc.vip.iqiyi.com/taskCenter/task/joinTask"
params = {
"P00001": p00001,
"taskCode": "",
"platform": "bb136ff4276771f3",
"lang": "zh_CN",
}
for item in task_list:
if item["status"] == 2:
params["taskCode"] = item["taskCode"]
requests.get(url=url, params=params)
@staticmethod
def get_task_rewards(p00001, task_list):
"""
获取任务奖励
:return: 返回信息
"""
url = "https://tc.vip.iqiyi.com/taskCenter/task/getTaskRewards"
params = {
"P00001": p00001,
"taskCode": "",
"platform": "bb136ff4276771f3",
"lang": "zh_CN",
}
growth_task = 0
for item in task_list:
if item["status"] == 0:
params["taskCode"] = item.get("taskCode")
requests.get(url=url, params=params)
elif item["status"] == 4:
requests.get(
url="https://tc.vip.iqiyi.com/taskCenter/task/notify", params=params
)
params["taskCode"] = item.get("taskCode")
requests.get(url=url, params=params)
elif item["status"] == 1:
growth_task += item["taskReward"]
msg = {"name": "任务奖励", "value": f"+{growth_task}成长值"}
return msg
@staticmethod
def draw(draw_type, p00001, p00003):
"""
查询抽奖次数(),抽奖
:param draw_type: 类型0 查询次数1 抽奖
:param p00001: 关键参数
:param p00003: 关键参数
:return: {status, msg, chance}
"""
url = "https://iface2.iqiyi.com/aggregate/3.0/lottery_activity"
params = {
"lottery_chance": 1,
"app_k": "b398b8ccbaeacca840073a7ee9b7e7e6",
"app_v": "11.6.5",
"platform_id": 10,
"dev_os": "8.0.0",
"dev_ua": "FRD-AL10",
"net_sts": 1,
"qyid": "2655b332a116d2247fac3dd66a5285011102",
"psp_uid": p00003,
"psp_cki": p00001,
"psp_status": 3,
"secure_v": 1,
"secure_p": "GPhone",
"req_sn": round(time.time() * 1000),
}
if draw_type == 1:
del params["lottery_chance"]
res = requests.get(url=url, params=params).json()
if not res.get("code"):
chance = int(res.get("daysurpluschance"))
msg = res.get("awardName")
return {"status": True, "msg": msg, "chance": chance}
else:
try:
msg = res.get("kv", {}).get("msg")
except Exception as e:
print(e)
msg = res["errorReason"]
return {"status": False, "msg": msg, "chance": 0}
def main(self):
p00001, p00002, p00003 = self.parse_cookie(self.check_item.get("cookie"))
sign_msg = self.sign(p00001=p00001)
chance = self.draw(0, p00001=p00001, p00003=p00003)["chance"]
if chance:
draw_msg = ""
for i in range(chance):
ret = self.draw(1, p00001=p00001, p00003=p00003)
draw_msg += ret["msg"] + ";" if ret["status"] else ""
else:
draw_msg = "抽奖机会不足"
task_msg = ""
for one in range(6):
task_list = self.query_user_task(p00001=p00001)
self.join_task(p00001=p00001, task_list=task_list)
time.sleep(10)
task_msg = self.get_task_rewards(p00001=p00001, task_list=task_list)
try:
user_info = json.loads(unquote(p00002, encoding="utf-8"))
user_name = user_info.get("user_name")
user_name = user_name.replace(user_name[3:7], "****")
nickname = user_info.get("nickname")
except Exception as e:
print(f"获取账号信息失败,错误信息: {e}")
nickname = "未获取到,请检查 Cookie 中 P00002 字段"
user_name = "未获取到,请检查 Cookie 中 P00002 字段"
user_msg = self.user_information(p00001=p00001)
msg = (
[
{"name": "用户账号", "value": user_name},
{"name": "用户昵称", "value": nickname},
]
+ user_msg
+ sign_msg
+ [
task_msg,
{"name": "抽奖奖励", "value": draw_msg},
]
)
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("IQIYI", [])[0]
print(IQIYI(check_item=_check_item).main())

0
dailycheckin/kgqq/__init__.py Executable file
View File

166
dailycheckin/kgqq/main.py Executable file
View File

@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
import json
import os
import requests
from dailycheckin import CheckIn
class KGQQ(CheckIn):
name = "全民K歌"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def sign(kgqq_cookie):
headers = {"Cookie": kgqq_cookie}
uid = kgqq_cookie.split("; ")
t_uuid = ""
for i in uid:
if i.find("uid=") >= 0:
t_uuid = i.split("=")[1]
proto_profile_url = "https://node.kg.qq.com/webapp/proxy?ns=proto_profile&cmd=profile.getProfile&mapExt=JTdCJTIyZmlsZSUyMiUzQSUyMnByb2ZpbGVfd2ViYXBwSmNlJTIyJTJDJTIyY21kTmFtZSUyMiUzQSUyMlByb2ZpbGVHZXQlMjIlMkMlMjJhcHBpZCUyMiUzQTEwMDA2MjYlMkMlMjJkY2FwaSUyMiUzQSU3QiUyMmludGVyZmFjZUlkJTIyJTNBMjA1MzU5NTk3JTdEJTJDJTIybDVhcGklMjIlM0ElN0IlMjJtb2RpZCUyMiUzQTI5NDAxNyUyQyUyMmNtZCUyMiUzQTI2MjE0NCU3RCUyQyUyMmlwJTIyJTNBJTIyMTAwLjExMy4xNjIuMTc4JTIyJTJDJTIycG9ydCUyMiUzQSUyMjEyNDA2JTIyJTdE&t_uUid={0}".format(
t_uuid
)
url_list = (
[
"https://node.kg.qq.com/webapp/proxy?ns=KG_TASK&cmd=task.getLottery&ns_inbuf=&mapExt=JTdCJTIyZmlsZSUyMiUzQSUyMnRhc2tKY2UlMjIlMkMlMjJjbWROYW1lJTIyJTNBJTIyTG90dGVyeVJlcSUyMiUyQyUyMnduc0NvbmZpZyUyMiUzQSU3QiUyMmFwcGlkJTIyJTNBMTAwMDU1NyU3RCUyQyUyMmw1YXBpJTIyJTNBJTdCJTIybW9kaWQlMjIlM0E1MDM5MzclMkMlMjJjbWQlMjIlM0E1ODk4MjQlN0QlN0Q%3D&t_uid={0}&t_iShowEntry=1&t_type={1}".format(
t_uuid, one
)
for one in ["1", "2"]
]
+ [
"https://node.kg.qq.com/webapp/proxy?ns=KG_TASK&cmd=task.signinGetAward&mapExt=JTdCJTIyZmlsZSUyMiUzQSUyMnRhc2tKY2UlMjIlMkMlMjJjbWROYW1lJTIyJTNBJTIyR2V0U2lnbkluQXdhcmRSZXElMjIlMkMlMjJ3bnNDb25maWclMjIlM0ElN0IlMjJhcHBpZCUyMiUzQTEwMDA2MjYlN0QlMkMlMjJsNWFwaSUyMiUzQSU3QiUyMm1vZGlkJTIyJTNBNTAzOTM3JTJDJTIyY21kJTIyJTNBNTg5ODI0JTdEJTdE&t_uid={0}&t_iShowEntry={1}".format(
t_uuid, one
)
for one in ["1", "2", "4", "16", "128", "512"]
]
+ [
"https://node.kg.qq.com/webapp/proxy?ns=KG_TASK&cmd=task.getLottery&mapExt=JTdCJTIyZmlsZSUyMiUzQSUyMnRhc2tKY2UlMjIlMkMlMjJjbWROYW1lJTIyJTNBJTIyTG90dGVyeVJlcSUyMiUyQyUyMnduc0NvbmZpZyUyMiUzQSU3QiUyMmFwcGlkJTIyJTNBMTAwMDU1NyU3RCUyQyUyMmw1YXBpJTIyJTNBJTdCJTIybW9kaWQlMjIlM0E1MDM5MzclMkMlMjJjbWQlMjIlM0E1ODk4MjQlN0QlN0Q&t_uid={0}&t_iShowEntry=4&t_type=104".format(
t_uuid
),
"https://node.kg.qq.com/webapp/proxy?ns=KG_TASK&cmd=task.getLottery&mapExt=JTdCJTIyZmlsZSUyMiUzQSUyMnRhc2tKY2UlMjIlMkMlMjJjbWROYW1lJTIyJTNBJTIyTG90dGVyeVJlcSUyMiUyQyUyMmw1YXBpJTIyJTNBJTdCJTIybW9kaWQlMjIlM0E1MDM5MzclMkMlMjJjbWQlMjIlM0E1ODk4MjQlN0QlMkMlMjJsNWFwaV9leHAxJTIyJTNBJTdCJTIybW9kaWQlMjIlM0E4MTcwODklMkMlMjJjbWQlMjIlM0EzODAxMDg4JTdEJTdE&t_uid={0}&t_type=103".format(
t_uuid
),
]
)
proto_music_station_url = "https://node.kg.qq.com/webapp/proxy?ns=proto_music_station&cmd=message.batch_get_music_cards&mapExt=JTdCJTIyY21kTmFtZSUyMiUzQSUyMkdldEJhdGNoTXVzaWNDYXJkc1JlcSUyMiUyQyUyMmZpbGUlMjIlM0ElMjJwcm90b19tdXNpY19zdGF0aW9uSmNlJTIyJTJDJTIyd25zRGlzcGF0Y2hlciUyMiUzQXRydWUlN0Q&t_uUid={0}&g_tk_openkey=".format(
t_uuid
)
url_10 = "https://node.kg.qq.com/webapp/proxy?t_stReward%3Aobject=%7B%22uInteractiveType%22%3A1%2C%22uRewardType%22%3A0%2C%22uFlowerNum%22%3A15%7D&ns=proto_music_station&cmd=message.get_reward&mapExt=JTdCJTIyY21kTmFtZSUyMiUzQSUyMkdldFJld2FyZFJlcSUyMiUyQyUyMmZpbGUlMjIlM0ElMjJwcm90b19tdXNpY19zdGF0aW9uSmNlJTIyJTJDJTIyd25zRGlzcGF0Y2hlciUyMiUzQXRydWUlN0Q&t_uUid={0}&t_strUgcId=".format(
t_uuid
)
url_15 = "https://node.kg.qq.com/webapp/proxy?t_stReward%3Aobject=%7B%22uInteractiveType%22%3A0%2C%22uRewardType%22%3A0%2C%22uFlowerNum%22%3A10%7D&ns=proto_music_station&cmd=message.get_reward&mapExt=JTdCJTIyY21kTmFtZSUyMiUzQSUyMkdldFJld2FyZFJlcSUyMiUyQyUyMmZpbGUlMjIlM0ElMjJwcm90b19tdXNpY19zdGF0aW9uSmNlJTIyJTJDJTIyd25zRGlzcGF0Y2hlciUyMiUzQXRydWUlN0Q&t_uUid={0}&t_strUgcId=".format(
t_uuid
)
try:
old_proto_profile_response = requests.get(
url=proto_profile_url, headers=headers
)
old_num = old_proto_profile_response.json()["data"]["profile.getProfile"][
"uFlowerNum"
]
nickname = old_proto_profile_response.json()["data"]["profile.getProfile"][
"stPersonInfo"
]["sKgNick"]
for url in url_list:
try:
requests.get(url=url, headers=headers)
except Exception as e:
print(e)
for g_tk_openkey in range(16):
try:
proto_music_station_resp = requests.get(
url=proto_music_station_url + str(g_tk_openkey), headers=headers
)
if proto_music_station_resp.json().get("code") in [1000]:
return proto_music_station_resp.json().get("msg")
vct_music_cards = proto_music_station_resp.json()["data"][
"message.batch_get_music_cards"
]["vctMusicCards"]
vct_music_cards_list = sorted(
vct_music_cards,
key=lambda x: x["stReward"]["uFlowerNum"],
reverse=True,
)[0]
str_ugc_id = vct_music_cards_list["strUgcId"]
str_key = vct_music_cards_list["strKey"]
url = str_ugc_id + "&t_strKey=" + str_key
u_flower_num = vct_music_cards_list["stReward"]["uFlowerNum"]
if u_flower_num > 10:
requests.get(url=url_10 + url, headers=headers)
elif 1 < u_flower_num < 10:
requests.get(url=url_15 + url, headers=headers)
except Exception as e:
print(e)
# VIP 签到
try:
getinfourl = (
"https://node.kg.qq.com/webapp/proxy?ns=proto_vip_webapp&cmd=vip.get_vip_info&t_uUid="
+ t_uuid
+ "&t_uWebReq=1&t_uGetDataFromC4B=1"
)
inforequest = requests.get(url=getinfourl, headers=headers)
vip_status = inforequest.json()["data"]["vip.get_vip_info"][
"stVipCoreInfo"
]["uStatus"]
if vip_status == 1:
vipurl = (
"https://node.kg.qq.com/webapp/proxy?t_uUid="
+ t_uuid
+ "&ns=proto_vip_webapp&cmd=vip.get_vip_day_reward&ns_inbuf=&nocache=1613719349184&mapExt=JTdCJTIyY21kTmFtZSUyMiUzQSUyMkdldFZpcERheVJld2FyZCUyMiU3RA%3D%3D&g_tk_openkey=642424811"
)
viprequest = requests.get(url=vipurl, headers=headers)
str_tips = viprequest.json()["data"]["vip.get_vip_day_reward"][
"strTips"
]
u_cur_reward_num = viprequest.json()["data"][
"vip.get_vip_day_reward"
]["uCurRewardNum"]
vip_message = f"{str_tips} 获取VIP福利道具{u_cur_reward_num}"
else:
vip_message = "非 VIP 用户"
except Exception as e:
print(e)
vip_message = "VIP 签到失败"
new_proto_profile_response = requests.get(
url=proto_profile_url, headers=headers
)
new_num = new_proto_profile_response.json()["data"]["profile.getProfile"][
"uFlowerNum"
]
get_num = int(new_num) - int(old_num)
msg = [
{"name": "帐号信息", "value": nickname},
{"name": "获取鲜花", "value": get_num},
{"name": "当前鲜花", "value": new_num},
{"name": "VIP 签到", "value": vip_message},
]
except Exception as e:
msg = [
{"name": "帐号信息", "value": str(e)},
]
return msg
def main(self):
kgqq_cookie = self.check_item.get("cookie")
msg = self.sign(kgqq_cookie=kgqq_cookie)
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("KGQQ", [])[0]
print(KGQQ(check_item=_check_item).main())

147
dailycheckin/main.py Executable file
View File

@ -0,0 +1,147 @@
# -*- coding: utf-8 -*-
import argparse
import json
import os
import time
from datetime import datetime, timedelta
import requests
from dailycheckin.__version__ import __version__
from dailycheckin.configs import checkin_map, get_checkin_info, get_notice_info
from dailycheckin.utils.format_config import format_data
from dailycheckin.utils.message import push_message
def parse_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--include", nargs="+", help="任务执行包含的任务列表")
parser.add_argument("--exclude", nargs="+", help="任务执行排除的任务列表")
return parser.parse_args()
def check_config(task_list):
config_path = None
config_path_list = []
for one_path in [
"/ql/scripts/config.json",
"config.json",
"../config.json",
"./config/config.json",
"../config/config.json",
"/config.json",
]:
_config_path = os.path.join(os.getcwd(), one_path)
if os.path.exists(_config_path):
config_path = os.path.normpath(_config_path)
break
config_path_list.append(os.path.normpath(os.path.dirname(_config_path)))
if config_path:
print("使用配置文件路径:", config_path)
with open(config_path, "r", encoding="utf-8") as f:
try:
data = json.load(f)
flag, _data = format_data(data)
if flag:
print(
f"""\n\n当前版本大于 0.2.0 需要对「config.json」配置格式已更新请按照如下更新步骤进行更新:
1. 更新 Pypi 到最新版本>= v0.2.0 版本如何更新看教程文档
2. 手动执行一次签到请复制本次运行输出的配置覆盖到config.json文件后再重新运行记得去 https://json.cn 校验一下
3. 更新 新版config.json配置后再次手动运行一次检测签到是否正常即可
4. 如果失败请确定 pypi 版本大于 0.2.0 并根据配置文档: https://sitoi.github.io/dailycheckin/settings/ 手动更新配置\n
下方内容即是新版配置内容全部复制即可记得去 https://json.cn 校验一下
{'-' * 100}
{json.dumps(_data, ensure_ascii=False)}
{'-' * 100}\n\n"""
)
return False, False
except Exception as e:
print("Json 格式错误,请务必到 http://www.json.cn 网站检查 config.json 文件格式是否正确!")
return False, False
try:
notice_info = get_notice_info(data=data)
_check_info = get_checkin_info(data=data)
check_info = {}
for one_check, _ in checkin_map.items():
if one_check in task_list:
if _check_info.get(one_check.lower()):
for _, check_item in enumerate(
_check_info.get(one_check.lower(), [])
):
if "xxxxxx" not in str(check_item) and "多账号" not in str(
check_item
):
if one_check.lower() not in check_info.keys():
check_info[one_check.lower()] = []
check_info[one_check.lower()].append(check_item)
return notice_info, check_info
except Exception as e:
print(e)
return False, False
else:
print(
"未找到 config.json 配置文件\n请在下方任意目录中添加「config.json」文件:\n"
+ "\n".join(config_path_list)
)
return False, False
def checkin():
start_time = time.time()
utc_time = (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
print(f"当前时间: {utc_time}\n当前版本: {__version__}")
args = parse_arguments()
include = args.include
exclude = args.exclude
if not include:
include = list(checkin_map.keys())
else:
include = [one for one in include if one in checkin_map.keys()]
if not exclude:
exclude = []
else:
exclude = [one for one in exclude if one in checkin_map.keys()]
task_list = list(set(include) - set(exclude))
notice_info, check_info = check_config(task_list)
if check_info:
task_name_str = "\n".join(
[
f"{checkin_map.get(one.upper())[0]}」账号数 : {len(value)}"
for one, value in check_info.items()
]
)
print(f"\n---------- 本次执行签到任务如下 ----------\n\n{task_name_str}\n\n")
content_list = []
for one_check, check_list in check_info.items():
check_name, check_func = checkin_map.get(one_check.upper())
print(f"----------开始执行「{check_name}」签到----------")
for index, check_item in enumerate(check_list):
try:
msg = check_func(check_item).main()
content_list.append(f"{check_name}\n{msg}")
print(f"{index + 1} 个账号: ✅✅✅✅✅")
except Exception as e:
content_list.append(f"{check_name}\n{e}")
print(f"{index + 1} 个账号: ❌❌❌❌❌\n{e}")
print("\n\n")
try:
url = "https://pypi.python.org/pypi/dailycheckin/json"
data = list(requests.get(url=url, timeout=30).json()["releases"].keys())
data.sort()
latest_version = data[-1]
except:
print("获取最新版本失败")
latest_version = "0.0.0"
content_list.append(
f"开始时间: {utc_time}\n"
f"任务用时: {int(time.time() - start_time)}\n"
f"当前版本: {__version__}\n"
f"最新版本: {latest_version}\n"
f"项目地址: https://github.com/Sitoi/dailycheckin"
)
push_message(content_list=content_list, notice_info=notice_info)
return
if __name__ == "__main__":
checkin()

View File

File diff suppressed because one or more lines are too long

0
dailycheckin/tieba/__init__.py Executable file
View File

130
dailycheckin/tieba/main.py Executable file
View File

@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
import hashlib
import json
import os
import re
import requests
from requests import utils
from dailycheckin import CheckIn
class Tieba(CheckIn):
name = "百度贴吧"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def login_info(session):
return session.get(url="https://zhidao.baidu.com/api/loginInfo").json()
def valid(self, session):
try:
content = session.get(url="https://tieba.baidu.com/dc/common/tbs")
except Exception as e:
return False, f"登录验证异常,错误信息: {e}"
data = json.loads(content.text)
if data["is_login"] == 0:
return False, "登录失败,cookie 异常"
tbs = data["tbs"]
user_name = self.login_info(session=session)["userName"]
return tbs, user_name
@staticmethod
def tieba_list_more(session):
content = session.get(
url="https://tieba.baidu.com/f/like/mylike?&pn=1",
timeout=(5, 20),
allow_redirects=False,
)
try:
pn = int(
re.match(
r".*/f/like/mylike\?&pn=(.*?)\">尾页.*", content.text, re.S | re.I
).group(1)
)
except Exception:
pn = 1
next_page = 1
pattern = re.compile(r".*?<a href=\"/f\?kw=.*?title=\"(.*?)\">")
while next_page <= pn:
tbname = pattern.findall(content.text)
for x in tbname:
yield x
next_page += 1
content = session.get(
url=f"https://tieba.baidu.com/f/like/mylike?&pn={next_page}",
timeout=(5, 20),
allow_redirects=False,
)
def get_tieba_list(self, session):
tieba_list = list(self.tieba_list_more(session=session))
return tieba_list
@staticmethod
def sign(session, tb_name_list, tbs):
success_count, error_count, exist_count, shield_count = 0, 0, 0, 0
for tb_name in tb_name_list:
md5 = hashlib.md5(
f"kw={tb_name}tbs={tbs}tiebaclient!!!".encode("utf-8")
).hexdigest()
data = {"kw": tb_name, "tbs": tbs, "sign": md5}
try:
response = session.post(
url="https://c.tieba.baidu.com/c/c/forum/sign",
data=data,
verify=False,
).json()
if response["error_code"] == "0":
success_count += 1
elif response["error_code"] == "160002":
exist_count += 1
elif response["error_code"] == "340006":
shield_count += 1
else:
error_count += 1
except Exception as e:
print(f"贴吧 {tb_name} 签到异常,原因{str(e)}")
msg = [
{"name": "贴吧总数", "value": len(tb_name_list)},
{"name": "签到成功", "value": success_count},
{"name": "已经签到", "value": exist_count},
{"name": "被屏蔽的", "value": shield_count},
{"name": "签到失败", "value": error_count},
]
return msg
def main(self):
tieba_cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
session = requests.session()
requests.utils.add_dict_to_cookiejar(session.cookies, tieba_cookie)
session.headers.update({"Referer": "https://www.baidu.com/"})
tbs, user_name = self.valid(session=session)
if tbs:
tb_name_list = self.get_tieba_list(session=session)
msg = self.sign(session=session, tb_name_list=tb_name_list, tbs=tbs)
msg = [{"name": "帐号信息", "value": user_name}] + msg
else:
msg = [
{"name": "帐号信息", "value": user_name},
{"name": "签到信息", "value": "Cookie 可能过期"},
]
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("TIEBA", [])[0]
print(Tieba(check_item=_check_item).main())

0
dailycheckin/utils/__init__.py Executable file
View File

View File

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
name_map = {
"IQIYI_COOKIE_LIST": "IQIYI",
"VQQ_COOKIE_LIST": "VQQ",
"MGTV_PARAMS_LIST": "MGTV",
"KGQQ_COOKIE_LIST": "KGQQ",
"MUSIC163_ACCOUNT_LIST": "MUSIC163",
"BILIBILI_COOKIE_LIST": "BILIBILI",
"YOUDAO_COOKIE_LIST": "YOUDAO",
"FMAPP_ACCOUNT_LIST": "FMAPP",
"BAIDU_URL_SUBMIT_LIST": "BAIDU",
"ONEPLUSBBS_COOKIE_LIST": "ONEPLUSBBS",
"SMZDM_COOKIE_LIST": "SMZDM",
"TIEBA_COOKIE_LIST": "TIEBA",
"V2EX_COOKIE_LIST": "V2EX",
"WWW2NZZ_COOKIE_LIST": "WWW2NZZ",
"ACFUN_ACCOUNT_LIST": "ACFUN",
"MIMOTION_ACCOUNT_LIST": "MIMOTION",
"CLOUD189_ACCOUNT_LIST": "CLOUD189",
"POJIE_COOKIE_LIST": "POJIE",
"MEIZU_COOKIE_LIST": "MEIZU",
"PICACOMIC_ACCOUNT_LIST": "PICACOMIC",
"ZHIYOO_COOKIE_LIST": "ZHIYOO",
"WEIBO_COOKIE_LIST": "WEIBO",
"DUOKAN_COOKIE_LIST": "DUOKAN",
"CSDN_COOKIE_LIST": "CSDN",
"WZYD_DATA_LIST": "WZYD",
"WOMAIL_URL_LIST": "WOMAIL",
}
change_key_map = {
"ACFUN_ACCOUNT_LIST": {"acfun_password": "password", "acfun_phone": "phone"},
"BAIDU_URL_SUBMIT_LIST": {
"data_url": "data_url",
"submit_url": "submit_url",
"times": "times",
},
"BILIBILI_COOKIE_LIST": {
"bilibili_cookie": "cookie",
"coin_num": "coin_num",
"coin_type": "coin_type",
"silver2coin": "silver2coin",
},
"CLOUD189_ACCOUNT_LIST": {
"cloud189_password": "password",
"cloud189_phone": "phone",
},
"CSDN_COOKIE_LIST": {"csdn_cookie": "cookie"},
"DUOKAN_COOKIE_LIST": {"duokan_cookie": "cookie"},
"FMAPP_ACCOUNT_LIST": {
"fmapp_blackbox": "blackbox",
"fmapp_cookie": "cookie",
"fmapp_device_id": "device_id",
"fmapp_fmversion": "fmversion",
"fmapp_os": "os",
"fmapp_token": "token",
"fmapp_useragent": "useragent",
},
"HEYTAP": {"cookie": "cookie", "useragent": "useragent"},
"IQIYI_COOKIE_LIST": {"iqiyi_cookie": "cookie"},
"KGQQ_COOKIE_LIST": {"kgqq_cookie": "cookie"},
"MEIZU_COOKIE_LIST": {"draw_count": "draw_count", "meizu_cookie": "cookie"},
"MGTV_PARAMS_LIST": {"mgtv_params": "params"},
"MIMOTION_ACCOUNT_LIST": {
"mimotion_max_step": "max_step",
"mimotion_min_step": "min_step",
"mimotion_password": "password",
"mimotion_phone": "phone",
},
"MUSIC163_ACCOUNT_LIST": {
"music163_password": "password",
"music163_phone": "phone",
},
"ONEPLUSBBS_COOKIE_LIST": {"oneplusbbs_cookie": "cookie"},
"PICACOMIC_ACCOUNT_LIST": {
"picacomic_email": "email",
"picacomic_password": "password",
},
"POJIE_COOKIE_LIST": {"pojie_cookie": "cookie"},
"SMZDM_COOKIE_LIST": {"smzdm_cookie": "cookie"},
"TIEBA_COOKIE_LIST": {"tieba_cookie": "cookie"},
"UNICOM": {"app_id": "app_id", "mobile": "mobile", "password": "password"},
"V2EX_COOKIE_LIST": {"v2ex_cookie": "cookie", "v2ex_proxy": "proxy"},
"VQQ_COOKIE_LIST": {"auth_refresh": "auth_refresh", "vqq_cookie": "cookie"},
"WEIBO_COOKIE_LIST": {"weibo_show_url": "url"},
"WOMAIL_URL_LIST": {"womail_url": "url"},
"WWW2NZZ_COOKIE_LIST": {"www2nzz_cookie": "cookie"},
"WZYD_DATA_LIST": {"wzyd_data": "data"},
"YOUDAO_COOKIE_LIST": {"youdao_cookie": "cookie"},
"ZHIYOO_COOKIE_LIST": {"zhiyoo_cookie": "cookie"},
}
def format_data(data):
flag = False
new_data = {}
for key, value in data.items():
if name_map.get(key):
flag = True
if isinstance(value, list):
for one in value:
if isinstance(one, dict):
if name_map.get(key) not in new_data.keys():
new_data[name_map.get(key)] = []
for k2, v2 in change_key_map[key].items():
try:
one[v2] = one.pop(k2)
except Exception as e:
print(e)
new_data[name_map.get(key)].append(one)
if not new_data.get(name_map.get(key)):
new_data[name_map.get(key)] = value
else:
new_data[key] = value
return flag, new_data

335
dailycheckin/utils/message.py Executable file
View File

@ -0,0 +1,335 @@
# -*- coding: utf-8 -*-
import base64
import hashlib
import hmac
import json
import time
import urllib.parse
import requests
def message2server(sckey, content):
print("server 酱推送开始")
data = {"text": "每日签到", "desp": content.replace("\n", "\n\n")}
requests.post(url=f"https://sc.ftqq.com/{sckey}.send", data=data)
return
def message2server_turbo(sendkey, content):
print("server 酱 Turbo 推送开始")
data = {"text": "每日签到", "desp": content.replace("\n", "\n\n")}
requests.post(url=f"https://sctapi.ftqq.com/{sendkey}.send", data=data)
return
def message2coolpush(
coolpushskey,
content,
coolpushqq: bool = True,
coolpushwx: bool = False,
coolpushemail: bool = False,
):
print("Cool Push 推送开始")
params = {"c": content, "t": "每日签到"}
if coolpushqq:
requests.post(url=f"https://push.xuthus.cc/send/{coolpushskey}", params=params)
if coolpushwx:
requests.post(url=f"https://push.xuthus.cc/wx/{coolpushskey}", params=params)
if coolpushemail:
requests.post(url=f"https://push.xuthus.cc/email/{coolpushskey}", params=params)
return
def message2qmsg(qmsg_key, qmsg_type, content):
print("qmsg 酱推送开始")
params = {"msg": content}
if qmsg_type == "group":
requests.get(url=f"https://qmsg.zendee.cn/group/{qmsg_key}", params=params)
else:
requests.get(url=f"https://qmsg.zendee.cn/send/{qmsg_key}", params=params)
return
def message2telegram(tg_api_host, tg_proxy, tg_bot_token, tg_user_id, content):
print("Telegram 推送开始")
send_data = {
"chat_id": tg_user_id,
"text": content,
"disable_web_page_preview": "true",
}
if tg_api_host:
url = f"https://{tg_api_host}/bot{tg_bot_token}/sendMessage"
else:
url = f"https://api.telegram.org/bot{tg_bot_token}/sendMessage"
if tg_proxy:
proxies = {
"http": tg_proxy,
"https": tg_proxy,
}
else:
proxies = None
requests.post(url=url, data=send_data, proxies=proxies)
return
def message2feishu(fskey, content):
print("飞书 推送开始")
data = {"msg_type": "text", "content": {"text": content}}
requests.post(
url=f"https://open.feishu.cn/open-apis/bot/v2/hook/{fskey}", json=data
)
return
def message2dingtalk(dingtalk_secret, dingtalk_access_token, content):
print("Dingtalk 推送开始")
timestamp = str(round(time.time() * 1000))
secret_enc = dingtalk_secret.encode("utf-8")
string_to_sign = "{}\n{}".format(timestamp, dingtalk_secret)
string_to_sign_enc = string_to_sign.encode("utf-8")
hmac_code = hmac.new(
secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
send_data = {"msgtype": "text", "text": {"content": content}}
requests.post(
url="https://oapi.dingtalk.com/robot/send?access_token={0}&timestamp={1}&sign={2}".format(
dingtalk_access_token, timestamp, sign
),
headers={"Content-Type": "application/json", "Charset": "UTF-8"},
data=json.dumps(send_data),
)
return
def message2bark(bark_url: str, content):
print("Bark 推送开始")
if not bark_url.endswith("/"):
bark_url += "/"
url = f"{bark_url}{content}"
headers = {"Content-type": "application/x-www-form-urlencoded"}
requests.get(url=url, headers=headers)
return
def message2qywxrobot(qywx_key, content):
print("企业微信群机器人推送开始")
requests.post(
url=f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={qywx_key}",
data=json.dumps({"msgtype": "text", "text": {"content": content}}),
)
return
def message2qywxapp(
qywx_corpid, qywx_agentid, qywx_corpsecret, qywx_touser, qywx_media_id, content
):
print("企业微信应用消息推送开始")
res = requests.get(
f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={qywx_corpid}&corpsecret={qywx_corpsecret}"
)
token = res.json().get("access_token", False)
if qywx_media_id:
data = {
"touser": qywx_touser,
"msgtype": "mpnews",
"agentid": int(qywx_agentid),
"mpnews": {
"articles": [
{
"title": "Dailycheckin 签到通知",
"thumb_media_id": qywx_media_id,
"author": "Sitoi",
"content_source_url": "https://github.com/Sitoi/dailycheckin",
"content": content.replace("\n", "<br>"),
"digest": content,
}
]
},
}
else:
data = {
"touser": qywx_touser,
"agentid": int(qywx_agentid),
"msgtype": "textcard",
"textcard": {
"title": "Dailycheckin 签到通知",
"description": content,
"url": "https://github.com/Sitoi/dailycheckin",
"btntxt": "开源项目",
},
}
requests.post(
url=f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={token}",
data=json.dumps(data),
)
return
def message2pushplus(pushplus_token, content, pushplus_topic=None):
print("Pushplus 推送开始")
data = {
"token": pushplus_token,
"title": "签到通知",
"content": content.replace("\n", "<br>"),
"template": "json",
}
if pushplus_topic:
data["topic"] = pushplus_topic
requests.post(url="http://www.pushplus.plus/send", data=json.dumps(data))
return
def important_notice():
datas = requests.get(
url="https://api.github.com/repos/Sitoi/dailycheckin/issues?state=open&labels=通知"
).json()
if datas:
data = datas[0]
title = data.get("title")
body = data.get("body")
url = data.get("html_url")
notice = f"{title}\n{body}\n详细地址: {url}"
else:
notice = None
return notice
def push_message(content_list: list, notice_info: dict):
dingtalk_secret = notice_info.get("dingtalk_secret")
dingtalk_access_token = notice_info.get("dingtalk_access_token")
fskey = notice_info.get("fskey")
bark_url = notice_info.get("bark_url")
sckey = notice_info.get("sckey")
sendkey = notice_info.get("sendkey")
qmsg_key = notice_info.get("qmsg_key")
qmsg_type = notice_info.get("qmsg_type")
tg_bot_token = notice_info.get("tg_bot_token")
tg_user_id = notice_info.get("tg_user_id")
tg_api_host = notice_info.get("tg_api_host")
tg_proxy = notice_info.get("tg_proxy")
coolpushskey = notice_info.get("coolpushskey")
coolpushqq = notice_info.get("coolpushqq")
coolpushwx = notice_info.get("coolpushwx")
coolpushemail = notice_info.get("coolpushemail")
qywx_key = notice_info.get("qywx_key")
qywx_corpid = notice_info.get("qywx_corpid")
qywx_agentid = notice_info.get("qywx_agentid")
qywx_corpsecret = notice_info.get("qywx_corpsecret")
qywx_touser = notice_info.get("qywx_touser")
qywx_media_id = notice_info.get("qywx_media_id")
pushplus_token = notice_info.get("pushplus_token")
pushplus_topic = notice_info.get("pushplus_topic")
merge_push = notice_info.get("merge_push")
content_str = "\n————————————\n\n".join(content_list)
message_list = [content_str]
try:
notice = important_notice()
if notice:
message_list.append(notice)
content_list.append(notice)
except Exception as e:
print("获取重要通知失败:", e)
if merge_push is None:
if (
qmsg_key
or coolpushskey
or qywx_touser
or qywx_corpsecret
or qywx_agentid
or bark_url
or pushplus_token
):
merge_push = False
else:
merge_push = True
if not merge_push:
message_list = content_list
for message in message_list:
if qmsg_key:
try:
message2qmsg(qmsg_key=qmsg_key, qmsg_type=qmsg_type, content=message)
except Exception as e:
print("qmsg 推送失败", e)
if coolpushskey:
try:
message2coolpush(
coolpushskey=coolpushskey,
coolpushqq=coolpushqq,
coolpushwx=coolpushwx,
coolpushemail=coolpushemail,
content=message,
)
except Exception as e:
print("coolpush 推送失败", e)
if qywx_touser and qywx_corpid and qywx_corpsecret and qywx_agentid:
try:
message2qywxapp(
qywx_corpid=qywx_corpid,
qywx_agentid=qywx_agentid,
qywx_corpsecret=qywx_corpsecret,
qywx_touser=qywx_touser,
qywx_media_id=qywx_media_id,
content=message,
)
except Exception as e:
print("企业微信应用消息推送失败", e)
if bark_url:
try:
message2bark(bark_url=bark_url, content=message)
except Exception as e:
print("Bark 推送失败", e)
if dingtalk_access_token and dingtalk_secret:
try:
message2dingtalk(
dingtalk_secret=dingtalk_secret,
dingtalk_access_token=dingtalk_access_token,
content=message,
)
except Exception as e:
print("钉钉推送失败", e)
if fskey:
try:
message2feishu(fskey=fskey, content=message)
except Exception as e:
print("飞书推送失败", e)
if sckey:
try:
message2server(sckey=sckey, content=message)
except Exception as e:
print("Server 推送失败", e)
if sendkey:
try:
message2server_turbo(sendkey=sendkey, content=message)
except Exception as e:
print("Server Turbo 推送失败", e)
if qywx_key:
try:
message2qywxrobot(qywx_key=qywx_key, content=message)
except Exception as e:
print("企业微信群机器人推送失败", e)
if pushplus_token:
try:
message2pushplus(
pushplus_token=pushplus_token,
content=message,
pushplus_topic=pushplus_topic,
)
except Exception as e:
print("Pushplus 推送失败", e)
if tg_user_id and tg_bot_token:
try:
message2telegram(
tg_api_host=tg_api_host,
tg_proxy=tg_proxy,
tg_user_id=tg_user_id,
tg_bot_token=tg_bot_token,
content=message,
)
except Exception as e:
print("Telegram 推送失败", e)
if __name__ == "__main__":
print(important_notice())

0
dailycheckin/v2ex/__init__.py Executable file
View File

106
dailycheckin/v2ex/main.py Executable file
View File

@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
import json
import os
import re
import requests
import urllib3
from requests import utils
from dailycheckin import CheckIn
urllib3.disable_warnings()
class V2ex(CheckIn):
name = "V2EX 论坛"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def sign(session):
msg = []
response = session.get(url="https://www.v2ex.com/mission/daily", verify=False)
pattern = (
r"<input type=\"button\" class=\"super normal button\""
r" value=\".*?\" onclick=\"location\.href = \'(.*?)\';\" />"
)
urls = re.findall(pattern=pattern, string=response.text)
url = urls[0] if urls else None
if url is None:
return "cookie 可能过期"
elif url != "/balance":
headers = {"Referer": "https://www.v2ex.com/mission/daily"}
data = {"once": url.split("=")[-1]}
_ = session.get(
url="https://www.v2ex.com" + url,
verify=False,
headers=headers,
params=data,
)
response = session.get(url="https://www.v2ex.com/balance", verify=False)
total = re.findall(
pattern=r"<td class=\"d\" style=\"text-align: right;\">(\d+\.\d+)</td>",
string=response.text,
)
total = total[0] if total else "签到失败"
today = re.findall(
pattern=r'<td class="d"><span class="gray">(.*?)</span></td>',
string=response.text,
)
today = today[0] if today else "签到失败"
username = re.findall(
pattern=r"<a href=\"/member/.*?\" class=\"top\">(.*?)</a>",
string=response.text,
)
username = username[0] if username else "用户名获取失败"
msg += [
{"name": "帐号信息", "value": username},
{"name": "今日签到", "value": today},
{"name": "帐号余额", "value": total},
]
response = session.get(url="https://www.v2ex.com/mission/daily", verify=False)
data = re.findall(
pattern=r"<div class=\"cell\">(.*?)天</div>", string=response.text
)
data = data[0] + "" if data else "获取连续签到天数失败"
msg += [
{"name": "签到天数", "value": data},
]
return msg
def main(self):
cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
session = requests.session()
if self.check_item.get("proxy", ""):
proxies = {
"http": self.check_item.get("proxy", ""),
"https": self.check_item.get("proxy", ""),
}
session.proxies.update(proxies)
requests.utils.add_dict_to_cookiejar(session.cookies, cookie)
session.headers.update(
{
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36 Edg/87.0.664.66",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8",
}
)
msg = self.sign(session=session)
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("V2EX", [])[0]
print(V2ex(check_item=_check_item).main())

View File

75
dailycheckin/youdao/main.py Executable file
View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
import json
import os
import requests
from dailycheckin import CheckIn
class YouDao(CheckIn):
name = "有道云笔记"
def __init__(self, check_item):
self.check_item = check_item
@staticmethod
def sign(cookies):
ad_space = 0
refresh_cookies_res = requests.get(
"http://note.youdao.com/login/acc/pe/getsess?product=YNOTE", cookies=cookies
)
cookies = dict(refresh_cookies_res.cookies)
url = "https://note.youdao.com/yws/api/daupromotion?method=sync"
res = requests.post(url=url, cookies=cookies)
if "error" not in res.text:
checkin_response = requests.post(
url="https://note.youdao.com/yws/mapi/user?method=checkin",
cookies=cookies,
)
for i in range(3):
ad_response = requests.post(
url="https://note.youdao.com/yws/mapi/user?method=adRandomPrompt",
cookies=cookies,
)
ad_space += ad_response.json().get("space", 0) // 1048576
if "reward" in res.text:
sync_space = res.json().get("rewardSpace", 0) // 1048576
checkin_space = checkin_response.json().get("space", 0) // 1048576
space = sync_space + checkin_space + ad_space
youdao_message = "+{0}M".format(space)
else:
youdao_message = "获取失败"
else:
youdao_message = "Cookie 可能过期"
return youdao_message
def main(self):
youdao_cookie = {
item.split("=")[0]: item.split("=")[1]
for item in self.check_item.get("cookie").split("; ")
}
try:
ynote_pers = youdao_cookie.get("YNOTE_PERS", "")
uid = ynote_pers.split("||")[-2]
except Exception as e:
print(f"获取账号信息失败: {e}")
uid = "未获取到账号信息"
msg = self.sign(cookies=youdao_cookie)
msg = [
{"name": "帐号信息", "value": uid},
{"name": "获取空间", "value": msg},
]
msg = "\n".join([f"{one.get('name')}: {one.get('value')}" for one in msg])
return msg
if __name__ == "__main__":
with open(
os.path.join(os.path.dirname(os.path.dirname(__file__)), "config.json"),
"r",
encoding="utf-8",
) as f:
datas = json.loads(f.read())
_check_item = datas.get("YOUDAO", [])[0]
print(YouDao(check_item=_check_item).main())

10
pyproject.tomal Normal file
View File

@ -0,0 +1,10 @@
[tool.black]
line-length = 88
skip-string-normalization = true
[tool.autoflake]
check = true
verbose = true
[tool.isort]
profile = "black"

3
requirements.txt Executable file
View File

@ -0,0 +1,3 @@
requests~=2.25.1
rsa~=4.0
urllib3~=1.26.2

121
setup.py Executable file
View File

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
import io
import os
import sys
from os import walk
from os.path import isfile, join
from shutil import rmtree
from setuptools import Command, find_packages, setup
NAME = "dailycheckin"
FOLDER = "dailycheckin"
DESCRIPTION = "dailycheckin"
EMAIL = "133397418@qq.com"
AUTHOR = "Sitoi"
REQUIRES_PYTHON = ">=3.6.0"
VERSION = None
def read_file(filename):
with open(filename) as fp:
return fp.read().strip()
def read_requirements(filename):
return [
line.strip()
for line in read_file(filename).splitlines()
if not line.startswith("#")
]
REQUIRED = read_requirements("requirements.txt")
here = os.path.abspath(os.path.dirname(__file__))
try:
with io.open(os.path.join(here, "README.md"), encoding="utf-8") as f:
long_description = "\n" + f.read()
except FileNotFoundError:
long_description = DESCRIPTION
about = {}
if not VERSION:
with open(os.path.join(here, FOLDER, "__version__.py")) as f:
exec(f.read(), about)
else:
about["__version__"] = VERSION
def package_files(directories):
paths = []
for item in directories:
if isfile(item):
paths.append(join("..", item))
continue
for path, directories, filenames in walk(item):
for filename in filenames:
paths.append(join("..", path, filename))
return paths
class UploadCommand(Command):
description = "Build and publish the package."
user_options = []
@staticmethod
def status(s):
"""Prints things in bold."""
print("\033[1m{0}\033[0m".format(s))
def initialize_options(self):
pass
def finalize_options(self):
pass
def run(self):
try:
self.status("Removing previous builds…")
rmtree(os.path.join(here, "dist"))
except OSError:
pass
self.status("Building Source and Wheel (universal) distribution…")
os.system("{0} setup.py sdist bdist_wheel --universal".format(sys.executable))
self.status("Uploading the package to PyPI via Twine…")
os.system("twine upload dist/*")
sys.exit()
setup(
name=NAME,
version=about["__version__"],
description=DESCRIPTION,
long_description=long_description,
long_description_content_type="text/markdown",
author=AUTHOR,
author_email=EMAIL,
python_requires=REQUIRES_PYTHON,
url="https://sitoi.cn",
project_urls={"Documentation": "https://sitoi.github.io/dailycheckin/"},
packages=find_packages(exclude=("config",)),
install_requires=REQUIRED,
include_package_data=True,
license="MIT",
zip_safe=False,
entry_points={"console_scripts": ["dailycheckin = dailycheckin.main:checkin"]},
classifiers=[
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
],
cmdclass={"upload": UploadCommand},
)