Add files via upload

allow user to load embedding & rerank models from cache
This commit is contained in:
zealot52099 2024-03-22 20:15:37 +08:00 committed by GitHub
parent 382d338ab3
commit 0aa58372bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,305 +1,329 @@
import json import json
import pickle import pickle
import faiss import faiss
import pickle import pickle
import os import os
from loguru import logger from loguru import logger
from sentence_transformers import SentenceTransformer from sentence_transformers import SentenceTransformer
from langchain_community.vectorstores import FAISS from langchain_community.vectorstores import FAISS
from config.config import embedding_path, doc_dir, qa_dir, knowledge_pkl_path, data_dir, base_dir, vector_db_dir from config.config import embedding_path, doc_dir, qa_dir, knowledge_pkl_path, data_dir, vector_db_dir, rerank_path
from langchain.embeddings import HuggingFaceBgeEmbeddings from langchain.embeddings import HuggingFaceBgeEmbeddings
from langchain_community.document_loaders import DirectoryLoader, TextLoader, JSONLoader from langchain_community.document_loaders import DirectoryLoader, TextLoader, JSONLoader
from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter, RecursiveJsonSplitter from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter, RecursiveJsonSplitter
from BCEmbedding import EmbeddingModel, RerankerModel from BCEmbedding import EmbeddingModel, RerankerModel
# from util.pipeline import EmoLLMRAG # from util.pipeline import EmoLLMRAG
from transformers import AutoTokenizer, AutoModelForCausalLM from transformers import AutoTokenizer, AutoModelForCausalLM
from langchain.document_loaders.pdf import PyPDFDirectoryLoader from langchain.document_loaders.pdf import PyPDFDirectoryLoader
from langchain.document_loaders import UnstructuredFileLoader,DirectoryLoader from langchain.document_loaders import UnstructuredFileLoader,DirectoryLoader
from langchain_community.llms import Cohere from langchain_community.llms import Cohere
from langchain.retrievers import ContextualCompressionRetriever from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import FlashrankRerank from langchain.retrievers.document_compressors import FlashrankRerank
from langchain_core.documents.base import Document from langchain_core.documents.base import Document
from FlagEmbedding import FlagReranker from FlagEmbedding import FlagReranker
class Data_process(): class Data_process():
def __init__(self): def __init__(self):
self.vector_db_dir = vector_db_dir self.chunk_size: int=1000
self.doc_dir = doc_dir self.chunk_overlap: int=100
self.qa_dir = qa_dir
self.knowledge_pkl_path = knowledge_pkl_path def load_embedding_model(self, model_name='BAAI/bge-small-zh-v1.5', device='cpu', normalize_embeddings=True):
self.chunk_size: int=1000 """
self.chunk_overlap: int=100 加载嵌入模型
def load_embedding_model(self, model_name="BAAI/bge-small-zh-v1.5", device='cpu', normalize_embeddings=True): 参数:
""" - model_name: 模型名称字符串类型默认为"BAAI/bge-small-zh-v1.5"
加载嵌入模型 - device: 指定模型加载的设备'cpu' 'cuda'默认为'cpu'
- normalize_embeddings: 是否标准化嵌入向量布尔类型默认为 True
参数: """
- model_name: 模型名称字符串类型默认为"BAAI/bge-small-zh-v1.5" if not os.path.exists(embedding_path):
- device: 指定模型加载的设备'cpu' 'cuda'默认为'cpu' os.makedirs(embedding_path, exist_ok=True)
- normalize_embeddings: 是否标准化嵌入向量布尔类型默认为 True embedding_model_path = os.path.join(embedding_path,model_name.split('/')[1] + '.pkl')
""" logger.info('Loading embedding model...')
logger.info('Loading embedding model...') if os.path.exists(embedding_model_path):
try: try:
embeddings = HuggingFaceBgeEmbeddings( with open(embedding_model_path , 'rb') as f:
model_name=model_name, embeddings = pickle.load(f)
model_kwargs={'device': device}, logger.info('Embedding model loaded.')
encode_kwargs={'normalize_embeddings': normalize_embeddings} return embeddings
) except Exception as e:
except Exception as e: logger.error(f'Failed to load embedding model from {embedding_model_path}')
logger.error(f'Failed to load embedding model: {e}') try:
return None embeddings = HuggingFaceBgeEmbeddings(
model_name=model_name,
logger.info('Embedding model loaded.') model_kwargs={'device': device},
return embeddings encode_kwargs={'normalize_embeddings': normalize_embeddings})
logger.info('Embedding model loaded.')
def load_rerank_model(self, model_name='BAAI/bge-reranker-large'): with open(embedding_model_path, 'wb') as file:
""" pickle.dump(embeddings, file)
加载重排名模型 except Exception as e:
logger.error(f'Failed to load embedding model: {e}')
参数: return None
- model_name (str): 模型的名称默认为 'BAAI/bge-reranker-large' return embeddings
返回: def load_rerank_model(self, model_name='BAAI/bge-reranker-large'):
- FlagReranker 实例 """
加载重排名模型
异常:
- ValueError: 如果模型名称不在批准的模型列表中 参数:
- Exception: 如果模型加载过程中发生任何其他错误 - model_name (str): 模型的名称默认为 'BAAI/bge-reranker-large'
"""
try: 返回:
reranker_model = FlagReranker(model_name, use_fp16=True) - FlagReranker 实例
except Exception as e:
logger.error(f'Failed to load rerank model: {e}') 异常:
raise - ValueError: 如果模型名称不在批准的模型列表中
- Exception: 如果模型加载过程中发生任何其他错误
return reranker_model
"""
if not os.path.exists(rerank_path):
def extract_text_from_json(self, obj, content=None): os.makedirs(rerank_path, exist_ok=True)
""" rerank_model_path = os.path.join(rerank_path, model_name.split('/')[1] + '.pkl')
抽取json中的文本用于向量库构建 logger.info('Loading rerank model...')
if os.path.exists(rerank_model_path):
参数: try:
- obj: dict,list,str with open(rerank_model_path , 'rb') as f:
- content: str reranker_model = pickle.load(f)
logger.info('Rerank model loaded.')
返回: return reranker_model
- content: str except Exception as e:
""" logger.error(f'Failed to load embedding model from {rerank_model_path}')
if isinstance(obj, dict): try:
for key, value in obj.items(): reranker_model = FlagReranker(model_name, use_fp16=True)
try: logger.info('Rerank model loaded.')
content = self.extract_text_from_json(value, content) with open(rerank_model_path, 'wb') as file:
except Exception as e: pickle.dump(reranker_model, file)
print(f"Error processing value: {e}") except Exception as e:
elif isinstance(obj, list): logger.error(f'Failed to load rerank model: {e}')
for index, item in enumerate(obj): raise
try:
content = self.extract_text_from_json(item, content) return reranker_model
except Exception as e:
print(f"Error processing item: {e}")
elif isinstance(obj, str): def extract_text_from_json(self, obj, content=None):
content += obj """
return content 抽取json中的文本用于向量库构建
参数:
def split_document(self, data_path, chunk_size=500, chunk_overlap=100): - obj: dict,list,str
""" - content: str
切分data_path文件夹下的所有txt文件
返回:
参数: - content: str
- data_path: str """
- chunk_size: int if isinstance(obj, dict):
- chunk_overlap: int for key, value in obj.items():
try:
返回 content = self.extract_text_from_json(value, content)
- split_docs: list except Exception as e:
""" print(f"Error processing value: {e}")
elif isinstance(obj, list):
for index, item in enumerate(obj):
# text_spliter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) try:
text_spliter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) content = self.extract_text_from_json(item, content)
split_docs = [] except Exception as e:
logger.info(f'Loading txt files from {data_path}') print(f"Error processing item: {e}")
if os.path.isdir(data_path): elif isinstance(obj, str):
loader = DirectoryLoader(data_path, glob="**/*.txt",show_progress=True) content += obj
docs = loader.load() return content
split_docs = text_spliter.split_documents(docs)
elif data_path.endswith('.txt'):
file_path = data_path def split_document(self, data_path, chunk_size=500, chunk_overlap=100):
logger.info(f'splitting file {file_path}') """
text_loader = TextLoader(file_path, encoding='utf-8') 切分data_path文件夹下的所有txt文件
text = text_loader.load()
splits = text_spliter.split_documents(text) 参数:
split_docs = splits - data_path: str
logger.info(f'split_docs size {len(split_docs)}') - chunk_size: int
return split_docs - chunk_overlap: int
返回
def split_conversation(self, path): - split_docs: list
""" """
按conversation块切分path文件夹下的所有json文件
##TODO 限制序列长度
""" # text_spliter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
# json_spliter = RecursiveJsonSplitter(max_chunk_size=500) text_spliter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
logger.info(f'Loading json files from {path}') split_docs = []
split_qa = [] logger.info(f'Loading txt files from {data_path}')
if os.path.isdir(path): if os.path.isdir(data_path):
# loader = DirectoryLoader(path, glob="**/*.json",show_progress=True) loader = DirectoryLoader(data_path, glob="**/*.txt",show_progress=True)
# jsons = loader.load() docs = loader.load()
split_docs = text_spliter.split_documents(docs)
for root, dirs, files in os.walk(path): elif data_path.endswith('.txt'):
for file in files: file_path = data_path
if file.endswith('.json'): logger.info(f'splitting file {file_path}')
file_path = os.path.join(root, file) text_loader = TextLoader(file_path, encoding='utf-8')
logger.info(f'splitting file {file_path}') text = text_loader.load()
with open(file_path, 'r', encoding='utf-8') as f: splits = text_spliter.split_documents(text)
data = json.load(f) split_docs = splits
# print(data) logger.info(f'split_docs size {len(split_docs)}')
for conversation in data: return split_docs
# for dialog in conversation['conversation']:
##按qa对切分,将每一轮qa转换为langchain_core.documents.base.Document
# content = self.extract_text_from_json(dialog,'') def split_conversation(self, path):
# split_qa.append(Document(page_content = content)) """
#按conversation块切分 按conversation块切分path文件夹下的所有json文件
content = self.extract_text_from_json(conversation['conversation'], '') ##TODO 限制序列长度
logger.info(f'content====={content}') """
split_qa.append(Document(page_content = content)) # json_spliter = RecursiveJsonSplitter(max_chunk_size=500)
# logger.info(f'split_qa size====={len(split_qa)}') logger.info(f'Loading json files from {path}')
return split_qa split_qa = []
if os.path.isdir(path):
# loader = DirectoryLoader(path, glob="**/*.json",show_progress=True)
def load_knowledge(self, knowledge_pkl_path): # jsons = loader.load()
'''
读取或创建知识.pkl for root, dirs, files in os.walk(path):
''' for file in files:
if not os.path.exists(knowledge_pkl_path): if file.endswith('.json'):
split_doc = self.split_document(doc_dir) file_path = os.path.join(root, file)
split_qa = self.split_conversation(qa_dir) logger.info(f'splitting file {file_path}')
knowledge_chunks = split_doc + split_qa with open(file_path, 'r', encoding='utf-8') as f:
with open(knowledge_pkl_path, 'wb') as file: data = json.load(f)
pickle.dump(knowledge_chunks, file) # print(data)
else: for conversation in data:
with open(knowledge_pkl_path , 'rb') as f: # for dialog in conversation['conversation']:
knowledge_chunks = pickle.load(f) ##按qa对切分,将每一轮qa转换为langchain_core.documents.base.Document
return knowledge_chunks # content = self.extract_text_from_json(dialog,'')
# split_qa.append(Document(page_content = content))
#按conversation块切分
def create_vector_db(self, emb_model): content = self.extract_text_from_json(conversation['conversation'], '')
''' logger.info(f'content====={content}')
创建并保存向量库 split_qa.append(Document(page_content = content))
''' # logger.info(f'split_qa size====={len(split_qa)}')
logger.info(f'Creating index...') return split_qa
split_doc = self.split_document(self.doc_dir)
split_qa = self.split_conversation(self.qa_dir)
# logger.info(f'split_doc == {len(split_doc)}') def load_knowledge(self, knowledge_pkl_path):
# logger.info(f'split_qa == {len(split_qa)}') '''
# logger.info(f'split_doc type == {type(split_doc[0])}') 读取或创建知识.pkl
# logger.info(f'split_qa type== {type(split_qa[0])}') '''
db = FAISS.from_documents(split_doc + split_qa, emb_model) if not os.path.exists(knowledge_pkl_path):
db.save_local(vector_db_dir) split_doc = self.split_document(doc_dir)
return db split_qa = self.split_conversation(qa_dir)
knowledge_chunks = split_doc + split_qa
with open(knowledge_pkl_path, 'wb') as file:
def load_vector_db(self, knowledge_pkl_path=knowledge_pkl_path, doc_dir=doc_dir, qa_dir=qa_dir): pickle.dump(knowledge_chunks, file)
''' else:
读取向量库 with open(knowledge_pkl_path , 'rb') as f:
''' knowledge_chunks = pickle.load(f)
# current_os = platform.system() return knowledge_chunks
emb_model = self.load_embedding_model()
if not os.path.exists(vector_db_dir) or not os.listdir(vector_db_dir):
db = self.create_vector_db(emb_model) def create_vector_db(self, emb_model):
else: '''
db = FAISS.load_local(vector_db_dir, emb_model, allow_dangerous_deserialization=True) 创建并保存向量库
return db '''
logger.info(f'Creating index...')
split_doc = self.split_document(doc_dir)
def retrieve(self, query, vector_db, k=5): split_qa = self.split_conversation(qa_dir)
''' # logger.info(f'split_doc == {len(split_doc)}')
基于query对向量库进行检索 # logger.info(f'split_qa == {len(split_qa)}')
''' # logger.info(f'split_doc type == {type(split_doc[0])}')
retriever = vector_db.as_retriever(search_kwargs={"k": k}) # logger.info(f'split_qa type== {type(split_qa[0])}')
docs = retriever.invoke(query) db = FAISS.from_documents(split_doc + split_qa, emb_model)
return docs, retriever db.save_local(vector_db_dir)
return db
##FlashrankRerank效果一般
# def rerank(self, query, retriever):
# compressor = FlashrankRerank() def load_vector_db(self, knowledge_pkl_path=knowledge_pkl_path, doc_dir=doc_dir, qa_dir=qa_dir):
# compression_retriever = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever) '''
# compressed_docs = compression_retriever.get_relevant_documents(query) 读取向量库
# return compressed_docs '''
# current_os = platform.system()
def rerank(self, query, docs): emb_model = self.load_embedding_model()
reranker = self.load_rerank_model() if not os.path.exists(vector_db_dir) or not os.listdir(vector_db_dir):
passages = [] db = self.create_vector_db(emb_model)
for doc in docs: else:
passages.append(str(doc.page_content)) db = FAISS.load_local(vector_db_dir, emb_model, allow_dangerous_deserialization=True)
scores = reranker.compute_score([[query, passage] for passage in passages]) return db
sorted_pairs = sorted(zip(passages, scores), key=lambda x: x[1], reverse=True)
sorted_passages, sorted_scores = zip(*sorted_pairs)
return sorted_passages, sorted_scores def retrieve(self, query, vector_db, k=5):
'''
基于query对向量库进行检索
# def create_prompt(question, context): '''
# from langchain.prompts import PromptTemplate retriever = vector_db.as_retriever(search_kwargs={"k": k})
# prompt_template = f"""请基于以下内容回答问题: docs = retriever.invoke(query)
return docs, retriever
# {context}
##FlashrankRerank效果一般
# 问题: {question} # def rerank(self, query, retriever):
# 回答:""" # compressor = FlashrankRerank()
# prompt = PromptTemplate( # compression_retriever = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever)
# template=prompt_template, input_variables=["context", "question"] # compressed_docs = compression_retriever.get_relevant_documents(query)
# ) # return compressed_docs
# logger.info(f'Prompt: {prompt}')
# return prompt def rerank(self, query, docs):
reranker = self.load_rerank_model()
def create_prompt(question, context): passages = []
prompt = f"""请基于以下内容: {context} 给出问题答案。问题如下: {question}。回答:""" for doc in docs:
logger.info(f'Prompt: {prompt}') passages.append(str(doc.page_content))
return prompt scores = reranker.compute_score([[query, passage] for passage in passages])
sorted_pairs = sorted(zip(passages, scores), key=lambda x: x[1], reverse=True)
def test_zhipu(prompt): sorted_passages, sorted_scores = zip(*sorted_pairs)
from zhipuai import ZhipuAI return sorted_passages, sorted_scores
api_key = "" # 填写您自己的APIKey
if api_key == "":
raise ValueError("请填写api_key") # def create_prompt(question, context):
client = ZhipuAI(api_key=api_key) # from langchain.prompts import PromptTemplate
response = client.chat.completions.create( # prompt_template = f"""请基于以下内容回答问题:
model="glm-4", # 填写需要调用的模型名称
messages=[ # {context}
{"role": "user", "content": prompt[:100]}
], # 问题: {question}
) # 回答:"""
print(response.choices[0].message) # prompt = PromptTemplate(
# template=prompt_template, input_variables=["context", "question"]
if __name__ == "__main__": # )
logger.info(data_dir) # logger.info(f'Prompt: {prompt}')
if not os.path.exists(data_dir): # return prompt
os.mkdir(data_dir)
dp = Data_process() def create_prompt(question, context):
# faiss_index, knowledge_chunks = dp.load_index_and_knowledge(knowledge_pkl_path='') prompt = f"""请基于以下内容: {context} 给出问题答案。问题如下: {question}。回答:"""
vector_db = dp.load_vector_db() logger.info(f'Prompt: {prompt}')
# 按照query进行查询 return prompt
# query = "儿童心理学说明-内容提要-目录 《儿童心理学》1993年修订版说明 《儿童心理学》是1961年初全国高等学校文科教材会议指定朱智贤教授编 写的。1962年初版1979年再版。"
# query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想,我现在感到非常孤独、累和迷茫。您能给我提供一些建议吗?" def test_zhipu(prompt):
# query = "这在一定程度上限制了其思维能力,特别是辩证 逻辑思维能力的发展。随着年龄的增长,初中三年级学生逐步克服了依赖性" from zhipuai import ZhipuAI
# query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想" api_key = "" # 填写您自己的APIKey
query = "我现在心情非常差,有什么解决办法吗?" if api_key == "":
docs, retriever = dp.retrieve(query, vector_db, k=10) raise ValueError("请填写api_key")
logger.info(f'Query: {query}') client = ZhipuAI(api_key=api_key)
logger.info("Retrieve results:") response = client.chat.completions.create(
for i, doc in enumerate(docs): model="glm-4", # 填写需要调用的模型名称
logger.info(str(i) + '\n') messages=[
logger.info(doc) {"role": "user", "content": prompt[:100]}
# print(f'get num of docs:{len(docs)}') ],
# print(docs) )
passages,scores = dp.rerank(query, docs) print(response.choices[0].message)
logger.info("After reranking...")
for i in range(len(scores)): if __name__ == "__main__":
logger.info(str(scores[i]) + '\n') logger.info(data_dir)
logger.info(passages[i]) if not os.path.exists(data_dir):
prompt = create_prompt(query, passages[0]) os.mkdir(data_dir)
dp = Data_process()
# faiss_index, knowledge_chunks = dp.load_index_and_knowledge(knowledge_pkl_path='')
vector_db = dp.load_vector_db()
# 按照query进行查询
# query = "儿童心理学说明-内容提要-目录 《儿童心理学》1993年修订版说明 《儿童心理学》是1961年初全国高等学校文科教材会议指定朱智贤教授编 写的。1962年初版1979年再版。"
# query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想,我现在感到非常孤独、累和迷茫。您能给我提供一些建议吗?"
# query = "这在一定程度上限制了其思维能力,特别是辩证 逻辑思维能力的发展。随着年龄的增长,初中三年级学生逐步克服了依赖性"
# query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想"
# query = "我现在心情非常差,有什么解决办法吗?"
query = "我最近总感觉胸口很闷,但医生检查过说身体没问题。可我就是觉得喘不过气来,尤其是看到那些旧照片,想起过去的日子"
docs, retriever = dp.retrieve(query, vector_db, k=10)
logger.info(f'Query: {query}')
logger.info("Retrieve results:")
for i, doc in enumerate(docs):
logger.info(str(i) + '\n')
logger.info(doc)
# print(f'get num of docs:{len(docs)}')
# print(docs)
passages,scores = dp.rerank(query, docs)
logger.info("After reranking...")
for i in range(len(scores)):
logger.info(str(scores[i]) + '\n')
logger.info(passages[i])
prompt = create_prompt(query, passages[0])
test_zhipu(prompt) ## 如果显示'Server disconnected without sending a response.'可能是由于上下文窗口限制 test_zhipu(prompt) ## 如果显示'Server disconnected without sending a response.'可能是由于上下文窗口限制