from langchain_community.llms import Ollama from langchain.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import RunnablePassthrough from langchain_community.vectorstores import Chroma from langchain_community.embeddings import OllamaEmbeddings from langchain.text_splitter import CharacterTextSplitter from langchain.schema import Document from langchain.chains.combine_documents import create_stuff_documents_chain from langchain.chains import create_retrieval_chain from sqlalchemy import create_engine, MetaData, Table from sqlalchemy.orm import sessionmaker from dotenv import load_dotenv, find_dotenv import os MODEL = "mistral" class Ident: llm = None prompt = None retriever = None output_parser = None vectorstore = None chain = None engine = None def __init__(self): load_dotenv(find_dotenv()) self.llm = Ollama(model=MODEL, temperature=1) self.prompt = ChatPromptTemplate.from_template( """ You are a world class expert vehicle appraiser When answer to user: - If you don't know, just say that you don't know. - If you don't know when you are not sure, ask for clarification. Avoid mentioning that you obtained the information from the context. And answer according to the language of the user's question. Given the context information, answer the query. Query: {input} """ ) self.output_parser = StrOutputParser() self.chain = self.prompt | self.llm | self.output_parser db_host = os.getenv("DB_PRECIOS_HOST") db_user = os.getenv("DB_PRECIOS_USER") db_password = os.getenv("DB_PRECIOS_PASSWORD") db_schema = os.getenv("DB_PRECIOS_SCHEMA") self.engine = create_engine(f"mysql://{db_user}:{db_password}@{db_host}/{db_schema}") def ask(self, query: str): identified_fields = self.chain.invoke({ "input": """From the following text: '[query]' Try get: - year - make/brand - model - transmission (mt, manual, at, automatic, cvt) - variant - engine size (in CC or LT. Ex. 1.2 or 2.0) - fuel (bencina, diesel, hybrid) - power train (4x2, 4x4, AWD, FWD) - doors no. - price Answer only with the format and nothing more: 'year: {year}, brand: {brand}, model: {model}, transmission: {transmission}, variant: {variant}, submodel: {submodel}, engine size: {engine}, fuel: {fuel}, power train: {traction}, doors: {doors}, price: {price}' If one field is not to be found assing 'N/A' as default value """.replace("[query]", query) }) print(f"Response (1): {identified_fields}") documents = self.get_db_documents(identified_fields) identified_fields = identified_fields.replace(", ", "\n") print(f"New query: {identified_fields}") text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=100) docs = text_splitter.split_documents(documents) # print(documents) self.vectorstore = Chroma.from_documents(docs, embedding=OllamaEmbeddings(model="nomic-embed-text")) # print(f"Response (2): {self.vectorstore.similarity_search_with_score(identified_fields)}") self.retriever = self.vectorstore.as_retriever() self.prompt = ChatPromptTemplate.from_template( """ You are a world class expert vehicle appraiser Consider the following vehicle versions: {context} Utiliza los siguientes criterios de comparación (ordenandos por relevancia): - Year - Brand - Model - Transmission - Variant - Engine size - Fuel - Power train - Body type - Doors - Price (range 10%) Si un criterio no puede ser evaluado continuar con el siguiente Compara la siguiente descripción: '{search}' Omite características como ubicación y kilometraje Responde con las características del mejor match """ ) # document_chain = create_stuff_documents_chain(self.llm, self.prompt) # retrieval_chain = create_retrieval_chain(self.retriever, document_chain) # response = retrieval_chain.invoke({"search": query, "input": query}) """ self.chain = ( {"context": self.retriever | self.format_docs, "search": RunnablePassthrough()} | self.prompt | self.llm | StrOutputParser() ) response = self.chain.invoke(identified_fields) """ # print(f"Response (3): {response}") relevant_documents = self.retriever.get_relevant_documents(identified_fields) print(f"Retriever: {relevant_documents}") # return response return relevant_documents[0].page_content + "\nmodel_id: " + str(relevant_documents[0].metadata['modelo_id']) def get_db_documents(self, fields: str): fields = fields.upper().split(",") key_values = {} for field in fields: try: key, value = field.split(":") except Exception as e: print(f"Error: {e}") key = '' value = '' key_values[key.strip()] = value.strip() year = key_values.get("YEAR", 0) brand = key_values.get("BRAND", "") model = key_values.get("MODEL", "") print(f"Searching: '{year} {brand} {model}'") Session = sessionmaker(bind=self.engine) session = Session() connection = self.engine.connect() metadata = MetaData() # Carga la tabla desde la base de datos bm = Table('20171229_bm', metadata, autoload_with=self.engine) result = (session.query(bm) .where(bm.c.ano_auto == year) .where(bm.c.marca.like(f"%{brand}%")) .where(bm.c.modelo_comp.like(f"{model}%")) .where(bm.c.eliminado == 0) .all()) connection.close() documents = [] for row in result: if row.combustible == "BENC": fuel = "BENCINA" elif row.combustible == "DIES": fuel = "DIESEL" elif row.combustible == "HIB": fuel = "HIBRIDO" elif row.combustible == "ELEC": fuel = "ELECTRICO" page_content = f""" year: {row.ano_auto} brand: {row.marca} model: {row.modelo} transmission: {row.transmision} variant: {row.version_m} engine size: {row.motor} | {row.cilindrada} fuel: {fuel} power train: {row.traccion} body type: {row.tipo_carroceria} doors: {row.puertas} price: {row.tasacion} """ documents.append(Document(page_content=page_content, metadata=dict(modelo_id=row.modelo_id))) return documents def format_docs(self, docs): return "\n".join(doc.page_content for doc in docs) def clear(self): self.vectorstore = None self.retriever = None self.chain = None