diff --git a/asr/ali_nls.py b/asr/ali_nls.py index a41b8f4..df072ca 100644 --- a/asr/ali_nls.py +++ b/asr/ali_nls.py @@ -58,6 +58,7 @@ class ALiNls: self.__URL = 'wss://nls-gateway-cn-shenzhen.aliyuncs.com/ws/v1' self.__ws = None self.__frames = [] + self.started = False self.__closing = False self.__task_id = '' self.done = False @@ -86,6 +87,8 @@ class ALiNls: data = json.loads(message) header = data['header'] name = header['name'] + if name == 'TranscriptionStarted': + self.started = True if name == 'SentenceEnd': self.done = True self.finalResults = data['payload']['result'] diff --git a/asr/funasr.py b/asr/funasr.py index 7f81d38..6e69732 100644 --- a/asr/funasr.py +++ b/asr/funasr.py @@ -27,6 +27,7 @@ class FunASR: self.__reconnect_delay = 1 self.__reconnecting = False self.username = username + self.started = True # 收到websocket消息的处理 diff --git a/core/content_db.py b/core/content_db.py index 313d2a6..9e18323 100644 --- a/core/content_db.py +++ b/core/content_db.py @@ -3,12 +3,13 @@ import time import threading import functools from utils import util + def synchronized(func): - @functools.wraps(func) - def wrapper(self, *args, **kwargs): - with self.lock: - return func(self, *args, **kwargs) - return wrapper + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + with self.lock: + return func(self, *args, **kwargs) + return wrapper __content_tb = None def new_instance(): @@ -18,73 +19,122 @@ def new_instance(): __content_tb.init_db() return __content_tb - class Content_Db: def __init__(self) -> None: self.lock = threading.Lock() - - - #初始化 + # 初始化数据库 def init_db(self): conn = sqlite3.connect('fay.db') c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS T_Msg - (id INTEGER PRIMARY KEY autoincrement, - type char(10), - way char(10), - content TEXT NOT NULL, - createtime Int, - username TEXT DEFAULT 'User', - uid Int);''') + (id INTEGER PRIMARY KEY AUTOINCREMENT, + type CHAR(10), + way CHAR(10), + content TEXT NOT NULL, + createtime INT, + username TEXT DEFAULT 'User', + uid INT);''') + # 对话采纳记录表 + c.execute('''CREATE TABLE IF NOT EXISTS T_Adopted + (id INTEGER PRIMARY KEY AUTOINCREMENT, + msg_id INTEGER UNIQUE, + adopted_time INT, + FOREIGN KEY(msg_id) REFERENCES T_Msg(id));''') conn.commit() conn.close() - - - - #添加对话 + # 添加对话 @synchronized - def add_content(self,type,way,content,username='User',uid=0): + def add_content(self, type, way, content, username='User', uid=0): conn = sqlite3.connect("fay.db") cur = conn.cursor() try: - cur.execute("insert into T_Msg (type,way,content,createtime,username,uid) values (?,?,?,?,?,?)",(type, way, content, time.time(), username, uid)) + cur.execute("INSERT INTO T_Msg (type, way, content, createtime, username, uid) VALUES (?, ?, ?, ?, ?, ?)", + (type, way, content, int(time.time()), username, uid)) conn.commit() - except: - util.log(1, "请检查参数是否有误") - conn.close() - return 0 + last_id = cur.lastrowid + except Exception as e: + util.log(1, "请检查参数是否有误: {}".format(e)) + conn.close() + return 0 conn.close() - return cur.lastrowid - - + return last_id - #获取对话内容 + # 根据ID查询对话记录 @synchronized - def get_list(self,way,order,limit,uid=0): + def get_content_by_id(self, msg_id): + conn = sqlite3.connect("fay.db") + cur = conn.cursor() + cur.execute("SELECT * FROM T_Msg WHERE id = ?", (msg_id,)) + record = cur.fetchone() + conn.close() + return record + + # 添加对话采纳记录 + @synchronized + def adopted_message(self, msg_id): + conn = sqlite3.connect('fay.db') + cur = conn.cursor() + # 检查消息ID是否存在 + cur.execute("SELECT 1 FROM T_Msg WHERE id = ?", (msg_id,)) + if cur.fetchone() is None: + util.log(1, "消息ID不存在") + conn.close() + return False + try: + cur.execute("INSERT INTO T_Adopted (msg_id, adopted_time) VALUES (?, ?)", (msg_id, int(time.time()))) + conn.commit() + except sqlite3.IntegrityError: + util.log(1, "该消息已被采纳") + conn.close() + return False + conn.close() + return True + + # 获取对话内容 + @synchronized + def get_list(self, way, order, limit, uid=0): conn = sqlite3.connect("fay.db") cur = conn.cursor() where_uid = "" if int(uid) != 0: - where_uid = f" and uid = {uid} " - if(way == 'all'): - cur.execute("select type,way,content,createtime,datetime(createtime, 'unixepoch', 'localtime') as timetext,username from T_Msg where 1 "+where_uid+" order by id "+order+" limit ?",(limit,)) - elif(way == 'notappended'): - cur.execute("select type,way,content,createtime,datetime(createtime, 'unixepoch', 'localtime') as timetext,username from T_Msg where way != 'appended' "+where_uid+" order by id "+order+" limit ?",(limit,)) + where_uid = f" AND T_Msg.uid = {uid} " + base_query = f""" + SELECT T_Msg.type, T_Msg.way, T_Msg.content, T_Msg.createtime, + datetime(T_Msg.createtime, 'unixepoch', 'localtime') AS timetext, + T_Msg.username,T_Msg.id, + CASE WHEN T_Adopted.msg_id IS NOT NULL THEN 1 ELSE 0 END AS is_adopted + FROM T_Msg + LEFT JOIN T_Adopted ON T_Msg.id = T_Adopted.msg_id + WHERE 1 {where_uid} + """ + if way == 'all': + query = base_query + f" ORDER BY T_Msg.id {order} LIMIT ?" + cur.execute(query, (limit,)) + elif way == 'notappended': + query = base_query + f" AND T_Msg.way != 'appended' ORDER BY T_Msg.id {order} LIMIT ?" + cur.execute(query, (limit,)) else: - cur.execute("select type,way,content,createtime,datetime(createtime, 'unixepoch', 'localtime') as timetext,username from T_Msg where way = ? "+where_uid+" order by id "+order+" limit ?",(way,limit,)) - + query = base_query + f" AND T_Msg.way = ? ORDER BY T_Msg.id {order} LIMIT ?" + cur.execute(query, (way, limit)) list = cur.fetchall() conn.close() return list + - - - - - - - - + @synchronized + def get_previous_user_message(self, msg_id): + conn = sqlite3.connect("fay.db") + cur = conn.cursor() + cur.execute(""" + SELECT id, type, way, content, createtime, datetime(createtime, 'unixepoch', 'localtime') AS timetext, username + FROM T_Msg + WHERE id < ? AND type != 'fay' + ORDER BY id DESC + LIMIT 1 + """, (msg_id,)) + record = cur.fetchone() + conn.close() + return record diff --git a/core/recorder.py b/core/recorder.py index 82d0b0e..d7c0346 100644 --- a/core/recorder.py +++ b/core/recorder.py @@ -202,7 +202,7 @@ class Recorder: while self.__running: try: record = cfg.config['source']['record'] - if not record['enabled']: + if not record['enabled'] and not self.is_remote: time.sleep(0.1) continue self.is_reading = True @@ -215,7 +215,6 @@ class Recorder: self.__running = False if not data: continue - #是否可以拾音,不可以就掉弃录音 can_listen = True #没有开唤醒,但面板或数字人正在播音时不能拾音 @@ -229,7 +228,7 @@ class Recorder: if can_listen == False:#掉弃录音 data = None continue - + #计算音量是否满足激活拾音 level = audioop.rms(data, 2) if len(self.__history_data) >= 10:#保存激活前的音频,以免信息掉失 @@ -245,9 +244,11 @@ class Recorder: elif history_percentage < self.__dynamic_threshold: self.__dynamic_threshold += (history_percentage - self.__dynamic_threshold) * 1 + #激活拾音 if percentage > self.__dynamic_threshold: last_speaking_time = time.time() + if not self.__processing and not isSpeaking and time.time() - last_mute_time > _ATTACK: isSpeaking = True #用户正在说话 util.printInfo(1, self.username,"聆听中...") @@ -259,7 +260,9 @@ class Recorder: concatenated_audio.clear() self.__aLiNls = self.asrclient() try: - self.__aLiNls.start() + task_id = self.__aLiNls.start() + while not self.__aLiNls.started: + time.sleep(0.01) except Exception as e: print(e) util.printInfo(1, self.username, "aliyun asr 连接受限") diff --git a/core/socket_bridge_service.py b/core/socket_bridge_service.py new file mode 100644 index 0000000..a4bb65c --- /dev/null +++ b/core/socket_bridge_service.py @@ -0,0 +1,117 @@ +import asyncio +import websockets +import socket +import threading +import time + +__wss = None + +def new_instance(): + global __wss + if __wss is None: + __wss = SocketBridgeService() + return __wss + +class SocketBridgeService: + def __init__(self): + self.websockets = {} + self.sockets = {} + self.message_queue = asyncio.Queue() + self.running = True + self.server = None + self.event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.event_loop) + + async def handler(self, websocket, path): + ws_id = id(websocket) + self.websockets[ws_id] = websocket + try: + if ws_id not in self.sockets: + self.sockets[ws_id] = await self.create_socket_client() + asyncio.create_task(self.receive_from_socket(ws_id)) + async for message in websocket: + await self.send_to_socket(ws_id, message) + except websockets.ConnectionClosed: + pass + finally: + self.close_socket_client(ws_id) + + async def create_socket_client(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('127.0.0.1', 10001)) + return sock + + async def send_to_socket(self, ws_id, message): + sock = self.sockets.get(ws_id) + if sock: + asyncio.create_task(self.socket_send(sock, message)) + + async def socket_send(self, sock, message): + await asyncio.to_thread(sock.sendall, message) + + async def receive_from_socket(self, ws_id): + sock = self.sockets.get(ws_id) + while True: + data = await asyncio.to_thread(sock.recv, 1024) + if data: + await self.message_queue.put((ws_id, data)) + + async def process_message_queue(self): + while True: + if not self.running: + break + ws_id, data = await self.message_queue.get() + websocket = self.websockets.get(ws_id) + if websocket.open: + await websocket.send(data) + self.message_queue.task_done() + + def close_socket_client(self, ws_id): + sock = self.sockets.pop(ws_id, None) + if sock: + sock.close() + + async def start(self, host='0.0.0.0', port=9001): + self.server = await websockets.serve(self.handler, host, port, loop=self.event_loop) + asyncio.create_task(self.process_message_queue()) + await asyncio.Future() + + async def shutdown(self): + self.running = False + if self.server: + for ws in self.websockets.values(): + await ws.close() + if hasattr(self.server, 'close'): + self.server.close() + await asyncio.gather(*[w.wait_closed() for w in self.websockets.values()]) + for sock in self.sockets.values(): + sock.close() + if self.server: + await self.server.wait_closed() + + def stop_server(self): + self.event_loop.call_soon_threadsafe(self.shutdown) + self.event_loop.run_until_complete(self.shutdown()) + self.event_loop.close() + + def start_service(self): + self.event_loop.run_until_complete(self.start(host='0.0.0.0', port=9001)) + try: + self.event_loop.run_forever() + except KeyboardInterrupt: + pass + finally: + self.stop_server() + +if __name__ == '__main__': + service = new_instance() + service_thread = threading.Thread(target=service.start_service) + service_thread.start() + + # 等待一些时间或者直到收到停止信号 + try: + while service.running: + time.sleep(1) + except KeyboardInterrupt: + service.stop_server() + service_thread.join() \ No newline at end of file diff --git a/fay_booter.py b/fay_booter.py index b443b90..a63bf00 100644 --- a/fay_booter.py +++ b/fay_booter.py @@ -5,6 +5,7 @@ import pyaudio import socket import psutil import sys +import asyncio import requests from core.interact import Interact from core.recorder import Recorder @@ -14,6 +15,7 @@ from utils import util, config_util, stream_util from core.wsa_server import MyServer from scheduler.thread_manager import MyThread from core import wsa_server +from core import socket_bridge_service feiFei: fay_core.FeiFei = None recorderListener: Recorder = None @@ -21,6 +23,8 @@ __running = False deviceSocketServer = None DeviceInputListenerDict = {} ngrok = None +socket_service_instance = None + #启动状态 def is_running(): @@ -324,6 +328,7 @@ def stop(): global __running global DeviceInputListenerDict global ngrok + global socket_service_instance util.log(1, '正在关闭服务...') __running = False @@ -337,6 +342,9 @@ def stop(): value = DeviceInputListenerDict.pop(key) value.stop() deviceSocketServer.close() + if socket_service_instance is not None: + future = asyncio.run_coroutine_threadsafe(socket_service_instance.shutdown(), socket_service_instance.loop) + future.result() util.log(1, '正在关闭核心服务...') feiFei.stop() util.log(1, '服务已关闭!') @@ -347,6 +355,7 @@ def start(): global feiFei global recorderListener global __running + global socket_service_instance util.log(1, '开启服务...') __running = True @@ -379,6 +388,10 @@ def start(): deviceSocketThread = MyThread(target=accept_audio_device_output_connect) deviceSocketThread.start() + socket_service_instance = socket_bridge_service.new_instance() + socket_bridge_service_Thread = MyThread(target=socket_service_instance.start_service) + socket_bridge_service_Thread.start() + #启动自动播放服务 util.log(1,'启动自动播放服务...') MyThread(target=start_auto_play_service).start() diff --git a/gui/flask_server.py b/gui/flask_server.py index f122f24..d72698b 100644 --- a/gui/flask_server.py +++ b/gui/flask_server.py @@ -22,7 +22,7 @@ from core.interact import Interact from core import member_db import fay_booter from flask_httpauth import HTTPBasicAuth - +from core import qa_service __app = Flask(__name__) auth = HTTPBasicAuth() @@ -233,7 +233,7 @@ def api_get_Msg(): i = len(list)-1 while i >= 0: timetext = datetime.datetime.fromtimestamp(list[i][3]).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] - relist.append(dict(type=list[i][0], way=list[i][1], content=list[i][2], createtime=list[i][3], timetext=timetext, username=list[i][5])) + relist.append(dict(type=list[i][0], way=list[i][1], content=list[i][2], createtime=list[i][3], timetext=timetext, username=list[i][5], id=list[i][6], is_adopted=list[i][7])) i -= 1 if fay_booter.is_running(): wsa_server.get_web_instance().add_cmd({"liveState": 1}) @@ -278,6 +278,32 @@ def api_get_run_status(): return json.dumps({'status': status}) +@__app.route('/api/adopt_msg', methods=['POST']) +def adopt_msg(): + data = request.get_json() + if not data: + return jsonify({'status':'error', 'msg': '未提供数据'}) + + id = data.get('id') + + if not id: + return jsonify({'status':'error', 'msg': 'id不能为空'}) + + info = content_db.new_instance().get_content_by_id(id) + content = info[3] + if info is not None: + previous_info = content_db.new_instance().get_previous_user_message(id) + previous_content = previous_info[3] + result = content_db.new_instance().adopted_message(id) + if result: + qa_service.QAService().record_qapair(previous_content, content) + return jsonify({'status': 'success', 'msg': '采纳成功'}) + else: + return jsonify({'status':'error', 'msg': '采纳失败'}) + else: + return jsonify({'status':'error', 'msg': '采纳失败'}) + + def stream_response(text): def generate(): for chunk in text_chunks(text): diff --git a/gui/static/css/index.css b/gui/static/css/index.css index fb9aba7..aac541c 100644 --- a/gui/static/css/index.css +++ b/gui/static/css/index.css @@ -204,19 +204,23 @@ html { .Userchange{ background-color: #FFFFFF; - height: 40px; - font-size: 12px; + height: 40px; + font-size: 12px; + border-top: 1px solid #bed1fc; + width: 1358px; /* 设置菜单容器的宽度,可根据实际情况调整 */ + overflow: hidden; /* 隐藏超出容器的内容 */ + position: relative; } .inputmessage{ - margin-left:15% ; + margin-left:290px ; width: 760px; background: #f9fbff; border-radius: 70px; height: 73px; box-shadow: -10px 0 15px rgba(0, 0, 0, 0.1); position: absolute; - top: 70%; + top: 675px; z-index: 2; } @@ -276,4 +280,87 @@ html { .tag.selected { background-color: #f4f7ff; color: #0064fb; - } \ No newline at end of file + } + #prevButton{background-color: #FFFFFF; border: none; + z-index: 1; + position: absolute; + top: 50%; + transform: translateY(-50%); + } + + #nextButton {background-color: #FFFFFF; border: none; + position: absolute; + top: 50%; + transform: translateY(-50%); + } + + #prevButton { + left: 0; + } + + #nextButton { + right: 0; + } + + .menu-container { + width: 800px; /* 设置菜单容器的宽度,可根据实际情况调整 */ + overflow: hidden; /* 隐藏超出容器的内容 */ + position: relative; + } + + .menu li { + margin-right: 20px; /* 菜单项之间的间距,可调整 */ + } + + .menu li a { + text-decoration: none; + color: black; + } + + .menu { background-color: #FFFFFF; + /* display: flex; */ + white-space: nowrap; + display: flex; + transition: transform 0.3s ease; /* 添加过渡效果,使滑动更平滑 */ + list-style: none; + padding: 0 50px 0 50px; + margin: 0; + display: flex; + transition: transform 0.3s ease; /* 添加过渡效果,使滑动更平滑 */ + } + + .adopt{border: none;background: none;} + + .what-time{vertical-align:top;line-height: 25px;} + + .answer-container { + border: 1px solid #ccc; + padding: 10px; + margin: 10px; + background-color: #f9f9f9; + } + + .adopt-button { + display: inline-block; + cursor: pointer; + position: relative; + } + + .adopt-button img { + width: 21px; + height: 21px; + display: block; + } + + .adopt-button:hover::after { + content: "采纳"; + position: absolute; + top: -30px; + left: 0; + background-color: #000; + color: #fff; + padding: 5px 10px; + border-radius: 4px; + font-size: 12px; + white-space: nowrap; + } \ No newline at end of file diff --git a/gui/static/images/adopt.png b/gui/static/images/adopt.png new file mode 100644 index 0000000..33ef734 Binary files /dev/null and b/gui/static/images/adopt.png differ diff --git a/gui/static/images/adopted.png b/gui/static/images/adopted.png new file mode 100644 index 0000000..8f8c5e8 Binary files /dev/null and b/gui/static/images/adopted.png differ diff --git a/gui/static/js/index.js b/gui/static/js/index.js index eb9f7f9..70a5a8a 100644 --- a/gui/static/js/index.js +++ b/gui/static/js/index.js @@ -186,7 +186,8 @@ class FayInterface { username: data.panelReply.username, content: data.panelReply.content, type: data.panelReply.type, - timetext: this.getTime() + timetext: this.getTime(), + is_adopted:0 }); vueInstance.$nextTick(() => { const chatContainer = vueInstance.$el.querySelector('.chatmessage'); @@ -308,7 +309,7 @@ class FayInterface { selectUser(user) { this.selectedUser = user; this.fayService.websocket.send(JSON.stringify({ "Username": user[1] })); - this.loadMessageHistory(user[1]); + this.loadMessageHistory(user[1], 'common'); }, startLive() { this.liveState = 2 @@ -323,11 +324,11 @@ class FayInterface { }); }, - loadMessageHistory(username) { + loadMessageHistory(username, type) { this.fayService.getMessageHistory(username).then((response) => { if (response) { this.messages = response; - console.log(this.messages); + if(type == 'common'){ this.$nextTick(() => { const chatContainer = this.$el.querySelector('.chatmessage'); if (chatContainer) { @@ -335,6 +336,7 @@ class FayInterface { } }); } + } }); }, sendSuccessMsg(message) { @@ -344,7 +346,37 @@ class FayInterface { type: 'success', }); -} +} , +adoptText(id) { + // 调用采纳接口 + this.fayService.fetchData(`${this.base_url}/api/adopt_msg`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ id }) // 发送采纳请求 + }) + .then((response) => { + if (response && response.status === 'success') { + // 处理成功的响应 + this.$notify({ + title: '成功', + message: response.msg, // 显示成功消息 + type: 'success', + }); + + this.loadMessageHistory(this.selectedUser[1], 'adopt'); + } else { + // 处理失败的响应 + this.$notify({ + title: '失败', + message: response ? response.msg : '请求失败', + type: 'error', + }); + } + }) + +}, } }); \ No newline at end of file diff --git a/gui/templates/index.html b/gui/templates/index.html index 13a95aa..35b4687 100644 --- a/gui/templates/index.html +++ b/gui/templates/index.html @@ -40,7 +40,15 @@ 接收者头像
[[item.content]]
-
[[item.timetext]]
+
[[item.timetext]] +
+ 采纳图标 +
+
+ 采纳图标 +
+ +