mirror of
https://github.com/AlignPixel-Dev/Pixel-Chat-App.git
synced 2025-04-20 12:25:25 +08:00
287 lines
11 KiB
Python
Executable File
287 lines
11 KiB
Python
Executable File
import json
|
||
import threading
|
||
from socket import *
|
||
from time import ctime, time
|
||
|
||
class PyChattingServer:
|
||
__socket = socket(AF_INET, SOCK_STREAM, 0)
|
||
__address = ('', 12231)
|
||
__buf = 1024
|
||
|
||
def __init__(self):
|
||
self.__socket.bind(self.__address)
|
||
self.__socket.listen(20)
|
||
self.__msg_handler = ChattingHandler()
|
||
|
||
def start_session(self):
|
||
print('已经上线,用户可通过IP进入\r\n')
|
||
input_thread_handler = threading.Thread(target=self.input_thread)
|
||
input_thread_handler.daemon = True
|
||
input_thread_handler.start()
|
||
try:
|
||
while True:
|
||
cs, caddr = self.__socket.accept()
|
||
# 利用handler来管理线程,实现线程之间的socket的相互通信
|
||
self.__msg_handler.start_thread(cs, caddr)
|
||
except socket.error:
|
||
pass
|
||
|
||
def input_thread(self):
|
||
while True:
|
||
command = input("")
|
||
self.__msg_handler.add_to_blacklist_manual(command.strip())
|
||
|
||
class ChattingThread(threading.Thread):
|
||
__buf = 32767
|
||
|
||
def __init__(self, cs, caddr, msg_handler):
|
||
super(ChattingThread, self).__init__()
|
||
self.__cs = cs
|
||
self.__caddr = caddr
|
||
self.__msg_handler = msg_handler
|
||
self.__last_msg_time = time()
|
||
self.__msg_count = 0
|
||
|
||
def run(self):
|
||
try:
|
||
print('连接来自于:', self.__caddr)
|
||
if self.__msg_handler.is_blacklisted(self.__caddr[0]):
|
||
self.__handle_blacklisted()
|
||
return
|
||
data = "欢迎你来到由PixelChat驱动的聊天室!请遵守以下规则:\r\n"\
|
||
"1. 不要刷屏,否则将会被踢出\r\n"\
|
||
"2. 不要骂人,可以吐槽,但要适度\r\n"\
|
||
"3. 如有问题请联系服务器管理员\r\n"\
|
||
"4. 不要发布违反国家法律的信息\r\n"\
|
||
"如果你同意以上内容,请输入昵称加入服务器~"
|
||
self.__cs.sendall(bytes(data, 'utf-8'))
|
||
while True:
|
||
if self.__msg_handler.is_blacklisted(self.__caddr[0]):
|
||
self.__handle_blacklisted()
|
||
return
|
||
data = self.__cs.recv(self.__buf).decode('utf-8')
|
||
if not data:
|
||
break
|
||
if len(data) > 15000:
|
||
self.__msg_handler.add_to_blacklist(self.__caddr[0])
|
||
self.__handle_blacklisted()
|
||
return
|
||
current_time = time()
|
||
if current_time - self.__last_msg_time < 12: # 12秒内发送多条消息
|
||
self.__msg_count += 1
|
||
if self.__msg_count > 12:
|
||
self.__msg_handler.add_to_blacklist(self.__caddr[0])
|
||
self.__handle_blacklisted()
|
||
return
|
||
else:
|
||
self.__msg_count = 0
|
||
self.__last_msg_time = current_time
|
||
self.__msg_handler.handle_msg(data, self.__cs)
|
||
print(data)
|
||
except Exception as e:
|
||
print(f"Error in thread: {e}")
|
||
finally:
|
||
self.__msg_handler.close_conn(self.__cs)
|
||
self.__cs.close()
|
||
|
||
def __handle_blacklisted(self):
|
||
print('...黑塔安全拦截的用户:', self.__caddr)
|
||
data = '拒绝访问!请联系管理员处理'
|
||
self.__cs.sendall(bytes(data, 'utf-8'))
|
||
self.__cs.close()
|
||
|
||
class ChattingHandler:
|
||
__help_str = "[ 系统消息 ]\r\n" \
|
||
"输入/checkol,即可获得所有登陆用户信息\r\n" \
|
||
"输入/help,即可获得帮助\r\n" \
|
||
"输入/exit,即可退出\r\n" \
|
||
"输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
|
||
"输入/i,即可屏蔽群聊信息\r\n" \
|
||
"再次输入/i,即可取消屏蔽\r\n" \
|
||
"所有首字符为/的信息都不会发送出去"
|
||
|
||
__buf = 32767
|
||
__socket_list = []
|
||
__user_name_to_socket = {}
|
||
__socket_to_user_name = {}
|
||
__user_name_to_broadcast_state = {}
|
||
__blacklist = set()
|
||
|
||
def start_thread(self, cs, caddr):
|
||
self.__socket_list.append(cs)
|
||
chat_thread = ChattingThread(cs, caddr, self)
|
||
chat_thread.start()
|
||
|
||
def close_conn(self, cs):
|
||
if cs not in self.__socket_list:
|
||
return
|
||
nickname = "SOMEONE"
|
||
if cs in self.__socket_list:
|
||
self.__socket_list.remove(cs)
|
||
if cs in self.__socket_to_user_name:
|
||
nickname = self.__socket_to_user_name[cs]
|
||
self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
|
||
self.__socket_to_user_name.pop(cs)
|
||
self.__user_name_to_broadcast_state.pop(nickname)
|
||
nickname += " "
|
||
self.broadcast_系统消息_msg(nickname + "离开了本聊天室")
|
||
|
||
def handle_msg(self, msg, cs):
|
||
js = json.loads(msg)
|
||
if js['type'] == "login":
|
||
if js['msg'] not in self.__user_name_to_socket:
|
||
if ' ' in js['msg']:
|
||
self.send_to(json.dumps({
|
||
'type': 'login',
|
||
'success': False,
|
||
'msg': '账号不能够带有空格'
|
||
}), cs)
|
||
else:
|
||
self.__user_name_to_socket[js['msg']] = cs
|
||
self.__socket_to_user_name[cs] = js['msg']
|
||
self.__user_name_to_broadcast_state[js['msg']] = True
|
||
self.send_to(json.dumps({
|
||
'type': 'login',
|
||
'success': True,
|
||
'msg': '昵称建立成功,输入/checkol可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
|
||
}), cs)
|
||
self.broadcast_系统消息_msg(js['msg'] + "加入了聊天")
|
||
else:
|
||
self.send_to(json.dumps({
|
||
'type': 'login',
|
||
'success': False,
|
||
'msg': '账号已存在'
|
||
}), cs)
|
||
elif js['type'] == "broadcast":
|
||
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
|
||
self.broadcast(js['msg'], cs)
|
||
else:
|
||
self.send_to(json.dumps({
|
||
'type': 'broadcast',
|
||
'msg': '屏蔽模式下无法发送群聊信息,输入/i解除屏蔽'
|
||
}), cs)
|
||
elif js['type'] == "ls":
|
||
self.send_to(json.dumps({
|
||
'type': 'ls',
|
||
'msg': self.get_all_login_user_info()
|
||
}), cs)
|
||
elif js['type'] == "help":
|
||
self.send_to(json.dumps({
|
||
'type': 'help',
|
||
'msg': self.__help_str
|
||
}), cs)
|
||
elif js['type'] == "sendto":
|
||
self.single_chatting(cs, js['nickname'], js['msg'])
|
||
elif js['type'] == "ignore":
|
||
self.exchange_ignore_state(cs)
|
||
elif js['type'] == "exit": # 添加处理退出消息
|
||
self.close_conn(cs)
|
||
|
||
def exchange_ignore_state(self, cs):
|
||
if cs in self.__socket_to_user_name:
|
||
state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
|
||
state = not state
|
||
self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
|
||
msg = "通常模式" if state else "屏蔽模式"
|
||
self.send_to(json.dumps({
|
||
'type': 'ignore',
|
||
'success': True,
|
||
'msg': '[ %s ]\r\n[ 系统消息 ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
|
||
}), cs)
|
||
else:
|
||
self.send_to({
|
||
'type': 'ignore',
|
||
'success': False,
|
||
'msg': '切换失败'
|
||
}, cs)
|
||
|
||
def single_chatting(self, cs, nickname, msg):
|
||
if nickname in self.__user_name_to_socket:
|
||
msg = '[ %s ]\r\n[ %s 发送给 %s ] : %s\r\n' % (
|
||
ctime(), self.__socket_to_user_name[cs], nickname, msg)
|
||
self.send_to_list(json.dumps({
|
||
'type': 'single',
|
||
'msg': msg
|
||
}), self.__user_name_to_socket[nickname], cs)
|
||
else:
|
||
self.send_to(json.dumps({
|
||
'type': 'single',
|
||
'msg': '该用户不存在'
|
||
}), cs)
|
||
print(nickname)
|
||
|
||
def send_to_list(self, msg, *cs):
|
||
for i in range(len(cs)):
|
||
self.send_to(msg, cs[i])
|
||
|
||
def get_all_login_user_info(self):
|
||
login_list = "[ 系统消息 ] 在线用户 : "
|
||
for key in self.__socket_to_user_name:
|
||
login_list += self.__socket_to_user_name[key] + " | "
|
||
return login_list
|
||
|
||
def send_to(self, msg, cs):
|
||
if cs not in self.__socket_list:
|
||
self.__socket_list.append(cs)
|
||
cs.sendall(bytes(msg, 'utf-8'))
|
||
|
||
def broadcast_系统消息_msg(self, msg):
|
||
data = '[ %s ]\r\n[ 系统消息 ] : %s' % (ctime(), msg)
|
||
js = json.dumps({
|
||
'type': '系统消息_msg',
|
||
'msg': data
|
||
})
|
||
for i in range(len(self.__socket_list)):
|
||
if self.__socket_list[i] in self.__socket_to_user_name:
|
||
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
|
||
|
||
def broadcast(self, msg, cs):
|
||
data = '[ %s ]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
|
||
js = json.dumps({
|
||
'type': 'broadcast',
|
||
'msg': data
|
||
})
|
||
for i in range(len(self.__socket_list)):
|
||
if self.__socket_list[i] in self.__socket_to_user_name \
|
||
and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
|
||
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
|
||
|
||
def is_blacklisted(self, ip):
|
||
return ip in self.__blacklist
|
||
|
||
def add_to_blacklist(self, ip):
|
||
self.__blacklist.add(ip)
|
||
|
||
def add_to_blacklist_manual(self, ip):
|
||
if ip == '.ban':
|
||
ip = input("请输入需要封禁的ip地址:")
|
||
if not self.is_blacklisted(ip):
|
||
self.__blacklist.add(ip)
|
||
print(f"IP {ip} 已被手动加入黑名单")
|
||
else:
|
||
print(f"IP {ip} 已经在黑名单中")
|
||
elif ip == '.unban':
|
||
ip = input("请输入需要解除封禁的ip地址:")
|
||
if not self.is_blacklisted(ip):
|
||
print(f"IP {ip} 未在黑名单中")
|
||
else:
|
||
self.__blacklist.remove(ip)
|
||
print(f"IP {ip} 已经被手动移除")
|
||
elif ip == '.banlist':
|
||
print(self.__blacklist)
|
||
elif ip == '.help':
|
||
print("BAN: 封禁某个IP\r\n"\
|
||
"UNBAN: 解除封禁某个IP\r\n"\
|
||
"BANLIST: 查看封禁IP列表\r\n"\
|
||
"HELP: 查看操作帮助")
|
||
else:
|
||
print("不存在的命令!")
|
||
|
||
|
||
def main():
|
||
server = PyChattingServer()
|
||
server.start_session()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|