olivebot/core/viewer.py

291 lines
11 KiB
Python
Raw Normal View History

2022-06-20 11:05:10 +08:00
from abc import abstractmethod
import json
import random
import time
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support.expected_conditions import presence_of_element_located
from scheduler.thread_manager import MyThread
from utils import config_util, util
USER_URL = 'https://www.douyin.com/user/'
class Viewer:
def __init__(self, url):
self.url = url
self.GIFT_TYPES = {
'0ea40b8376ef8157791b928a339ed9c9': (1, '小星星', 1),
'a29d6cdc0abb7286fdd403915196eaa7': (2, '玫瑰', 1),
'802a21ae29f9fae5abe3693de9f874bd': (3, '抖音', 1),
'a24b3cc863742fd4bc3de0f53dac4487': (4, '大啤酒', 2),
'4960c39f645d524beda5d50dc372510e': (5, '你最好看', 2),
'e9b7db267d0501b8963d8000c091e123': (6, '人气票', 1),
'698373dfdac86a90b54facdc38698cbc': (7, '粉丝团灯牌', 1)
}
self.__running = True
self.live_driver = None
self.user_driver = None
self.user_sec_uid = None
self.last_join_data = ''
self.last_interact_datas = []
self.live_started = False
self.last_chat_item_index = 0
def __start(self):
MyThread(target=self.__driver_alive_runnable).start()
self.chrome_options = Options()
self.chrome_options.add_argument('--headless')
self.chrome_options.add_argument('--blink-settings=imagesEnabled=false')
self.live_driver = webdriver.Chrome(config_util.system_chrome_driver, options=self.chrome_options)
self.live_driver.get(self.url)
self.user_driver = webdriver.Chrome(config_util.system_chrome_driver, options=self.chrome_options)
self.__wait_live_start()
2022-11-12 15:27:18 +08:00
self.user_sec_uid = self.__get_render_data(self.live_driver)['app']['initialState']['roomStore']['roomInfo']['room']['owner']['sec_uid']
2022-06-20 11:05:10 +08:00
MyThread(target=self.__live_state_runnable).start()
MyThread(target=self.__join_runnable).start()
MyThread(target=self.__interact_runnable).start()
MyThread(target=self.__follower_runnable).start()
def start(self):
MyThread(target=self.__start).start()
def is_live_started(self):
return self.live_started
def __wait_live_start(self):
if self.__is_live():
return
util.log(1, '等待直播开始...')
time.sleep(30)
while not self.__is_live() and self.__running:
try:
self.live_driver.get(self.url)
except:
pass
time.sleep(30)
def __is_live(self):
try:
xpath = '//*[@id="_douyin_live_scroll_container_"]/div/div[2]/div/div[2]/div/div[2]/div'
element = self.live_driver.find_element_by_xpath(xpath)
return '结束' not in element.text
except BaseException as e:
print(e)
return False
def __driver_alive_runnable(self):
while self.__running:
time.sleep(0.1)
try:
if self.live_driver is not None:
try:
self.live_driver.execute_script('javascript:void(0);')
except:
if self.__running:
self.live_driver = webdriver.Chrome(config_util.system_chrome_driver, options=self.chrome_options)
self.live_driver.get(self.url)
if self.user_driver is not None:
try:
self.user_driver.execute_script('javascript:void(0);')
except:
if self.__running:
self.user_driver = webdriver.Chrome(config_util.system_chrome_driver, options=self.chrome_options)
except:
pass
def __live_state_runnable(self):
while self.__running:
is_live = self.__is_live()
if is_live != self.live_started:
self.live_started = self.__is_live()
self.on_change_state(is_live)
if not is_live:
util.log(1, '直播直播已结束,等待下场直播开始...')
if is_live != True:
try:
self.live_driver.get(self.url)
except:
pass
time.sleep(30)
def __get_render_data(self, driver):
wait = WebDriverWait(driver, 10)
first_result = wait.until(presence_of_element_located((By.ID, "RENDER_DATA")))
return json.loads(requests.utils.unquote(first_result.get_attribute("textContent")))
def __get_interact_type(self, text):
ary = text.split('')
if len(ary) >= 2:
content_ary = ary[1].split(' ')
if len(content_ary) == 3 and content_ary[0] == '送出了':
return 3
return 1
def __get_gift_type(self, url):
for gift_id in self.GIFT_TYPES.keys():
if gift_id in url:
return self.GIFT_TYPES.get(gift_id)
return -1, '其他礼物', 0
def __get_join_data(self):
try:
xpath = '//*[@id="_douyin_live_scroll_container_"]/div/div[2]/div/div[2]/div/div[1]/div/div/div/div[1]/div/div[2]'
element = self.live_driver.find_element_by_xpath(xpath)
ary = element.text.split('\n')
text = ary[len(ary) - 1]
if len(text) > 0 and self.last_join_data != text:
self.last_join_data = text
user = text[0:len(text) - 3]
return 2, user, '来了'
except BaseException as e:
return None
return None
def __get_interact_data(self):
interact_data = []
chatroom_xpath = '//*[@id="_douyin_live_scroll_container_"]/div/div[2]/div/div[2]/div/div[1]/div/div/div/div[1]/div/div[1]'
try:
chatroom_element = self.live_driver.find_element_by_xpath(chatroom_xpath)
index_range = None
if self.last_chat_item_index < 100:
start = self.last_chat_item_index + 1
if start < 1:
start = 1
index_range = range(start, 101) # 升序
else:
index_range = range(100, 0, -1) # 降序
# print("\n上一次: {}".format(self.last_chat_item_index))
for index in index_range:
# print("到了: {}".format(index))
chatroom_item = None
try:
chatroom_item = chatroom_element.find_element_by_xpath(chatroom_xpath + '/div[' + str(index) + ']')
except:
pass
item_id = None
if self.last_chat_item_index < 100:
if chatroom_item is None:
self.last_chat_item_index = index - 1
break
elif index >= 100:
self.last_chat_item_index = index
else:
if chatroom_item is None:
continue
item_id = chatroom_item.id
if item_id in self.last_interact_datas:
break
# print(index)
if len(self.last_interact_datas) > 200:
self.last_interact_datas.pop(0)
self.last_interact_datas.append(item_id)
item_text = chatroom_item.text
ary = chatroom_item.text.replace('\r', '').split('\n')
text = ary[len(ary) - 1]
if len(text) < 1 and len(ary) > 1:
text = ary[len(ary) - 2]
speak = self.__get_speak(text)
if speak is None:
# print("无法分析[O]: " + item_text)
# print("无法分析[R]: " + text)
continue
if self.__get_interact_type(text) == 3:
item_msg = None
try:
item_msg = chatroom_element.find_element_by_xpath(
chatroom_xpath + '/div[' + str(index) + ']/div/span[3]/span/span/img')
except:
continue
gift = self.__get_gift_type(item_msg.get_attribute('src'))
arg = speak[1].split(' ')
amount = int(arg[len(arg) - 1]) # 礼物数量
interact_data.append((3, speak[0], ('送出了 {0} X {1}'.format(gift[1], amount)), gift, amount))
else:
interact_data.append((1, speak[0], speak[1]))
except BaseException as e:
interact_data.reverse()
return interact_data
interact_data.reverse()
return interact_data
def __get_speak(self, text):
ary = text.split('')
if len(ary) < 2:
return None
user = ary[0]
speak = text[len(ary[0]) + 1:]
if len(user) > 0 and len(speak) > 0:
return user, speak
def __join_runnable(self):
while self.__running:
if not self.live_started:
continue
# 进入 抓取
join_data = self.__get_join_data()
if join_data is not None:
self.on_interact(join_data, time.time())
time.sleep(0.05)
def __interact_runnable(self):
while self.__running:
if not self.live_started:
continue
# 发言 & 刷礼物 抓取
for interact in self.__get_interact_data():
MyThread(target=self.on_interact, args=[interact, time.time()]).start()
# self.on_interact(interact, time.time())
def __follower_runnable(self):
followers = -1
while self.__running:
# 关注 抓取
try:
time.sleep(1.0 + random.random())
self.user_driver.get(USER_URL + self.user_sec_uid)
time.sleep(0.2)
render_data = self.__get_render_data(self.user_driver)
fs = -1
for i in range(100, -1, -1):
if str(i) in render_data and 'user' in render_data[str(i)] and 'user' in render_data[str(i)]['user'] and 'followerCount' in render_data[str(i)]['user']['user']:
fs = int(render_data[str(i)]['user']['user']['followerCount'])
break
if fs >= 0:
if self.live_started and 0 < followers < fs:
self.on_interact((4, 'None', '粉丝关注'), time.time())
followers = fs
else:
util.log(1, '粉丝数获取异常')
except BaseException as e:
util.log(1, e)
util.log(1, '粉丝数获取异常')
def stop(self):
self.__running = False
if self.live_driver:
self.live_driver.quit()
if self.user_driver:
self.user_driver.quit()
@abstractmethod
def on_interact(self, interact, event_time):
pass
@abstractmethod
def on_change_state(self, is_live_started):
pass