版本比较

标识

  • 该行被添加。
  • 该行被删除。
  • 格式已经改变。

提问:

代码块
给我写一个监控程序,用于监控推特列表中用户,如果有用户更新了推文,及时微信通知我,并帮我优化一下推特接口速率,避免API接口被封

回答:

为了实现一个监控Twitter列表中用户更新推文并通过微信通知的程序,我们可以使用Python编写一个脚本。这个脚本会定期检查Twitter列表中的用户是否有新的推文,并通过企业微信API发送通知。
以下是实现步骤:


1. 安装必要的库

代码块
pip3 install tweepy requests


2. 配置API凭证

确保你有以下API凭证:

  • Twitter API凭证(API Key, API Secret Key, Access Token, Access Token Secret)
  • 企业微信API凭证(CorpID, Secret, AgentID)


3. 编写监控程序

代码块
import tweepy
import requests
import time
from datetime import datetime, timedelta

# Twitter API凭证
API_KEY = 'your_twitter_api_key'
API_SECRET_KEY = 'your_twitter_api_secret_key'
ACCESS_TOKEN = 'your_twitter_access_token'
ACCESS_TOKEN_SECRET = 'your_twitter_access_token_secret'

# 企业微信API凭证
CORP_ID = 'your_corp_id'
SECRET = 'your_app_secret'
AGENT_ID = 1000002  # 应用的AgentID

# 监控的Twitter列表ID
LIST_ID = 'your_twitter_list_id'

# 初始化Twitter API
auth = tweepy.OAuth1UserHandler(API_KEY, API_SECRET_KEY, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)

# 获取企业微信Access Token
def get_wechat_access_token():
    token_url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={CORP_ID}&corpsecret={SECRET}'
    response = requests.get(token_url)
    token_data = response.json()
    if token_data['errcode'] == 0:
        return token_data['access_token']
    else:
        print(f'Error getting WeChat access token: {token_data["errmsg"]}')
        return None

# 发送微信通知
def send_wechat_notification(access_token, content):
    send_msg_url = f'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}'
    message_data = {
        "touser": "@all",  # 接收消息的成员ID列表
        "msgtype": "text",
        "agentid": AGENT_ID,
        "text": {
            "content": content
        },
        "safe": 0
    }
    response = requests.post(send_msg_url, json=message_data)
    msg_data = response.json()
    if msg_data['errcode'] == 0:
        print('Message sent successfully!')
    else:
        print(f'Error sending message: {msg_data["errmsg"]}')

# 监控Twitter列表
def monitor_twitter_list():
    last_check_time = datetime.utcnow() - timedelta(minutes=15)  # 初始检查时间
    while True:
        current_time = datetime.utcnow()
        tweets = api.list_timeline(list_id=LIST_ID, since_id=last_check_time)
        for tweet in tweets:
            tweet_time = tweet.created_at
            if tweet_time > last_check_time:
                content = f'New tweet from @{tweet.user.screen_name}: {tweet.text}'
                access_token = get_wechat_access_token()
                if access_token:
                    send_wechat_notification(access_token, content)
        last_check_time = current_time
        time.sleep(60)  # 每分钟检查一次,调整速率以避免API被封

if __name__ == "__main__":
    monitor_twitter_list()


4. 运行程序
将上述代码保存为一个Python脚本(例如twitter_monitor.py),并运行该脚本:

代码块
python twitter_monitor.py


优化速率限制

  • 检查频率:在代码中,我们使用了time.sleep(60)来每分钟检查一次推文更新。你可以根据Twitter API的速率限制和你的需求调整检查频率。
  • 错误处理:在获取和发送请求时,添加错误处理机制,避免因网络问题或其他原因导致程序崩溃。
  • 速率限制处理:Twitter API有速率限制,确保在多次请求后进行适当的延迟,避免API被封。


注意事项

  • API凭证的安全性:确保你的API凭证保密,不要在公开的地方(如GitHub仓库)泄露。
  • 速率限制:根据Twitter API的速率限制,合理调整检查频率,避免API被封。

如果你在实现过程中遇到任何问题,随时告诉我,我会尽力帮你解决。