概要
這是一個使用python實現(xiàn)一個簡單的聊天室的功能,里面包含群聊,私聊兩種聊天方式.實現(xiàn)的方式是使用套接字編程的一個使用TCP協(xié)議 c/s結(jié)構(gòu)的聊天室
實現(xiàn)思路
x01 服務(wù)端的建立
首先,在服務(wù)端,使用socket進(jìn)行消息的接受,每接受一個socket的請求,就開啟一個新的線程來管理消息的分發(fā)與接受,同時,又存在一個handler來管理所有的線程,從而實現(xiàn)對聊天室的各種功能的處理
x02 客戶端的建立
客戶端的建立就要比服務(wù)端簡單多了,客戶端的作用只是對消息的發(fā)送以及接受,以及按照特定的規(guī)則去輸入特定的字符從而實現(xiàn)不同的功能的使用,因此,在客戶端這里,只需要去使用兩個線程,一個是專門用于接受消息,一個是專門用于發(fā)送消息的
至于為什么不用一個呢,那是因為,只用一個的話,當(dāng)接受了消息,在發(fā)送之前接受消息的處于阻塞狀態(tài),同理,發(fā)送消息也是,那么要是將這兩個功能放在一個地方實現(xiàn),就會導(dǎo)致沒有辦法連續(xù)發(fā)送或者接受消息了
實現(xiàn)方式
服務(wù)端實現(xiàn)
import json
import threading
from socket import *
from time import ctime
class PyChattingServer:
__socket = socket(AF_INET, SOCK_STREAM, 0)
__address = ('', 12231)
__buf = 1024
def __init__(self):
self.__socket.bind(self.__address)
self.__socket.listen(20)
self.__msg_handler = ChattingHandler()
def start_session(self):
print('等待客戶連接...\r\n')
try:
while True:
cs, caddr = self.__socket.accept()
# 利用handler來管理線程,實現(xiàn)線程之間的socket的相互通信
self.__msg_handler.start_thread(cs, caddr)
except socket.error:
pass
class ChattingThread(threading.Thread):
__buf = 1024
def __init__(self, cs, caddr, msg_handler):
super(ChattingThread, self).__init__()
self.__cs = cs
self.__caddr = caddr
self.__msg_handler = msg_handler
# 使用多線程管理會話
def run(self):
try:
print('...連接來自于:', self.__caddr)
data = '歡迎你到來PY_CHATTING!請輸入你的很cooooool的昵稱(不能帶有空格喲`)\r\n'
self.__cs.sendall(bytes(data, 'utf-8'))
while True:
data = self.__cs.recv(self.__buf).decode('utf-8')
if not data:
break
self.__msg_handler.handle_msg(data, self.__cs)
print(data)
except socket.error as e:
print(e.args)
pass
finally:
self.__msg_handler.close_conn(self.__cs)
self.__cs.close()
class ChattingHandler:
__help_str = "[ SYSTEM ]\r\n" \
"輸入/ls,即可獲得所有登陸用戶信息\r\n" \
"輸入/h,即可獲得幫助\r\n" \
"輸入@用戶名 (注意用戶名后面的空格)+消息,即可發(fā)動單聊\r\n" \
"輸入/i,即可屏蔽群聊信息\r\n" \
"再次輸入/i,即可取消屏蔽\r\n" \
"所有首字符為/的信息都不會發(fā)送出去"
__buf = 1024
__socket_list = []
__user_name_to_socket = {}
__socket_to_user_name = {}
__user_name_to_broadcast_state = {}
def start_thread(self, cs, caddr):
self.__socket_list.append(cs)
chat_thread = ChattingThread(cs, caddr, self)
chat_thread.start()
def close_conn(self, cs):
if cs not in self.__socket_list:
return
# 去除socket的記錄
nickname = "SOMEONE"
if cs in self.__socket_list:
self.__socket_list.remove(cs)
# 去除socket與username之間的映射關(guān)系
if cs in self.__socket_to_user_name:
nickname = self.__socket_to_user_name[cs]
self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
self.__socket_to_user_name.pop(cs)
self.__user_name_to_broadcast_state.pop(nickname)
nickname += " "
# 廣播某玩家退出聊天室
self.broadcast_system_msg(nickname + "離開了PY_CHATTING")
# 管理用戶輸入的信息
def handle_msg(self, msg, cs):
js = json.loads(msg)
if js['type'] == "login":
if js['msg'] not in self.__user_name_to_socket:
if ' ' in js['msg']:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '賬號不能夠帶有空格'
}), cs)
else:
self.__user_name_to_socket[js['msg']] = cs
self.__socket_to_user_name[cs] = js['msg']
self.__user_name_to_broadcast_state[js['msg']] = True
self.send_to(json.dumps({
'type': 'login',
'success': True,
'msg': '昵稱建立成功,輸入/ls可查看所有在線的人,輸入/help可以查看幫助(所有首字符為/的消息都不會發(fā)送)'
}), cs)
# 廣播其他人,他已經(jīng)進(jìn)入聊天室
self.broadcast_system_msg(js['msg'] + "已經(jīng)進(jìn)入了聊天室")
else:
self.send_to(json.dumps({
'type': 'login',
'success': False,
'msg': '賬號已存在'
}), cs)
# 若玩家處于屏蔽模式,則無法發(fā)送群聊消息
elif js['type'] == "broadcast":
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
self.broadcast(js['msg'], cs)
else:
self.send_to(json.dumps({
'type': 'broadcast',
'msg': '屏蔽模式下無法發(fā)送群聊信息'
}), cs)
elif js['type'] == "ls":
self.send_to(json.dumps({
'type': 'ls',
'msg': self.get_all_login_user_info()
}), cs)
elif js['type'] == "help":
self.send_to(json.dumps({
'type': 'help',
'msg': self.__help_str
}), cs)
elif js['type'] == "sendto":
self.single_chatting(cs, js['nickname'], js['msg'])
elif js['type'] == "ignore":
self.exchange_ignore_state(cs)
def exchange_ignore_state(self, cs):
if cs in self.__socket_to_user_name:
state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
if state:
state = False
else:
state = True
self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
msg = "通常模式"
else:
msg = "屏蔽模式"
self.send_to(json.dumps({
'type': 'ignore',
'success': True,
'msg': '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), "模式切換成功,現(xiàn)在是" + msg)
}), cs)
else:
self.send_to({
'type': 'ignore',
'success': False,
'msg': '切換失敗'
}, cs)
def single_chatting(self, cs, nickname, msg):
if nickname in self.__user_name_to_socket:
msg = '[TIME : %s]\r\n[ %s CHATTING TO %s ] : %s\r\n' % (
ctime(), self.__socket_to_user_name[cs], nickname, msg)
self.send_to_list(json.dumps({
'type': 'single',
'msg': msg
}), self.__user_name_to_socket[nickname], cs)
else:
self.send_to(json.dumps({
'type': 'single',
'msg': '該用戶不存在'
}), cs)
print(nickname)
def send_to_list(self, msg, *cs):
for i in range(len(cs)):
self.send_to(msg, cs[i])
def get_all_login_user_info(self):
login_list = "[ SYSTEM ] ALIVE USER : \r\n"
for key in self.__socket_to_user_name:
login_list += self.__socket_to_user_name[key] + ",\r\n"
return login_list
def send_to(self, msg, cs):
if cs not in self.__socket_list:
self.__socket_list.append(cs)
cs.sendall(bytes(msg, 'utf-8'))
def broadcast_system_msg(self, msg):
data = '[TIME : %s]\r\n[ SYSTEM ] : %s\r\n' % (ctime(), msg)
js = json.dumps({
'type': 'system_msg',
'msg': data
})
# 屏蔽了群聊的玩家也可以獲得系統(tǒng)的群發(fā)信息
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def broadcast(self, msg, cs):
data = '[TIME : %s]\r\n[%s] : %s\r\n' % (ctime(), self.__socket_to_user_name[cs], msg)
js = json.dumps({
'type': 'broadcast',
'msg': data
})
# 沒有的登陸的玩家無法得知消息,屏蔽了群聊的玩家也沒辦法獲取信息
for i in range(len(self.__socket_list)):
if self.__socket_list[i] in self.__socket_to_user_name \
and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
self.__socket_list[i].sendall(bytes(js, 'utf-8'))
def main():
server = PyChattingServer()
server.start_session()
main()
客戶端的實現(xiàn)
import json
import threading
from socket import *
is_login = False
is_broadcast = True
class ClientReceiveThread(threading.Thread):
__buf = 1024
def __init__(self, cs):
super(ClientReceiveThread, self).__init__()
self.__cs = cs
def run(self):
self.receive_msg()
def receive_msg(self):
while True:
msg = self.__cs.recv(self.__buf).decode('utf-8')
if not msg:
break
js = json.loads(msg)
if js['type'] == "login":
if js['success']:
global is_login
is_login = True
print(js['msg'])
elif js['type'] == "ignore":
if js['success']:
global is_broadcast
if is_broadcast:
is_broadcast = False
else:
is_broadcast = True
print(js['msg'])
else:
if not is_broadcast:
print("[現(xiàn)在處于屏蔽模式]")
print(js['msg'])
class ClientSendMsgThread(threading.Thread):
def __init__(self, cs):
super(ClientSendMsgThread, self).__init__()
self.__cs = cs
def run(self):
self.send_msg()
# 根據(jù)不同的輸入格式來進(jìn)行不同的聊天方式
def send_msg(self):
while True:
js = None
msg = input()
if not is_login:
js = json.dumps({
'type': 'login',
'msg': msg
})
elif msg[0] == "@":
data = msg.split(' ')
if not data:
print("請重新輸入")
break
nickname = data[0]
nickname = nickname.strip("@")
if len(data) == 1:
data.append(" ")
js = json.dumps({
'type': 'sendto',
'nickname': nickname,
'msg': data[1]
})
elif msg == "/help":
js = json.dumps({
'type': 'help',
'msg': None
})
elif msg == "/ls":
js = json.dumps({
'type': 'ls',
'msg': None
})
elif msg == "/i":
js = json.dumps({
'type': 'ignore',
'msg': None
})
else:
if msg[0] != '/':
js = json.dumps({
'type': 'broadcast',
'msg': msg
})
if js is not None:
self.__cs.sendall(bytes(js, 'utf-8'))
def main():
buf = 1024
# 改變這個的地址,變成服務(wù)器的地址,那么只要部署到服務(wù)器上就可以全網(wǎng)使用了
address = ("127.0.0.1", 12231)
cs = socket(AF_INET, SOCK_STREAM, 0)
cs.connect(address)
data = cs.recv(buf).decode("utf-8")
if data:
print(data)
receive_thread = ClientReceiveThread(cs)
receive_thread.start()
send_thread = ClientSendMsgThread(cs)
send_thread.start()
while True:
pass
main()
這樣一個簡單的聊天室就建立了。
總結(jié)
在這個實現(xiàn)聊天室當(dāng)中,我使用的是json格式的字符串信息來編寫的協(xié)議,或許,也可以使用一些更加簡單的方式去實現(xiàn)
其實這個聊天室也就是一個最基本的socket編程的實現(xiàn)方案,也是一些屬于網(wǎng)絡(luò)方面的比較簡單的編寫吧
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
您可能感興趣的文章:- 利用Python實現(xiàn)定時程序的方法
- 基于Python+Pyqt5開發(fā)一個應(yīng)用程序
- Python如何實現(xiàn)的簡單購物車程序
- 結(jié)合Python網(wǎng)絡(luò)爬蟲做一個今日新聞小程序