olivebot/fay_booter.py

354 lines
13 KiB
Python
Raw Normal View History

#核心启动模块
import time
import re
import pyaudio
import socket
import sys
import asyncio
import requests
from core.interact import Interact
from core.recorder import Recorder
from core import fay_core
from scheduler.thread_manager import MyThread
from utils import util, config_util, stream_util
from core.wsa_server import MyServer
from core import wsa_server
from core import socket_bridge_service
feiFei: fay_core.FeiFei = None
recorderListener: Recorder = None
__running = False
deviceSocketServer = None
DeviceInputListenerDict = {}
ngrok = None
socket_service_instance = None
#启动状态
def is_running():
return __running
#录制麦克风音频输入并传给aliyun
class RecorderListener(Recorder):
def __init__(self, device, fei):
self.__device = device
self.__RATE = 16000
self.__FORMAT = pyaudio.paInt16
self.__running = False
self.username = 'User'
self.channels = 1
self.sample_rate = 16000
super().__init__(fei)
def on_speaking(self, text):
if len(text) > 1:
interact = Interact("mic", 1, {'user': 'User', 'msg': text})
util.printInfo(3, "语音", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
def get_stream(self):
try:
#是否录音的控制是放在recorder.py的这里的作用是防止没有麦克风的设备出错
while True:
record = config_util.config['source']['record']
if record['enabled']:
break
time.sleep(0.1)
self.paudio = pyaudio.PyAudio()
device_id = 0 # 或者根据需要选择其他设备
# 获取设备信息
device_info = self.paudio.get_device_info_by_index(device_id)
self.channels = device_info.get('maxInputChannels', 1) #很多麦克风只支持单声道录音
# self.sample_rate = int(device_info.get('defaultSampleRate', self.__RATE))
# 设置格式这里以16位深度为例
format = pyaudio.paInt16
# 打开音频流,使用设备的最大声道数和默认采样率
self.stream = self.paudio.open(
input_device_index=device_id,
rate=self.sample_rate,
format=format,
channels=self.channels,
input=True,
frames_per_buffer=4096
)
self.__running = True
MyThread(target=self.__pyaudio_clear).start()
except Exception as e:
print(f"Error: {e}")
time.sleep(10)
return self.stream
def __pyaudio_clear(self):
while self.__running:
time.sleep(30)
def stop(self):
super().stop()
self.__running = False
try:
while self.is_reading:
time.sleep(0.1)
self.stream.stop_stream()
self.stream.close()
self.paudio.terminate()
except Exception as e:
print(e)
util.log(1, "请检查设备是否有误,再重新启动!")
def is_remote(self):
return False
#Edit by xszyou on 20230113:录制远程设备音频输入并传给aliyun
class DeviceInputListener(Recorder):
def __init__(self, deviceConnector, fei):
super().__init__(fei)
self.__running = True
self.streamCache = None
self.thread = MyThread(target=self.run)
self.thread.start() #启动远程音频输入设备监听线程
self.username = 'User'
self.isOutput = True
self.deviceConnector = deviceConnector
def run(self):
#启动ngork
self.streamCache = stream_util.StreamCache(1024*1024*20)
addr = None
while self.__running:
try:
data = b""
while self.deviceConnector:
data = self.deviceConnector.recv(2048)
if b"<username>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<username>(.*?)</username>", data_str)
if match:
self.username = match.group(1)
else:
self.streamCache.write(data)
if b"<output>" in data:
data_str = data.decode("utf-8")
match = re.search(r"<output>(.*?)<output>", data_str)
if match:
self.isOutput = (match.group(1) == "True")
else:
self.streamCache.write(data)
if not b"<username>" in data and not b"<output>" in data:
self.streamCache.write(data)
time.sleep(0.005)
self.streamCache.clear()
except Exception as err:
pass
time.sleep(1)
def on_speaking(self, text):
global feiFei
if len(text) > 1:
interact = Interact("socket", 1, {"user": self.username, "msg": text, "socket": self.deviceConnector})
util.printInfo(3, "(" + self.username + ")远程音频输入", '{}'.format(interact.data["msg"]), time.time())
feiFei.on_interact(interact)
#recorder会等待stream不为空才开始录音
def get_stream(self):
while not self.deviceConnector:
time.sleep(1)
pass
return self.streamCache
def stop(self):
super().stop()
self.__running = False
def is_remote(self):
return True
#检查远程音频连接状态
def device_socket_keep_alive():
global DeviceInputListenerDict
while __running:
delkey = None
for key, value in DeviceInputListenerDict.items():
try:
value.deviceConnector.send(b'\xf0\xf1\xf2\xf3\xf4\xf5\xf6\xf7\xf8')#发送心跳包
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": True, "Username" : value.username})
except Exception as serr:
util.printInfo(3, value.username, "远程音频输入输出设备已经断开:{}".format(key))
value.stop()
delkey = key
break
if delkey:
value = DeviceInputListenerDict.pop(delkey)
if wsa_server.get_web_instance().is_connected(value.username):
wsa_server.get_web_instance().add_cmd({"remote_audio_connect": False, "Username" : value.username})
time.sleep(1)
#远程音频连接
def accept_audio_device_output_connect():
global deviceSocketServer
global __running
global DeviceInputListenerDict
deviceSocketServer = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
deviceSocketServer.bind(("0.0.0.0",10001))
deviceSocketServer.listen(1)
MyThread(target = device_socket_keep_alive).start() # 开启心跳包检测
addr = None
while __running:
try:
deviceConnector,addr = deviceSocketServer.accept() #接受TCP连接并返回新的套接字与IP地址
deviceInputListener = DeviceInputListener(deviceConnector, feiFei) # 设备音频输入输出麦克风
deviceInputListener.start()
#把DeviceInputListenner对象记录下来
peername = str(deviceConnector.getpeername()[0]) + ":" + str(deviceConnector.getpeername()[1])
DeviceInputListenerDict[peername] = deviceInputListener
util.log(1,"远程音频输入输出设备连接上:{}".format(addr))
except Exception as e:
pass
#数字人端请求获取最新的自动播放消息,若自动播放服务关闭会自动退出自动播放
def start_auto_play_service(): #TODO 评估一下有无优化的空间
url = f"{config_util.config['source']['automatic_player_url']}/get_auto_play_item"
user = "User" #TODO 临时固死了
is_auto_server_error = False
while __running:
if config_util.config['source']['wake_word_enabled'] and config_util.config['source']['wake_word_type'] == 'common' and recorderListener.wakeup_matched == True:
time.sleep(0.01)
continue
if is_auto_server_error:
util.printInfo(1, user, '60s后重连自动播放服务器')
time.sleep(60)
# 请求自动播放服务器
with fay_core.auto_play_lock:
if config_util.config['source']['automatic_player_status'] and config_util.config['source']['automatic_player_url'] is not None and fay_core.can_auto_play == True and (config_util.config["interact"]["playSound"] or wsa_server.get_instance().is_connected(user)):
fay_core.can_auto_play = False
post_data = {"user": user}
try:
response = requests.post(url, json=post_data, timeout=5)
if response.status_code == 200:
is_auto_server_error = False
data = response.json()
audio_url = data.get('audio')
if not audio_url or audio_url.strip()[0:4] != "http":
audio_url = None
response_text = data.get('text')
timestamp = data.get('timestamp')
interact = Interact("auto_play", 2, {'user': user, 'text': response_text, 'audio': audio_url})
util.printInfo(1, user, '自动播放:{}{}'.format(response_text, audio_url), time.time())
feiFei.on_interact(interact)
else:
is_auto_server_error = True
fay_core.can_auto_play = True
util.printInfo(1, user, '请求自动播放服务器失败,错误代码是:{}'.format(response.status_code))
except requests.exceptions.RequestException as e:
is_auto_server_error = True
fay_core.can_auto_play = True
util.printInfo(1, user, '请求自动播放服务器失败,错误信息是:{}'.format(e))
time.sleep(0.01)
#停止服务
def stop():
global feiFei
global recorderListener
global __running
global DeviceInputListenerDict
global ngrok
global socket_service_instance
global deviceSocketServer
util.log(1, '正在关闭服务...')
__running = False
if recorderListener is not None:
util.log(1, '正在关闭录音服务...')
recorderListener.stop()
time.sleep(0.1)
util.log(1, '正在关闭远程音频输入输出服务...')
try:
if len(DeviceInputListenerDict) > 0:
for key in list(DeviceInputListenerDict.keys()):
value = DeviceInputListenerDict.pop(key)
value.stop()
deviceSocketServer.close()
if socket_service_instance is not None:
socket_service_instance.stop_server()
socket_service_instance = None
except:
pass
util.log(1, '正在关闭核心服务...')
feiFei.stop()
util.log(1, '服务已关闭!')
#开启服务
def start():
global feiFei
global recorderListener
global __running
global socket_service_instance
util.log(1, '开启服务...')
__running = True
#读取配置
util.log(1, '读取配置...')
config_util.load_config()
#开启核心服务
util.log(1, '开启核心服务...')
feiFei = fay_core.FeiFei()
feiFei.start()
#加载本地知识库
if config_util.key_chat_module == 'langchain':
from llm import nlp_langchain
nlp_langchain.save_all()
if config_util.key_chat_module == 'privategpt':
from llm import nlp_privategpt
nlp_privategpt.save_all()
#开启录音服务
record = config_util.config['source']['record']
if record['enabled']:
util.log(1, '开启录音服务...')
recorderListener = RecorderListener(record['device'], feiFei) # 监听麦克风
recorderListener.start()
#启动声音沟通接口服务
util.log(1,'启动声音沟通接口服务...')
deviceSocketThread = MyThread(target=accept_audio_device_output_connect)
deviceSocketThread.start()
socket_service_instance = socket_bridge_service.new_instance()
socket_bridge_service_Thread = MyThread(target=socket_service_instance.start_service)
socket_bridge_service_Thread.start()
#启动自动播放服务
util.log(1,'启动自动播放服务...')
MyThread(target=start_auto_play_service).start()
util.log(1, '服务启动完成!')
util.log(1, '使用 \'help\' 获取帮助.')
if __name__ == '__main__':
ws_server: MyServer = None
feiFei: fay_core.FeiFei = None
recorderListener: Recorder = None
start()