🎈 perf: 强制https&link域名转换

This commit is contained in:
xxfer 2024-11-18 15:58:23 +08:00
parent 4301500ee3
commit 08f3b38715

View File

@ -1,8 +1,15 @@
import logging
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from dateutil import parser from dateutil import parser
import requests import requests
import feedparser import feedparser
from concurrent.futures import ThreadPoolExecutor, as_completed from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, urlunparse, urljoin
import ipaddress
import socket
# 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 标准化的请求头 # 标准化的请求头
headers = { headers = {
@ -21,9 +28,8 @@ def format_published_time(time_str):
返回: 返回:
str: 格式化后的时间字符串若解析失败返回空字符串 str: 格式化后的时间字符串若解析失败返回空字符串
""" """
# 尝试自动解析输入时间字符串
try: try:
parsed_time = parser.parse(time_str) parsed_time = parser.parse(time_str, fuzzy=True)
# 如果没有时区信息,则将其视为 UTC # 如果没有时区信息,则将其视为 UTC
if parsed_time.tzinfo is None: if parsed_time.tzinfo is None:
parsed_time = parsed_time.replace(tzinfo=timezone.utc) parsed_time = parsed_time.replace(tzinfo=timezone.utc)
@ -33,34 +39,9 @@ def format_published_time(time_str):
return shanghai_time.strftime('%Y-%m-%d %H:%M') return shanghai_time.strftime('%Y-%m-%d %H:%M')
except (ValueError, parser.ParserError): except (ValueError, parser.ParserError):
pass logging.warning(f"无法解析时间字符串:{time_str}")
# 定义支持的时间格式
time_formats = [
'%a, %d %b %Y %H:%M:%S %z', # Mon, 11 Mar 2024 14:08:32 +0000
'%a, %d %b %Y %H:%M:%S GMT', # Wed, 19 Jun 2024 09:43:53 GMT
'%Y-%m-%dT%H:%M:%S%z', # 2024-03-11T14:08:32+00:00
'%Y-%m-%dT%H:%M:%SZ', # 2024-03-11T14:08:32Z
'%Y-%m-%d %H:%M:%S', # 2024-03-11 14:08:32
'%Y-%m-%d' # 2024-03-11
]
# 遍历所有支持的时间格式进行解析
for fmt in time_formats:
try:
parsed_time = datetime.strptime(time_str, fmt)
# 将解析的时间视为 UTC
parsed_time = parsed_time.replace(tzinfo=timezone.utc)
# 转换为上海时区
shanghai_time = parsed_time.astimezone(timezone(timedelta(hours=8)))
return shanghai_time.strftime('%Y-%m-%d %H:%M')
except ValueError:
continue
# 如果所有格式都无法匹配,返回空字符串
return '' return ''
def check_feed(blog_url, session): def check_feed(blog_url, session):
""" """
检查博客的 RSS Atom 订阅链接 检查博客的 RSS Atom 订阅链接
@ -78,67 +59,121 @@ def check_feed(blog_url, session):
如果 feed 链接可访问则返回 ['feed', feed_url] 如果 feed 链接可访问则返回 ['feed', feed_url]
如果都不可访问则返回 ['none', blog_url] 如果都不可访问则返回 ['none', blog_url]
""" """
possible_feeds = [
('atom', '/atom.xml'),
('rss', '/rss.xml'), # 2024-07-26 添加 /rss.xml内容的支持
('rss2', '/rss2.xml'),
('feed', '/feed'),
('feed2', '/feed.xml'), # 2024-07-26 添加 /feed.xml内容的支持
('feed3', '/feed/'),
('index', '/index.xml') # 2024-07-25 添加 /index.xml内容的支持
]
atom_url = blog_url.rstrip('/') + '/atom.xml' for feed_type, path in possible_feeds:
rss_url = blog_url.rstrip('/') + '/rss.xml' # 2024-07-26 添加 /rss.xml内容的支持 feed_url = blog_url.rstrip('/') + path
rss2_url = blog_url.rstrip('/') + '/rss2.xml' # 确保 feed_url 使用 https 协议
feed_url = blog_url.rstrip('/') + '/feed' feed_url = ensure_https(feed_url)
feed2_url = blog_url.rstrip('/') + '/feed.xml' # 2024-07-26 添加 /feed.xml内容的支持
feed3_url = blog_url.rstrip('/') + '/feed/' # 2024-07-26 添加 /feed/内容的支持
index_url = blog_url.rstrip('/') + '/index.xml' # 2024-07-25 添加 /index.xml内容的支持
try: try:
atom_response = session.get(atom_url, headers=headers, timeout=timeout) response = session.get(feed_url, headers=headers, timeout=timeout)
if atom_response.status_code == 200: if response.status_code == 200:
return ['atom', atom_url] return [feed_type, feed_url]
except requests.RequestException: except requests.RequestException:
pass continue
try:
rss_response = session.get(rss_url, headers=headers, timeout=timeout)
if rss_response.status_code == 200:
return ['rss', rss_url]
except requests.RequestException:
pass
try:
rss_response = session.get(rss2_url, headers=headers, timeout=timeout)
if rss_response.status_code == 200:
return ['rss2', rss2_url]
except requests.RequestException:
pass
try:
feed_response = session.get(feed_url, headers=headers, timeout=timeout)
if feed_response.status_code == 200:
return ['feed', feed_url]
except requests.RequestException:
pass
try:
feed_response = session.get(feed2_url, headers=headers, timeout=timeout)
if feed_response.status_code == 200:
return ['feed2', feed2_url]
except requests.RequestException:
pass
try:
feed_response = session.get(index_url, headers=headers, timeout=timeout)
if feed_response.status_code == 200:
return ['index', index_url]
except requests.RequestException:
pass
try:
feed_response = session.get(feed3_url, headers=headers, timeout=timeout)
if feed_response.status_code == 200:
return ['feed3', feed3_url]
except requests.RequestException:
pass
return ['none', blog_url] return ['none', blog_url]
def parse_feed(url, session, count=5): def is_bad_link(link):
"""
判断链接是否是IP地址+端口localhost+端口或缺少域名的链接
参数
link (str): 要检查的链接
返回
bool: 如果是IP地址+端口localhost+端口或缺少域名返回True否则返回False
"""
try:
parsed_url = urlparse(link)
netloc = parsed_url.netloc
if not netloc:
return True # 缺少主机部分
# 分割出主机和端口
if ':' in netloc:
host, _ = netloc.split(':', 1)
else:
host = netloc
# 检查是否是localhost或环回地址127.0.0.1包括IPv6的 ::1
if host in ['localhost', '::1', '127.0.0.1']:
return True
# 检查是否是IP地址
try:
ip = ipaddress.ip_address(host)
if socket.inet_aton(host) or ip.is_private or ip.is_loopback:
return True
return False
except ValueError:
return False
except Exception:
return False
def ensure_https(url):
"""
确保链接使用 https 协议
参数
url (str): 原始链接
返回
str: 使用 https 协议的链接
"""
parsed_url = urlparse(url)
if parsed_url.scheme != 'https':
parsed_url = parsed_url._replace(scheme='https')
return urlunparse(parsed_url)
return url
def fix_link(link, blog_url):
"""
修复链接将IP地址localhost或缺少域名的链接替换为blog_url的域名并确保使用HTTPS
参数
link (str): 原始链接
blog_url (str): 博客的URL
返回
str: 修复后的链接
"""
if not link or not blog_url:
return link
parsed_blog_url = urlparse(blog_url)
# 如果链接是相对路径,或者缺少协议,则使用 urljoin
if not urlparse(link).netloc:
link = urljoin(blog_url, link)
parsed_link = urlparse(link)
# 强制使用 https 协议
if parsed_link.scheme != 'https':
parsed_link = parsed_link._replace(scheme='https')
if is_bad_link(link):
fixed_link = urlunparse(parsed_link._replace(netloc=parsed_blog_url.netloc))
return fixed_link
else:
# 确保链接使用 https 协议
fixed_link = urlunparse(parsed_link)
if parsed_link.scheme != 'https':
logging.info(f"将链接协议从 {link} 强制改为 HTTPS: {fixed_link}")
return fixed_link
def parse_feed(url, session, count=5, blog_url=None):
""" """
解析 Atom RSS2 feed 并返回包含网站名称作者原链接和每篇文章详细内容的字典 解析 Atom RSS2 feed 并返回包含网站名称作者原链接和每篇文章详细内容的字典
@ -149,6 +184,7 @@ def parse_feed(url, session, count=5):
url (str): Atom RSS2 feed URL url (str): Atom RSS2 feed URL
session (requests.Session): 用于请求的会话对象 session (requests.Session): 用于请求的会话对象
count (int): 获取文章数的最大数如果小于则全部获取如果文章数大于则只取前 count 篇文章 count (int): 获取文章数的最大数如果小于则全部获取如果文章数大于则只取前 count 篇文章
blog_url (str): 目标博客的 URL用于修复文章链接
返回 返回
dict: 包含网站名称作者原链接和每篇文章详细内容的字典 dict: 包含网站名称作者原链接和每篇文章详细内容的字典
@ -165,21 +201,22 @@ def parse_feed(url, session, count=5):
'articles': [] 'articles': []
} }
for i, entry in enumerate(feed.entries): for entry in feed.entries:
if 'published' in entry: if 'published' in entry:
published = format_published_time(entry.published) published = format_published_time(entry.published)
elif 'updated' in entry: elif 'updated' in entry:
published = format_published_time(entry.updated) published = format_published_time(entry.updated)
# 输出警告信息 logging.warning(f"文章 {entry.title} 未包含发布时间,已使用更新时间 {published}")
print(f"警告:文章 {entry.title} 未包含发布时间,请尽快联系站长处理,暂时已设置为更新时间 {published}")
else: else:
published = '' published = ''
print(f"警告:文章 {entry.title} 未包含任何时间信息,请尽快联系站长处理") logging.warning(f"文章 {entry.title} 未包含任何时间信息")
entry_link = entry.link if 'link' in entry else ''
fixed_link = fix_link(entry_link, blog_url)
article = { article = {
'title': entry.title if 'title' in entry else '', 'title': entry.title if 'title' in entry else '',
'author': result['author'], 'author': result['author'],
'link': entry.link if 'link' in entry else '', 'link': fixed_link,
'published': published, 'published': published,
'summary': entry.summary if 'summary' in entry else '', 'summary': entry.summary if 'summary' in entry else '',
'content': entry.content[0].value if 'content' in entry and entry.content else entry.description if 'description' in entry else '' 'content': entry.content[0].value if 'content' in entry and entry.content else entry.description if 'description' in entry else ''
@ -187,13 +224,15 @@ def parse_feed(url, session, count=5):
result['articles'].append(article) result['articles'].append(article)
# 对文章按时间排序,并只取前 count 篇文章 # 对文章按时间排序,并只取前 count 篇文章
result['articles'] = sorted(result['articles'], key=lambda x: datetime.strptime(x['published'], '%Y-%m-%d %H:%M'), reverse=True) result['articles'] = sorted(
if count < len(result['articles']): result['articles'],
result['articles'] = result['articles'][:count] key=lambda x: datetime.strptime(x['published'], '%Y-%m-%d %H:%M') if x['published'] else datetime.min,
reverse=True
)[:count]
return result return result
except Exception as e: except Exception as e:
print(f"不可链接的FEED地址{url} : {e}") logging.error(f"无法解析FEED地址{url} : {e}", exc_info=True)
return { return {
'website_name': '', 'website_name': '',
'author': '', 'author': '',
@ -216,6 +255,9 @@ def process_friend(friend, session, count, specific_RSS=[]):
""" """
name, blog_url, avatar = friend name, blog_url, avatar = friend
# 确保博客 URL 使用 https 协议
blog_url = ensure_https(blog_url)
# 如果 specific_RSS 中有对应的 name则直接返回 feed_url # 如果 specific_RSS 中有对应的 name则直接返回 feed_url
if specific_RSS is None: if specific_RSS is None:
specific_RSS = [] specific_RSS = []
@ -223,13 +265,13 @@ def process_friend(friend, session, count, specific_RSS=[]):
if rss_feed: if rss_feed:
feed_url = rss_feed feed_url = rss_feed
feed_type = 'specific' feed_type = 'specific'
print(f"========{name}”的博客“{blog_url} ”为特定RSS源“{feed_url}========") logging.info(f"{name}”的博客“{blog_url}”为特定RSS源“{feed_url}")
else: else:
feed_type, feed_url = check_feed(blog_url, session) feed_type, feed_url = check_feed(blog_url, session)
print(f"========{name}”的博客“{blog_url} ”的feed类型为“{feed_type}========") logging.info(f"{name}”的博客“{blog_url}”的feed类型为“{feed_type}")
if feed_type != 'none': if feed_type != 'none':
feed_info = parse_feed(feed_url, session, count) feed_info = parse_feed(feed_url, session, count, blog_url=blog_url)
articles = [ articles = [
{ {
'title': article['title'], 'title': article['title'],
@ -242,7 +284,7 @@ def process_friend(friend, session, count, specific_RSS=[]):
] ]
for article in articles: for article in articles:
print(f"{name} 发布了新文章:{article['title']}, 时间:{article['created']}") logging.info(f"{name} 发布了新文章:{article['title']}时间:{article['created']},链接:{article['link']}")
return { return {
'name': name, 'name': name,
@ -250,7 +292,7 @@ def process_friend(friend, session, count, specific_RSS=[]):
'articles': articles 'articles': articles
} }
else: else:
print(f"{name} 的博客 {blog_url} 无法访问") logging.warning(f"{name} 的博客 {blog_url} 无法访问")
return { return {
'name': name, 'name': name,
'status': 'error', 'status': 'error',
@ -267,16 +309,22 @@ def fetch_and_process_data(json_url, specific_RSS=[], count=5):
specific_RSS (list): 包含特定 RSS 源的字典列表 [{name, url}] specific_RSS (list): 包含特定 RSS 源的字典列表 [{name, url}]
返回 返回
dict: 包含统计数据和文章信息的字典 tuple: (处理后的数据字典, 错误的朋友信息列表)
""" """
session = requests.Session() session = requests.Session()
retries = requests.packages.urllib3.util.retry.Retry(
total=3, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504]
)
adapter = requests.adapters.HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
try: try:
response = session.get(json_url, headers=headers, timeout=timeout) response = session.get(json_url, headers=headers, timeout=timeout)
friends_data = response.json() friends_data = response.json()
except Exception as e: except Exception as e:
print(f"无法获取该链接:{json_url} , 出现的问题为:{e}") logging.error(f"无法获取链接:{json_url}出现的问题为:{e}", exc_info=True)
return None return None, []
total_friends = len(friends_data['friends']) total_friends = len(friends_data['friends'])
active_friends = 0 active_friends = 0
@ -303,7 +351,7 @@ def fetch_and_process_data(json_url, specific_RSS=[], count=5):
error_friends += 1 error_friends += 1
error_friends_info.append(friend) error_friends_info.append(friend)
except Exception as e: except Exception as e:
print(f"处理 {friend} 时发生错误: {e}") logging.error(f"处理 {friend} 时发生错误: {e}", exc_info=True)
error_friends += 1 error_friends += 1
error_friends_info.append(friend) error_friends_info.append(friend)
@ -318,8 +366,7 @@ def fetch_and_process_data(json_url, specific_RSS=[], count=5):
'article_data': article_data 'article_data': article_data
} }
print("数据处理完成") logging.info(f"数据处理完成,总共有 {total_friends} 位朋友,其中 {active_friends} 位博客可访问,{error_friends} 位博客无法访问")
print("总共有 %d 位朋友,其中 %d 位博客可访问,%d 位博客无法访问" % (total_friends, active_friends, error_friends))
return result, error_friends_info return result, error_friends_info
@ -334,16 +381,15 @@ def sort_articles_by_time(data):
dict: 按时间排序后的文章信息字典 dict: 按时间排序后的文章信息字典
""" """
# 先确保每个元素存在时间 # 先确保每个元素存在时间
for article in data['article_data']: for article in data.get('article_data', []):
if article['created'] == '' or article['created'] == None: if not article.get('created'):
article['created'] = '2024-01-01 00:00' article['created'] = '2024-01-01 00:00'
# 输出警告信息 logging.warning(f"文章 {article['title']} 未包含时间信息,已设置为默认时间 2024-01-01 00:00")
print(f"警告:文章 {article['title']} 未包含任何可提取的时间信息,已设置为默认时间 2024-01-01 00:00")
if 'article_data' in data: if 'article_data' in data:
sorted_articles = sorted( sorted_articles = sorted(
data['article_data'], data['article_data'],
key=lambda x: datetime.strptime(x['created'], '%Y-%m-%d %H:%M'), key=lambda x: datetime.strptime(x['created'], '%Y-%m-%d %H:%M') if x['created'] else datetime.min,
reverse=True reverse=True
) )
data['article_data'] = sorted_articles data['article_data'] = sorted_articles
@ -364,17 +410,19 @@ def marge_data_from_json_url(data, marge_json_url):
response = requests.get(marge_json_url, headers=headers, timeout=timeout) response = requests.get(marge_json_url, headers=headers, timeout=timeout)
marge_data = response.json() marge_data = response.json()
except Exception as e: except Exception as e:
print(f"无法获取链接:{marge_json_url} , 出现的问题为:{e}") logging.error(f"无法获取链接:{marge_json_url}出现的问题为:{e}", exc_info=True)
return data return data
if 'article_data' in marge_data: if 'article_data' in marge_data:
print("开始合并数据,原数据共有 %d 篇文章,境外数据共有 %d 篇文章" % (len(data['article_data']), len(marge_data['article_data']))) logging.info(f"开始合并数据,原数据共有 {len(data['article_data'])} 篇文章,境外数据共有 {len(marge_data['article_data'])} 篇文章")
data['article_data'].extend(marge_data['article_data'])
data['article_data'] = list({v['link']:v for v in data['article_data']}.values()) existing_links = set(article['link'] for article in data['article_data'])
print("合并数据完成,现在共有 %d 篇文章" % len(data['article_data'])) new_articles = [article for article in marge_data['article_data'] if article['link'] not in existing_links]
data['article_data'].extend(new_articles)
logging.info(f"合并数据完成,现在共有 {len(data['article_data'])} 篇文章")
return data return data
import requests
def marge_errors_from_json_url(errors, marge_json_url): def marge_errors_from_json_url(errors, marge_json_url):
""" """
@ -392,17 +440,16 @@ def marge_errors_from_json_url(errors, marge_json_url):
response = requests.get(marge_json_url, timeout=10) # 设置请求超时时间 response = requests.get(marge_json_url, timeout=10) # 设置请求超时时间
marge_errors = response.json() marge_errors = response.json()
except Exception as e: except Exception as e:
print(f"无法获取链接:{marge_json_url},出现的问题为:{e}") logging.error(f"无法获取链接:{marge_json_url},出现的问题为:{e}", exc_info=True)
return errors return errors
# 提取 marge_errors 中的 URL # 合并错误信息列表并去重
marge_urls = {item[1] for item in marge_errors} errors_set = set(tuple(error) for error in errors)
marge_errors_set = set(tuple(error) for error in marge_errors)
combined_errors = list(errors_set.union(marge_errors_set))
# 使用过滤器保留 errors 中在 marge_errors 中出现的 URL logging.info(f"合并错误信息完成,合并后共有 {len(combined_errors)} 位朋友")
filtered_errors = [error for error in errors if error[1] in marge_urls] return combined_errors
print("合并错误信息完成,保留了 %d 位朋友" % len(filtered_errors))
return filtered_errors
def deal_with_large_data(result): def deal_with_large_data(result):
""" """
@ -418,21 +465,22 @@ def deal_with_large_data(result):
article_data = result.get("article_data", []) article_data = result.get("article_data", [])
# 检查文章数量是否大于 150 # 检查文章数量是否大于 150
if len(article_data) > 150: max_articles = 150
print("数据量较大,开始进行处理···") if len(article_data) > max_articles:
# 获取前 150 篇文章的作者集合 logging.info("数据量较大,开始进行处理...")
first_200_authors = {article["author"] for article in article_data[:150]} # 获取前 max_articles 篇文章的作者集合
top_authors = {article["author"] for article in article_data[:max_articles]}
# 从第151篇开始过滤只保留前150篇出现过的作者的文章 # 从第 {max_articles + 1} 篇开始过滤,只保留前 max_articles 篇出现过的作者的文章
filtered_articles = article_data[:150] + [ filtered_articles = article_data[:max_articles] + [
article for article in article_data[150:] article for article in article_data[max_articles:]
if article["author"] in first_200_authors if article["author"] in top_authors
] ]
# 更新结果中的 article_data # 更新结果中的 article_data
result["article_data"] = filtered_articles result["article_data"] = filtered_articles
# 更新结果中的统计数据 # 更新结果中的统计数据
result["statistical_data"]["article_num"] = len(filtered_articles) result["statistical_data"]["article_num"] = len(filtered_articles)
print("数据处理完成,保留 %d 篇文章" % len(filtered_articles)) logging.info(f"数据处理完成,保留 {len(filtered_articles)} 篇文章")
return result return result