olivebot/llm/agent/agent_service.py

134 lines
4.5 KiB
Python
Raw Normal View History

import sqlite3
import threading
import datetime
import time
import os
from scheduler.thread_manager import MyThread
from core import member_db
from core.interact import Interact
from utils import util
import fay_booter
scheduled_tasks = {}
agent_running = False
# 数据库初始化
def init_db():
conn = sqlite3.connect('timer.db')
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS timer (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT NOT NULL,
repeat_rule TEXT NOT NULL,
content TEXT NOT NULL,
uid INTEGER
)
''')
conn.commit()
conn.close()
# 插入测试数据
def insert_test_data():
conn = sqlite3.connect('timer.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO timer (time, repeat_rule, content) VALUES (?, ?, ?)", ('16:20', '1010001', 'Meeting Reminder'))
conn.commit()
conn.close()
# 解析重复规则返回待执行时间None代表不在今天的待执行计划
def parse_repeat_rule(rule, task_time):
today = datetime.datetime.now()
if rule == '0000000': # 不重复
task_datetime = datetime.datetime.combine(today.date(), task_time)
if task_datetime > today:
return task_datetime
else:
return None
for i, day in enumerate(rule):
if day == '1' and today.weekday() == i:
task_datetime = datetime.datetime.combine(today.date(), task_time)
if task_datetime > today:
return task_datetime
return None
# 执行任务
def execute_task(task_time, id, content, uid):
username = member_db.new_instance().find_username_by_uid(uid=uid)
if not username:
username = "User"
interact = Interact("text", 1, {'user': username, 'msg': "执行任务->\n" + content, 'observation': ""})
util.printInfo(3, "系统", '执行任务:{}'.format(interact.data["msg"]), time.time())
text = fay_booter.feiFei.on_interact(interact)
if text is not None and id in scheduled_tasks:
del scheduled_tasks[id]
# 如果不重复,执行后删除记录
conn = sqlite3.connect('timer.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM timer WHERE repeat_rule = '0000000' AND id = ?", (id,))
conn.commit()
conn.close()
# 30秒扫描一次数据库当扫描到今天的不存在于定时任务列表的记录则添加到定时任务列表。执行完的记录从定时任务列表中清除。
def check_and_execute():
while agent_running:
conn = sqlite3.connect('timer.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM timer")
rows = cursor.fetchall()
for row in rows:
id, task_time_str, repeat_rule, content, uid = row
task_time = datetime.datetime.strptime(task_time_str, '%H:%M').time()
next_execution = parse_repeat_rule(repeat_rule, task_time)
if next_execution and id not in scheduled_tasks:
timer_thread = threading.Timer((next_execution - datetime.datetime.now()).total_seconds(), execute_task, [next_execution, id, content, uid])
timer_thread.start()
scheduled_tasks[id] = timer_thread
conn.close()
time.sleep(30) # 30秒扫描一次
# agent启动
def agent_start():
global agent_running
agent_running = True
#初始计划
if not os.path.exists("./timer.db"):
init_db()
content ="""执行任务->
你是一个数字人你的责任是陪伴主人生活工作
1在每天早上8点提醒主人起床;
2每天12:00及18:30提醒主人吃饭;
3每天21:00陪主人聊聊天;
4每天23:00提醒主人睡觉
"""
interact = Interact("text", 1, {'user': 'User', 'msg': content, 'observation': ""})
util.printInfo(3, "系统", '执行任务:{}'.format(interact.data["msg"]), time.time())
text = fay_booter.feiFei.on_interact(interact)
if text is None:
util.printInfo(3, "系统", '任务执行失败', time.time())
check_and_execute_thread = MyThread(target=check_and_execute)
check_and_execute_thread.start()
def agent_stop():
global agent_running
global scheduled_tasks
# 取消所有定时任务
for task in scheduled_tasks.values():
task.cancel()
agent_running = False
scheduled_tasks = {}
if __name__ == "__main__":
agent_start()