diff --git a/datasets/README.md b/datasets/README.md index fc02200..cf04b96 100644 --- a/datasets/README.md +++ b/datasets/README.md @@ -40,3 +40,21 @@ * 数据集 aiwei 来自本项目 * 数据集 tiangou 来自本项目 * 数据集 SoulStar 来源 [SoulStar](https://github.com/Nobody-ML/SoulStar) + +## 数据集去重 +结合绝对匹配以及模糊匹配(Simhash)算法,对数据集进行去重以提升微调模型的效果。在确保数据集的高质量的同时,通过调整阈值减少因错误匹配而丢失重要数据的风险。 + +Simhash算法 +Simhash(相似性哈希)是一种用于检测大量数据中相似或重复项的算法。它通过将文本转换为一组数值指纹来工作,这些指纹对相似的文本具有高度的相似性。Simhash算法对于处理文本数据特别有效,尤其是在处理大量数据时。 + +实现步骤: +文本预处理:将文本数据转换为适合Simhash处理的格式。这可能包括分词、去除停用词、词干提取等。 + +生成Simhash指纹:对预处理后的文本应用Simhash算法,生成一组数值指纹。每个指纹代表文本内容的一个哈希值。 + +比较指纹:通过比较哈希值的相似性来识别重复或相似的记录。Simhash的特点是即使在文本有少量差异时,生成的哈希值也具有较高的相似性。 + +确定阈值:设置一个相似性阈值,只有当两个指纹的相似度超过这个阈值时,才认为它们代表相似或重复的记录。 + +处理相似记录:对于被标记为相似的记录,可以进一步人工审查或自动合并,以消除重复。 + diff --git a/datasets/README_EN.md b/datasets/README_EN.md index 19f9bf2..835de61 100644 --- a/datasets/README_EN.md +++ b/datasets/README_EN.md @@ -41,3 +41,8 @@ * dataset `aiwei` from this repo * dataset `tiangou` from this repo * dataset `SoulStar` from [SoulStar](https://github.com/Nobody-ML/SoulStar) + +**Dataset Deduplication**: +Combine absolute matching with fuzzy matching (Simhash) algorithms to deduplicate the dataset, thereby enhancing the effectiveness of the fine-tuning model. While ensuring the high quality of the dataset, the risk of losing important data due to incorrect matches can be reduced via adjusting the threshold. + +https://algonotes.readthedocs.io/en/latest/Simhash.html \ No newline at end of file diff --git a/rag/src/data_processing.py b/rag/src/data_processing.py index 45ff3f0..334ce13 100644 --- a/rag/src/data_processing.py +++ b/rag/src/data_processing.py @@ -1,262 +1,270 @@ import json import pickle +import faiss +import pickle +import os + from loguru import logger from sentence_transformers import SentenceTransformer - +from langchain_community.vectorstores import FAISS from config.config import embedding_path, doc_dir, qa_dir, knowledge_pkl_path, data_dir, base_dir, vector_db_dir -import os -import faiss -import platform +from langchain.embeddings import HuggingFaceBgeEmbeddings from langchain_community.document_loaders import DirectoryLoader, TextLoader, JSONLoader -from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter +from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter, RecursiveJsonSplitter from BCEmbedding import EmbeddingModel, RerankerModel from util.pipeline import EmoLLMRAG -import pickle from transformers import AutoTokenizer, AutoModelForCausalLM -import torch -import streamlit as st -from openxlab.model import download - - -''' -1)根据QA对/TXT 文本生成 embedding -2)调用 langchain FAISS 接口构建 vector DB -3)存储到 openxlab.dataset 中,方便后续调用 -4)提供 embedding 的接口函数,方便后续调用 -5)提供 rerank 的接口函数,方便后续调用 -''' - -""" -加载向量模型 -""" -def load_embedding_model(): - logger.info('Loading embedding model...') - # model = EmbeddingModel(model_name_or_path="huggingface/bce-embedding-base_v1") - model = EmbeddingModel(model_name_or_path="maidalun1020/bce-embedding-base_v1") - logger.info('Embedding model loaded.') - return model - -def load_rerank_model(): - logger.info('Loading rerank_model...') - model = RerankerModel(model_name_or_path="maidalun1020/bce-reranker-base_v1") - # model = RerankerModel(model_name_or_path="huggingface/bce-reranker-base_v1") - logger.info('Rerank model loaded.') - return model - - -def split_document(data_path, chunk_size=1000, chunk_overlap=100): - # text_spliter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - text_spliter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) - split_docs = [] - logger.info(f'Loading txt files from {data_path}') - if os.path.isdir(data_path): - # 如果是文件夹,则遍历读取 - for root, dirs, files in os.walk(data_path): - for file in files: - if file.endswith('.txt'): - file_path = os.path.join(root, file) - # logger.info(f'splitting file {file_path}') - text_loader = TextLoader(file_path, encoding='utf-8') - text = text_loader.load() - - splits = text_spliter.split_documents(text) - # logger.info(f"splits type {type(splits[0])}") - # logger.info(f'splits size {len(splits)}') - split_docs += splits - elif data_path.endswith('.txt'): - file_path = os.path.join(root, data_path) - # logger.info(f'splitting file {file_path}') - text_loader = TextLoader(file_path, encoding='utf-8') - text = text_loader.load() - splits = text_spliter.split_documents(text) - # logger.info(f"splits type {type(splits[0])}") - # logger.info(f'splits size {len(splits)}') - split_docs = splits - logger.info(f'split_docs size {len(split_docs)}') - return split_docs - - -##TODO 1、读取system prompt 2、限制序列长度 -def split_conversation(path): - ''' - data format: - [ - { - "conversation": [ - { - "input": Q1 - "output": A1 - }, - { - "input": Q2 - "output": A2 - }, - ] - }, - ] - ''' - qa_pairs = [] - logger.info(f'Loading json files from {path}') - if os.path.isfile(path): - with open(path, 'r', encoding='utf-8') as file: - data = json.load(file) - for conversation in data: - for dialog in conversation['conversation']: - # input_text = dialog['input'] - # output_text = dialog['output'] - # if len(input_text) > max_length or len(output_text) > max_length: - # continue - qa_pairs.append(dialog) - elif os.path.isdir(path): - # 如果是文件夹,则遍历读取 - for root, dirs, files in os.walk(path): - for file in files: - if file.endswith('.json'): - file_path = os.path.join(root, file) - logger.info(f'splitting file {file_path}') - with open(file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - for conversation in data: - for dialog in conversation['conversation']: - qa_pairs.append(dialog) - return qa_pairs - +from langchain.document_loaders.pdf import PyPDFDirectoryLoader +from langchain.document_loaders import UnstructuredFileLoader,DirectoryLoader +from langchain_community.llms import Cohere +from langchain.retrievers import ContextualCompressionRetriever +from langchain.retrievers.document_compressors import FlashrankRerank +from langchain_core.documents.base import Document +from FlagEmbedding import FlagReranker +class Data_process(): + def __init__(self): + self.vector_db_dir = vector_db_dir + self.doc_dir = doc_dir + self.qa_dir = qa_dir + self.knowledge_pkl_path = knowledge_pkl_path + self.chunk_size: int=1000 + self.chunk_overlap: int=100 -# 加载本地索引 -def load_index_and_knowledge(): - current_os = platform.system() - split_doc = [] - split_qa = [] - #读取知识库 - if not os.path.exists(knowledge_pkl_path): - split_doc = split_document(doc_dir) - split_qa = split_conversation(qa_dir) - # logger.info(f'split_qa size:{len(split_qa)}') - # logger.info(f'type of split_qa:{type(split_qa[0])}') - # logger.info(f'split_doc size:{len(split_doc)}') - # logger.info(f'type of doc:{type(split_doc[0])}') - knowledge_chunks = split_doc + split_qa - with open(knowledge_pkl_path, 'wb') as file: - pickle.dump(knowledge_chunks, file) - else: - with open(knowledge_pkl_path , 'rb') as f: - knowledge_chunks = pickle.load(f) + def load_embedding_model(self, model_name="BAAI/bge-small-zh-v1.5", device='cpu', normalize_embeddings=True): + """ + 加载嵌入模型。 - #读取vector DB - if not os.path.exists(vector_db_dir): + 参数: + - model_name: 模型名称,字符串类型,默认为"BAAI/bge-small-zh-v1.5"。 + - device: 指定模型加载的设备,'cpu' 或 'cuda',默认为'cpu'。 + - normalize_embeddings: 是否标准化嵌入向量,布尔类型,默认为 True。 + """ + logger.info('Loading embedding model...') + try: + embeddings = HuggingFaceBgeEmbeddings( + model_name=model_name, + model_kwargs={'device': device}, + encode_kwargs={'normalize_embeddings': normalize_embeddings} + ) + except Exception as e: + logger.error(f'Failed to load embedding model: {e}') + return None + + logger.info('Embedding model loaded.') + return embeddings + + def load_rerank_model(self, model_name='BAAI/bge-reranker-large'): + """ + 加载重排名模型。 + + 参数: + - model_name (str): 模型的名称。默认为 'BAAI/bge-reranker-large'。 + + 返回: + - FlagReranker 实例。 + + 异常: + - ValueError: 如果模型名称不在批准的模型列表中。 + - Exception: 如果模型加载过程中发生任何其他错误。 + """ + try: + reranker_model = FlagReranker(model_name, use_fp16=True) + except Exception as e: + logger.error(f'Failed to load rerank model: {e}') + raise + + return reranker_model + + + def extract_text_from_json(self, obj, content=None): + """ + 抽取json中的文本,用于向量库构建 + + 参数: + - obj: dict,list,str + - content: str + + 返回: + - content: str + """ + if isinstance(obj, dict): + for key, value in obj.items(): + try: + self.extract_text_from_json(value, content) + except Exception as e: + print(f"Error processing value: {e}") + elif isinstance(obj, list): + for index, item in enumerate(obj): + try: + self.extract_text_from_json(item, content) + except Exception as e: + print(f"Error processing item: {e}") + elif isinstance(obj, str): + content += obj + return content + + + def split_document(self, data_path, chunk_size=500, chunk_overlap=100): + """ + 切分data_path文件夹下的所有txt文件 + + 参数: + - data_path: str + - chunk_size: int + - chunk_overlap: int + + 返回: + - split_docs: list + """ + + + # text_spliter = CharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + text_spliter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) + split_docs = [] + logger.info(f'Loading txt files from {data_path}') + if os.path.isdir(data_path): + loader = DirectoryLoader(data_path, glob="**/*.txt",show_progress=True) + docs = loader.load() + split_docs = text_spliter.split_documents(docs) + elif data_path.endswith('.txt'): + file_path = data_path + logger.info(f'splitting file {file_path}') + text_loader = TextLoader(file_path, encoding='utf-8') + text = text_loader.load() + splits = text_spliter.split_documents(text) + split_docs = splits + logger.info(f'split_docs size {len(split_docs)}') + return split_docs + + + def split_conversation(self, path): + """ + 按conversation块切分path文件夹下的所有json文件 + ##TODO 限制序列长度 + """ + # json_spliter = RecursiveJsonSplitter(max_chunk_size=500) + logger.info(f'Loading json files from {path}') + split_qa = [] + if os.path.isdir(path): + # loader = DirectoryLoader(path, glob="**/*.json",show_progress=True) + # jsons = loader.load() + + for root, dirs, files in os.walk(path): + for file in files: + if file.endswith('.json'): + file_path = os.path.join(root, file) + logger.info(f'splitting file {file_path}') + with open(file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + print(data) + for conversation in data: + # for dialog in conversation['conversation']: + ##按qa对切分,将每一轮qa转换为langchain_core.documents.base.Document + # content = self.extract_text_from_json(dialog,'') + # split_qa.append(Document(page_content = content)) + #按conversation块切分 + content = self.extract_text_from_json(conversation['conversation'], '') + split_qa.append(Document(page_content = content)) + # logger.info(f'split_qa size====={len(split_qa)}') + return split_qa + + + def load_knowledge(self, knowledge_pkl_path): + ''' + 读取或创建知识.pkl + ''' + if not os.path.exists(knowledge_pkl_path): + split_doc = self.split_document(doc_dir) + split_qa = self.split_conversation(qa_dir) + knowledge_chunks = split_doc + split_qa + with open(knowledge_pkl_path, 'wb') as file: + pickle.dump(knowledge_chunks, file) + else: + with open(knowledge_pkl_path , 'rb') as f: + knowledge_chunks = pickle.load(f) + return knowledge_chunks + + + def create_vector_db(self, emb_model): + ''' + 创建并保存向量库 + ''' logger.info(f'Creating index...') - emb_model = load_embedding_model() - if not split_doc: - split_doc = split_document(doc_dir) - if not split_qa: - split_qa = split_conversation(qa_dir) - # 创建索引,windows不支持faiss-gpu - if current_os == 'Linux': - index = create_index_gpu(split_doc, split_qa, emb_model, vector_db_dir) - else: - index = create_index_cpu(split_doc, split_qa, emb_model, vector_db_dir) - else: - if current_os == 'Linux': - res = faiss.StandardGpuResources() - index = faiss.index_cpu_to_gpu(res, 0, index, vector_db_dir) - else: - index = faiss.read_index(vector_db_dir) - - return index, knowledge_chunks - - -def create_index_cpu(split_doc, split_qa, emb_model, knowledge_pkl_path, dimension = 768, question_only=False): - # 假设BCE嵌入的维度是768,根据你选择的模型可能不同 - faiss_index_cpu = faiss.IndexFlatIP(dimension) # 创建一个使用内积的FAISS索引 - # 将问答对转换为向量并添加到FAISS索引中 - for doc in split_doc: - # type_of_docs = type(split_doc) - text = f"{doc.page_content}" - vector = emb_model.encode([text]) - faiss_index_cpu.add(vector) - for qa in split_qa: - #仅对Q对进行编码 - text = f"{qa['input']}" - vector = emb_model.encode([text]) - faiss_index_cpu.add(vector) - faiss.write_index(faiss_index_cpu, knowledge_pkl_path) - return faiss_index_cpu - -def create_index_gpu(split_doc, split_qa, emb_model, knowledge_pkl_path, dimension = 768, question_only=False): - res = faiss.StandardGpuResources() - index = faiss.IndexFlatIP(dimension) - faiss_index_gpu = faiss.index_cpu_to_gpu(res, 0, index) - for doc in split_doc: - # type_of_docs = type(split_doc) - text = f"{doc.page_content}" - vector = emb_model.encode([text]) - faiss_index_gpu.add(vector) - for qa in split_qa: - #仅对Q对进行编码 - text = f"{qa['input']}" - vector = emb_model.encode([text]) - faiss_index_gpu.add(vector) - faiss.write_index(faiss_index_gpu, knowledge_pkl_path) - return faiss_index_gpu - - - -# 根据query搜索相似文本 -def find_top_k(query, faiss_index, k=5): - emb_model = load_embedding_model() - emb_query = emb_model.encode([query]) - distances, indices = faiss_index.search(emb_query, k) - return distances, indices - -def rerank(query, indices, knowledge_chunks): - passages = [] - for index in indices[0]: - content = knowledge_chunks[index] + split_doc = self.split_document(self.doc_dir) + split_qa = self.split_conversation(self.qa_dir) + # logger.info(f'split_doc == {len(split_doc)}') + # logger.info(f'split_qa == {len(split_qa)}') + # logger.info(f'split_doc type == {type(split_doc[0])}') + # logger.info(f'split_qa type== {type(split_qa[0])}') + db = FAISS.from_documents(split_doc + split_qa, emb_model) + db.save_local(vector_db_dir) + return db + + + def load_vector_db(self, knowledge_pkl_path=knowledge_pkl_path, doc_dir=doc_dir, qa_dir=qa_dir): ''' - txt: 'langchain_core.documents.base.Document' - json: dict + 读取向量库 ''' - # logger.info(f'retrieved content:{content}') - # logger.info(f'type of content:{type(content)}') - if type(content) == dict: - content = content["input"] + '\n' + content["output"] + # current_os = platform.system() + emb_model = self.load_embedding_model() + if not os.path.exists(vector_db_dir) or not os.listdir(vector_db_dir): + db = self.create_vector_db(emb_model) else: - content = content.page_content - passages.append(content) + db = FAISS.load_local(vector_db_dir, emb_model, allow_dangerous_deserialization=True) + return db + + + def retrieve(self, query, vector_db, k=5): + ''' + 基于query对向量库进行检索 + ''' + retriever = vector_db.as_retriever(search_kwargs={"k": k}) + docs = retriever.invoke(query) + return docs, retriever + + ##FlashrankRerank效果一般 + # def rerank(self, query, retriever): + # compressor = FlashrankRerank() + # compression_retriever = ContextualCompressionRetriever(base_compressor=compressor, base_retriever=retriever) + # compressed_docs = compression_retriever.get_relevant_documents(query) + # return compressed_docs - model = load_rerank_model() - rerank_results = model.rerank(query, passages) - return rerank_results - -@st.cache_resource -def load_model(): - model = ( - AutoModelForCausalLM.from_pretrained("model", trust_remote_code=True) - .to(torch.bfloat16) - .cuda() - ) - tokenizer = AutoTokenizer.from_pretrained("model", trust_remote_code=True) - return model, tokenizer + def rerank(self, query, docs): + reranker = self.load_rerank_model() + passages = [] + for doc in docs: + passages.append(str(doc.page_content)) + scores = reranker.compute_score([[query, passage] for passage in passages]) + sorted_pairs = sorted(zip(passages, scores), key=lambda x: x[1], reverse=True) + sorted_passages, sorted_scores = zip(*sorted_pairs) + return sorted_passages, sorted_scores + + + if __name__ == "__main__": logger.info(data_dir) if not os.path.exists(data_dir): - os.mkdir(data_dir) - faiss_index, knowledge_chunks = load_index_and_knowledge() + os.mkdir(data_dir) + dp = Data_process() + # faiss_index, knowledge_chunks = dp.load_index_and_knowledge(knowledge_pkl_path='') + vector_db = dp.load_vector_db() # 按照query进行查询 - # query = "她要阻挠姐姐的婚姻,即使她自己的尸体在房门跟前" - # query = "肯定的。我最近睡眠很差,总是做噩梦。而且我吃得也不好,体重一直在下降" - # query = "序言 (一) 变态心理学是心理学本科生的必修课程之一,教材更新的问题一直在困扰着我们。" - query = "心理咨询师,我觉得我的胸闷症状越来越严重了,这让我很害怕" - distances, indices = find_top_k(query, faiss_index, 5) - logger.info(f'distances==={distances}') - logger.info(f'indices==={indices}') - - - # rerank无法返回id,先实现按整个问答对排序 - rerank_results = rerank(query, indices, knowledge_chunks) - for passage, score in zip(rerank_results['rerank_passages'], rerank_results['rerank_scores']): - print(str(score)+'\n') - print(passage+'\n') - \ No newline at end of file + # query = "儿童心理学说明-内容提要-目录 《儿童心理学》1993年修订版说明 《儿童心理学》是1961年初全国高等学校文科教材会议指定朱智贤教授编 写的。1962年初版,1979年再版。" + # query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想,我现在感到非常孤独、累和迷茫。您能给我提供一些建议吗?" + # query = "这在一定程度上限制了其思维能力,特别是辩证 逻辑思维能力的发展。随着年龄的增长,初中三年级学生逐步克服了依赖性" + query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想" + docs, retriever = dp.retrieve(query, vector_db, k=10) + logger.info(f'Query: {query}') + logger.info("Retrieve results:") + for i, doc in enumerate(docs): + logger.info(str(i) + '\n') + logger.info(doc) + # print(f'get num of docs:{len(docs)}') + # print(docs) + passages,scores = dp.rerank(query, docs) + logger.info("After reranking...") + for i in range(len(scores)): + logger.info(str(scores[i]) + '\n') + logger.info(passages[i]) \ No newline at end of file diff --git a/rag/src/main.py b/rag/src/main.py index 7dd7639..86a2f04 100644 --- a/rag/src/main.py +++ b/rag/src/main.py @@ -13,9 +13,8 @@ from transformers import AutoTokenizer, AutoModelForCausalLM import torch import streamlit as st from openxlab.model import download -from data_processing import load_index_and_knowledge, create_index_cpu, create_index_gpu, find_top_k, rerank from config.config import embedding_path, doc_dir, qa_dir, knowledge_pkl_path, data_dir - +from data_processing import Data_process ''' 1)构建完整的 RAG pipeline。输入为用户 query,输出为 answer 2)调用 embedding 提供的接口对 query 向量化 @@ -42,30 +41,23 @@ def load_model(): tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) return model, tokenizer -def get_prompt(): - pass - -def get_prompt_template(): - pass - -def main(query, system_prompt): - model, tokenizer = load_model() - model = model.eval() +def main(query, system_prompt=''): + logger.info(data_dir) if not os.path.exists(data_dir): - os.mkdir(data_dir) - # 下载基于 FAISS 预构建的 vector DB 以及原始知识库 - faiss_index, knowledge_chunks = load_index_and_knowledge() - distances, indices = find_top_k(query, faiss_index, 5) - rerank_results = rerank(query, indices, knowledge_chunks) - messages = [(system_prompt, rerank_results['rerank_passages'][0])] - logger.info(f'messages:{messages}') - response, history = model.chat(tokenizer, query, history=messages) - messages.append((query, response)) - print(f"robot >>> {response}") - -if __name__ == '__main__': - # query = '你好' - query = "心理咨询师,我觉得我的胸闷症状越来越严重了,这让我很害怕" - #TODO system_prompt = get_prompt() - system_prompt = "你是一个由aJupyter、Farewell、jujimeizuo、Smiling&Weeping研发(排名按字母顺序排序,不分先后)、散步提供技术支持、上海人工智能实验室提供支持开发的心理健康大模型。现在你是一个心理专家,我有一些心理问题,请你用专业的知识帮我解决。" - main(query, system_prompt) \ No newline at end of file + os.mkdir(data_dir) + dp = Data_process() + vector_db = dp.load_vector_db() + docs, retriever = dp.retrieve(query, vector_db, k=10) + logger.info(f'Query: {query}') + logger.info("Retrieve results===============================") + for i, doc in enumerate(docs): + logger.info(doc) + passages,scores = dp.rerank(query, docs) + logger.info("After reranking===============================") + for i in range(len(scores)): + logger.info(passages[i]) + logger.info(f'score: {str(scores[i])}') + +if __name__ == "__main__": + query = "我现在处于高三阶段,感到非常迷茫和害怕。我觉得自己从出生以来就是多余的,没有必要存在于这个世界。无论是在家庭、学校、朋友还是老师面前,我都感到被否定。我非常难过,对高考充满期望但成绩却不理想" + main(query) \ No newline at end of file