287 lines
11 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import threading
from socket import *
from time import ctime, time
class PyChattingServer:
__socket = socket(AF_INET, SOCK_STREAM, 0)
__address = ('', 12231)
__buf = 1024
def __init__(self):
self.__socket.bind(self.__address)
self.__socket.listen(20)
self.__msg_handler = ChattingHandler()
def start_session(self):
print('已经上线用户可通过IP进入\r\n')
input_thread_handler = threading.Thread(target=self.input_thread)
input_thread_handler.daemon = True
input_thread_handler.start()
try:
while True:
cs, caddr = self.__socket.accept()
# 利用handler来管理线程,实现线程之间的socket的相互通信
self.__msg_handler.start_thread(cs, caddr)
except socket.error:
pass
def input_thread(self):
while True:
command = input("")
self.__msg_handler.add_to_blacklist_manual(command.strip())
class ChattingThread(threading.Thread):
__buf = 32767
def __init__(self, cs, caddr, msg_handler):
super(ChattingThread, self).__init__()
self.__cs = cs
self.__caddr = caddr
self.__msg_handler = msg_handler
self.__last_msg_time = time()
self.__msg_count = 0
def run(self):
try:
print('-> 连接来自于:', self.__caddr)
if self.__msg_handler.is_blacklisted(self.__caddr[0]):
self.__handle_blacklisted()
return
data = "欢迎你到来暑期聊天室!请遵守以下规则:\r\n"\
"1. 不要刷屏,否则将会被踢出\r\n"\
"2. 不要骂人,可以吐槽,但要适度\r\n"\
"3. 如有问题请联系服务器管理员\r\n"\
"4. 不要发布违反国家法律的信息\r\n"\
"如果你同意以上内容,请输入昵称加入服务器~"
self.__cs.sendall(bytes(data, 'utf-8'))
while True:
if self.__msg_handler.is_blacklisted(self.__caddr[0]):
self.__handle_blacklisted()
return
data = self.__cs.recv(self.__buf).decode('utf-8')
if not data:
break
if len(data) > 15000:
self.__msg_handler.add_to_blacklist(self.__caddr[0])
self.__handle_blacklisted()
return
current_time = time()
if current_time - self.__last_msg_time < 12: # 12秒内发送多条消息
self.__msg_count += 1
if self.__msg_count > 12:
self.__msg_handler.add_to_blacklist(self.__caddr[0])
self.__handle_blacklisted()
return
else:
self.__msg_count = 0
self.__last_msg_time = current_time
self.__msg_handler.handle_msg(data, self.__cs)
print(data)
except Exception as e:
print(f"Error in thread: {e}")
finally:
self.__msg_handler.close_conn(self.__cs)
self.__cs.close()
def __handle_blacklisted(self):
print('...黑塔安全拦截的用户:', self.__caddr)
data = '拒绝访问请联系seventeen@ohdragonboi.cn'
self.__cs.sendall(bytes(data, 'utf-8'))
self.__cs.close()
class ChattingHandler:
__help_str = "[ 系统消息 ]\r\n" \
"输入/checkol,即可获得所有登陆用户信息\r\n" \
"输入/help,即可获得帮助\r\n" \
"输入/exit,即可退出\r\n" \
"输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊\r\n" \
"输入/i,即可屏蔽群聊信息\r\n" \
"再次输入/i,即可取消屏蔽\r\n" \
"所有首字符为/的信息都不会发送出去"
__buf = 32767
__socket_list = []
__user_name_to_socket = {}
__socket_to_user_name = {}
__user_name_to_broadcast_state = {}
__blacklist = set()
def start_thread(self, cs, caddr):
self.__socket_list.append(cs)
chat_thread = ChattingThread(cs, caddr, self)
chat_thread.start()
def close_conn(self, cs):
if cs not in self.__socket_list:
return
nickname = "SOMEONE"
if cs in self.__socket_list:
self.__socket_list.remove(cs)
if cs in self.__socket_to_user_name:
nickname = self.__socket_to_user_name[cs]
self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
self.__socket_to_user_name.pop(cs)
self.__user_name_to_broadcast_state.pop(nickname)
nickname += " "
self.broadcast_系统消息_msg(nickname + "离开了本聊天室")
def handle_msg(self, msg, cs):
js = json.loads(msg)
if js['type'] == "login":
if js['msg'] not in self.__user_name_to_socket:
if ' ' in js['msg']:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '账号不能够带有空格'
}), cs)
else:
self.__user_name_to_socket[js['msg']] = cs
self.__socket_to_user_name[cs] = js['msg']
self.__user_name_to_broadcast_state[js['msg']] = True
self.send_to(json.dumps({
'type': 'login',
'success': True,
'msg': '昵称建立成功,输入/checkol可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
}), cs)
self.broadcast_系统消息_msg(js['msg'] + "加入了聊天")
else:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '账号已存在'
}), cs)
elif js['type'] == "broadcast":
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
self.broadcast(js['msg'], cs)
else:
self.send_to(json.dumps({
'type': 'broadcast',
'msg': '屏蔽模式下无法发送群聊信息,输入/i解除屏蔽'
}), cs)
elif js['type'] == "ls":
self.send_to(json.dumps({
'type': 'ls',
'msg': self.get_all_login_user_info()
}), cs)
elif js['type'] == "help":
self.send_to(json.dumps({
'type': 'help',
'msg': self.__help_str
}), cs)
elif js['type'] == "sendto":
self.single_chatting(cs, js['nickname'], js['msg'])
elif js['type'] == "ignore":
self.exchange_ignore_state(cs)
elif js['type'] == "exit": # 添加处理退出消息
self.close_conn(cs)
def exchange_ignore_state(self, cs):
if cs in self.__socket_to_user_name:
state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
state = not state
self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
msg = "通常模式" if state else "屏蔽模式"
self.send_to(json.dumps({
'type': 'ignore',
'success': True,
'msg': '[ %s ]\r\n[ 系统消息 ] : %s\r\n' % (ctime(), "模式切换成功,现在是" + msg)
}), cs)
else:
self.send_to({
'type': 'ignore',
'success': False,
'msg': '切换失败'
}, cs)
def single_chatting(self, cs, nickname, msg):
if nickname in self.__user_name_to_socket:
msg = '[ %s ]\r\n[ %s 发送给 %s ] : %s\r\n' % (
ctime(), self.__socket_to_user_name[cs], nickname, msg)
self.send_to_list(json.dumps({
'type': 'single',
'msg': msg
}), self.__user_name_to_socket[nickname], cs)
else:
self.send_to(json.dumps({
'type': 'single',
'msg': '该用户不存在'
}), cs)
print(nickname)
def send_to_list(self, msg, *cs):
for i in range(len(cs)):
self.send_to(msg, cs[i])
def get_all_login_user_info(self):
login_list = "[ 系统消息 ] 在线用户 : "
for key in self.__socket_to_user_name:
login_list += self.__socket_to_user_name[key] + " | "
return login_list
def send_to(self, msg, cs):
if cs not in self.__socket_list:
self.__socket_list.append(cs)
cs.sendall(bytes(msg, 'utf-8'))
def broadcast_系统消息_msg(self, msg):
data = '[ %s ]\r\n[ 系统消息 ] : %s' % (ctime(), msg)
js = json.dumps({
'type': '系统消息_msg',
'msg': data
})
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def broadcast(self, msg, cs):
data = '[ %s ]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
js = json.dumps({
'type': 'broadcast',
'msg': data
})
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name \
and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def is_blacklisted(self, ip):
return ip in self.__blacklist
def add_to_blacklist(self, ip):
self.__blacklist.add(ip)
def add_to_blacklist_manual(self, ip):
if ip == '.ban':
ip = input("请输入需要封禁的ip地址")
if not self.is_blacklisted(ip):
self.__blacklist.add(ip)
print(f"IP {ip} 已被手动加入黑名单")
else:
print(f"IP {ip} 已经在黑名单中")
elif ip == '.unban':
ip = input("请输入需要解除封禁的ip地址")
if not self.is_blacklisted(ip):
print(f"IP {ip} 未在黑名单中")
else:
self.__blacklist.remove(ip)
print(f"IP {ip} 已经被手动移除")
elif ip == '.banlist':
print(self.__blacklist)
elif ip == '.help':
print("BAN: 封禁某个IP\r\n"\
"UNBAN: 解除封禁某个IP\r\n"\
"BANLIST: 查看封禁IP列表\r\n"\
"HELP: 查看操作帮助")
else:
print("不存在的命令!")
def main():
server = PyChattingServer()
server.start_session()
if __name__ == "__main__":
main()