去年10月,我写了一篇利用python搭建TG消息推送的文章,当时买了一台几十块钱一年的小鸡来挂这个脚本,虽然隔三差五机器嗝屁,但还是持续使用到了今年8月直到服务器到期,到期后一时没有合适机器来放脚本,于是又花几十块续费了一年,但是今年极其不稳定,每天都嗝屁,甚至有时候嗝好几天,截止本文发布时依旧处于嗝屁状态已连续一周。
今天在抖音刷到视频有人在GitHub放脚本,我突然冒出把推送脚本也放到GitHub的想法,在和豆包简单了解这个想法可行之后,马上开始了实践。
准备工作
- 创建 Telegram Bot
联系 Telegram 官方机器人 @BotFather,发送/newbot
,按提示设置名称和用户名,最终会获得一个Bot Token(格式:123456:ABC-DEF1234ghIkl-zyx57W2v1u123ew11),保存备用。
- 获取群组 ID
将你的 Bot 加入群组,然后在群组内发送任意消息,再向 @getidsbot 发送该消息的转发,会返回群组 ID(格式:-1001234567890),保存备用。
- 注册或登录GitHub。
开始搭建
- 配置 GitHub 仓库
新建一个 GitHub 仓库(示例命名rss-telegram-pusher),设置公开状态。
仓库根目录创建文件:rss_telegram.py
:
import feedparser
import logging
import asyncio
import json
import os
from telegram import Bot
from telegram.error import TelegramError
# 从环境变量读取配置
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
CHAT_ID = os.getenv("CHAT_ID")
RSS_URL = os.getenv("RSS_URL")
# 存储已发送ID的本地文件
POSTS_FILE = "sent_posts.json"
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# 读取已发送的post_id
def load_sent_posts():
try:
if os.path.exists(POSTS_FILE):
with open(POSTS_FILE, "r", encoding="utf-8") as f:
content = f.read().strip()
return json.loads(content) if content else []
logging.info("首次运行,创建空ID列表")
return []
except Exception as e:
logging.error(f"读取已发送ID失败:{str(e)}")
return []
# 保存已发送的post_id
def save_sent_posts(post_ids):
try:
with open(POSTS_FILE, "w", encoding="utf-8") as f:
json.dump(post_ids, f, ensure_ascii=False, indent=2)
logging.info(f"已保存ID列表(共{len(post_ids)}条):{post_ids}")
except Exception as e:
logging.error(f"保存已发送ID失败:{str(e)}")
# 获取RSS更新
def fetch_updates():
try:
logging.info(f"获取RSS源:{RSS_URL}")
feed = feedparser.parse(RSS_URL)
if feed.bozo:
logging.error(f"RSS解析错误:{feed.bozo_exception}")
return None
logging.info(f"成功获取{len(feed.entries)}条RSS条目")
return feed
except Exception as e:
logging.error(f"获取RSS失败:{str(e)}")
return None
# 转义Markdown特殊字符
def escape_markdown(text):
special_chars = r"_*~`>#+-.!()"
for char in special_chars:
text = text.replace(char, f"\{char}")
return text
# 发送单条消息到Telegram(带间隔)
async def send_message(bot, title, link, delay=3):
try:
# 发送前等待指定秒数(避免频率限制)
await asyncio.sleep(delay)
escaped_title = escape_markdown(title)
escaped_link = escape_markdown(link)
message = f"`{escaped_title}`\n{escaped_link}"
logging.info(f"发送消息:{message[:100]}")
await bot.send_message(
chat_id=CHAT_ID,
text=message,
parse_mode="MarkdownV2"
)
logging.info("消息发送成功")
return True
except TelegramError as e:
logging.error(f"Telegram发送失败:{str(e)}")
return False
# 检查更新并推送所有新帖子
async def check_for_updates(sent_post_ids):
updates = fetch_updates()
if not updates:
return
new_posts = []
for entry in updates.entries:
try:
# 提取帖子ID(适配URL格式)
guid_parts = entry.guid.split("-")
if len(guid_parts) < 2:
logging.warning(f"无效GUID格式:{entry.guid},跳过")
continue
post_id = guid_parts[-1].split(".")[0]
if not post_id.isdigit():
logging.warning(f"提取的ID非数字:{post_id},跳过")
continue
logging.info(f"解析到有效ID:{post_id},标题:{entry.title[:20]}...")
if post_id not in sent_post_ids:
new_posts.append((post_id, entry.title, entry.link))
except Exception as e:
logging.error(f"解析条目失败(GUID:{entry.guid}):{str(e)}")
continue
if new_posts:
# 按ID升序排序(从旧到新推送),若想从新到旧则用reverse=True
new_posts.sort(key=lambda x: int(x[0])) # 从小到大:旧→新
# new_posts.sort(key=lambda x: int(x[0]), reverse=True) # 从大到小:新→旧
logging.info(f"发现{len(new_posts)}条新帖子,准备依次推送(间隔3秒)")
async with Bot(token=TELEGRAM_TOKEN) as bot:
# 逐条推送,每条间隔3秒
for i, (post_id, title, link) in enumerate(new_posts):
# 第一条消息延迟0秒,后续每条延迟3秒
success = await send_message(bot, title, link, delay=3 if i > 0 else 0)
if success:
sent_post_ids.append(post_id) # 仅成功推送的ID才记录
# 保存所有成功推送的ID
save_sent_posts(sent_post_ids)
else:
logging.info("无新帖子需要推送")
# 主函数
async def main():
logging.info("===== 脚本开始运行 =====")
sent_post_ids = load_sent_posts()
try:
await check_for_updates(sent_post_ids)
except Exception as e:
logging.error(f"主逻辑执行失败:{str(e)}")
logging.info("===== 脚本运行结束 =====")
if __name__ == "__main__":
asyncio.run(main())
- 仓库根目录创建
.github/workflows/rss.yml
(定时任务配置):
name: RSS to Telegram
on:
schedule:
- cron: "*/2 * * * *" # 每2分钟运行一次
workflow_dispatch: # 允许手动触发
jobs:
run:
runs-on: ubuntu-latest
steps:
# 拉取仓库代码(带令牌认证,后续步骤自动继承权限)
- name: 拉取仓库代码
uses: actions/checkout@v4
with:
token: ${{ secrets.MY_GITHUB_TOKEN }} # 用令牌拉取,确保推送权限
fetch-depth: 0 # 获取完整历史,避免分支冲突
# 设置Python环境
- name: 设置Python环境
uses: actions/setup-python@v5
with:
python-version: "3.10"
# 安装依赖库
- name: 安装依赖
run: pip install python-telegram-bot feedparser
# 运行脚本(检测新内容并推送)
- name: 运行脚本
env:
TELEGRAM_TOKEN: ${{ secrets.TELEGRAM_TOKEN }}
CHAT_ID: ${{ secrets.CHAT_ID }}
RSS_URL: ${{ secrets.RSS_URL }}
run: python rss_pusher.py
# 提交更新后的ID文件(无repo_token,消除警告)
- name: 提交更新后的ID文件
uses: stefanzweifel/git-auto-commit-action@v4
with:
commit_message: "更新已发送的帖子ID"
file_pattern: "sent_posts.json" # 仅提交ID文件
branch: main # 仓库主分支(确保与你的分支名称一致)
commit_user_name: "GitHub Actions"
commit_user_email: "actions@github.com"
- 仓库根目录创建文件
sent_posts.json
:
[] # 留空即可
- 创建访问令牌
创建地址,创建个人访问令牌(经典),填写自定义令牌名称,有效期建议90天,令牌范围必须勾选repo
(写入仓库),完成创建后只会显示一次令牌token,复制保存。
设置仓库密钥
步骤: Settings(设置) → Secrets and variables(和变量) → Actions(行动) → Repository → secrets(存储库机密) → New repository secret(创建存储库机密)
添加以下密钥:
TELEGRAM_TOKEN:你的 Telegram Bot Token
CHAT_ID:目标群组 ID
RSS_URL:你的 RSS 源地址
MY_GITHUB_TOKEN:GitHub 令牌token
测试与验证
进入仓库 → 点击顶部 “Actions” → 左侧选择 “RSS to Telegram” → 点击 “Run workflow” → “Run workflow”,手动触发一次运行,查看群内是否有信息,如果没有,在仓库页点击顶部导航栏的Actions标签,进入工作流运行记录页面,可查看报错原因。
注:此脚本用于本论坛使用,适配本论坛帖子url,其它网站需稍作调整才可正常使用。
已有 0 条评论