文章
· 四月 10 阅读大约需 6 分钟

如何构建代理人工智能 RAG 应用程序: 分步指南

社区朋友们好,

传统的基于关键词的搜索方式在处理具有细微差别的领域特定查询时往往力不从心。而向量搜索则通过语义理解能力,使AI智能体能够根据上下文(而非仅凭关键词)来检索信息并生成响应。

本文将通过逐步指导,带您创建一个具备代理能力的AI RAG(检索增强生成)应用程序。

实现步骤:

  1. 添加文档摄取功能
    • 自动获取并建立文档索引(例如《InterSystems IRIS 2025.1版本说明》)
    • 实现向量搜索功能
  2. 构建向量搜索智能体
  3. 移交至主智能体(分流处理)
  4. 运行智能体

1. Create Agent Tools 添加文档摄取功能

Implement Document Ingestion: Automated ingestion and indexing of documents 


1.1 - 以下是实现文档摄取工具的代码:

    def ingestDoc(self):
        #Check if document is defined, by selecting from table
        #If not defined then INGEST document, Otherwise back
        embeddings = OpenAIEmbeddings()	
        #Load the document based on the fle type
        loader = TextLoader("/irisdev/app/docs/IRIS2025-1-Release-Notes.txt", encoding='utf-8')      
        
        documents = loader.load()        
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=0)
        
        texts = text_splitter.split_documents(documents)
                       
        #COLLECTION_NAME = "rag_document"
        db = IRISVector.from_documents(
            embedding=embeddings,
            documents=texts,
            collection_name = self.COLLECTION_NAME,
            connection_string=self.CONNECTION_STRING,
        )

        db = IRISVector.from_documents(embedding=embeddings,documents=texts, collection_name = self.COLLECTION_NAME, connection_string=self.CONNECTION_STRING,)

向量搜索智能体(Vector Search Agent)能够自动完成文档的摄取(ingest)索引构建(index), 该新功能在InterSystems IRIS 2025.1的数据资源文件夹里) 至 IRIS 向量存储, 只有当数据尚未存在时,才执行该操作。



运行以下查询以从向量存储中获取所需数据:

SELECT
id, embedding, document, metadata
FROM SQLUser.AgenticAIRAG



1.2 - 实现向量搜索功能


以下代码为智能体提供了搜索能力:

 def ragSearch(self,prompt):
        #Check if collections are defined or ingested done.
        # if not then call ingest method
        embeddings = OpenAIEmbeddings()	
        db2 = IRISVector (
            embedding_function=embeddings,    
            collection_name=self.COLLECTION_NAME,
            connection_string=self.CONNECTION_STRING,
        )
        docs_with_score = db2.similarity_search_with_score(prompt)
        relevant_docs = ["".join(str(doc.page_content)) + " " for doc, _ in docs_with_score]
        
        #Generate Template
        template = f"""
        Prompt: {prompt}
        Relevant Docuemnts: {relevant_docs}
        """
        return template


分流代理处理传入的用户查询,并将其委托给矢量搜索代理,后者执行语义搜索操作,以检索最相关的信息。


2 - 创建矢量存储代理

以下代码实现了 矢量存储代理vector_search_agent :

  • 为智能体协同自定义交接描述规范 handoff_descriptions 
  • 明确的操作说明instructions
  • IRIS向量检索工具iris_RAG_search (使用irisRAG.py 用于文件输入和矢量搜索操作
@function_tool
    @cl.step(name = "Vector Search Agent (RAG)", type="tool", show_input = False)
    async def iris_RAG_search():
            """Provide IRIS Release Notes details,IRIS 2025.1 Release Notes, IRIS Latest Release Notes, Release Notes"""
            if not ragOprRef.check_VS_Table():
                 #Ingest the document first
                 msg = cl.user_session.get("ragclmsg")
                 msg.content = "Ingesting Vector Data..."
                 await msg.update()
                 ragOprRef.ingestDoc()
            
            if ragOprRef.check_VS_Table():
                 msg = cl.user_session.get("ragclmsg")
                 msg.content = "Searching Vector Data..."
                 await msg.update()                 
                 return ragOprRef.ragSearch(cl.user_session.get("ragmsg"))   
            else:
                 return "Error while getting RAG data"
    vector_search_agent = Agent(
            name="RAGAgent",
            handoff_description="Specialist agent for Release Notes",
            instructions="You provide assistance with Release Notes. Explain important events and context clearly.",
            tools=[iris_RAG_search]
    )


3 - 移交分流 (主要代理)


以下代码实现了将处理过的查询路由到分流代理(主协调器)的切换协议:

 triage_agent = Agent(
        name="Triage agent",
        instructions=(
            "Handoff to appropriate agent based on user query."
            "if they ask about Release Notes, handoff to the vector_search_agent."
            "If they ask about production, handoff to the production agent."
            "If they ask about dashboard, handoff to the dashboard agent."
            "If they ask about process, handoff to the processes agent."
            "use the WebSearchAgent tool to find information related to the user's query and do not use this agent is query is about Release Notes."
            "If they ask about order, handoff to the order_agent."
        ),
        handoffs=[vector_search_agent,production_agent,dashboard_agent,processes_agent,order_agent,web_search_agent]
    )


4 - 运行代理

以下代码:

  1. 接受用户输入
  2. 唤醒试运行代理triage_agent
  3. 将查询路由到矢量搜索处理Vector_Search_Agent 进行处理
@cl.on_message
async def main(message: cl.Message):
    """Process incoming messages and generate responses."""
    # Send a thinking message
    msg = cl.Message(content="Thinking...")
    await msg.send()

    agent: Agent = cast(Agent, cl.user_session.get("agent"))
    config: RunConfig = cast(RunConfig, cl.user_session.get("config"))

    # Retrieve the chat history from the session.
    history = cl.user_session.get("chat_history") or []
    
    # Append the user's message to the history.
    history.append({"role": "user", "content": message.content})
    
    # Used by RAG agent
    cl.user_session.set("ragmsg", message.content)
    cl.user_session.set("ragclmsg", msg)

    try:
        print("\n[CALLING_AGENT_WITH_CONTEXT]\n", history, "\n")
        result = Runner.run_sync(agent, history, run_config=config)
               
        response_content = result.final_output
        
        # Update the thinking message with the actual response
        msg.content = response_content
        await msg.update()

        # Append the assistant's response to the history.
        history.append({"role": "developer", "content": response_content})
        # NOTE: Here we are appending the response to the history as a developer message.
        # This is a BUG in the agents library.
        # The expected behavior is to append the response to the history as an assistant message.
    
        # Update the session with the new history.
        cl.user_session.set("chat_history", history)
        
        # Optional: Log the interaction
        print(f"User: {message.content}")
        print(f"Assistant: {response_content}")
        
    except Exception as e:
        msg.content = f"Error: {str(e)}"
        await msg.update()
        print(f"Error: {str(e)}")


在行动中观看:

更多细节,请访问 iris-AgenticAI open exchange 页面。

谢谢

讨论 (0)1
登录或注册以继续