olivebot/python_connector_demo/thread_manager.py
xszyou 55fb0896b8 2023.01
Fay2.0:
1、控制器pc内网穿透,音频输入输出设备远程直连;
2、提供android 音频输入输出工程示例代码;
3、提供python音频输入输出工程示例代码(远程PC、树莓派等可用);
4、补传1.0语音指令音乐播放模块(暂不支持远程播放);
5、重构及补充若干工具模块:websocket、多线程、缓冲器、音频流录制器等;
6、修复1.x版本的多个bug。
2023-01-31 12:40:36 +08:00

44 lines
1.2 KiB
Python

import ctypes
import threading
from threading import Thread
class MyThread(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)
add_thread(self)
def get_id(self):
# returns id of the respective thread
if hasattr(self, '_thread_id'):
return self._thread_id
for id, thread in threading._active.items():
if thread is self:
return id
def raise_exception(self):
thread_id = self.get_id()
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
print('Exception raise failure')
__thread_list = []
def add_thread(thread: MyThread):
if thread not in __thread_list:
__thread_list.append(thread)
def remove_thread(thread: MyThread):
if thread in __thread_list:
__thread_list.remove(thread)
def stopAll():
for thread in __thread_list:
thread.raise_exception()
thread.join()