olivebot/core/member_db.py
guo zebin 4cfad5ae0f 年翻更新
- 全新ui
- 全面优化websocket逻辑,提高数字人和ui连接的稳定性及资源开销
- 全面优化唤醒逻辑,提供稳定的普通唤醒模式和前置词唤醒模式
- 优化拾音质量,支持多声道麦克风拾音
- 优化自动播放服务器的对接机制,提供稳定和兼容旧版ue工程的对接模式
- 数字人接口输出机器人表情,以适应新fay ui及单片机的数字人表情输出
- 使用更高级的音频时长计算方式,可以更精准控制音频播放完成后的逻辑
- 修复点击关闭按钮会导致程序退出的bug
- 修复没有麦克风的设备开启麦克风会出错的问题
- 为服务器主机地址提供配置项,以方便服务器部署
2024-10-26 11:34:55 +08:00

130 lines
3.4 KiB
Python

import sqlite3
import time
import threading
import functools
def synchronized(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self.lock:
return func(self, *args, **kwargs)
return wrapper
__member_db = None
def new_instance():
global __member_db
if __member_db is None:
__member_db = Member_Db()
__member_db.init_db()
return __member_db
class Member_Db:
def __init__(self) -> None:
self.lock = threading.Lock()
#初始化
def init_db(self):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS T_Member
(id INTEGER PRIMARY KEY autoincrement,
username TEXT NOT NULL UNIQUE);''')
conn.commit()
conn.close()
# 添加新用户
@synchronized
def add_user(self, username):
if self.is_username_exist(username) == "notexists":
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('INSERT INTO T_Member (username) VALUES (?)', (username,))
conn.commit()
conn.close()
return "success"
else:
return f"Username '{username}' already exists."
# 修改用户名
@synchronized
def update_user(self, username, new_username):
if self.is_username_exist(new_username) == "notexists":
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('UPDATE T_Member SET username = ? WHERE username = ?', (new_username, username))
conn.commit()
conn.close()
return "success"
else:
return f"Username '{new_username}' already exists."
# 删除用户
@synchronized
def delete_user(self, username):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('DELETE FROM T_Member WHERE username = ?', (username,))
conn.commit()
conn.close()
return "success"
# 检查用户名是否已存在
def is_username_exist(self, username):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT COUNT(*) FROM T_Member WHERE username = ?', (username,))
result = c.fetchone()[0]
conn.close()
if result > 0:
return "exists"
else:
return "notexists"
def find_user(self, username):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT * FROM T_Member WHERE username = ?', (username,))
result = c.fetchone()
conn.close()
if result is None:
return 0
else:
return result[0]
@synchronized
def query(self, sql):
try:
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute(sql)
results = c.fetchall()
conn.commit()
conn.close()
return results
except Exception as e:
return f"执行时发生错误:{str(e)}"
# 获取所有用户
@synchronized
def get_all_users(self):
conn = sqlite3.connect('user_profiles.db')
c = conn.cursor()
c.execute('SELECT * FROM T_Member')
results = c.fetchall()
conn.close()
return results