olivebot/core/recorder.py
2026-01-03 22:24:02 +08:00

423 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#作用是音频录制对于aliyun asr来说边录制边stt但对于其他来说是先保存成文件再推送给asr模型通过实现子类的方式fay_booter.py 上有实现)来管理音频流的来源
import audioop
import math
import time
import threading
from abc import abstractmethod
from asr.ali_nls import ALiNls
from asr.funasr import FunASR
from core import wsa_server
from scheduler.thread_manager import MyThread
from utils import util
from utils import config_util as cfg
import numpy as np
import tempfile
import wave
from core import fay_core
from core import interact
# ===== 新增:用于前置唤醒词句首容错 =====
import re
import unicodedata
# 启动时间 (秒)
_ATTACK = 0.08 # ↓ 改小:让系统更早进入拾音,避免“唤醒词前半截被吃掉”
# 释放时间 (秒)
_RELEASE = 0.55 # ↓ 略微缩短,避免一句话被切成两段
# ===== 新增:前置唤醒词句首规范化与匹配 =====
_PUNCS = ",。!?!?,.、:;“”\"'()[]【】<>《》-—…" # 常见中文标点
_FILLER_PREFIX = ("", "", "", "", "", "", "那个", "就是", "然后") # 常见句首语气词ASR 很容易加)
def _norm_head(s: str) -> str:
"""只做句首容错:去不可见/空白/句首标点/句首语气词,不改变正文结构。"""
if not s:
return ""
s = unicodedata.normalize("NFKC", s).strip()
# 去掉开头空白
s = re.sub(r"^\s+", "", s)
# 去掉开头标点(可重复)
s = re.sub(r"^[{}]+".format(re.escape(_PUNCS)), "", s)
# 去掉句首常见语气词(允许多次叠加)
changed = True
while changed:
changed = False
for fp in _FILLER_PREFIX:
if s.startswith(fp):
s = s[len(fp):]
s = re.sub(r"^\s+", "", s)
s = re.sub(r"^[{}]+".format(re.escape(_PUNCS)), "", s)
changed = True
break
return s
def _front_wake_match(text: str, wake_words):
"""
前置唤醒词匹配(严格前置):
- 唤醒词必须在规范化后的最前面
- 不允许句中唤醒
"""
t = _norm_head(text)
for w in wake_words:
w = w.strip()
if not w:
continue
# 允许:唤醒词后面紧跟空格/标点/语气助词
# 例:"小橄榄,帮我..." "小橄榄啊 帮我..."
if t.startswith(w):
rest = t[len(w):] # 去掉唤醒词,得到真正的问题
# 去掉紧随其后的标点 / 空格 / 语气助词
rest = rest.lstrip(" \t\r\n" + _PUNCS)
rest = re.sub(r"^(啊|呀|呢|吧|哈|哎|诶|欸)\s*", "", rest)
rest = rest.lstrip(" \t\r\n" + _PUNCS)
return True, w, rest
return False, None, ""
class Recorder:
def __init__(self, fay):
self.__fay = fay
self.__running = True
self.__processing = False
self.__history_level = []
self.__history_data = []
self.__dynamic_threshold = 0.5 # 声音识别的音量阈值
self.__MAX_LEVEL = 25000
self.__MAX_BLOCK = 100
#Edit by xszyou in 20230516:增加本地asr
self.ASRMode = cfg.ASR_mode
self.__aLiNls = None
self.is_awake = False
self.wakeup_matched = False
if cfg.config['source']['wake_word_enabled']:
self.timer = threading.Timer(60, self.reset_wakeup_status) # 60秒后执行reset_wakeup_status方法
self.username = 'User' #默认用户,子类实现时会重写
self.channels = 1
self.sample_rate = 16000
self.is_reading = False
self.stream = None
def asrclient(self):
if self.ASRMode == "ali":
asrcli = ALiNls(self.username)
elif self.ASRMode == "funasr" or self.ASRMode == "sensevoice":
asrcli = FunASR(self.username)
return asrcli
def save_buffer_to_file(self, buffer):
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".wav", dir="cache_data")
wf = wave.open(temp_file.name, 'wb')
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(buffer)
wf.close()
return temp_file.name
def __get_history_average(self, number):
total = 0
num = 0
for i in range(len(self.__history_level) - 1, -1, -1):
level = self.__history_level[i]
total += level
num += 1
if num >= number:
break
return total / num
def __get_history_percentage(self, number):
return (self.__get_history_average(number) / self.__MAX_LEVEL) * 1.05 + 0.02
def reset_wakeup_status(self):
self.wakeup_matched = False
with fay_core.auto_play_lock:
fay_core.can_auto_play = True
def __waitingResult(self, iat: asrclient, audio_data):
self.processing = True
t = time.time()
tm = time.time()
if self.ASRMode == "funasr" or self.ASRMode == "sensevoice":
file_url = self.save_buffer_to_file(audio_data)
self.__aLiNls.send_url(file_url)
# return
# 等待结果返回
while not iat.done and time.time() - t < 1:
time.sleep(0.01)
text = iat.finalResults
util.printInfo(1, self.username, "语音处理完成! 耗时: {} ms".format(math.floor((time.time() - tm) * 1000)))
if len(text) > 0:
if cfg.config['source']['wake_word_enabled']:
#普通唤醒模式
if cfg.config['source']['wake_word_type'] == 'common':
if not self.wakeup_matched:
#唤醒词判断
wake_word = cfg.config['source']['wake_word']
wake_word_list = wake_word.split(',')
wake_up = False
for word in wake_word_list:
if word in text:
wake_up = True
if wake_up:
util.printInfo(1, self.username, "唤醒成功!")
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": "唤醒成功!", "Username" : self.username , 'robot': f'http://{cfg.fay_url}:5000/robot/Listening.jpg'})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': "唤醒成功!"}, 'Username' : self.username, 'robot': f'http://{cfg.fay_url}:5000/robot/Listening.jpg'}
wsa_server.get_instance().add_cmd(content)
self.wakeup_matched = True # 唤醒成功
with fay_core.auto_play_lock:
fay_core.can_auto_play = False
#self.on_speaking(text)
intt = interact.Interact("auto_play", 2, {'user': self.username, 'text': "在呢,你说?"})
self.__fay.on_interact(intt)
self.processing = False
self.timer.cancel() # 取消之前的计时器任务
else:
util.printInfo(1, self.username, "[!] 待唤醒!")
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": "[!] 待唤醒!", "Username" : self.username , 'robot': f'http://{cfg.fay_url}:5000/robot/Normal.jpg'})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': "[!] 待唤醒!"}, 'Username' : self.username, 'robot': f'http://{cfg.fay_url}:5000/robot/Normal.jpg'}
wsa_server.get_instance().add_cmd(content)
else:
self.on_speaking(text)
self.processing = False
self.timer.cancel() # 取消之前的计时器任务
self.timer = threading.Timer(60, self.reset_wakeup_status) # 重设计时器为60秒
self.timer.start()
# 前置唤醒词模式(严格前置,但句首做容错)
elif cfg.config['source']['wake_word_type'] == 'front':
# 读取配置的唤醒词(支持多个)
wake_word = cfg.config['source']['wake_word']
wake_word_list = [w.strip() for w in wake_word.split(',') if w.strip()]
matched, wake_up_word, question = _front_wake_match(text, wake_word_list)
if matched:
util.printInfo(1, self.username, "唤醒成功!")
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": "唤醒成功!", "Username": self.username,
'robot': f'http://{cfg.fay_url}:5000/robot/Listening.jpg'})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': "唤醒成功!"},
'Username': self.username,
'robot': f'http://{cfg.fay_url}:5000/robot/Listening.jpg'}
wsa_server.get_instance().add_cmd(content)
# 在识别到【前置唤醒词】后,发送“去掉唤醒词后的问题”
if question:
self.on_speaking(question)
else:
intt = interact.Interact("auto_play", 2, {'user': self.username, 'text': "在呢,你说?"})
self.__fay.on_interact(intt)
self.processing = False
else:
util.printInfo(1, self.username, "[!] 待唤醒!")
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": "[!] 待唤醒!", "Username": self.username,
'robot': f'http://{cfg.fay_url}:5000/robot/Normal.jpg'})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': "[!] 待唤醒!"},
'Username': self.username,
'robot': f'http://{cfg.fay_url}:5000/robot/Normal.jpg'}
wsa_server.get_instance().add_cmd(content)
self.processing = False
#非唤醒模式
else:
self.on_speaking(text)
self.processing = False
else:
#TODO 为什么这个设为False
# if self.wakeup_matched:
# self.wakeup_matched = False
self.processing = False
util.printInfo(1, self.username, "[!] 语音未检测到内容!")
self.dynamic_threshold = self.__get_history_percentage(30)
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": "", 'Username' : self.username, 'robot': f'http://{cfg.fay_url}:5000/robot/Normal.jpg'})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': ""}, 'Username' : self.username, 'robot': f'http://{cfg.fay_url}:5000/robot/Normal.jpg'}
wsa_server.get_instance().add_cmd(content)
def __record(self):
try:
stream = self.get_stream() #通过此方法的阻塞来让程序往下执行
except Exception as e:
print(e)
util.printInfo(1, self.username, "请检查设备是否有误,再重新启动!")
return
isSpeaking = False
last_mute_time = time.time()
last_speaking_time = time.time()
data = None
concatenated_audio = bytearray()
audio_data_list = []
while self.__running:
try:
cfg.load_config()
record = cfg.config['source']['record']
if not record['enabled'] and not self.is_remote:
time.sleep(1)
continue
self.is_reading = True
data = stream.read(1024, exception_on_overflow=False)
self.is_reading = False
except Exception as e:
data = None
print(e)
util.log(1, "请检查录音设备是否有误,再重新启动!")
self.__running = False
if not data:
continue
#是否可以拾音,不可以就掉弃录音
can_listen = True
if self.__fay.speaking == True:
# 只要数字人/面板在播放TTS就禁拾音避免把自己的声音识别成用户输入
can_listen = False
if can_listen == False:#掉弃录音
data = None
continue
#计算音量是否满足激活拾音
level = audioop.rms(data, 2)
if len(self.__history_data) >= 20:#保存激活前的音频,以免信息掉失
self.__history_data.pop(0)
if len(self.__history_level) >= 500:
self.__history_level.pop(0)
self.__history_data.append(data)
self.__history_level.append(level)
percentage = level / self.__MAX_LEVEL
history_percentage = self.__get_history_percentage(30)
# ===== 改进:阈值平滑变化,避免断句导致唤醒词被截断 =====
up_alpha = 0.01 # 环境变吵:慢慢升
down_alpha = 0.05 # 环境变安静:也不要瞬间掉
if history_percentage > self.__dynamic_threshold:
self.__dynamic_threshold += (history_percentage - self.__dynamic_threshold) * up_alpha
else:
self.__dynamic_threshold += (history_percentage - self.__dynamic_threshold) * down_alpha
# 给阈值一个下限,防止过度灵敏
self.__dynamic_threshold = max(self.__dynamic_threshold, 0.02)
#激活拾音
if percentage > self.__dynamic_threshold:
last_speaking_time = time.time()
if not self.__processing and not isSpeaking and time.time() - last_mute_time > _ATTACK:
isSpeaking = True #用户正在说话
util.printInfo(1, self.username,"聆听中...")
if wsa_server.get_web_instance().is_connected(self.username):
wsa_server.get_web_instance().add_cmd({"panelMsg": "聆听中...", 'Username' : self.username, 'robot': f'http://{cfg.fay_url}:5000/robot/Listening.jpg'})
if wsa_server.get_instance().is_connected(self.username):
content = {'Topic': 'Unreal', 'Data': {'Key': 'log', 'Value': "聆听中..."}, 'Username' : self.username, 'robot': f'http://{cfg.fay_url}:5000/robot/Listening.jpg'}
wsa_server.get_instance().add_cmd(content)
concatenated_audio.clear()
self.__aLiNls = self.asrclient()
try:
task_id = self.__aLiNls.start()
while not self.__aLiNls.started:
time.sleep(0.01)
except Exception as e:
print(e)
util.printInfo(1, self.username, "aliyun asr 连接受限")
for i in range(len(self.__history_data) - 1): #当前data在下面会做发送这里是发送激活前的音频数据以免漏掉信息
buf = self.__history_data[i]
audio_data_list.append(self.__process_audio_data(buf, self.channels))
if self.ASRMode == "ali":
self.__aLiNls.send(self.__process_audio_data(buf, self.channels).tobytes())
else:
concatenated_audio.extend(self.__process_audio_data(buf, self.channels).tobytes())
self.__history_data.clear()
else:#结束拾音
last_mute_time = time.time()
if isSpeaking:
if time.time() - last_speaking_time > _RELEASE: #TODO 更换的vad更靠谱
isSpeaking = False
self.__aLiNls.end()
util.printInfo(1, self.username, "语音处理中...")
self.__waitingResult(self.__aLiNls, concatenated_audio)
mono_data = self.__concatenate_audio_data(audio_data_list)
self.__save_audio_to_wav(mono_data, self.sample_rate, "cache_data/input.wav")
audio_data_list = []
#拾音中
if isSpeaking:
audio_data_list.append(self.__process_audio_data(data, self.channels))
if self.ASRMode == "ali":
self.__aLiNls.send(self.__process_audio_data(data, self.channels).tobytes())
else:
concatenated_audio.extend(self.__process_audio_data(data, self.channels).tobytes())
def __save_audio_to_wav(self, data, sample_rate, filename):
# 确保数据类型为 int16
if data.dtype != np.int16:
data = data.astype(np.int16)
# 打开 WAV 文件
with wave.open(filename, 'wb') as wf:
# 设置音频参数
n_channels = 1 # 单声道
sampwidth = 2 # 16 位音频,每个采样点 2 字节
wf.setnchannels(n_channels)
wf.setsampwidth(sampwidth)
wf.setframerate(sample_rate)
wf.writeframes(data.tobytes())
def __concatenate_audio_data(self, audio_data_list):
# 将累积的音频数据块连接起来
data = np.concatenate(audio_data_list)
return data
#转变为单声道np.int16
def __process_audio_data(self, data, channels):
data = bytearray(data)
# 将字节数据转换为 numpy 数组
data = np.frombuffer(data, dtype=np.int16)
# 重塑数组,将数据分离成多个声道
data = np.reshape(data, (-1, channels))
# 对所有声道的数据进行平均,生成单声道
mono_data = np.mean(data, axis=1).astype(np.int16)
return mono_data
def set_processing(self, processing):
self.__processing = processing
def start(self):
MyThread(target=self.__record).start()
def stop(self):
self.__running = False
@abstractmethod
def on_speaking(self, text):
pass
#TODO Edit by xszyou on 20230113:把流的获取方式封装出来方便实现麦克风录制及网络流等不同的流录制子类
@abstractmethod
def get_stream(self):
pass
@abstractmethod
def is_remote(self):
pass