olivebot/core/wsa_server.py
guo zebin 4cfad5ae0f 年翻更新
- 全新ui
- 全面优化websocket逻辑,提高数字人和ui连接的稳定性及资源开销
- 全面优化唤醒逻辑,提供稳定的普通唤醒模式和前置词唤醒模式
- 优化拾音质量,支持多声道麦克风拾音
- 优化自动播放服务器的对接机制,提供稳定和兼容旧版ue工程的对接模式
- 数字人接口输出机器人表情,以适应新fay ui及单片机的数字人表情输出
- 使用更高级的音频时长计算方式,可以更精准控制音频播放完成后的逻辑
- 修复点击关闭按钮会导致程序退出的bug
- 修复没有麦克风的设备开启麦克风会出错的问题
- 为服务器主机地址提供配置项,以方便服务器部署
2024-10-26 11:34:55 +08:00

289 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from asyncio import AbstractEventLoop
import websockets
import asyncio
import json
from abc import abstractmethod
from websockets.legacy.server import Serve
from utils import util
from scheduler.thread_manager import MyThread
class MyServer:
def __init__(self, host='0.0.0.0', port=10000):
self.lock = asyncio.Lock()
self.__host = host # ip
self.__port = port # 端口号
self.__listCmd = [] # 要发送的信息的列表
self.__clients = list()
self.__server: Serve = None
self.__event_loop: AbstractEventLoop = None
self.__running = True
self.__pending = None
self.isConnect = False
self.TIMEOUT = 3 # 设置任何超时时间为 3 秒
self.__tasks = {} # 记录任务和开始时间的字典
# 接收处理
async def __consumer_handler(self, websocket, path):
username = None
try:
async for message in websocket:
await asyncio.sleep(0.01)
try:
username = json.loads(message).get("Username")
except json.JSONDecodeError as e:#json格式有误不处理
pass
if username:
remote_address = websocket.remote_address
unique_id = f"{remote_address[0]}:{remote_address[1]}"
async with self.lock:
for i in range(len(self.__clients)):
if self.__clients[i]["id"] == unique_id:
self.__clients[i]["username"] = username
await self.__consumer(message)
except websockets.exceptions.ConnectionClosedError as e:
# 从客户端列表中移除已断开的连接
await self.remove_client(websocket)
util.printInfo(1, "User" if username is None else username, f"WebSocket 连接关闭: {e}")
# 发送处理
async def __producer_handler(self, websocket, path):
while self.__running:
await asyncio.sleep(0.01)
if len(self.__listCmd) > 0:
message = await self.__producer()
if message:
username = json.loads(message).get("Username")
if username is None:
# 群发消息
async with self.lock:
wsclients = [c["websocket"] for c in self.__clients]
tasks = [self.send_message_with_timeout(client, message, username, timeout=3) for client in wsclients]
await asyncio.gather(*tasks)
else:
# 向指定用户发送消息
async with self.lock:
target_clients = [c["websocket"] for c in self.__clients if c.get("username") == username]
tasks = [self.send_message_with_timeout(client, message, username, timeout=3) for client in target_clients]
await asyncio.gather(*tasks)
# 发送消息(设置超时)
async def send_message_with_timeout(self, client, message, username, timeout=3):
try:
await asyncio.wait_for(self.send_message(client, message, username), timeout=timeout)
except asyncio.TimeoutError:
util.printInfo(1, "User" if username is None else username, f"发送消息超时: 用户名 {username}")
except websockets.exceptions.ConnectionClosed as e:
# 从客户端列表中移除已断开的连接
await self.remove_client(client)
util.printInfo(1, "User" if username is None else username, f"WebSocket 连接关闭: {e}")
# 发送消息
async def send_message(self, client, message, username):
try:
await client.send(message)
except websockets.exceptions.ConnectionClosed as e:
# 从客户端列表中移除已断开的连接
await self.remove_client(client)
util.printInfo(1, "User" if username is None else username, f"WebSocket 连接关闭: {e}")
async def __handler(self, websocket, path):
self.isConnect = True
util.log(1,"websocket连接上:{}".format(self.__port))
self.on_connect_handler()
remote_address = websocket.remote_address
unique_id = f"{remote_address[0]}:{remote_address[1]}"
async with self.lock:
self.__clients.append({"id" : unique_id, "websocket" : websocket, "username" : "User"})
consumer_task = asyncio.create_task(self.__consumer_handler(websocket, path))#接收
producer_task = asyncio.create_task(self.__producer_handler(websocket, path))#发送
done, self.__pending = await asyncio.wait([consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in self.__pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# 从客户端列表中移除已断开的连接
await self.remove_client(websocket)
util.log(1, "websocket连接断开:{}".format(unique_id))
async def __consumer(self, message):
self.on_revice_handler(message)
async def __producer(self):
if len(self.__listCmd) > 0:
message = self.on_send_handler(self.__listCmd.pop(0))
return message
else:
return None
async def remove_client(self, websocket):
async with self.lock:
self.__clients = [c for c in self.__clients if c["websocket"] != websocket]
if len(self.__clients) == 0:
self.isConnect = False
self.on_close_handler()
def is_connected(self, username):
if username is None:
username = "User"
if len(self.__clients) == 0:
return False
clients = [c for c in self.__clients if c["username"] == username]
if len(clients) > 0:
return True
return False
#Edit by xszyou on 20230113:通过继承此类来实现服务端的接收后处理逻辑
@abstractmethod
def on_revice_handler(self, message):
pass
#Edit by xszyou on 20230114:通过继承此类来实现服务端的连接处理逻辑
@abstractmethod
def on_connect_handler(self):
pass
#Edit by xszyou on 20230804:通过继承此类来实现服务端的发送前的处理逻辑
@abstractmethod
def on_send_handler(self, message):
return message
#Edit by xszyou on 20230816:通过继承此类来实现服务端的断开后的处理逻辑
@abstractmethod
def on_close_handler(self):
pass
# 创建server
def __connect(self):
self.__event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.__event_loop)
self.__isExecute = True
if self.__server:
util.log(1, 'server already exist')
return
self.__server = websockets.serve(self.__handler, self.__host, self.__port)
asyncio.get_event_loop().run_until_complete(self.__server)
asyncio.get_event_loop().run_forever()
# 往要发送的命令列表中,添加命令
def add_cmd(self, content):
if not self.__running:
return
jsonStr = json.dumps(content)
self.__listCmd.append(jsonStr)
# util.log('命令 {}'.format(content))
# 开启服务
def start_server(self):
MyThread(target=self.__connect).start()
# 关闭服务
def stop_server(self):
self.__running = False
self.isConnect = False
if self.__server is None:
return
self.__server.close()
self.__server = None
self.__clients = []
util.log(1, "WebSocket server stopped.")
#ui端server
class WebServer(MyServer):
def __init__(self, host='0.0.0.0', port=10003):
super().__init__(host, port)
def on_revice_handler(self, message):
pass
def on_connect_handler(self):
self.add_cmd({"panelMsg": "使用提示Fay可以独立使用启动数字人将自动对接。"})
def on_send_handler(self, message):
return message
def on_close_handler(self):
pass
#数字人端server
class HumanServer(MyServer):
def __init__(self, host='0.0.0.0', port=10002):
super().__init__(host, port)
def on_revice_handler(self, message):
pass
def on_connect_handler(self):
web_server_instance = get_web_instance()
web_server_instance.add_cmd({"is_connect": self.isConnect})
def on_send_handler(self, message):
# util.log(1, '向human发送 {}'.format(message))
if not self.isConnect:
return None
return message
def on_close_handler(self):
web_server_instance = get_web_instance()
web_server_instance.add_cmd({"is_connect": self.isConnect})
#测试
class TestServer(MyServer):
def __init__(self, host='0.0.0.0', port=10000):
super().__init__(host, port)
def on_revice_handler(self, message):
print(message)
def on_connect_handler(self):
print("连接上了")
def on_send_handler(self, message):
return message
def on_close_handler(self):
pass
#单例
__instance: MyServer = None
__web_instance: MyServer = None
def new_instance(host='0.0.0.0', port=10002) -> MyServer:
global __instance
if __instance is None:
__instance = HumanServer(host, port)
return __instance
def new_web_instance(host='0.0.0.0', port=10003) -> MyServer:
global __web_instance
if __web_instance is None:
__web_instance = WebServer(host, port)
return __web_instance
def get_instance() -> MyServer:
return __instance
def get_web_instance() -> MyServer:
return __web_instance
if __name__ == '__main__':
testServer = TestServer(host='0.0.0.0', port=10000)
testServer.start_server()