Pixel-Chat-App/server-client/simple-verson.py

311 lines
13 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import threading
from socket import *
from time import ctime, time
class PyChattingServer:
__socket = socket(AF_INET, SOCK_STREAM, 0)
__address = ('', 12231)
__buf = 1024
def __init__(self):
self.__socket.bind(self.__address)
self.__socket.listen(20)
self.__msg_handler = ChattingHandler()
def start_session(self):
print('已经上线用户可通过客户端输入IP进入输入.help可以调出命令列表\r\n')
input_thread_handler = threading.Thread(target=self.input_thread)
input_thread_handler.daemon = True
input_thread_handler.start()
try:
while True:
cs, caddr = self.__socket.accept()
# 利用handler来管理线程,实现线程之间的socket的相互通信
self.__msg_handler.start_thread(cs, caddr)
except socket.error:
pass
def input_thread(self):
while True:
command = input("")
self.__msg_handler.add_to_blacklist_manual(command.strip())
class ChattingThread(threading.Thread):
__buf = 32767
def __init__(self, cs, caddr, msg_handler):
super(ChattingThread, self).__init__()
self.__cs = cs
self.__caddr = caddr
self.__msg_handler = msg_handler
self.__last_msg_time = time()
self.__msg_count = 0
def run(self):
try:
print('-> 连接来自于:', self.__caddr)
if self.__msg_handler.is_blacklisted(self.__caddr[0]):
self.__handle_blacklisted()
return
data = "欢迎你来到由PixelChat驱动的聊天室请遵守以下规则\r\n"\
"1. 不要刷屏,否则将会被踢出\r\n"\
"2. 不要骂人,可以吐槽,但要适度\r\n"\
"3. 如有问题请联系服务器管理员\r\n"\
"4. 不要发布违反国家法律的信息\r\n"\
"如果你同意以上内容,请输入昵称加入服务器~"
self.__cs.sendall(bytes(data, 'utf-8'))
while True:
if self.__msg_handler.is_blacklisted(self.__caddr[0]):
self.__handle_blacklisted()
return
data = self.__cs.recv(self.__buf).decode('utf-8')
if not data:
break
if len(data) > 15000:
self.__msg_handler.add_to_blacklist(self.__caddr[0])
self.__handle_blacklisted()
return
current_time = time()
if current_time - self.__last_msg_time < 12: # 12秒内发送多条消息
self.__msg_count += 1
if self.__msg_count > 12:
self.__msg_handler.add_to_blacklist(self.__caddr[0])
self.__handle_blacklisted()
return
else:
self.__msg_count = 0
self.__last_msg_time = current_time
self.__msg_handler.handle_msg(data, self.__cs)
print(data)
except Exception as e:
print(f"Error in thread: {e}")
finally:
self.__msg_handler.close_conn(self.__cs)
self.__cs.close()
def __handle_blacklisted(self):
print('...黑塔安全拦截的用户:', self.__caddr)
data = '拒绝访问!请联系管理员处理'
self.__cs.sendall(bytes(data, 'utf-8'))
self.__cs.close()
class ChattingHandler:
__help_str = "[ 系统消息 ]\r\n" \
"输入/checkol,即可获得所有登陆用户信息\r\n" \
"输入/help,即可获得帮助\r\n" \
"输入/exit,即可退出\r\n" \
"输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
"输入/i,即可屏蔽群聊信息\r\n" \
"再次输入/i,即可取消屏蔽\r\n" \
"所有首字符为/的信息都不会发送出去"
__buf = 32767
__socket_list = []
__user_name_to_socket = {}
__socket_to_user_name = {}
__user_name_to_broadcast_state = {}
__blacklist = set()
def start_thread(self, cs, caddr):
self.__socket_list.append(cs)
chat_thread = ChattingThread(cs, caddr, self)
chat_thread.start()
def close_conn(self, cs):
if cs not in self.__socket_list:
return
nickname = "SOMEONE"
if cs in self.__socket_list:
self.__socket_list.remove(cs)
if cs in self.__socket_to_user_name:
nickname = self.__socket_to_user_name[cs]
self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
self.__socket_to_user_name.pop(cs)
self.__user_name_to_broadcast_state.pop(nickname)
nickname += " 离开了本聊天室"
if nickname != "SOMEONE": # 说明是正常退出,一个防输出卡死机制,来自 boom hack 0x3299f除非你有更好的替代方案请勿修改此项
self.broadcast_system_msg(nickname)
def handle_msg(self, msg, cs):
js = json.loads(msg)
if js['type'] == "login":
if js['msg'] not in self.__user_name_to_socket:
if ' ' in js['msg']:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '账号不能够带有空格'
}), cs)
else:
self.__user_name_to_socket[js['msg']] = cs
self.__socket_to_user_name[cs] = js['msg']
self.__user_name_to_broadcast_state[js['msg']] = True
self.send_to(json.dumps({
'type': 'login',
'success': True,
'msg': '昵称建立成功,输入/checkol可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
}), cs)
self.broadcast_system_msg(js['msg'] + "加入了聊天")
else:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '账号已存在'
}), cs)
elif js['type'] == "broadcast":
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
self.broadcast(js['msg'], cs)
else:
self.send_to(json.dumps({
'type': 'broadcast',
'msg': '屏蔽模式下无法发送群聊信息,输入/i解除屏蔽'
}), cs)
elif js['type'] == "ls":
self.send_to(json.dumps({
'type': 'ls',
'msg': self.get_all_login_user_info()
}), cs)
elif js['type'] == "help":
self.send_to(json.dumps({
'type': 'help',
'msg': self.__help_str
}), cs)
elif js['type'] == "sendto":
self.single_chatting(cs, js['nickname'], js['msg'])
elif js['type'] == "ignore":
self.exchange_ignore_state(cs)
elif js['type'] == "exit": # 添加处理退出消息
self.close_conn(cs)
def exchange_ignore_state(self, cs):
if cs in self.__socket_to_user_name:
state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
state = not state
self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
msg = "通常模式" if state else "屏蔽模式"
self.send_to(json.dumps({
'type': 'ignore',
'success': True,
'msg': '[ %s ]\r\n[ 系统消息 ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
}), cs)
else:
self.send_to({
'type': 'ignore',
'success': False,
'msg': '切换失败'
}, cs)
def single_chatting(self, cs, nickname, msg):
if nickname in self.__user_name_to_socket:
msg = '[ %s ]\r\n[ %s 发送给 %s ] : %s\r\n' % (
ctime(), self.__socket_to_user_name[cs], nickname, msg)
self.send_to_list(json.dumps({
'type': 'single',
'msg': msg
}), self.__user_name_to_socket[nickname], cs)
else:
self.send_to(json.dumps({
'type': 'single',
'msg': '该用户不存在'
}), cs)
print(nickname)
def send_to_list(self, msg, *cs):
for i in range(len(cs)):
self.send_to(msg, cs[i])
def get_all_login_user_info(self):
login_list = "[ 系统消息 ] 在线用户 : "
for key in self.__socket_to_user_name:
login_list += self.__socket_to_user_name[key] + " | "
return login_list
def send_to(self, msg, cs):
if cs not in self.__socket_list:
self.__socket_list.append(cs)
cs.sendall(bytes(msg, 'utf-8'))
def broadcast_system_msg(self, msg):
data = '[ %s ]\r\n[ 系统消息 ] : %s' % (ctime(), msg)
js = json.dumps({
'type': '系统消息_msg',
'msg': data
})
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def broadcast(self, msg, cs):
data = '[ %s ]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
if '' in data: # 屏蔽卡死服务器的字符,所有 Contributors 请不要更改这条判断,否则 PR 将直接拒绝合并,本安全措施来自 boom hack 0x3657f
data = '[ %s ]\r\n[ 系统警告 - %s ] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], '{用户发送的内容可能包含卡死服务器的内容,已经被屏蔽显示}')
js = json.dumps({
'type': 'broadcast',
'msg': data
})
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name \
and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def is_blacklisted(self, ip):
return ip in self.__blacklist
def add_to_blacklist(self, ip):
self.__blacklist.add(ip)
def add_to_blacklist_manual(self, ip):
if ip == '.ban':
ip = input("请输入需要封禁的ip地址")
if not self.is_blacklisted(ip):
self.__blacklist.add(ip)
print(f"IP {ip} 已被手动加入黑名单")
else:
print(f"IP {ip} 已经在黑名单中")
elif ip == '.unban':
ip = input("请输入需要解除封禁的ip地址")
if not self.is_blacklisted(ip):
print(f"IP {ip} 未在黑名单中")
else:
self.__blacklist.remove(ip)
print(f"IP {ip} 已经被手动移除")
elif ip == '.banlist':
print(self.__blacklist)
elif ip == '.an':
user = input("请输入要发布的内容:")
self.broadcast_system_msg(user)
print("发布成功")
elif ip == '.online':
login_list = "[ 输出 ] 在线用户 : "
for key in self.__socket_to_user_name:
login_list += self.__socket_to_user_name[key] + ' | '
print(login_list)
elif ip == '.setvisit': # 一个防输出卡死的屏蔽功能,目前仅支持手动添加,来自 boom hack 0x3299f
ip = input("请输入限制访问信息的ip地址")
if not self.is_alisted(ip):
self.__alist.add(ip)
print(f"IP {ip} 已经被手动更改访问")
else:
self.__alist.remove(ip)
print(f"IP {ip} 已经被手动更改访问")
elif ip == '.help':
print("BAN: 封禁某个IP\r\n"\
"UNBAN: 解除封禁某个IP\r\n"\
"BANLIST: 查看封禁IP列表\r\n"\
"KICK: 踢出某个用户\r\n"\
"AN: 以系统身份发布消息\r\n"\
"ONELINE: 查看在线用户\r\n"\
"SETVISIT: 对用户访问进行操作\r\n"\
"HELP: 查看操作帮助")
else:
print("不存在的命令!")
def main():
server = PyChattingServer()
server.start_session()
if __name__ == "__main__":
main()