diff --git a/pyproject.toml b/pyproject.toml index 292c1195..eb3fa7e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "ELF_RSS" -version = "2.6.17" +version = "2.6.18" description = "QQ机器人 RSS订阅 插件,订阅源建议选择 RSSHub" authors = ["Quan666 "] license = "GPL-3.0-only" @@ -16,6 +16,7 @@ keywords = ["nonebot", "nonebot2", "rss" ,"elf" ,"rsshub"] python = "^3.8.3" aiohttp = {extras = ["speedups"], version = "^3.8.4"} arrow = "^1.2.3" +async-timeout = "^4.0.2" bbcode = "^1.1.0" cachetools = "^5.3.0" emoji = "^2.2.0" @@ -34,7 +35,6 @@ pyquery = "^2.0.0" python-qbittorrent = "^0.4.3" tenacity = "^8.2.2" tinydb = "^4.7.1" -typing-extensions = "^4.5.0" yarl = "^1.9.1" [tool.poetry.dev-dependencies] diff --git a/requirements.txt b/requirements.txt index a7bef7a3..198611dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ aiohttp[speedups]~=3.8.4 arrow~=1.2.3 +async-timeout~=4.0.2 bbcode~=1.1.0 cachetools~=5.3.0 emoji~=2.2.0 @@ -19,5 +20,4 @@ pyquery~=2.0.0 python-qbittorrent~=0.4.3 tenacity~=8.2.2 tinydb~=4.7.1 -typing-extensions~=4.5.0 yarl~=1.9.1 \ No newline at end of file diff --git a/src/plugins/ELF_RSS2/__init__.py b/src/plugins/ELF_RSS2/__init__.py index b91d2a4a..1aaf157b 100644 --- a/src/plugins/ELF_RSS2/__init__.py +++ b/src/plugins/ELF_RSS2/__init__.py @@ -14,7 +14,7 @@ require("nonebot_plugin_apscheduler") from nonebot_plugin_apscheduler import scheduler # noqa -VERSION = "2.6.17" +VERSION = "2.6.18" __plugin_meta__ = PluginMetadata( name="ELF_RSS", diff --git a/src/plugins/ELF_RSS2/command/add_dy.py b/src/plugins/ELF_RSS2/command/add_dy.py index 41221319..7a294b73 100644 --- a/src/plugins/ELF_RSS2/command/add_dy.py +++ b/src/plugins/ELF_RSS2/command/add_dy.py @@ -57,10 +57,13 @@ async def handle_rss_add( event: MessageEvent, name_and_url: str = ArgPlainText("RSS_ADD") ) -> None: try: - name, url = name_and_url.split(" ") + name, url = name_and_url.strip().split(" ") except ValueError: await RSS_ADD.reject(prompt) return + if not name or not url: + await RSS_ADD.reject(prompt) + return if _ := Rss.get_one_by_name(name): await RSS_ADD.finish(f"已存在订阅名为 {name} 的订阅") diff --git a/src/plugins/ELF_RSS2/command/rsshub_add.py b/src/plugins/ELF_RSS2/command/rsshub_add.py index 5db5b414..f296ddde 100644 --- a/src/plugins/ELF_RSS2/command/rsshub_add.py +++ b/src/plugins/ELF_RSS2/command/rsshub_add.py @@ -37,12 +37,16 @@ @RSSHUB_ADD.handle() async def handle_first_receive(matcher: Matcher, args: Message = CommandArg()) -> None: - if args.extract_plain_text(): + if args.extract_plain_text().strip(): matcher.set_arg("route", args) @RSSHUB_ADD.got("name", prompt="请输入要订阅的订阅名") async def handle_feed_name(name: str = ArgPlainText("name")) -> None: + if not name.strip(): + await RSSHUB_ADD.reject("订阅名不能为空,请重新输入") + return + if _ := Rss.get_one_by_name(name=name): await RSSHUB_ADD.reject(f"已存在名为 {name} 的订阅,请重新输入") @@ -51,6 +55,10 @@ async def handle_feed_name(name: str = ArgPlainText("name")) -> None: async def handle_rsshub_routes( state: T_State, route: str = ArgPlainText("route") ) -> None: + if not route.strip(): + await RSSHUB_ADD.reject("路由名不能为空,请重新输入") + return + rsshub_url = URL(config.rsshub) # 对本机部署的 RSSHub 不使用代理 local_host = [ @@ -71,7 +79,7 @@ async def handle_rsshub_routes( rsshub_routes = await resp.json() if route not in rsshub_routes["data"]: - await RSSHUB_ADD.reject("没有这个路由,请重新输入") + await RSSHUB_ADD.reject(f"未找到名为 {route} 的 RSSHub 路由,请重新输入") else: route_list = state["route_list"] = rsshub_routes["data"][route]["routes"] if len(route_list) > 1: diff --git a/src/plugins/ELF_RSS2/config.py b/src/plugins/ELF_RSS2/config.py index 4a824fc0..f652d6de 100644 --- a/src/plugins/ELF_RSS2/config.py +++ b/src/plugins/ELF_RSS2/config.py @@ -21,6 +21,7 @@ class Config: limit: int = 200 max_length: int = 1024 # 正文长度限制,防止消息太长刷屏,以及消息过长发送失败的情况 enable_boot_message: bool = True # 是否启用启动时的提示消息推送 + debug: bool = False # 是否开启 debug 模式,开启后会打印更多的日志信息,同时检查更新时不会使用缓存,便于调试 zip_size: int = 2 * 1024 gif_zip_size: int = 6 * 1024 diff --git a/src/plugins/ELF_RSS2/my_trigger.py b/src/plugins/ELF_RSS2/my_trigger.py index 5b832126..ace02b8e 100644 --- a/src/plugins/ELF_RSS2/my_trigger.py +++ b/src/plugins/ELF_RSS2/my_trigger.py @@ -4,6 +4,7 @@ from apscheduler.executors.pool import ProcessPoolExecutor, ThreadPoolExecutor from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger +from async_timeout import timeout from nonebot import require from nonebot.log import logger @@ -13,14 +14,14 @@ require("nonebot_plugin_apscheduler") from nonebot_plugin_apscheduler import scheduler # noqa -wait_for = 5 * 60 - # 检测某个rss更新 async def check_update(rss: Rss) -> None: logger.info(f"{rss.name} 检查更新") try: - await asyncio.wait_for(rss_parsing.start(rss), timeout=wait_for) + wait_for = 5 * 60 if re.search(r"[_*/,-]", rss.time) else int(rss.time) * 60 + async with timeout(wait_for): + await rss_parsing.start(rss) except asyncio.TimeoutError: logger.error(f"{rss.name} 检查更新超时,结束此次任务!") diff --git a/src/plugins/ELF_RSS2/parsing/__init__.py b/src/plugins/ELF_RSS2/parsing/__init__.py index e63aafc7..4627e4fd 100644 --- a/src/plugins/ELF_RSS2/parsing/__init__.py +++ b/src/plugins/ELF_RSS2/parsing/__init__.py @@ -267,18 +267,42 @@ async def handle_date(item: Dict[str, Any]) -> str: return f"日期:{date.format('YYYY年MM月DD日 HH:mm:ss')}" +# 发送消息 +@ParsingBase.append_handler(parsing_type="after") +async def handle_message( + rss: Rss, + state: Dict[str, Any], + item: Dict[str, Any], + item_msg: str, +) -> str: + if rss.send_forward_msg: + return "" + + # 发送消息并写入文件 + await handle_send_msgs(rss=rss, messages=[item_msg], items=[item], state=state) + return "" + + @ParsingBase.append_after_handler() async def after_handler(rss: Rss, state: Dict[str, Any]) -> Dict[str, Any]: - # 发送消息并写入文件 - await handle_send_msgs( - rss=rss, messages=state["messages"], items=state["items"], state=state - ) + if rss.send_forward_msg: + # 发送消息并写入文件 + await handle_send_msgs( + rss=rss, messages=state["messages"], items=state["items"], state=state + ) - message_count = len(state["messages"]) db = state["tinydb"] + new_data_length = len(state["new_data"]) + cache_json_manage(db, new_data_length) + + message_count = len(state["change_data"]) + success_count = message_count - state["error_count"] - if state["success_count"] > 0: - logger.info(f"{rss.name} 新消息推送完毕,共计:{message_count}") + if message_count > 10 and len(state["messages"]) == 10: + return {} + + if success_count > 0: + logger.info(f"{rss.name} 新消息推送完毕,共计:{success_count}/{message_count}") elif message_count > 0: logger.error(f"{rss.name} 新消息推送失败,共计:{message_count}") else: @@ -287,8 +311,6 @@ async def after_handler(rss: Rss, state: Dict[str, Any]) -> Dict[str, Any]: if conn := state["conn"]: conn.close() - new_data_length = len(state["new_data"]) - cache_json_manage(db, new_data_length) db.close() return {} diff --git a/src/plugins/ELF_RSS2/parsing/handle_images.py b/src/plugins/ELF_RSS2/parsing/handle_images.py index 76362e39..b0a9431a 100644 --- a/src/plugins/ELF_RSS2/parsing/handle_images.py +++ b/src/plugins/ELF_RSS2/parsing/handle_images.py @@ -206,27 +206,25 @@ async def handle_img_combo(url: str, img_proxy: bool, rss: Optional[Rss] = None) 返回当前图片的CQ码,以base64格式编码发送 如获取图片失败将会提示图片走丢了 """ - content = await download_image(url, img_proxy) - if content: + if content := await download_image(url, img_proxy): if rss is not None and rss.download_pic: _url = URL(url) logger.debug(f"正在保存图片: {url}") try: save_image(content=content, file_url=_url, rss=rss) except Exception as e: - logger.warning(e) - logger.warning("在保存图片到本地时出现错误") - resize_content = await zip_pic(url, content) - if img_base64 := get_pic_base64(resize_content): - return f"[CQ:image,file=base64://{img_base64}]" - return f"\n图片走丢啦: {url}\n" + logger.warning(f"在保存图片到本地时出现错误\nE:{repr(e)}") + if resize_content := await zip_pic(url, content): + if img_base64 := get_pic_base64(resize_content): + return f"[CQ:image,file=base64://{img_base64}]" + return f"\n图片走丢啦 链接:[{url}]\n" -async def handle_img_combo_with_content(gif_url: str, content: bytes) -> str: - resize_content = await zip_pic(gif_url, content) - if img_base64 := get_pic_base64(resize_content): - return f"[CQ:image,file=base64://{img_base64}]" - return "\n图片走丢啦\n" +async def handle_img_combo_with_content(url: str, content: bytes) -> str: + if resize_content := await zip_pic(url, content): + if img_base64 := get_pic_base64(resize_content): + return f"[CQ:image,file=base64://{img_base64}]" + return f"\n图片走丢啦 链接:[{url}]\n" if url else "\n图片走丢啦\n" # 处理图片、视频 diff --git a/src/plugins/ELF_RSS2/parsing/parsing_rss.py b/src/plugins/ELF_RSS2/parsing/parsing_rss.py index 63df4aa6..bac9fc4d 100644 --- a/src/plugins/ELF_RSS2/parsing/parsing_rss.py +++ b/src/plugins/ELF_RSS2/parsing/parsing_rss.py @@ -3,11 +3,10 @@ from typing import Any, Callable, Dict, List, Optional, Tuple from tinydb import TinyDB -from tinydb.middlewares import CachingMiddleware -from tinydb.storages import JSONStorage from ..config import DATA_PATH from ..rss_class import Rss +from ..utils import partition_list # 订阅器启动的时候将解析器注册到rss实例类?,避免每次推送时再匹配 @@ -55,6 +54,7 @@ class ParsingBase: "source": [], "date": [], "torrent": [], + "after": [], # item的最后处理,此处调用消息截取、发送 } """ @@ -188,7 +188,6 @@ async def start(self, rss_name: str, new_rss: Dict[str, Any]) -> None: _file = DATA_PATH / f"{Rss.handle_name(rss_name)}.json" db = TinyDB( _file, - storage=CachingMiddleware(JSONStorage), # type: ignore encoding="utf-8", sort_keys=True, indent=4, @@ -201,6 +200,7 @@ async def start(self, rss_name: str, new_rss: Dict[str, Any]) -> None: "change_data": [], # 更新的消息列表 "conn": None, # 数据库连接 "tinydb": db, # 缓存 json + "error_count": 0, # 消息发送失败计数 } ) self.state, _ = await _run_handlers(self.before_handler, self.rss, self.state) @@ -213,26 +213,30 @@ async def start(self, rss_name: str, new_rss: Dict[str, Any]) -> None: "items": [], } ) - for item in self.state["change_data"]: - item_msg = "" - for handler_list in self.handler.values(): - # 用于保存上一次处理结果 - tmp = "" - tmp_state = {"continue": True} # 是否继续执行后续处理 - - # 某一个内容的处理如正文,传入原文与上一次处理结果,此次处理完后覆盖 - _, tmp = await _run_handlers( - handler_list, - self.rss, - self.state, - item=item, - item_msg=item_msg, - tmp=tmp, - tmp_state=tmp_state, - ) - item_msg += tmp - self.state["messages"].append(item_msg) - self.state["items"].append(item) - - # 最后处理 - self.state, _ = await _run_handlers(self.after_handler, self.rss, self.state) + if change_data := self.state["change_data"]: + for parted_item_list in partition_list(change_data, 10): + for item in parted_item_list: + item_msg = "" + for handler_list in self.handler.values(): + # 用于保存上一次处理结果 + tmp = "" + tmp_state = {"continue": True} # 是否继续执行后续处理 + + # 某一个内容的处理如正文,传入原文与上一次处理结果,此次处理完后覆盖 + _, tmp = await _run_handlers( + handler_list, + self.rss, + self.state, + item=item, + item_msg=item_msg, + tmp=tmp, + tmp_state=tmp_state, + ) + item_msg += tmp + self.state["messages"].append(item_msg) + self.state["items"].append(item) + + _, _ = await _run_handlers(self.after_handler, self.rss, self.state) + self.state["messages"] = self.state["items"] = [] + else: + _, _ = await _run_handlers(self.after_handler, self.rss, self.state) diff --git a/src/plugins/ELF_RSS2/parsing/send_message.py b/src/plugins/ELF_RSS2/parsing/send_message.py index 84f91d98..d532f334 100644 --- a/src/plugins/ELF_RSS2/parsing/send_message.py +++ b/src/plugins/ELF_RSS2/parsing/send_message.py @@ -5,7 +5,8 @@ import arrow from nonebot import get_bot -from nonebot.adapters.onebot.v11 import Bot +from nonebot.adapters.onebot.v11 import Bot, Message, MessageSegment +from nonebot.adapters.onebot.v11.exception import NetworkError from nonebot.log import logger from ..config import config @@ -32,7 +33,14 @@ async def send_msg( flag = any( await asyncio.gather( *[ - send_private_msg(bot, messages, int(user_id), items, header_message) + send_private_msg( + bot, + messages, + int(user_id), + items, + header_message, + rss.send_forward_msg, + ) for user_id in rss.user_id ] ) @@ -43,7 +51,12 @@ async def send_msg( await asyncio.gather( *[ send_group_msg( - bot, messages, int(group_id), items, header_message + bot, + messages, + int(group_id), + items, + header_message, + rss.send_forward_msg, ) for group_id in rss.group_id ] @@ -75,6 +88,7 @@ async def send_private_msg( user_id: int, items: List[Dict[str, Any]], header_message: str, + send_forward_msg: bool, ) -> bool: return await send_msgs_with_lock( bot=bot, @@ -86,6 +100,7 @@ async def send_private_msg( send_func=lambda user_id, message: bot.send_private_msg( user_id=user_id, message=message # type: ignore ), + send_forward_msg=send_forward_msg, ) @@ -96,6 +111,7 @@ async def send_group_msg( group_id: int, items: List[Dict[str, Any]], header_message: str, + send_forward_msg: bool, ) -> bool: return await send_msgs_with_lock( bot=bot, @@ -107,6 +123,7 @@ async def send_group_msg( send_func=lambda group_id, message: bot.send_group_msg( group_id=group_id, message=message # type: ignore ), + send_forward_msg=send_forward_msg, ) @@ -164,8 +181,9 @@ async def send_multiple_msgs( ) -> bool: flag = False for message, item in zip(messages, items): - flag |= await send_single_msg( - message, target_id, item, header_message, send_func + flag = ( + await send_single_msg(message, target_id, item, header_message, send_func) + or flag ) return flag @@ -178,6 +196,7 @@ async def send_msgs_with_lock( items: List[Dict[str, Any]], header_message: str, send_func: Callable[[Union[int, str], str], Coroutine[Any, Any, Dict[str, Any]]], + send_forward_msg: bool = False, ) -> bool: start_time = arrow.now() async with sending_lock[(target_id, target_type)]: @@ -185,20 +204,10 @@ async def send_msgs_with_lock( flag = await send_single_msg( messages[0], target_id, items[0], header_message, send_func ) - elif target_type != "guild_channel": - forward_messages = handle_forward_message(bot, [header_message] + messages) - try: - await bot.send_forward_msg( - user_id=target_id if target_type == "private" else 0, - group_id=target_id if target_type == "group" else 0, - messages=forward_messages, - ) - flag = True - except Exception as e: - logger.warning(f"E: {repr(e)}\n合并消息发送失败!将尝试逐条发送!") - flag = await send_multiple_msgs( - messages, target_id, items, header_message, send_func - ) + elif send_forward_msg and target_type != "guild_channel": + flag = await try_sending_forward_msg( + bot, messages, target_id, target_type, items, header_message, send_func + ) else: flag = await send_multiple_msgs( messages, target_id, items, header_message, send_func @@ -207,18 +216,49 @@ async def send_msgs_with_lock( return flag -def handle_forward_message(bot: Bot, messages: List[str]) -> List[Dict[str, Any]]: - return [ - { - "type": "node", - "data": { - "name": list(config.nickname)[0] if config.nickname else "\u200b", - "uin": bot.self_id, - "content": message, - }, - } - for message in messages - ] +async def try_sending_forward_msg( + bot: Bot, + messages: List[str], + target_id: Union[int, str], + target_type: str, + items: List[Dict[str, Any]], + header_message: str, + send_func: Callable[[Union[int, str], str], Coroutine[Any, Any, Dict[str, Any]]], +) -> bool: + forward_messages = handle_forward_message(bot, [header_message] + messages) + try: + if target_type == "private": + await bot.send_private_forward_msg( + user_id=target_id, messages=forward_messages + ) + elif target_type == "group": + await bot.send_group_forward_msg( + group_id=target_id, messages=forward_messages + ) + flag = True + except NetworkError: + # 如果图片体积过大或数量过多,很可能触发这个错误,但实际上发送成功,不过高概率吞图,只警告不处理 + logger.warning("图片过大或数量过多,可能发送失败!") + flag = True + except Exception as e: + logger.warning(f"E: {repr(e)}\n合并消息发送失败!将尝试逐条发送!") + flag = await send_multiple_msgs( + messages, target_id, items, header_message, send_func + ) + return flag + + +def handle_forward_message(bot: Bot, messages: List[str]) -> Message: + return Message( + [ + MessageSegment.node_custom( + user_id=int(bot.self_id), + nickname=list(config.nickname)[0] if config.nickname else "\u200b", + content=message, + ) + for message in messages + ] + ) # 发送消息并写入文件 @@ -239,13 +279,11 @@ async def handle_send_msgs( if item.get("to_send"): item.pop("to_send") - state["success_count"] = len(state["messages"]) - else: for item in items: item["to_send"] = True - state["success_count"] = 0 + state["error_count"] += len(messages) for item in items: write_item(db, item) diff --git a/src/plugins/ELF_RSS2/rss_parsing.py b/src/plugins/ELF_RSS2/rss_parsing.py index a2e18de3..cc19e531 100644 --- a/src/plugins/ELF_RSS2/rss_parsing.py +++ b/src/plugins/ELF_RSS2/rss_parsing.py @@ -6,8 +6,6 @@ from nonebot.adapters.onebot.v11 import Bot from nonebot.log import logger from tinydb import TinyDB -from tinydb.middlewares import CachingMiddleware -from tinydb.storages import JSONStorage from yarl import URL from . import my_trigger as tr @@ -58,7 +56,6 @@ async def save_first_time_fetch(rss: Rss, new_rss: Dict[str, Any]) -> None: with TinyDB( _file, - storage=CachingMiddleware(JSONStorage), # type: ignore encoding="utf-8", sort_keys=True, indent=4, diff --git a/src/plugins/ELF_RSS2/utils.py b/src/plugins/ELF_RSS2/utils.py index 93fd96dc..f6badc34 100644 --- a/src/plugins/ELF_RSS2/utils.py +++ b/src/plugins/ELF_RSS2/utils.py @@ -3,7 +3,7 @@ import math import re from contextlib import suppress -from typing import Any, Dict, List, Mapping, Optional +from typing import Any, Dict, Generator, List, Mapping, Optional from cachetools import TTLCache from cachetools.keys import hashkey @@ -194,3 +194,10 @@ async def filter_valid_guild_channel_id_list( continue valid_guild_channel_id_list.append(guild_channel_id) return valid_guild_channel_id_list + + +def partition_list( + input_list: List[Any], partition_size: int +) -> Generator[List[Any], None, None]: + for i in range(0, len(input_list), partition_size): + yield input_list[i : i + partition_size]