olivebot/core/fay_core.py
xszyou bc2cbba0d5 更新微信群二维码
1、更新微信群二维码
2、人设提问加入关键字“你是谁”
2022-10-21 10:43:36 +08:00

505 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import difflib
import math
import os
import random
import time
import wave
import eyed3
from openpyxl import load_workbook
# 适应模型使用
import numpy as np
# import tensorflow as tf
from ai_module import xf_aiui
from ai_module import xf_ltp
from ai_module.ms_tts_sdk import Speech
from core import wsa_server, tts_voice
from core.tts_voice import EnumVoice
from scheduler.thread_manager import MyThread
from utils import util, storer, config_util
import pygame
class FeiFei:
def __init__(self):
pygame.init()
self.q_msg = '你叫什么名字?'
self.a_msg = 'hi,我叫菲菲英文名是fay'
self.mood = 0.0 # 情绪值
self.item_index = 0
self.X = np.array([1, 0, 0, 0, 0, 0, 0, 0]).reshape(1, -1) # 适应模型变量矩阵
# self.W = np.array([0.01577594,1.16119452,0.75828,0.207746,1.25017864,0.1044121,0.4294899,0.2770932]).reshape(-1,1) #适应模型变量矩阵
self.W = np.array([0.0, 0.6, 0.1, 0.7, 0.3, 0.0, 0.0, 0.0]).reshape(-1, 1) # 适应模型变量矩阵
# 人设提问关键字
self.attribute_keyword = [
[['你叫什么名字', '你的名字是什么','你是谁'], 'name'],
[['你是男的还是女的', '你是男生还是女生', '你的性别是什么', '你是男生吗', '你是女生吗', '你是男的吗', '你是女的吗', '你是男孩子吗', '你是女孩子吗', ], 'gender', ],
[['你今年多大了', '你多大了', '你今年多少岁', '你几岁了', '你今年几岁了', '你今年几岁了', '你什么时候出生', '你的生日是什么', '你的年龄'], 'age', ],
[['你的家乡在哪', '你的家乡是什么', '你家在哪', '你住在哪', '你出生在哪', '你的出生地在哪', '你的出生地是什么', ], 'birth', ],
[['你的生肖是什么', '你属什么', ], 'zodiac', ],
[['你是什么座', '你是什么星座', '你的星座是什么', ], 'constellation', ],
[['你是做什么的', '你的职业是什么', '你是干什么的', '你的职位是什么', '你的工作是什么', '你是做什么工作的'], 'job', ],
[['你的爱好是什么', '你有爱好吗', '你喜欢什么', '你喜欢做什么'], 'hobby'],
[['联系方式', '联系你们', '怎么联系客服', '有没有客服'], 'contact']
]
# 商品提问关键字
self.explain_keyword = [
[['是什么'], 'intro'],
[['怎么用', '使用场景', '有什么作用'], 'usage'],
[['怎么卖', '多少钱', '售价'], 'price'],
[['便宜点', '优惠', '折扣', '促销'], 'discount'],
[['质量', '保证', '担保'], 'promise'],
[['特点', '优点'], 'character'],
]
self.wsParam = None
self.wss = None
self.sp = Speech()
self.speaking = False
self.last_interact_time = time.time()
self.last_speak_data = ''
self.interactive = []
self.sleep = False
self.__running = True
self.sp.connect() # 预连接
self.last_quest_time = time.time()
def __string_similar(self, s1, s2):
return difflib.SequenceMatcher(None, s1, s2).quick_ratio()
def __read_qna(self, filename) -> list:
qna = []
try:
wb = load_workbook(filename)
sheets = wb.worksheets # 获取当前所有的sheet
sheet = sheets[0]
for row in sheet.rows:
if len(row) >= 2:
qna.append([row[0].value.split(";"), row[1].value])
except BaseException as e:
print("无法读取Q&A文件 {} -> ".format(filename) + str(e))
return qna
def __get_keyword(self, keyword_dict, text):
last_similar = 0
last_answer = ''
for qa in keyword_dict:
for quest in qa[0]:
similar = self.__string_similar(text, quest)
if quest in text:
similar += 0.3
if similar > last_similar:
last_similar = similar
last_answer = qa[1]
if last_similar >= 0.6:
return last_answer
return None
def __get_answer(self, text):
# 人设问答
keyword = self.__get_keyword(self.attribute_keyword, text)
if keyword is not None:
return config_util.config["attribute"][keyword]
# 全局问答
answer = self.__get_keyword(self.__read_qna(config_util.config['interact']['QnA']), text)
if answer is not None:
return answer
items = self.__get_item_list()
if len(items) > 0:
item = items[self.item_index]
# 跨商品物品问答匹配
for ite in items:
name = ite["name"]
if name != item["name"]:
if name in text or self.__string_similar(text, name) > 0.6:
item = ite
break
# 商品介绍问答
keyword = self.__get_keyword(self.explain_keyword, text)
if keyword is not None:
try:
return item["explain"][keyword]
except BaseException as e:
print(e)
# 商品问答
answer = self.__get_keyword(self.__read_qna(item["QnA"]), text)
if answer is not None:
return answer
return None
def __get_list_answer(self, answers, text):
last_similar = 0
last_answer = ''
for mlist in answers:
for quest in mlist[0]:
similar = self.__string_similar(text, quest)
if quest in text:
similar += 0.3
if similar > last_similar:
last_similar = similar
answer_list = mlist[1]
last_answer = answer_list[random.randint(0, len(answer_list) - 1)]
# print("相似度: {}, 回答: {}".format(last_similar, last_answer))
if last_similar >= 0.6:
return last_answer
return None
def __auto_speak(self):
i = 0
script_index = 0
while self.__running:
time.sleep(0.8)
if self.speaking or self.sleep:
continue
try:
# 简化逻辑:默认执行带货脚本,带货脚本执行其间有人互动,则执行完当前脚本就回应最后三条互动,回应完继续执行带货脚本
if i <= 3 and len(self.interactive) > i:
i += 1
interact = self.interactive[0 - i]
if interact[0] == 1:
self.q_msg = interact[2]
index = interact[0]
# print("index:{0}".format(index))
user_name = interact[1]
# self.__isExecute = True #!!!!
if index == 1:
answer = self.__get_answer(self.q_msg)
text = ''
if answer is None:
try:
wsa_server.get_web_instance().add_cmd({"panelMsg": "思考中..."})
util.log(1, '自然语言处理...')
tm = time.time()
text = xf_aiui.question(self.q_msg)
util.log(1, '自然语言处理完成. 耗时: {} ms'.format(math.floor((time.time() - tm) * 1000)))
if text == '哎呀,你这么说我也不懂,详细点呗' or text == '':
util.log(1, '[!] 自然语言无语了!')
wsa_server.get_web_instance().add_cmd({"panelMsg": ""})
continue
except BaseException as e:
print(e)
util.log(1, '自然语言处理错误!')
wsa_server.get_web_instance().add_cmd({"panelMsg": ""})
continue
else:
text = answer
if len(user_name) == 0:
self.a_msg = text
else:
self.a_msg = user_name + '' + text
elif index == 2:
self.a_msg = ['我们的直播间越来越多人咯', '感谢{}的到来'.format(user_name), '欢印{}来到我们的直播间'.format(user_name)][
random.randint(0, 2)]
elif index == 3:
msg = ""
for index in range(1, len(interact), 4):
try:
gift = interact[index + 2]
gift_name = '礼物'
if gift[0] != -1:
gift_name = gift[1]
msg = msg + "{}送给我的{}{}".format(interact[index], interact[index + 3], gift_name)
except BaseException as e:
print("[System] 礼物处理错误!")
print(e)
self.a_msg = '感谢感谢,感谢' + msg
elif index == 4:
self.a_msg = '感谢关注'
self.last_speak_data = self.a_msg
self.speaking = True
MyThread(target=self.__say, args=['interact']).start()
else:
i = 0
self.interactive.clear()
config_items = config_util.config["items"]
items = []
for item in config_items:
if item["enabled"]:
items.append(item)
if len(items) > 0:
if self.item_index >= len(items):
self.item_index = 0
script_index = 0
item = items[self.item_index]
script_index = script_index + 1
explain_key = self.__get_explain_from_index(script_index)
if explain_key is None:
self.item_index = self.item_index + 1
script_index = 0
if self.item_index >= len(items):
self.item_index = 0
explain_key = self.__get_explain_from_index(script_index)
explain = item["explain"][explain_key]
if len(explain) > 0:
self.a_msg = explain
self.last_speak_data = self.a_msg
self.speaking = True
MyThread(target=self.__say, args=['script']).start()
except BaseException as e:
print(e)
def __get_item_list(self) -> list:
items = []
for item in config_util.config["items"]:
if item["enabled"]:
items.append(item)
return items
def __get_explain_from_index(self, index: int):
if index == 0:
return "character"
if index == 1:
return "discount"
if index == 2:
return "intro"
if index == 3:
return "price"
if index == 4:
return "promise"
if index == 5:
return "usage"
return None
def on_interact(self, interact):
# 合并同类交互
# 进入
if interact[0] == 2:
itr = self.__get_interactive(2)
if itr is None:
self.interactive.append(interact)
else:
newItr = (2, itr[1] + ', ' + interact[1], itr[2])
self.interactive.remove(itr)
self.interactive.append(newItr)
# 送礼
elif interact[0] == 3:
itr = self.__get_interactive(3)
if itr is None:
self.interactive.append(interact)
else:
newItrList = []
newItrList.extend(itr)
newItrList.append(itr[2])
newItrList.append(itr[3])
newItrList.append(itr[4])
self.interactive.remove(itr)
self.interactive.append(tuple(newItrList))
# 关注
elif interact[0] == 4:
if self.__get_interactive(2) is None:
self.interactive.append(interact)
else:
self.interactive.append(interact)
MyThread(target=self.__update_mood, args=[interact[0]]).start()
MyThread(target=storer.storage_live_interact, args=[interact]).start()
def __get_interactive(self, interactType):
for interact in self.interactive:
if interact[0] == interactType:
return interact
return None
# 适应模型计算
def __fay(self, index):
if 0 < index < 8:
self.X[0][index] += 1
# PRED = 1 /(1 + tf.exp(-tf.matmul(tf.constant(self.X,tf.float32), tf.constant(self.W,tf.float32))))
PRED = np.sum(self.X.reshape(-1) * self.W.reshape(-1))
if 0 < index < 8:
print('***PRED:{0}***'.format(PRED))
print(self.X.reshape(-1) * self.W.reshape(-1))
return PRED
# 发送情绪
def __send_mood(self):
while self.__running:
time.sleep(3)
if not self.sleep:
content = {'Topic': 'Unreal', 'Data': {'Key': 'mood', 'Value': self.mood}}
wsa_server.get_instance().add_cmd(content)
# 更新情绪
def __update_mood(self, typeIndex):
perception = config_util.config["interact"]["perception"]
if typeIndex == 1:
try:
result = xf_ltp.get_sentiment(self.q_msg)
chat_perception = perception["chat"]
if result == 2:
self.mood = self.mood + (chat_perception / 200.0)
elif result == 0:
self.mood = self.mood - (chat_perception / 100.0)
except BaseException as e:
print("[System] 情绪更新错误!")
print(e)
elif typeIndex == 2:
self.mood = self.mood + (perception["join"] / 100.0)
elif typeIndex == 3:
self.mood = self.mood + (perception["gift"] / 100.0)
elif typeIndex == 4:
self.mood = self.mood + (perception["follow"] / 100.0)
if self.mood >= 1:
self.mood = 1
if self.mood <= -1:
self.mood = -1
def __get_mood(self):
voice = tts_voice.get_voice_of(config_util.config["attribute"]["voice"])
if voice is None:
voice = EnumVoice.XIAO_XIAO
styleList = voice.value["styleList"]
sayType = styleList["calm"]
if -1 <= self.mood < -0.5:
sayType = styleList["angry"]
if -0.5 <= self.mood < -0.1:
sayType = styleList["lyrical"]
if -0.1 <= self.mood < 0.1:
sayType = styleList["calm"]
if 0.1 <= self.mood < 0.5:
sayType = styleList["assistant"]
if 0.5 <= self.mood <= 1:
sayType = styleList["cheerful"]
return sayType
# 合成声音加上type代表是脚本还是互动
def __say(self, styleType):
try:
if len(self.a_msg) < 1:
self.speaking = False
else:
# print(self.__get_mood().name + self.a_msg)
util.printInfo(1, '菲菲', '({}) {}'.format(self.__get_mood(), self.a_msg))
MyThread(target=storer.storage_live_interact, args=[(0, '菲菲', self.a_msg)]).start()
util.log(1, '合成音频...')
tm = time.time()
result = self.sp.to_sample(self.a_msg, self.__get_mood())
util.log(1, '合成音频完成. 耗时: {} ms'.format(math.floor((time.time() - tm) * 1000)))
if result is not None:
# playsound(result)
# with wave.open(result, 'rb') as wav_file:
# wav_length = wav_file.getnframes() / float(wav_file.getframerate())
# time.sleep(wav_length)
MyThread(target=self.__send_audio, args=[result, styleType]).start()
# MyThread(target=self.__play_audio, args=[result]).start()
# MyThread(target=self.__waiting_speaking, args=[result]).start()
return result
except BaseException as e:
print(e)
# print("tts失败")
self.speaking = False
return None
def __play_sound(self, file_url):
util.log(1, '播放音频...')
util.log(1, '问答处理总时长:{} ms'.format(math.floor((time.time() - self.last_quest_time) * 1000)))
pygame.mixer.music.load(file_url)
pygame.mixer.music.play()
def __send_audio(self, file_url, say_type):
try:
audio_length = eyed3.load(file_url).info.time_secs
if audio_length <= config_util.config["interact"]["maxInteractTime"] or say_type == "script":
if config_util.config["interact"]["playSound"]:
self.__play_sound(file_url)
else:
content = {'Topic': 'Unreal', 'Data': {'Key': 'audio', 'Value': os.path.abspath(file_url), 'Time': audio_length, 'Type': say_type}}
wsa_server.get_instance().add_cmd(content)
wsa_server.get_web_instance().add_cmd({"panelMsg": self.a_msg})
time.sleep(audio_length + 0.5)
wsa_server.get_web_instance().add_cmd({"panelMsg": ""})
if config_util.config["interact"]["playSound"]:
util.log(1, '结束播放!')
self.speaking = False
except Exception as e:
print(e)
# def __send_audio(self, file_url, say_type):
# try:
# # time.sleep(0.25)
# with wave.open(file_url, 'rb') as wav_file:
# wav_length = wav_file.getnframes() / float(wav_file.getframerate())
# print(wav_length)
# if wav_length <= config_util.config["interact"]["maxInteractTime"] or say_type == "script":
# if config_util.config["interact"]["playSound"]:
# self.__play_sound(file_url)
# else:
# content = {'Topic': 'Unreal', 'Data': {'Key': 'audio', 'Value': os.path.abspath(file_url), 'Time': wav_length, 'Type': say_type}}
# wsa_server.get_instance().add_cmd(content)
# time.sleep(wav_length + 0.5)
# self.speaking = False
# except Exception as e:
# print(e)
def __waiting_speaking(self, file_url):
try:
time.sleep(5)
print('[' + str(int(time.time())) + '][菲菲] [S] [开始发言]')
with wave.open(file_url, 'rb') as wav_file:
wav_length = wav_file.getnframes() / float(wav_file.getframerate())
time.sleep(wav_length)
self.last_interact_time = time.time()
self.speaking = False
print('[' + str(int(time.time())) + '][菲菲] [E] [结束发言]')
time.sleep(30)
os.remove(file_url)
except:
self.last_interact_time = time.time()
self.speaking = False
# 冷场情绪更新
def __update_mood_runnable(self):
while self.__running:
time.sleep(10)
update = config_util.config["interact"]["perception"]["indifferent"] / 100
if len(self.interactive) < 1:
if self.mood > 0:
if self.mood > update:
self.mood = self.mood - update
else:
self.mood = 0
elif self.mood < 0:
if self.mood < -update:
self.mood = self.mood + update
else:
self.mood = 0
def set_sleep(self, sleep):
self.sleep = sleep
def start(self):
MyThread(target=self.__send_mood).start()
MyThread(target=self.__auto_speak).start()
MyThread(target=self.__update_mood_runnable).start()
def stop(self):
self.__running = False
self.sp.close()