🫠先改回源代码,后续逐步审核推送实现功能 #28

This commit is contained in:
柳神 2024-11-21 00:19:05 +08:00
parent 83c2a8b3ef
commit 55d88561b2

View File

@ -4,12 +4,9 @@ from dateutil import parser
import requests import requests
import feedparser import feedparser
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, urlunparse, urljoin
import ipaddress
import socket
# 设置日志配置 # 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='😋%(levelname)s: %(message)s')
# 标准化的请求头 # 标准化的请求头
headers = { headers = {
@ -28,19 +25,35 @@ def format_published_time(time_str):
返回: 返回:
str: 格式化后的时间字符串若解析失败返回空字符串 str: 格式化后的时间字符串若解析失败返回空字符串
""" """
# 尝试自动解析输入时间字符串
try: try:
parsed_time = parser.parse(time_str, fuzzy=True) parsed_time = parser.parse(time_str, fuzzy=True)
# 如果没有时区信息,则将其视为 UTC
if parsed_time.tzinfo is None:
parsed_time = parsed_time.replace(tzinfo=timezone.utc)
# 转换为上海时区UTC+8
shanghai_time = parsed_time.astimezone(timezone(timedelta(hours=8)))
return shanghai_time.strftime('%Y-%m-%d %H:%M')
except (ValueError, parser.ParserError): except (ValueError, parser.ParserError):
logging.warning(f"无法解析时间字符串:{time_str}") # 定义支持的时间格式
return '' time_formats = [
'%a, %d %b %Y %H:%M:%S %z', # Mon, 11 Mar 2024 14:08:32 +0000
'%a, %d %b %Y %H:%M:%S GMT', # Wed, 19 Jun 2024 09:43:53 GMT
'%Y-%m-%dT%H:%M:%S%z', # 2024-03-11T14:08:32+00:00
'%Y-%m-%dT%H:%M:%SZ', # 2024-03-11T14:08:32Z
'%Y-%m-%d %H:%M:%S', # 2024-03-11 14:08:32
'%Y-%m-%d' # 2024-03-11
]
for fmt in time_formats:
try:
parsed_time = datetime.strptime(time_str, fmt)
break
except ValueError:
continue
else:
logging.warning(f"无法解析时间字符串:{time_str}")
return ''
# 处理时区转换
if parsed_time.tzinfo is None:
parsed_time = parsed_time.replace(tzinfo=timezone.utc)
shanghai_time = parsed_time.astimezone(timezone(timedelta(hours=8)))
return shanghai_time.strftime('%Y-%m-%d %H:%M')
def check_feed(blog_url, session): def check_feed(blog_url, session):
""" """
@ -59,6 +72,7 @@ def check_feed(blog_url, session):
如果 feed 链接可访问则返回 ['feed', feed_url] 如果 feed 链接可访问则返回 ['feed', feed_url]
如果都不可访问则返回 ['none', blog_url] 如果都不可访问则返回 ['none', blog_url]
""" """
possible_feeds = [ possible_feeds = [
('atom', '/atom.xml'), ('atom', '/atom.xml'),
('rss', '/rss.xml'), # 2024-07-26 添加 /rss.xml内容的支持 ('rss', '/rss.xml'), # 2024-07-26 添加 /rss.xml内容的支持
@ -71,109 +85,16 @@ def check_feed(blog_url, session):
for feed_type, path in possible_feeds: for feed_type, path in possible_feeds:
feed_url = blog_url.rstrip('/') + path feed_url = blog_url.rstrip('/') + path
# 确保 feed_url 使用 https 协议
feed_url = ensure_https(feed_url)
try: try:
response = session.get(feed_url, headers=headers, timeout=timeout) response = session.get(feed_url, headers=headers, timeout=timeout)
if response.status_code == 200: if response.status_code == 200:
return [feed_type, feed_url] return [feed_type, feed_url]
except requests.RequestException: except requests.RequestException:
continue continue
logging.warning(f"无法找到 {blog_url} 的订阅链接")
return ['none', blog_url] return ['none', blog_url]
def is_bad_link(link): def parse_feed(url, session, count=5):
"""
判断链接是否是IP地址+端口localhost+端口或缺少域名的链接
参数
link (str): 要检查的链接
返回
bool: 如果是IP地址+端口localhost+端口或缺少域名返回True否则返回False
"""
try:
parsed_url = urlparse(link)
netloc = parsed_url.netloc
if not netloc:
return True # 缺少主机部分
# 分割出主机和端口
if ':' in netloc:
host, _ = netloc.split(':', 1)
else:
host = netloc
# 检查是否是localhost或环回地址127.0.0.1包括IPv6的 ::1
if host in ['localhost', '::1', '127.0.0.1']:
return True
# 检查是否是IP地址
try:
ip = ipaddress.ip_address(host)
if socket.inet_aton(host) or ip.is_private or ip.is_loopback:
return True
return False
except ValueError:
return False
except Exception:
return False
def ensure_https(url):
"""
确保链接使用 https 协议
参数
url (str): 原始链接
返回
str: 使用 https 协议的链接
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'https':
parsed_url = parsed_url._replace(scheme='https')
return urlunparse(parsed_url)
return url
def fix_link(link, blog_url):
"""
修复链接将IP地址localhost或缺少域名的链接替换为blog_url的域名并确保使用HTTPS
参数
link (str): 原始链接
blog_url (str): 博客的URL
返回
str: 修复后的链接
"""
if not link or not blog_url:
return link
parsed_blog_url = urlparse(blog_url)
# 如果链接是相对路径,或者缺少协议,则使用 urljoin
if not urlparse(link).netloc:
link = urljoin(blog_url, link)
parsed_link = urlparse(link)
# 强制使用 https 协议
if parsed_link.scheme != 'https':
parsed_link = parsed_link._replace(scheme='https')
if is_bad_link(link):
fixed_link = urlunparse(parsed_link._replace(netloc=parsed_blog_url.netloc))
return fixed_link
else:
# 确保链接使用 https 协议
fixed_link = urlunparse(parsed_link)
if parsed_link.scheme != 'https':
logging.info(f"将链接协议从 {link} 强制改为 HTTPS: {fixed_link}")
return fixed_link
def parse_feed(url, session, count=5, blog_url=None):
""" """
解析 Atom RSS2 feed 并返回包含网站名称作者原链接和每篇文章详细内容的字典 解析 Atom RSS2 feed 并返回包含网站名称作者原链接和每篇文章详细内容的字典
@ -184,7 +105,6 @@ def parse_feed(url, session, count=5, blog_url=None):
url (str): Atom RSS2 feed URL url (str): Atom RSS2 feed URL
session (requests.Session): 用于请求的会话对象 session (requests.Session): 用于请求的会话对象
count (int): 获取文章数的最大数如果小于则全部获取如果文章数大于则只取前 count 篇文章 count (int): 获取文章数的最大数如果小于则全部获取如果文章数大于则只取前 count 篇文章
blog_url (str): 目标博客的 URL用于修复文章链接
返回 返回
dict: 包含网站名称作者原链接和每篇文章详细内容的字典 dict: 包含网站名称作者原链接和每篇文章详细内容的字典
@ -200,23 +120,22 @@ def parse_feed(url, session, count=5, blog_url=None):
'link': feed.feed.link if 'link' in feed.feed else '', 'link': feed.feed.link if 'link' in feed.feed else '',
'articles': [] 'articles': []
} }
for entry in feed.entries: for i, entry in enumerate(feed.entries):
if 'published' in entry: if 'published' in entry:
published = format_published_time(entry.published) published = format_published_time(entry.published)
elif 'updated' in entry: elif 'updated' in entry:
published = format_published_time(entry.updated) published = format_published_time(entry.updated)
# 输出警告信息
logging.warning(f"文章 {entry.title} 未包含发布时间,已使用更新时间 {published}") logging.warning(f"文章 {entry.title} 未包含发布时间,已使用更新时间 {published}")
else: else:
published = '' published = ''
logging.warning(f"文章 {entry.title} 未包含任何时间信息") logging.warning(f"文章 {entry.title} 未包含任何时间信息")
entry_link = entry.link if 'link' in entry else ''
fixed_link = fix_link(entry_link, blog_url)
article = { article = {
'title': entry.title if 'title' in entry else '', 'title': entry.title if 'title' in entry else '',
'author': result['author'], 'author': result['author'],
'link': fixed_link, 'link': entry.link if 'link' in entry else '',
'published': published, 'published': published,
'summary': entry.summary if 'summary' in entry else '', 'summary': entry.summary if 'summary' in entry else '',
'content': entry.content[0].value if 'content' in entry and entry.content else entry.description if 'description' in entry else '' 'content': entry.content[0].value if 'content' in entry and entry.content else entry.description if 'description' in entry else ''
@ -224,15 +143,13 @@ def parse_feed(url, session, count=5, blog_url=None):
result['articles'].append(article) result['articles'].append(article)
# 对文章按时间排序,并只取前 count 篇文章 # 对文章按时间排序,并只取前 count 篇文章
result['articles'] = sorted( result['articles'] = sorted(result['articles'], key=lambda x: datetime.strptime(x['published'], '%Y-%m-%d %H:%M'), reverse=True)
result['articles'], if count < len(result['articles']):
key=lambda x: datetime.strptime(x['published'], '%Y-%m-%d %H:%M') if x['published'] else datetime.min, result['articles'] = result['articles'][:count]
reverse=True
)[:count]
return result return result
except Exception as e: except Exception as e:
logging.error(f"无法解析FEED地址{url} : {e}", exc_info=True) logging.error(f"无法解析FEED地址{url} ,请自行排查原因!", exc_info=True)
return { return {
'website_name': '', 'website_name': '',
'author': '', 'author': '',
@ -254,10 +171,7 @@ def process_friend(friend, session, count, specific_RSS=[]):
dict: 包含朋友博客信息的字典 dict: 包含朋友博客信息的字典
""" """
name, blog_url, avatar = friend name, blog_url, avatar = friend
# 确保博客 URL 使用 https 协议
blog_url = ensure_https(blog_url)
# 如果 specific_RSS 中有对应的 name则直接返回 feed_url # 如果 specific_RSS 中有对应的 name则直接返回 feed_url
if specific_RSS is None: if specific_RSS is None:
specific_RSS = [] specific_RSS = []
@ -271,7 +185,7 @@ def process_friend(friend, session, count, specific_RSS=[]):
logging.info(f"{name}”的博客“{blog_url}”的feed类型为“{feed_type}") logging.info(f"{name}”的博客“{blog_url}”的feed类型为“{feed_type}")
if feed_type != 'none': if feed_type != 'none':
feed_info = parse_feed(feed_url, session, count, blog_url=blog_url) feed_info = parse_feed(feed_url, session, count)
articles = [ articles = [
{ {
'title': article['title'], 'title': article['title'],
@ -285,7 +199,7 @@ def process_friend(friend, session, count, specific_RSS=[]):
for article in articles: for article in articles:
logging.info(f"{name} 发布了新文章:{article['title']},时间:{article['created']},链接:{article['link']}") logging.info(f"{name} 发布了新文章:{article['title']},时间:{article['created']},链接:{article['link']}")
return { return {
'name': name, 'name': name,
'status': 'active', 'status': 'active',
@ -309,22 +223,16 @@ def fetch_and_process_data(json_url, specific_RSS=[], count=5):
specific_RSS (list): 包含特定 RSS 源的字典列表 [{name, url}] specific_RSS (list): 包含特定 RSS 源的字典列表 [{name, url}]
返回 返回
tuple: (处理后的数据字典, 错误的朋友信息列表) dict: 包含统计数据和文章信息的字典
""" """
session = requests.Session() session = requests.Session()
retries = requests.packages.urllib3.util.retry.Retry(
total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504]
)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
try: try:
response = session.get(json_url, headers=headers, timeout=timeout) response = session.get(json_url, headers=headers, timeout=timeout)
friends_data = response.json() friends_data = response.json()
except Exception as e: except Exception as e:
logging.error(f"无法获取链接:{json_url},出现的问题为{e}", exc_info=True) logging.error(f"无法获取链接:{json_url} {e}", exc_info=True)
return None, [] return None
total_friends = len(friends_data['friends']) total_friends = len(friends_data['friends'])
active_friends = 0 active_friends = 0
@ -365,7 +273,7 @@ def fetch_and_process_data(json_url, specific_RSS=[], count=5):
}, },
'article_data': article_data 'article_data': article_data
} }
logging.info(f"数据处理完成,总共有 {total_friends} 位朋友,其中 {active_friends} 位博客可访问,{error_friends} 位博客无法访问") logging.info(f"数据处理完成,总共有 {total_friends} 位朋友,其中 {active_friends} 位博客可访问,{error_friends} 位博客无法访问")
return result, error_friends_info return result, error_friends_info
@ -381,15 +289,16 @@ def sort_articles_by_time(data):
dict: 按时间排序后的文章信息字典 dict: 按时间排序后的文章信息字典
""" """
# 先确保每个元素存在时间 # 先确保每个元素存在时间
for article in data.get('article_data', []): for article in data['article_data']:
if not article.get('created'): if article['created'] == '' or article['created'] == None:
article['created'] = '2024-01-01 00:00' article['created'] = '2024-01-01 00:00'
# 输出警告信息
logging.warning(f"文章 {article['title']} 未包含时间信息,已设置为默认时间 2024-01-01 00:00") logging.warning(f"文章 {article['title']} 未包含时间信息,已设置为默认时间 2024-01-01 00:00")
if 'article_data' in data: if 'article_data' in data:
sorted_articles = sorted( sorted_articles = sorted(
data['article_data'], data['article_data'],
key=lambda x: datetime.strptime(x['created'], '%Y-%m-%d %H:%M') if x['created'] else datetime.min, key=lambda x: datetime.strptime(x['created'], '%Y-%m-%d %H:%M'),
reverse=True reverse=True
) )
data['article_data'] = sorted_articles data['article_data'] = sorted_articles
@ -414,15 +323,13 @@ def marge_data_from_json_url(data, marge_json_url):
return data return data
if 'article_data' in marge_data: if 'article_data' in marge_data:
logging.info(f"开始合并数据,原数据共有 {len(data['article_data'])} 篇文章,境外数据共有 {len(marge_data['article_data'])} 篇文章") logging.info(f"开始合并数据,原数据共有 {len(data['article_data'])} 篇文章,第三方数据共有 {len(marge_data['article_data'])} 篇文章")
data['article_data'].extend(marge_data['article_data'])
existing_links = set(article['link'] for article in data['article_data']) data['article_data'] = list({v['link']:v for v in data['article_data']}.values())
new_articles = [article for article in marge_data['article_data'] if article['link'] not in existing_links]
data['article_data'].extend(new_articles)
logging.info(f"合并数据完成,现在共有 {len(data['article_data'])} 篇文章") logging.info(f"合并数据完成,现在共有 {len(data['article_data'])} 篇文章")
return data return data
import requests
def marge_errors_from_json_url(errors, marge_json_url): def marge_errors_from_json_url(errors, marge_json_url):
""" """
@ -443,13 +350,14 @@ def marge_errors_from_json_url(errors, marge_json_url):
logging.error(f"无法获取链接:{marge_json_url},出现的问题为:{e}", exc_info=True) logging.error(f"无法获取链接:{marge_json_url},出现的问题为:{e}", exc_info=True)
return errors return errors
# 合并错误信息列表并去重 # 提取 marge_errors 中的 URL
errors_set = set(tuple(error) for error in errors) marge_urls = {item[1] for item in marge_errors}
marge_errors_set = set(tuple(error) for error in marge_errors)
combined_errors = list(errors_set.union(marge_errors_set))
logging.info(f"合并错误信息完成,合并后共有 {len(combined_errors)} 位朋友") # 使用过滤器保留 errors 中在 marge_errors 中出现的 URL
return combined_errors filtered_errors = [error for error in errors if error[1] in marge_urls]
logging.info(f"合并错误信息完成,合并后共有 {len(filtered_errors)} 位朋友")
return filtered_errors
def deal_with_large_data(result): def deal_with_large_data(result):
""" """
@ -463,7 +371,7 @@ def deal_with_large_data(result):
""" """
result = sort_articles_by_time(result) result = sort_articles_by_time(result)
article_data = result.get("article_data", []) article_data = result.get("article_data", [])
# 检查文章数量是否大于 150 # 检查文章数量是否大于 150
max_articles = 150 max_articles = 150
if len(article_data) > max_articles: if len(article_data) > max_articles:
@ -483,4 +391,4 @@ def deal_with_large_data(result):
result["statistical_data"]["article_num"] = len(filtered_articles) result["statistical_data"]["article_num"] = len(filtered_articles)
logging.info(f"数据处理完成,保留 {len(filtered_articles)} 篇文章") logging.info(f"数据处理完成,保留 {len(filtered_articles)} 篇文章")
return result return result