作者: du

  • Amazon Bedrock 的多智能体协作(Multi Agent Collaboration):实现高级智能体联动

    Amazon Bedrock 的多智能体协作(Multi Agent Collaboration):实现高级智能体联动

    前言

    今天我将结合实际运行的示例代码,为大家讲解 2024 年 12 月发布的 Amazon Bedrock 新功能 ——“多智能体协作(Multi Agent Collaboration)”。

    什么是多智能体协作(Multi Agent Collaboration)

    1. 概述

    多智能体协作(Multi Agent Collaboration)是 Amazon Bedrock 推出的新功能,支持多个 AI 智能体(Agent)协同完成任务。AWS 官方博客中以社交媒体营销活动为例,介绍了其应用模式:

    1. 由管理智能体(Supervisor Agent)接收用户的请求;
    2. 委托专门负责内容创作的智能体撰写帖子;
    3. 委托专门负责互动量预测的智能体确定发帖时机;
    4. 将各智能体返回的结果整合后,向用户反馈最终响应。

    通过管理智能体为各专业智能体分配相应任务,实现了多智能体协同生成输出的能力。

    2. 与其他智能体服务的对比

    在涉及多智能体管理的服务中,与多智能体协作(Multi Agent Collaboration)功能相似的是多智能体编排器(Multi Agent Orchestrator)。下表总结了普通智能体、多智能体协作(Multi Agent Collaboration)及多智能体编排器(Multi Agent Orchestrator)各自的特点与适用范围。

    特征普通的智能体(通常Agent)多智能体协作(Multi Agent Collaboration)多智能体编排器(Multi Agent Orchestrator)
    目的由单个智能体负责所有处理,完成简单任务。多个智能体联动分工,高效处理复杂任务。集中管理多个智能体,将任务分配给最优智能体。
    结构单个智能体接收用户请求,独立完成所有处理流程。管理智能体拆分任务、分配给各智能体,并整合结果。管理角色分析请求,选择合适的智能体执行任务。
    主要功能1. 简单任务处理2. 一致性响应1. 多智能体高效分工2. 发挥各智能体专业性开展处理1. 依据请求灵活选择智能体2. 动态分配任务
    优点1. 结构简单,易于实现2. 适用于小规模任务1. 可高效处理复杂任务2. 最大限度发挥各智能体的专业能力1. 设计简洁,便于新增和修改2. 可灵活适配各类任务
    缺点1. 单个智能体承担所有处理,可扩展性低2. 难以应对复杂任务1. 需实现智能体间联动,管理难度大2. 需进行复杂的系统设计1. 智能体间协作性弱,不适用于复杂任务2. 处理过程高度依赖中央管理

    github.com

    相较于使用普通智能体或多智能体编排器,借助多智能体协作(Multi Agent Collaboration)似乎能够应对更为复杂的任务。

    运行示例场景

    接下来,让我们实际操作使用多智能体协作(Multi Agent Collaboration)功能。本次将基于 AWS 官方提供的资源,搭建一个股票分析助手。

    github.com

    1. 股票分析助手概述

    我们将要搭建的股票分析助手如下所示:

    通过运行示例代码,将创建以下 4 个智能体:

    • Stock Analysis(管理智能体 / Supervisor Agent)接收用户的股票相关请求,分析股价数据与新闻数据;将处理工作委托给各专业智能体,最终生成汇总报告。
    • Stock Data Researcher(股价数据研究智能体)获取股价历史数据,分析价格波动;通过 Lambda 函数获取实时市场数据。
    • News Researcher(新闻研究智能体)获取指定股票代码(Ticker)的相关新闻,分析市场情绪与趋势;通过 Lambda 函数调用网络搜索 API,收集最新新闻。
    • Financial Analyst(金融分析师智能体)整合股价分析与新闻分析的结果,提供投资判断建议;生成最终推荐内容及面向用户的报告。

    流程上,用户输入股票相关信息后,上述 2-4 号智能体将围绕请求的股票信息开展收集与解析工作,再由 1 号 Stock Analysis 智能体对结果进行整合。

    2. 搭建方法

    在 Bedrock Agent 中搭建多智能体协作(Multi Agent Collaboration)非常简便,仅需完成以下设置:

    1. 创建需要参与多智能体协作的各智能体;
    2. 按照以下配置创建用于统筹的管理智能体(Supervisor Agent):
      • 创建管理智能体时,开启 “Enable multi-agent collaboration”(启用多智能体协作)选项;
      • 按以下形式设置由 Supervisor Agent(主管代理)进行统括的 Collaborator Agent(协作代理)。

    本次将通过运行以下示例代码,实现基于 boto3 的多智能体系统搭建。

    github.com

    我们将按照 README 文档的说明逐步搭建系统。

    1. 执行以下命令搭建环境

    git clone https://github.com/awslabs/amazon-bedrock-agent-samples
    
    cd amazon-bedrock-agent-samples
    
    python3 -m venv .venv
    
    source .venv/bin/activate
    
    pip3 install -r src/requirements.txt
    

    2. 执行以下命令创建示例智能体

    python3 examples/multi_agent_collaboration/portfolio_assistant_agent/main.py --recreate_agents "true"
    

    3. 生成的智能体列表

    执行上述命令后,将创建以下智能体:

    生成的智能体概要

    系统将 portfolio_assistant 智能体设置为管理智能体(Supervisor Agent),并将其他智能体注册为协作智能体(Collaborator Agent)。

    Stock Analysis(管理智能体 / Supervisor Agent)

    通过执行以下代码创建管理智能体:

    portfolio_assistant = SupervisorAgent.direct_create(
        "portfolio_assistant",
        role="Portfolio Assistant",
        goal="分析特定的潜在股票投资标的,提供包含一系列投资考量因素的报告",
        collaboration_type="SUPERVISOR",
        instructions="""
                 请以分析特定股票潜在投资价值的资深专家身份开展工作。
                 为掌握最新股价走势及相关新闻动态,需进行调研分析。
                 提交内容详实、论证充分且兼顾潜在投资者需求的报告。
                 借助分析师协作智能体完成最终分析,并将新闻与股价数据作为输入传递给分析师。
                 对协作智能体的调用需按顺序进行,不可并行调用。
                 最终输出内容需全部以日语呈现。""",
        collaborator_agents=[
            {
                "agent": "news_agent",
                "instructions": """
                 如需查找特定股票的相关新闻,请调用此协作智能体。""",
            },
            {
                "agent": "stock_data_agent",
                "instructions": """
                 如需查询特定股票的价格历史,请调用此协作智能体。""",
            },
            {
                "agent": "analyst_agent",
                "instructions": """
                 如需获取原始调研数据,并生成详细报告及投资考量建议,请调用此协作智能体。""",
            },
        ],
        collaborator_objects=[news_agent, stock_data_agent, analyst_agent],
        guardrail=no_bitcoin_guardrail,
        llm="us.anthropic.claude-3-5-sonnet-20241022-v2:0",
        verbose=False,
    )
    

    github.com

    上述代码中的核心配置项说明如下:

    • goal:设置智能体整体预期输出目标。
    • instructions:定义对管理智能体的操作指引,包括如何与协作智能体协同工作。
    • collaborator_agents:定义各协作智能体的配置,确保管理智能体可正常调用协作智能体。

    在 AWS 控制台中也可确认:portfolio_assistant 已被设为管理智能体,其余 3 个智能体均已注册为协作智能体。

    此外,通过以下提示词指令,要求管理智能体与其他协作智能体协同完成股票分析任务。(原始提示词为英文,为实现日语输出已进行修正)。本次通过提示词指定智能体按顺序执行任务,但协作智能体也支持并行执行模式。

    请以分析特定股票潜在投资价值的资深专家身份开展工作。为掌握最新股价走势及相关新闻动态,需进行调研分析。

    提交内容详实、论证充分且兼顾潜在投资者需求的报告。借助分析师协作智能体完成最终分析,并将新闻与股价数据作为输入传递给分析师。

    对协作智能体的调用需按顺序进行,不可并行调用。最终输出内容需全部以中文呈现。

    stock_data_agent(股价数据研究智能体)

    如图所示,该智能体将用于获取股票价格信息的 Lambda 函数配置为行动组。通过此行动组获取股票价格信息,是该智能体的核心职责。

    以下代码定义了该智能体的功能:通过从管理智能体传递的提示词中提取 tool_defs 里定义的参数,实现股价信息的获取。

    # Define Stock Data Agent
    stock_data_agent = Agent.direct_create(
        name="stock_data_agent",
        role="财务数据采集员",
        goal="获取特定股票代码(Ticker)的准确股价趋势",
        instructions="实时财务数据提取专家",
        tool_code=f"arn:aws:lambda:{region}:{account_id}:function:stock_data_lookup",
        tool_defs=[
            {  # lambda_layers: yfinance_layer.zip, numpy_layer.zip
                "name": "stock_data_lookup",
                "description": "获取指定股票代码的1个月股价历史,返回格式为JSON",
                "parameters": {
                    "ticker": {
                        "description": "The ticker to retrieve price history for",
                        "type": "string",
                        "required": True,
                    }
                },
            }
        ],
    )
    

    github.com

    news_agent(新闻研究智能体)

    如图所示,该智能体将用于搜索和获取股票相关新闻的 Lambda 函数配置为行动组。通过此行动组获取股票相关新闻,是该智能体的核心职责。

    analyst_agent(金融分析师智能体)

    通过以下提示词配置,该智能体可根据输入的股价信息与新闻内容开展分析工作:

    角色:财务分析师

    目标:通过分析股价趋势与市场新闻获取洞察

    操作说明:作为资深分析师提供战略性建议。接收新闻摘要与股价摘要作为输入。无可用工具,仅可依赖自身知识开展工作。

    3. 运行示例场景

    接下来,我们立即运行已搭建的智能体系统。本次将通过 AWS 控制台进行操作。

    我们向作为管理智能体创建的 portfolio_assistant 智能体发送以下请求,查询亚马逊股票相关信息:

    请求

    ticker:Amazon

    回答

    在本次案例中,大约 30 秒左右就收到了回复。尽管执行时间会因所运行的智能体(Agent)而异,但如果要执行多个像搜索这类耗时的操作,整体花费的时间也会相应增加。

    此外,我们能够通过以下形式查看协作智能体(Collaborator Agent)是如何开展处理工作的。可以看到,news_agent、stock_data_agent、analyst_agent 这几个智能体相互协作,成功回答了问题。

    接下来,让我们看看管理智能体(Supervisor Agent)是如何向协作智能体下达指令的。通过跟踪步骤,能够查看各个智能体的运行情况,具体如下:

    从以下内容可以看出,协作智能体能够从管理智能体传递的提示词中提取出参数。

    管理智能体向协作智能体(stock_data_agent)发送的提示词

    [{text = 请提供股票代码为 ‘Amazon’ 的股票价格历史。}]

    协作智能体执行行动组的输入

    "invocationInput": [
    {
    "actionGroupInvocationInput": {
    "actionGroupName": "actions_stock_data_agent",
    "executionType": "LAMBDA",
    "function": "stock_data_lookup",
    "parameters": [
    {
    "name": "ticker",
    "type": "string",
    "value": "Amazon"
    }
    ]
    },
    "invocationType": "ACTION_GROUP",
    "traceId": "1f373c5c-12b9-4d07-8a24-1317d66f5115-0"
    }
    ]
    

    对于管理智能体接收到的请求,我们可以轻松构建出向各个协作智能体分配任务的流程。在将任务拆分给多个专业智能体来解决时,多智能体协作(Multi Agent Collaboration)的优势有望得到充分发挥。

    总结

    本次我们对 2024 年 12 月发布的 Amazon Bedrock 多智能体协作(Multi Agent Collaboration)功能进行了讲解。特别令人欣喜的是,仅通过 Bedrock Agent 就能构建出专业智能体以及对其进行统筹的管理智能体。鉴于该功能在各类应用场景中都有潜在的使用可能,我打算今后继续进行更多尝试。

  • 借助 Azure MCP Server 与 GitHub Copilot,尽可能简化 Azure 应用搭建流程

    借助 Azure MCP Server 与 GitHub Copilot,尽可能简化 Azure 应用搭建流程

    Model Context Protocol(MCP)问世已有一段时间,在此期间,支持 MCP 的服务不断增多,其作为大语言模型(LLM)工具的应用范围也在逐步扩大。无论是工具提供方还是客户端的服务数量都在增加,预计今后支持 MCP 的场景还将进一步拓展。

    在这样的背景下,微软推出了 Azure MCP Server—— 一款可通过 MCP 操作 Azure 资源的 MCP 服务器。(相关链接:github.com

    目前,该服务器可操作的资源虽有限,但已能实现 CosmosDB、Blob Storage 等数据的查询功能。本文将先简要介绍 MCP,随后讲解如何从 GitHub Copilot 调用 Azure MCP Server,从而轻松搭建使用 Azure 资源的 Web 应用。

    MCP 是什么

    MCP 是 Model Context Protocol 的缩写,顾名思义,它指的是一种协议(即标准规范)。具体而言,这是一种针对 “使用 LLM 模型的应用在调用工具时,如何获取上下文并调用工具” 这一流程所制定的标准规范。

    提到工具调用,大家可能会想到 Function Calling(函数调用),但 MCP 与它属于不同层面的概念,具体区别如下:

    • Function Calling(函数调用):输入用户提示词与各工具的信息后,选择并调用对应工具的功能。
    • MCP:规定了 LLM 应用应如何选择工具、工具方应如何提供工具的标准规范。

    在 Model Context Protocol 中,定义了 “MCP Server”(MCP 服务器)与 “MCP Client”(MCP 客户端)两种角色:

    • 宿主(Host):指使用 MCP 的 LLM 应用。
    • MCP Server(MCP 服务器):提供工具,负责提供工具信息并执行工具调用。
    • MCP Client(MCP 客户端):从 MCP Server 获取工具信息,决定调用哪个工具并执行调用操作。
    https://modelcontextprotocol.io/docs/concepts/architecture

    严格来说,MCP Server 还可提供提示词模板等其他功能,本文在此暂不展开说明。

    例如,Claude Desktop、GitHub Copilot 等属于工具调用方服务,对应上述的 “宿主”,其内部包含了 MCP 客户端。

    本文将按照以下架构开展操作:通过由 NodeJS 启动的 MCP Server 来操作 Azure 资源。

    关于 Azure MCP Server

    这是微软发布的 MCP 服务器,可通过工具提供 Azure 资源的操作能力,支持标准输入与 HTTP 两种通信方式。

    可操作的内容示例如下:

    • 获取 CosmosDB 容器列表
    • 向 CosmosDB 容器执行 SQL 查询
    • 获取 Blob Storage 容器列表

    搭建使用 Azure 资源的 Web 应用

    本次我们将搭建一个简易 Web 应用,实现文件的上传、下载与删除功能,并记录 “谁在何时上传了文件”。后端将采用 Blob Storage 作为文件存储,CosmosDB 用于管理上传历史。

    开发环境如下:

    • VS Code Insiders
    • GitHub Copilot
    • Python 3.10.11

    启动 Azure MCP Server

    启动 Azure MCP Server 十分简单。

    https://github.com/Azure/azure-mcp

    执行以下命令启动服务器

    bashnpx -y @azure/mcp@latest server start --transport sse

    若出现如下输出,则表示启动成功

    plaintextinfo: Microsoft.Hosting.Lifetime[14]

    Now listening on: http://localhost:5008

    配置 VS Code 中的 GitHub Copilot,使其适配 Azure MCP Server

    仅启动 MCP Server 无法让客户端识别,需完成注册操作以确保 LLM 应用能正常识别。由于本次使用 GitHub Copilot,具体操作可参考链接code.visualstudio.com

    核心步骤为在 .vscode/mcp.json 文件中写入以下内容即可:

    json

    {
        "servers": {
            "azure-mcp-dev": {
                "url": "http://localhost:5008"
            }
        }
    }
    

    通过 GitHub Copilot 搭建应用

    首先,需预先在 Azure 中手动创建以下资源组及资源:

    • 资源组:mcp-dev
    • CosmosDB:ssk-mcp-cosmos-dev
    • 存储账户:sskmcpblobdev

    理论上,我们希望让 Azure MCP Server 完成资源创建的全过程,但将资源创建操作完全交给 LLM 存在一定风险,因此本次仅手动完成这部分工作。

    完成上述准备后,向已关联 Azure MCP Server 的 GitHub Copilot 输入以下提示词:

    “我想在名为 mcp-dev 的资源组中,使用 Blob Storage 和 CosmosDB 搭建一个文件上传管理系统。

    • 支持用户各自上传、下载、删除文件。
    • 通过 CosmosDB 记录每个用户何时更新了哪个文件。
    • 使用 Python 的 FastAPI 进行开发。

    请基于上述条件完成以下操作:

    • 修改 Azure 资源配置。
    • 输出展示数据库与存储配置的文档。”

    此时,Copilot 会先检查现有资源状态,如下所示:

    其中显示 “Ran azmcp-group-list” 的部分,即为正在执行 Azure MCP Server 操作的过程,能看出它正在确认资源组、存储及 CosmosDB 的状态。

    Copilot 在首次回复中会告知所需的设计方案,之后我们可进一步指令其输出代码。

    需要注意的是,Copilot 会生成源代码,但不会主动创建容器。由于若不明确指示,它通常不会执行修改资源的操作,因此需补充相关指令。

    补充指令后,资源所需的各个容器便会分别创建完成。

    至此,源代码与后端资源的搭建就全部完成了。

    实际使用体验

    下面我们来运行一下这个应用。界面是通过向 GitHub Copilot 输入提示词生成的。

    由于没有对外观做任何指定,所以界面处于最基础的状态。这里展示的是已上传 schema.json 文件后的状态,Azure 资源中也同步了上传的文件及相关操作历史。

    像这样,借助 Azure MCP Server 与 GitHub Copilot,从 Azure 资源配置到应用创建的全过程,都可以通过提示词一次性完成。在 Azure 上搭建简单应用时,这一组合应该会非常实用。

    总结

    本文借助 Azure MCP Server 与 GitHub Copilot,轻松验证了 Azure 资源配置与应用开发的全流程。

    除了 GitHub Copilot 原本就具备的应用创建能力外,现在连适配应用的资源配置都能通过提示词一站式完成,非常便捷。

    未来,MCP Server 的工具扩展功能还将进一步完善,想必能为我们带来更高效的开发体验。

    那么,我们下次再见。

  • 使用 Aurora Serverless v2 作为 Amazon Bedrock Knowledge Bases 的向量数据库

    使用 Aurora Serverless v2 作为 Amazon Bedrock Knowledge Bases 的向量数据库

    今天我尝试了使用 Amazon Bedrock Knowledge Bases,并将 Amazon Aurora PostgreSQL 用作向量数据库。

    去年 12 月,Amazon Bedrock Knowledge Bases 新增了可快速创建 Aurora PostgreSQL 作为向量数据库的功能,大幅简化了设置流程。

    本次我也将利用这一快速创建功能进行设置。

    aws.amazon.com

    1. 将 Aurora PostgreSQL 配置为向量存储

    事前准备

    本次将 S3 用作 RAG(检索增强生成)的外部数据源。之后,我们会确认 LLM(大语言模型)的回答是否参考了存储在 S3 中的资料。

    Knowledge Bases 的创建

    在 AWS 管理控制台中,进入 Bedrock 页面,仅通过 GUI 操作即可轻松创建 Knowledge Bases。

    点击 “Knowledge Base with vector store”(带向量存储的知识库),即可跳转至 Knowledge Bases 创建页面。

    在 “步骤 2 配置数据源” 中,指定事前准备好的 S3 的 URI。而 “步骤 3 选择嵌入模型并配置向量数据库” 则是本次的核心内容。

    向量数据库的选项中新增了 “Amazon Aurora PostgreSQL Serverless” 这一项目,请选择此项。

    ※向量数据库的可选范围因区域而异,本文中测试使用的是东京区域。

    之后点击 “下一步”,确认创建内容后即可完成设置。仅需通过 GUI 操作即可完成,直观又简单!

    在 RDS 控制台中可以确认已创建的数据库。

    数据库表的创建情况如下所示。

    Bedrock_Knowledge_Base_Cluster=> \d bedrock_knowledge_base
            Table "bedrock_integration.bedrock_knowledge_base"
      Column   |        Type         | Collation | Nullable | Default
    -----------+---------------------+-----------+----------+---------
     id        | uuid                |           | not null |
     embedding | public.vector(1536) |           |          |
     chunks    | text                |           |          |
     metadata  | jsonb               |           |          |
    Indexes:
        "bedrock_knowledge_base_pkey" PRIMARY KEY, btree (id)
        "bedrock_knowledge_base_embedding_idx" hnsw (embedding public.vector_l2_ops)
    

    Knowledge Bases 的测试

    选择已创建的 Knowledge Bases 后,会出现 “测试知识库”页面。在此处向 LLM 提问,测试是否能返回预期的回答。

    本次我提出了 “敏捷开发的优势是什么?“ 这一问题。结果如预期般返回了参考了事前准备并存储在 S3 中的资料的回答,看起来运行正常。

    2. 与 OpenSearch Serverless 的对比

    OpenSearch Serverless 是被广泛用作向量数据库的代表性服务。此处将整理其与 Aurora Serverless v2 在实际使用中的差异。

    功能

    当使用 Aurora PostgreSQL 作为向量数据库时,仅支持语义搜索这一种检索类型。

    而使用 OpenSearch Serverless 时,则可在混合搜索与语义搜索之间进行选择。

    • 语义搜索:并非简单的关键词匹配,而是检索语义上相关的信息。
    • 混合搜索:将关键词检索与语义搜索相结合进行信息检索。

    从检索功能性来看,OpenSearch Serverless 更具优势。若需融入关键词检索功能,建议选择 OpenSearch Serverless。

    成本

    OpenSearch Serverless 的计算费用采用计时收费模式,即便处于未使用状态,仍会产生每小时的费用。以美国东部(弗吉尼亚北部)区域为例,每个单位的费用为 0.24 美元 / 小时。根据文档说明,至少会按 2 个单位计费,因此每月的费用约为 0.24 美元 / 小时 × 720 小时 × 2 = 345 美元。

    相比之下,Aurora Serverless v2 不仅单价低廉(0.12 美元 / 单位 / 小时),还支持缩容至 0 个单位。因此,能够有效控制未使用状态下的成本。

    aws.amazon.com

    查看此前通过快速创建功能搭建的 Aurora PostgreSQL 实例配置,确认其确实支持缩容至 0 个单位,与预期一致。

    在 CloudWatch 中查看单位使用率(ACU),可以确认实例在未使用时会自动缩容至 0 个单位。

    3. 检索速度确认

    最后,我们将验证文档数量增加时的检索速度及 ACU(Aurora 计算单位)变化情况。数据源采用 Kaggle 上的 “BBC News Summary” 数据集,将约 9000 条数据存储至 S3 中。

    参照 “1. 将 Aurora PostgreSQL 配置为 Bedrock Knowledge Bases 的向量数据库” 中的方法,向 LLM 发起提问。结果显示,与文档数量较少时相同,回答在数十毫秒内即可返回。对于本次使用的数据集规模而言,检索速度似乎不存在明显问题。

    查看 ACU 数据可知:文档导入时的 ACU 使用率约为 30%(16(最大扩容单位数)× 0.3 = 5 个单位),LLM 生成回答时的 ACU 使用率约为 15%(16 × 0.15 = 2.5 个单位)。

    向量数据库的 ReadLatency(读取延迟)控制在 0.01 秒以内,使用体验较为流畅。

    4. 总结

    本次尝试了在 Bedrock Knowledge Bases 中使用 Aurora Serverless v2 作为向量数据库。

    借助快速创建功能,仅需几次 GUI 点击操作,即可轻松完成向量数据库的搭建。对于 “控制未使用状态下成本” 这一需求,也能够轻松实现。

    最后提醒

    仅删除 Bedrock Knowledge Bases,并不能移除通过快速创建功能生成的向量数据库等其他关联资源。若不再需要这些资源,请务必手动删除,避免遗漏。

  • 借助 Amazon Nova 模型与 Bedrock Knowledge Base 实现视频检索

    借助 Amazon Nova 模型与 Bedrock Knowledge Base 实现视频检索

    不知您是否有过想要检索视频的经历?

    比如,只依稀记得视频里提到过某些内容,但这些内容并未体现在视频标题中,导致无论如何都找不到对应的视频。

    或许如果能像使用 Google 搜索那样检索就好了,但对于公司内部的视频或工作中使用的视频而言,事情往往没那么简单。

    为了解决这类困扰,我们尝试利用 Amazon Nova 模型与 Amazon Bedrock Knowledge Base 开发了一款工具。

    通过使用 Bedrock Knowledge Base,无需自行开发文档导入与检索功能,只需将视频的摘要结果存入 S3,即可轻松实现联动。

    也就是说,能够在最大限度降低开发成本的同时,开发出高性能的应用程序。

    构成图

    1. 视频检索想要实现的目标

    如果视频标题中包含目标语句,那么通过标题就能进行检索,但对于仅在视频部分内容中提及的信息等,很难通过字符串进行检索。

    此外,有时用户并非对某一特定视频感兴趣,而是希望广泛检索主题相同的视频。

    在本次视频检索项目中,为实现此类模糊检索,我们考虑通过 Bedrock Knowledge Base 进行向量检索。

    2. 实现方法

    大致步骤如下:

    1. 使 Amazon Nova 模型可对视频进行摘要处理
    2. 使摘要结果可导入 Bedrock Knowledge Base
    3. 实现视频检索处理
    4. 制作检索界面

    下面分别进行详细说明。

    2.1. 使 Amazon Nova 模型可对视频进行摘要处理

    通过模型生成摘要

    Amazon Nova 模型除了可接收文本、图像输入外,还能接收视频输入。

    我们将视频及其文字稿与下述提示词一同输入模型,让模型生成摘要文本。

    本次我们从 YouTube 频道视频中,选取了时长 1 分钟以内的视频,并上传至 S3。

    视频上传至 S3 后,Lambda 会接收事件触发,调用 Amazon Nova Lite 为这些视频生成说明文本,并将文本文件上传至 S3。

    该文本文件随后将成为 Knowledge Base 的导入对象。

    system_prompt = [
        {
            "text": dedent("""\
                您的任务是分析给定的视频,并说明视频中呈现的内容。
                视频的文字稿结果已记载在「文字稿:xxx」部分,请将其作为说明的依据。
                您的回复必须严格仅由视频的说明文本构成。
                请尽可能详细地进行说明。
                摘要请以中文呈现。
            """)
        }
    ]
    use_messages = [
        {
            "role": "user",
            "content": [
                {"text": f"视频名: {filename}"},
                {
                    "video": {
                        "format": "mp4",
                        "source": {"bytes": b64_content},
                    },
                },
                {"text": f"文字稿: {transcript}"},
                {"text": "请用中文说明此视频。"},
            ],
        }
    ]
    config = {"temperature": 0}
    body = {
        "schemaVersion": "messages-v1",
        "system": system_prompt,
        "messages": use_messages,
        "inferenceConfig": config,
    }
    
    response = bedrock_agent.invoke_model(
        modelId=MODEL_ID,
        body=json.dumps(body),
        contentType="application/json",
    )
    model_response = json.loads(response["body"].read())
    content = model_response["output"]["message"]["content"][0]["text"]
    

    模型会返回如下响应:

    此外,由于本次仅针对短时长视频,因此采用了直接向 API 提交视频的方式,但如果文件体积较大,则需要采取诸如读取上传至 S3 的文件、将视频分段传输后再合并等方法。

    2.2. 使摘要结果可导入 Bedrock Knowledge Base

    导入 Knowledge Base

    接下来需要通过 Knowledge Base 进行同步,以导入上述说明文本。但如果直接导入,无法实现原始视频与上传的文本文件之间的关联。

    为解决这一问题,我们使用了 Knowledge Base 的 metadata.json 功能。

    在此 JSON 文件中记载的内容,会在 Knowledge Base 同步时作为元数据保存到 OpenSearch Serverless 中,可供检索时获取。

    本次我们在元数据中指定了视频的 URL,用于检索后的界面显示。

    {
      "metadataAttributes": {
        "original_path": "s3://bucket/path/to/video.mp4"
      }
    }
    

    当视频的说明文本与对应的 metadata.json 配置完成后,即可对 Knowledge Base 进行同步。

    2.3. 实现视频检索处理

    执行检索

    检索时使用了 Knowledge Base 的 Retrieve API。

    此外,通过在检索后设置分数下限作为阈值,可确保经重排序后被判定为相关性较低的视频不纳入检索结果。

    const input = {
      knowledgeBaseId,
      retrievalQuery: {
        text: query.trim(),
      }
      retrievalConfiguration: {
        vectorSearchConfiguration: {
          numberOfResults: 20,
          overrideSearchType: "HYBRID",
          rerankingConfiguration: {
            bedrockRerankingConfiguration: {
              modelConfiguration: {
                modelArn: AMAZON_RERANK_MODEL,
              },
              numberOfRerankedResults: 10,
            },
            type: "BEDROCK_RERANKING_MODEL",
          },
        },
      },
    };
    const command = new RetrieveCommand(input);
    return client.send(command);
    

    可按如下方式获取视频的概要及视频文件路径:

    {
      "retrievalResults": [{
        "content": {
          "text": "该视频介绍了远程办公中的 IT 工程师想吃的豆沙包排名……",
          "type": "TEXT"
        },
        "location": {
          "s3Location": {
            "uri": "s3://bucket/path/to/summary.txt"
          },
          "type": "S3"
        },
        "metadata": {
          "original_path": "s3://bucket/path/to/video.mp4"
        }
      }]
    }
    

    2.4. 制作检索界面

    本次使用 bolt.new 制作了执行上述检索的界面。

    虽然我不太擅长前端开发,但只需用中文下达指令,就能制作出非常不错的应用,这一点让我很惊喜。

    制作的检索应用
    检索结果

    3. 结果

    3.1. 尝试用视频中包含的语句进行检索

    首先,我们尝试使用既包含在视频标题中、也包含在生成式 AI 生成的摘要文本中的关键词进行检索。

    经确认,目标视频会显示在检索结果的第一位。

    当遇到 “想再看那个视频,但用传统检索方式搜不到” 的情况时,使用这款应用就能立即找到想看的视频。

    “美味的豆沙包”

    3.2. 尝试用视频中不包含的语句进行检索

    接下来,我们尝试用未出现在标题和摘要文本中,但凭借向量检索可能捕捉到相关内容的语句进行了检索。

    此次检索也成功命中了一段视频,内容是 2018 年参加在美国旧金山举办的 “Elastic {ON}” 大会时的场景。

    虽然本次知识库中仅导入了 Elastic {ON} 相关的视频,但如果预先导入其他海外大会的视频,就能实现 “并非想找某一特定视频,而是希望广泛获取同类视频” 的需求。

    海外大会的场景

    总结

    目前,Bedrock Knowledge Base 尚无法直接输入视频,但通过使用 Amazon Nova 生成视频说明文本,我们间接实现了视频检索功能。

    虽然通过自行对视频进行嵌入处理也能实现相同功能,但 Bedrock Knowledge Base 的优势在于可轻松集成内容导入与检索能力。

    内容导入仅需将文件放入 S3 并执行 “同步” 操作即可,检索也只需调用 Retrieve API。借助 Retrieve API,还能通过重排序功能轻松优化检索结果。

  • 使用 Azure Content Understanding 从视频生成结构化数据

    使用 Azure Content Understanding 从视频生成结构化数据

    最近我沉浸在 LLM(大语言模型)的微调工作中。

    在去年的 Microsoft Ignite 2024 大会上,微软发布了名为 Azure Content Understanding 的服务。该服务能以 Office 文档、图像、音频、视频等文件为基础,利用生成式 AI 等技术,将数据导入用户自定义的结构中,以便在 RAG(检索增强生成)系统等场景中轻松使用。

    以往处理这类数据时,音频需要用到 Speech Service(语音服务),图像需要用到 AI Vision(人工智能视觉)等不同服务分别进行数据处理,而现在只需通过这一项服务定义好结构,就能完成所有处理流程。本次我们就将使用 Azure Content Understanding,从视频中创建便于导入搜索引擎等系统的 JSON 格式结构化数据。

    Azure Content Understanding 是什么

    正如文章开头所介绍的,Azure Content Understanding 能够通过单一服务从多种格式的文件中创建结构化数据。

    在创建结构化数据的过程中,可实现如图所示的文件内文字提取、转录文本生成等内容提取操作。此外,通过生成式 AI 从提取的内容中生成用户定义的项目,还能将文件转换为结构化数据。

    以文档转换为检索数据为例,原本需要通过 OCR(光学字符识别)提取字符串→使用 LLM 提取数据→转换为 JSON 这样的流水线式实现,而借助该功能,只需创建相应定义即可完成,非常便捷。而且单一服务就能支持多种文件类型的处理。

    目前该服务处于预览阶段,暂可免费使用,价格详情将在后续公布。

    使用 Azure Content Understanding 的准备工作

    要使用 Azure Content Understanding,需在 West US(美国西部)、Sweden Central(瑞典中部)、Australia East(澳大利亚东部)这三个区域中的任意一个创建 Azure AI Services 资源。请注意,其他区域暂不支持该服务。

    使用 Azure Content Understanding 解析视频文件

    接下来,我们实际使用 Azure Content Understanding 从视频文件中提取摘要和类别信息。本次将借助以下链接提供的示例,通过 Python 来执行操作。

    示例链接:github.com

    具体实施步骤如下:

    1. 创建数据分析定义
    2. 创建分析器
    3. 使用分析器对文件进行分析

    创建数据分析定义

    首先,定义目标文件格式以及需要提取的数据内容。

    创建如下 JSON 文件:

    {
        "description": "从视频中提取文件的示例",
        "scenario": "videoShot", 
        "config": {
            "returnDetails": true,
            "locales": [
                "ja-JP"
            ],
            "enableFace": false
        },
        "fieldSchema": {
            "fields": {
                "summary": {
                    "type": "string",
                    "method": "generate",
                    "description": "概括内容的一句话"
                },
                "category": {
                    "type": "string",
                    "method": "classify",
                    "description": "视频文件的类别",
                    "enum": [
                        "IT",
                        "Economy",
                        "Nature"
                    ]
                }
            }
        }
    }
    

    该定义的核心要点如下:

    • 因目标为视频,需在scenario(场景)中指定videoShot
    • locales(区域设置)中因需处理日语,仅配置ja-JP
    • fieldSchema(字段 schema)中定义以下两个需提取的项目:
      • summary:概括视频内容的文本
      • category:从 IT、Economy(经济)、Nature(自然)中选择的视频类别

    fieldSchema的设置内容中,生成式 AI 会根据description(描述)的内容提取字段信息,因此description是重要的调优项。不过本次仅为简单的功能验证,故使用简洁的描述进行测试。

    创建分析器

    完成定义后,需创建用于读取文件并进行分析的分析器。目前仅能通过 GUI(图形用户界面)或 REST API 创建和运行分析器。但上述 GitHub 链接中提供了对 API 进行封装的工具类,本次将使用该工具类进行开发。

    github.com

    通过以下代码即可创建分析器:

    import uuid
    
    # 生成分析器名称
    ANALYZER_TEMPLATE = "video"
    ANALYZER_ID = "video-sample-" + str(uuid.uuid4())
    
    # 创建ContentUnderstanding客户端
    client = AzureContentUnderstandingClient(
        endpoint=AZURE_AI_ENDPOINT,
        api_version=AZURE_AI_API_VERSION,
        subscription_key=AZURE_AI_SUBSCRIPTION_KEY
    )
    # 读取数据分析定义并创建分析器
    response = client.begin_create_analyzer(ANALYZER_ID, analyzer_template_path="./content_video.json")
    result = client.poll_result(response)
    

    执行到这里,分析器就创建完成了,接下来只需执行分析操作即可。

    运行分析器

    接下来,我们让分析器加载视频文件并执行分析。

    本次分析的目标文件是视频:www.youtube.com

    该视频讲解了子网掩码的基础知识,因此预期结果是能够提取出相关内容。

    执行过程非常简单,仅需以下 2 行代码即可完成:

    response = client.begin_analyze(ANALYZER_ID, file_location=analyzer_sample_file_path)
    result = client.poll_result(response)
    

    获取分析结果所需的时间因文件而异,本次针对 11 分钟的视频,大约花费了 5 分钟。

    分析结果

    最终会得到如下所示的 JSON 数据,因完整内容过长,此处仅节选部分展示:

    {
      "id": "82eb4975-4232-44a1-a4e6-d018fa45469f",
      "status": "Succeeded",
      "result": {
        "analyzerId": "video-sample-31341919-bf0d-45af-9228-7866b252a70a",
        "apiVersion": "2024-12-01-preview",
        "createdAt": "2025-01-28T05:32:48Z",
        "warnings": [],
        "contents": [
          {
            "markdown": "# 片段 00:00.000 => 00:03.303\n## 转录文本\n```\nWEBVTT\n\n```\n## 关键帧\n- 00:00.825 ![](keyFrame.825.jpg)\n- 00:01.650 ![](keyFrame.1650.jpg)\n- 00:02.475 ![](keyFrame.2475.jpg)",
            "fields": {
              "category": {
                "type": "string",
                "valueString": "IT"
              },
              "summary": {
                "type": "string",
                "valueString": "这是面向新入职年轻工程师的IT基础知识讲解视频,本次将介绍子网掩码与CIDR。"
              }
            },
            "kind": "audioVisual",
            "startTimeMs": 0,
            "endTimeMs": 3303,
            "width": 1280,
            "height": 720,
            "KeyFrameTimesMs": [
              825,
              1650,
              2475
            ],
            "transcriptPhrases": []
          },
          {
            "markdown": "# 片段 00:03.303 => 01:02.106\n## 转录文本\n```\nWEBVTT\n\n00:04.680 --> 00:28.960\n<v 讲解者>接下来我们来介绍子网掩码。刚才我们讲了设置本地网络,并在其中配置和使用IP地址的内容。那么具体如何定义本地网络呢?这里我们采用将IP地址分为网络部分和主机部分的方式来考虑。\n00:29.480 --> 00:59.000\n<v 讲解者>具体怎么做呢?当有一个IP地址时,我们把其中的每个数字用比特来表示,也就是用二进制来书写。这样就会形成这样的形式,此时以某个位置为界限,前半部分称为网络部分,后半部分称为主机部分。可以说,前半部分的网络部分代表了网络本身。\n00:59.600 --> 01:27.680\n<v 讲解者>就是这样。而后面的主机部分则代表了隶属于该网络的各个设备的位置。也就是说,只要前半部分的网络部分一致,就会被视为同一个网络。关于网络的定义方式,“类”的概念从以前就一直在使用。类分为三种,即A类、B类、C类。\n```\n## 关键帧\n- 00:06.072 ![](keyFrame.6072.jpg)\n- 00:08.844 ![](keyFrame.8844.jpg)\n- 00:11.616 ![](keyFrame.11616.jpg)\n- 00:14.388 ![](keyFrame.14388.jpg)\n- 00:17.160 ![](keyFrame.17160.jpg)\n- 00:19.932 ![](keyFrame.19932.jpg)\n- 00:22.704 ![](keyFrame.22704.jpg)\n- 00:25.476 ![](keyFrame.25476.jpg)\n- 00:28.248 ![](keyFrame.28248.jpg)\n- 00:31.020 ![](keyFrame.31020.jpg)\n- 00:33.792 ![](keyFrame.33792.jpg)\n- 00:36.564 ![](keyFrame.36564.jpg)\n- 00:39.336 ![](keyFrame.39336.jpg)\n- 00:42.108 ![](keyFrame.42108.jpg)\n- 00:44.880 ![](keyFrame.44880.jpg)\n- 00:47.652 ![](keyFrame.47652.jpg)\n- 00:50.424 ![](keyFrame.50424.jpg)\n- 00:53.196 ![](keyFrame.53196.jpg)\n- 00:55.968 ![](keyFrame.55968.jpg)\n- 00:58.740 ![](keyFrame.58740.jpg)",
            "fields": {
              "summary": {
                "type": "string",
                "valueString": "讲解者对子网掩码进行了说明,介绍了将IP地址分为网络部分和主机部分的方法,同时也提及了A类、B类、C类的网络定义方式。"
              },
              "category": {
                "type": "string",
                "valueString": "IT"
              }
            },
            "kind": "audioVisual",
            "startTimeMs": 3303,
            "endTimeMs": 62106,
            "width": 1280,
            "height": 720,
            "KeyFrameTimesMs": [
              6072,
              8844,
              11616,
              14388,
              17160,
              19932,
              22704,
              25476,
              28248,
              31020,
              33792,
              36564,
              39336,
              42108,
              44880,
              47652,
              50424,
              53196,
              55968,
              58740
            ],
            "transcriptPhrases": [
              {
                "speaker": "speaker",
                "startTimeMs": 4680,
                "endTimeMs": 28960,
                "text": "接下来我们来介绍子网掩码。刚才我们讲了设置本地网络,并在其中配置和使用IP地址的内容。那么具体如何定义本地网络呢?这里我们采用将IP地址分为网络部分和主机部分的方式来考虑。",
                "confidence": 1,
                "words": [],
                "locale": "en-US"
              },
              {
                "speaker": "speaker",
                "startTimeMs": 29480,
                "endTimeMs": 59000,
                "text": "具体怎么做呢?当有一个IP地址时,我们把其中的每个数字用比特来表示,也就是用二进制来书写。这样就会形成这样的形式,此时以某个位置为界限,前半部分称为网络部分,后半部分称为主机部分。可以说,前半部分的网络部分代表了网络本身。",
                "confidence": 1,
                "words": [],
                "locale": "en-US"
              },
              {
                "speaker": "speaker",
                "startTimeMs": 59600,
                "endTimeMs": 87680,
                "text": "就是这样。而后面的主机部分则代表了隶属于该网络的各个设备的位置。也就是说,只要前半部分的网络部分一致,就会被视为同一个网络。关于网络的定义方式,“类”的概念从以前就一直在使用。类分为三种,即A类、B类、C类。",
                "confidence": 1,
                "words": [],
                "locale": "en-US"
              }
            ]
          },
    

    推测其内部可能使用了类似 Video Indexer 的技术,能够自动将视频按章节分割,并获取每个章节的转录文本和关键帧时间。此外,自定义定义的summary(摘要)和category(类别)也能按章节获取,且内容准确无误。

    只需简单定义并运行分析器,就能将视频结构化到如此详细的程度,非常便捷。同时还附带转录文本,作为检索数据也十分有效。

    总结

    本次我们尝试使用 Azure Content Understanding 对视频数据进行了结构化处理。

    整个过程几乎无需耗费过多精力就能完成视频数据的解析,是一款非常实用的服务。虽然目前仍处于预览阶段,但该服务在 RAG 数据等场景中具有极为广泛的应用前景,未来我还会进一步探索其更多使用方法。

  • 使用 GraphRAG Toolkit 在 Amazon Bedrock 中构建 GraphRAG

    使用 GraphRAG Toolkit 在 Amazon Bedrock 中构建 GraphRAG

    此次,我想借助 AWS 提供的开源工具 GraphRAG Toolkit,入门 GraphRAG 技术。

    使用 GraphRAG Toolkit,能够以低代码方式实现基于 Amazon Neptune 和 Amazon OpenSearch Serverless 的 GraphRAG 系统。

    另外,请注意,它与 2024 年 12 月起可在 Bedrock Knowledge Base 中使用的 GraphRAG 是不同的工具,请勿混淆。

    1. 前言

    1.1. 什么是 GraphRAG?

    传统的 RAG(检索增强生成)采用关键词检索或向量检索来获取相关信息,而 GraphRAG 则利用知识图谱,更深入地理解检索结果的上下文,从而提取相关性更高的信息。

    在传统 RAG 中,只有通过关键词检索或向量检索命中的信息才能用于回答生成;而在 GraphRAG 中,以关键词检索或向量检索得到的结果为起点,对图谱进行遍历,即便未在检索中命中但与上下文相关的信息也能被获取,并用于回答生成。

    也就是说,GraphRAG 利用结构化的实体间关系,实现了精度更高的信息检索与上下文理解。

    1.2. 什么是 GraphRAG Toolkit?

    GraphRAG Toolkit 是 AWS 提供的开源 GraphRAG 库,提供了诸多简化基于 GraphRAG 的系统构建的功能。(参考链接:github.com

    该工具可将 Amazon Neptune Database 用作图数据库,将 Amazon OpenSearch Serverless 用作向量数据库。

    通过使用此工具包,能够从非结构化数据中提取实体及其关系,将其作为知识图谱存储;并通过针对该知识图谱查询用户问题,构建可提供相关性更高信息的应用程序。本次我们将使用该工具包来尝试 GraphRAG 技术。

    GraphRAG Toolkit 中知识图谱的基本结构

    知识图谱是一种将数据构建为实体(实际存在的事物)及其关系(关联性)的结构化数据模型,用于表达数据间的语义关联。

    通过将信息整理为 “实体” 与 “关系” 的网络,能够实现考虑关联性的检索以及逻辑性的推理。

    在 GraphRAG Toolkit 中,知识图谱通过以下术语和结构构建。(自上而下,概念从抽象宏观逐渐过渡到具体微观。)

    序号项目概述示例
    1文档(Source)表示原始文档的属性https://docs.aws.amazon.com/neptune/latest/userguide/intro.html
    2片段(Chunk)文档分割后的小单位Amazon Neptune is a fast, reliable, fully managed graph database service that makes it easy to build and run applications that work with highly connected datasets. The core of Neptune is…
    3主题(Topic)片段中记载的核心主题Amazon Neptune
    4语句(Statement)片段中书写的句子或语句The core of Neptune is a purpose-built, high-performance graph database engine.
    5事实(Fact)从语句中提炼出的事实Amazon Neptune HAS high-performance
    6实体事实的主语、宾语,对应现实世界的 “事物” 或 “概念”Amazon Neptune

    知识图谱示意图(链接:https://github.com/awslabs/graphrag-toolkit/blob/main/images/lexical-graph.png

    1.3. 什么是 Amazon Neptune?

    Amazon Neptune 是 AWS 提供的全托管型图数据库服务,能够高效管理和操作数据间的关系。

    Neptune 支持多种图模型,可实现灵活的数据操作。它不仅可用于构建知识图谱,还在社交网络分析、推荐系统等多种场景中得到应用。

    此外,其一大优势在于能够控制成本 —— 可在不使用的时段 “停止” 服务,在开发环境中还能选用 T3 实例等相对经济的实例类型。

    2. 构建环境

    本次将通过 GraphRAG Toolkit 提供的 CloudFormation 模板来搭建环境。

    架构图
    序号项目概述
    1Amazon Neptune存储知识图谱并提供检索功能
    2Amazon OpenSearch Service配合图检索使用,用于向量检索
    3Amazon Bedrock从片段中提取主题、语句、事实、实体;生成用于存入向量数据库的向量・提取模型:Claude-3-Sonnet・向量化模型:Titan Text Embedding V2
    4Notebook(笔记本)操作 GraphRAG Toolkit,执行文档导入与检索操作

    3. 导入数据

    使用上述 CloudFormation 模板搭建环境后,系统会提供多个示例笔记本。数据导入可选择以下两种笔记本之一:

    • 01-Combined-Extract-and-Build.ipynb:一站式完成从文档中提取实体等信息及构建图谱的操作
    • 02-Separate-Extract-and-Build.ipynb:将实体等信息的提取与图谱构建拆分为独立步骤执行

    文档导入按以下流程进行:

    1. 文档分割(Chunking):将文档拆分为细小的单元
    2. 提取命题(Proposition):将片段传入大语言模型(LLM),提取命题命题是为提取构建知识图谱所需的 6 类要素而生成的预处理结果,会对原始文档进行清洗,例如将复杂句子拆分为简单易懂的句子、展开代词和缩写等。
    3. 提取核心要素:将命题传入 LLM,提取主题、语句、事实、实体
    4. 构建并导入知识图谱:根据提取的实体等信息构建知识图谱,存入图数据库
    5. 向量化并导入向量数据库:将语句传入 LLM 进行嵌入(Embedding)处理,生成向量后存入向量数据库

    使用 GraphRAG Toolkit 时,只需调用 extract 方法,即可一键执行上述所有流程,上手非常简单。

    3.1. 确认待导入的数据

    本次将使用 02-Separate-Extract-and-Build.ipynb 笔记本,确认提取模型如何处理原始文档,使其转化为可导入图数据库的格式。

    执行笔记本中的 “Extract(提取)” 阶段后,LLM 会解析文档,提取实体、属性以及各类关系。提取结果将保存至 extracted/ 目录下。

    通过修改配置,也可将提取结果保存至 S3。这样一来,可将已处理的数据存储在 S3 中,当新增数据需要重新构建知识图谱、或需在其他数据库中构建相同知识图谱时,无需重复执行提取操作,既能规避 LLM 的调用限制,又能节省成本。

    实际输出到 extracted/ 目录的 JSON 文件内容如下(因篇幅有限,已大幅省略)。从中可以看出,前文所述的命题、主题、语句、事实、实体均以嵌套形式存储。

    {
      "id_": "aws:xxxx",
      "metadata": {
        "url": "https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html",
        "aws::graph::propositions": [
          // 命题
          "Neptune Analytics is a memory-optimized graph database engine for analytics",
          "Neptune Analytics allows getting insights and finding trends by processing large amounts of graph data in seconds"
        ],
        "aws::graph::topics": {
          "topics": [
            {
              // 主题
              "value": "Neptune Analytics",
              "entities": [
                {
                  // 实体
                  "value": "Neptune Analytics",
                  "classification": "Software"
                }
              ],
              "statements": [
                {
                  // 语句
                  "value": "Neptune Analytics is a memory-optimized graph database engine for analytics",
                  "facts": [
                    // 事实
                    {
                      "subject": {
                        "value": "Neptune Analytics",
                        "classification": "Software"
                      },
                      "predicate": {
                        "value": "OPTIMIZED FOR"
                      },
                      "complement": "memory"
                    }
                  ]
                }
              ]
            }
          ]
        }
      }
    }
    

    此外,对于中文文档,它也能很好地完成信息提取。

    {
      "id_": "aws:xxxx",
      "metadata": {
        "aws::graph::propositions": [
          // 命题
          "知识表示的方法",
          "知识图谱将数据表现为实体与关系的网络",
          "知识图谱是一种易于捕捉信息语义的强大方法"
        ],
        "aws::graph::topics": {
          "topics": [
            {
              // 主题
              "value": "Knowledge Representation",
              "entities": [
                {
                  // 实体
                  "value": "知识图谱(知识图谱)",
                  "classification": "Concept(概念)"
                }
              ],
              "statements": [
                {
                  // 语句
                  "value": "知识图谱将数据表现为实体与关系的网络",
                  "facts": [
                    // 事实
                    {
                      "subject": {
                        "value": "知识图谱(知识图谱)",
                        "classification": "Concept(概念)"
                      },
                      "predicate": {
                        "value": "REPRESENTS(表示)"
                      },
                      "object": {
                        "value": "数据(数据)",
                        "classification": "Concept(概念)"
                      }
                    }
                  ]
                }
              ]
            }
          ]
        }
      }
    }
    

    3.2. 确认已导入的数据

    通过 CloudFormation 部署 Notebook 时,图探索器(Graph Explorer)会同时完成部署。打开该图探索器,即可直观查看已构建的知识图谱。

    图探索器的打开方式

    下图展示了使用 GraphRAG Toolkit 导入 Amazon Neptune 文档后构建的知识图谱的部分内容。

    如前文所述,可以看到知识图谱分为文档、片段、主题、语句、事实、实体等层级,且各层级之间的关系均有明确标注。

    实际构建的知识图谱(部分)

    4. 检索

    在 GraphRAG 中,首先通过向量检索找到作为起点的节点,随后遍历知识图谱,最终返回检索结果。

    GraphRAG Toolkit 主要提供两种检索方法,具体如下表所示:

    序号检索方法优点缺点用途
    1TraversalBasedRetriever・处理轻量化・结果可重复性高・对未知词汇及专有名词的适配性强・灵活性较低・检索精度偏低按类别整理的 FAQ 等场景
    2SemanticGuidedRetriever・可探索符合问题意图的信息・能考虑复杂关联性・可确保检索结果的多样性・计算成本高,处理负载重需要灵活、高级的信息检索场景(如匹配问题意图、需多样化信息等)

    4.1. TraversalBasedRetriever(基于遍历的检索器)

    TraversalBasedRetriever 是通过按顺序遍历图谱结构来扩展信息的检索方法。由于它会按固定顺序遍历知识图谱,因此处理过程较为轻量化。

    TraversalBasedRetriever 包含两种具体实现方式,可单独使用其中一种,也可组合使用两种。默认设置下,返回的是两种方式组合后的结果。

    1. EntityBasedSearch(基于实体的检索)

    从查询字符串中提取关键词,在图谱中查找匹配的实体(自下而上,Bottom up)。找到匹配的实体后,从该实体出发,向事实、命题、主题等更宏观的概念层级遍历,从而完成检索。

    其特点是,即便面对未知词汇或专有名词,也能依据预先构建的知识图谱收集相关信息。由于检索从实体出发,往往能广泛获取与该实体相关的信息。

    2. ChunkBasedSearch(基于片段的检索)

    利用查询与片段间的向量相似度进行检索(自上而下,Top down)。从检索命中的片段出发,向主题、命题、事实等更微观的概念层级遍历,从而完成检索。

    由于检索从与查询语义相近的片段开始,往往能精准获取关联性更强的信息。

    4.2. SemanticGuidedRetriever(基于语义引导的检索器)

    SemanticGuidedRetriever 对语句(Statement)进行向量化处理后执行检索。针对通过向量检索获取的语句,采用波束搜索(Beam Search)与路径解析技术,从邻近节点开始探索,筛选出关联性强的信息。

    此外,该检索器还集成了多种为回答生成筛选所需信息的机制,例如:通过重排序(Rerank)筛选符合问题意图的检索结果;通过多样性过滤器确保回答信息不仅局限于相似内容,而是具备多样性。

    通过结合向量检索与图谱检索的混合模式,SemanticGuidedRetriever 既能保持较高的信息关联性,又能返回范围更广的检索结果,但相应地会增加处理负载。

    4.3. 确认检索结果

    可使用 GraphRAG Toolkit 中包含的 04-Querying.ipynb 笔记本,测试上述两种检索方法。

    我们针对每个检索器提出了 “Neptune Database 与 Neptune Analytics 有什么区别?” 的问题,并确认了大语言模型(LLM)的回答及回答所使用的文档。

    TraversalBasedRetriever

    Neptune Database 与 Neptune Analytics 呈互补关系。

    Neptune Database 是托管型图数据库服务,提供数据的持久化存储、备份、复制等功能。[https://docs.aws.amazon.com/neptune/latest/userguide/intro.html]

    而 Neptune Analytics 是用于高速分析大规模图数据的内存引擎,可从 Neptune Database 或 S3 读取图数据,通过执行预构建的图算法或自定义查询进行分析。

    [https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html]

    也就是说,Neptune Database 负责图数据的持久化存储与管理,Neptune Analytics 则是实现该数据高速分析处理的服务。通过将数据存储与分析分离,可使两者各自实现专注于自身功能的性能优化。[https://docs.aws.amazon.com/neptune-analytics/latest/userguide/neptune-analytics-features.html]

    ■ 文档

    [
      {
        "source": "https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html",
        "topic": "Neptune Analytics",
        "statements": [
          "Neptune Analytics can load data from a Neptune Database graph",
          "Neptune Analytics complements Amazon Neptune Database, a popular managed graph database",
          "Neptune Analytics can load data from a Neptune Database snapshot",
          "The Neptune Analytics vs Neptune Database page explains when to use Neptune Analytics and when to use Neptune Database",
          "Neptune Analytics can load graph data stored in Amazon S3"
        ]
      },
      {
        "source": "https://docs.aws.amazon.com/neptune/latest/userguide/intro.html",
        "topic": "Amazon Neptune",
        "statements": [
          "Neptune Analytics is an analytics database engine.",
          "Neptune Analytics complements Neptune database.",
          "The Neptune database has continuous backup to Amazon S3",
          "Amazon Neptune automatically detects database crashes and restarts",
          "Amazon Neptune increases database performance and availability"
        ]
      }
    ]
    

    4.3.1. SemanticGuidedRetriever(基于语义引导的检索器)

    Neptune Database 与 Neptune Analytics 是不同的产品,主要区别如下:

    Neptune Database 是托管型图数据库服务,可实现数据的持久化存储与低延迟的查询执行。[https://docs.aws.amazon.com/neptune/latest/userguide/intro.html]

    而 Neptune Analytics 是用于在内存中分析图数据的引擎,可将大规模图数据集加载到内存中,高速执行预先准备的高级分析算法及自定义 Cypher 查询。

    [https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html]

    Neptune Analytics 适用于数据分析、数据科学及探索性工作负载。

    也就是说,Neptune Database 的主要目的是数据的持久化存储与低延迟查询执行,而 Neptune Analytics 的主要目的是在内存中实现大规模图数据的高速分析。此外,Neptune Analytics 也可从 Neptune Database 读取数据并进行分析。[https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html]

    ■ 文档

    <sources>
      <source_1>
        <source_1_metadata>
          <url>https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html</url>
        </source_1_metadata>
        <statement_1.1>Neptune Analytics is a memory-optimized graph database engine for analytics</statement_1.1>
        <statement_1.2>Neptune Analytics is a memory-optimized graph database engine for analytics</statement_1.2>
        <statement_1.3>Neptune Analytics can load data from a Neptune Database graph</statement_1.3>
        <statement_1.4>Neptune Analytics can load data from a Neptune Database snapshot</statement_1.4>
        <statement_1.5>Neptune Analytics is an ideal choice for data-science workloads that require fast iteration for data, analytical and algorithmic processing, or vector search on graph data</statement_1.5>
      </source_1>
      <source_2>
        <source_2_metadata>
          <url>https://docs.aws.amazon.com/neptune/latest/userguide/intro.html</url>
        </source_2_metadata>
        <statement_2.1>Neptune Analytics is an analytics database engine.</statement_2.1>
        <statement_2.2>Neptune Analytics is a solution for quickly analyzing existing graph databases.</statement_2.2>
        <statement_2.3>Neptune Analytics is a solution for quickly analyzing graph datasets stored in a data lake.</statement_2.3>
        <statement_2.4>Amazon Neptune is a fully managed graph database service</statement_2.4>
        <statement_2.5>Amazon Neptune is a fully managed graph database service</statement_2.5>
      </source_2>
    </sources>
    

    直观来看,使用 SemanticGuidedRetriever 得到的回答似乎更准确,但不同检索方法在检索精度上的差异等问题,我们将留作后续的验证课题。

    此外,TraversalBasedRetriever 与 SemanticGuidedRetriever 返回结果中包含的文档格式存在差异,不过据了解,该问题将在后续更新中实现统一。

    总结

    本次我们通过 GraphRAG Toolkit,了解了 GraphRAG 处理数据的具体流程。通过实际运行并确认内部机制,我认为已经理解了 GraphRAG 的特性。

    同时,我们也发现,借助 GraphRAG Toolkit 能够非常轻松地构建 GraphRAG 系统。该工具包封装了将数据导入图数据库前的繁琐处理流程,这一点十分令人满意。

  • Dify v1.0.0 新增代理节点:执行函数调用(Function Calling)

    Dify v1.0.0 新增代理节点:执行函数调用(Function Calling)

    1. 前言

    期待已久的 Dify v1.0.0 终于正式发布了。作为一名从事生成式 AI 相关服务及基础开发的从业者,我感到无比兴奋!

    github.com

    该版本中备受关注的功能是新增的代理节点(Agent Node) 与插件系统(Plugin System) 。

    所谓代理节点,是一种采用预先定义的 “代理策略(Agent Strategy)”,能根据问题自动执行工具调用及多步骤推理的新型节点。

    本次,我将尝试使用 Dify 官方提供的智能体策略(Agentic Strategy)创建聊天流程(Chat Flow)。

    2. 什么是代理节点

    2.1. 基本功能

    代理节点是一种可执行预先定义的多步骤推理及工具调用的节点。

    通过在聊天流程中嵌入该节点,能够根据用户的问题动态选择、调用工具并进行推理,从而实现复杂任务的执行。

    Dify 官方提供的名为 “Dify Agent Strategies” 的插件中,包含两种智能体策略,具体如下表所示:

    策略(Strategy)概述(Overview)
    Function Calling指 AI 能根据用户的问题,从预先定义的外部函数中判断应调用哪个工具,并返回所用函数名及参数的机制。也被称为 “Tool Use”(工具使用)。
    ReAct指 AI 交替执行推理(Reasoning)与行动(Action)的方法。通过反映思考结果及工具调用结果来更新回答,从而实现复杂问题的解决及任务自动化的机制。

    本次将使用 Function Calling 来调用工具。

    2.2. 与传统工作流实现方式的差异

    以往,针对每种想要实现的聊天流程结构,都需要进行整体设计,并探讨工具选择的实现方法等。

    例如,若想在聊天流程中实现 Function Calling,就需要考虑以下问题:

    • 如何让系统选择合适的工具?
    • 在何种条件下对所用工具进行分支处理?
    • 考虑到分支后的处理,调用方需要传递哪些信息?

    这些问题的探讨会导致设计、开发、修改等环节耗费大量成本。

    而通过使用代理节点,这类成本有望得到降低。

    传统聊天流程中 Function Calling 的实现示例

    2.3. 与 Dify 应用中 “代理” 的差异

    传统的代理(Agent)是以单一应用的形式运行,是可独立执行预先定义的特定任务的应用。

    简单来说,它和简单的聊天机器人比较接近。

    另一方面,代理节点(Agent Node)会作为工作流的一部分被整合,能够与其他各类工具联动,从而实现复杂的处理。

    代理应用(Agent App)

    3. 尝试使用代理节点(Agent Node)

    3.1. 流程概述

    本次将创建一个接收饮食相关咨询的聊天流程。

    以下是我们这次要创建的流程。

    本次创建的流程示意图

    创建主聊天流程,并使用该流程中代理节点的 Function Calling 策略来定义工具分配处理逻辑。该结构下,代理节点会根据问题调用相应的工具。

    本次准备了以下 3 个工具:

    • 菜单设计
    • 营养师
    • 一般咨询

    3.2. 构建使用 Function Calling 的流程

    工具准备

    在代理节点之前,需先定义好待调用的工具。代理节点可调用的工具包括以下 3 类:

    • 插件
    • 自定义工具
    • 工作流

    本次将使用工作流作为工具。

    从工作室的 “新建” 功能中选择 “工作流”,构建如下流程:该流程结构简单,接收问题后向大语言模型(LLM)发起查询。

    菜单设计工作流

    为 “菜单设计” 功能的 LLM 定义了如下系统提示词:

    你是专门设计并推荐餐食菜单的聊天机器人。请接收用户的问题与需求,根据内容提供最适合的餐食菜单。注意:仅需推荐单餐的菜单。

    角色与目标:根据用户的问题及条件,推荐具体的菜名与烹饪方法。需兼顾营养均衡、烹饪难度、季节适配性、饮食主题(法餐、西餐、中餐等),设计符合用户需求的菜单。若用户提出具体条件(食材限制、过敏情况、预算、烹饪时间等),需推荐符合这些条件的菜单。

    在结束节点的输出变量中设置 LLM 的回答后,工作流的构建即完成。为将其用作工具,需点击 “发布” 按钮,将工作流设置为公开状态。

    工作流发布

    为将创建好的工作流用作代理节点的工具,需进行名称等相关设置。“工具输入” 中的 “query”(查询)对应工作流开始节点中设置的输入字段。

    此处可配置的 “方法” 项分为两类:

    • LLM 输入:由 LLM 自动填充内容
    • 设置:需自行预先定义填充值

    本次中,“query” 项用于接收用户的问题,为了从 Function Calling 获取结果,将方法设置为 “LLM 输入”。

    作为工具的工作流设置

    同理,“营养师”“一般咨询” 的工作流也通过以下系统提示词创建:

    ■ 营养师

    你作为专业营养师,需针对用户提出的菜单及饮食相关问题,从健康与营养均衡的角度给出评价,并提供恰当的建议。请遵循以下指南进行回答。

    从专业角度提供建议

    根据菜单中包含的营养素、食材特点及烹饪方法给出具体评价。注意结合用户的问题与实际情况提供建议。

    指南

    用用户易于理解的语言讲解食材、烹饪方法及营养均衡相关知识。若菜单存在可改进之处或需补充的营养素,请给出具体建议。结合用户的目的(如维持健康、减肥、补充能量等)提供建议。

    ■ 一般咨询

    你作为优秀的聊天机器人,请回答用户提出的各类问题。

    代理节点设置

    选择 “聊天流程” 新建应用,并按如下方式配置代理节点。

    代理节点

    智能体策略:选择 Function Calling

    模型:使用 AWS(Bedrock)的 “Claude 3 Haiku”。即使是 OpenAI 之外的、官方支持 Function Calling 的模型,也可正常执行。

    工具列表(Tools List):添加此前定义的 3 个工作流。

    最后设置系统提示词与用户提示词:

    ■ 系统提示词

    你是接收饮食相关咨询的代理系统的监控代理。请遵循以下规则与方针,选择合适的工具。

    系统整体概述

    针对用户的饮食相关问题,调用以下 3 类专业代理:

    1. 菜单设计:提供新菜单灵感及菜品推荐。
    2. 营养师:从营养均衡、健康层面、食材搭配等角度提供建议。
    3. 其他一般咨询代理:解答饮食之外的各类问题,以及不属于专业领域的饮食相关问题(如烹饪小知识、烹饪方法、食材挑选技巧等)。

    注意事项

    仅可选择 1 个工具。需基于调用工具返回的响应,充分利用所有信息进行回答。必要时可采用分点列举等方式,确保回答易于阅读。

    最终目标

    你的职责是为各代理创造条件,使其能在专业领域为用户的饮食相关问题提供最优回答,实现系统整体的一致性与高质量信息服务。

    4. 尝试执行 Function Calling

    执行结果

    聊天流程整体搭建完成后,我们在预览模式下进行实际运行测试。

    首先,针对菜单设计相关问题进行提问。

    ■ 问题 1

    “帮忙想一个肉类料理的菜单”

    结果如下:

    查看日志可以发现,tool_name 字段显示,被调用的工具是在 “作为工具的工作流设置” 中配置的 think_name(注:推测为 “菜单设计” 工作流的配置名称,原文未明确说明,暂保留字段名)。这表明系统成功调用了 “菜单设计” 工作流。

    进一步查看 query 部分,其中填入了 “肉类料理的菜单” 这一文本。由此可见,系统能从问题中提取出适合传递给工具的文本,并将其传入工具中。

    菜单设计(工具调用验证)

    我们也针对饮食之外的内容进行了提问。

    ■ 问题 2

    “告诉我富士山的海拔高度”

    从结果来看,系统调用了 “一般咨询” 工具。

    一般咨询(工具调用验证)

    当不存在对应的工具时会发生什么?

    若不存在能匹配问题的合适工具,系统会如何运行呢?

    我们先将代理节点中的 “一般咨询” 工具禁用,然后再次提出关于富士山海拔的问题。

    首先,在代理节点的设置界面中,禁用 “一般咨询” 工具。

    一般咨询工具禁用

    在该状态下,提出以下问题:

    ■ 问题

    “告诉我富士山的海拔高度”

    结果显示,系统选择了本应完全无关的 “菜单设计” 工具。

    这意味着,无论收到何种问题,系统或许都会选择某个工具进行调用。

    考虑到这一点,在设计和准备工具时,需要充分考量工具的覆盖范围与分类逻辑。

    对不存在对应工具情况的验证

    5. 总结

    本次我们尝试了使用代理节点来执行函数调用(Function Calling)。

    随着智能体策略的不断丰富,Dify 的使用场景想必会一下子变得更加多元!

    今后,我也会持续关注 Dify 的更新动态。

  • 用时间序列基础模型 Chronos-Bolt 轻松尝试时间序列预测

    近年来,以 LLM(大语言模型)、VLM(视觉语言模型)为代表的基础模型被大量公开。与图像、自然语言处理等领域不同,基础模型也正逐步应用于时间序列建模领域。

    本文将为大家介绍时间序列基础模型之一 ——“Chronos-Bolt”。

    关于 Chronos-Bolt

    简单来说,Chronos 是一款通过预先学习各类时间序列数据,无需对目标应用数据进行额外学习,即可实现时间序列预测的时间序列基础模型。而 Chronos-Bolt 是 Chronos 的后续迭代模型,与初代 Chronos 相比,其预测精度和运行性能均有大幅提升。

    参考资料 ,Chronos 相关论文链接:arxiv.org

    本文中,我们将借助 AutoGluon 框架,实现基于 Chronos-Bolt 的时间序列预测流程。AutoGluon 是由 AWS 主导开发的开源 AutoML(自动化机器学习)框架,该框架已集成多款时间序列预测模型,Chronos-Bolt 便是其中之一。

    1. 无需训练即可实现高精度推理

    为实现 “无需训练即可预测” 的能力,Chronos/Chronos-Bolt 不仅学习了各类真实时间序列数据,还采用了 TS-Mixup、KernelSynth 等合成数据生成技术,通过大量时间序列数据的训练,确保即使在缺乏目标领域训练数据的情况下,也能保证一定的预测精度。

    通常而言,提升预测精度需要针对目标数据进行训练,但 Chronos-Bolt 在无训练数据的场景下,仍可实现符合预期的预测精度。

    2. 相比 Chronos 模型速度更快(最高达 260 倍)

    初代 Chronos 模型每次仅能预测未来 1 个时间步的结果;而 Chronos-Bolt 通过 “同时对多个时间步(t、t+1、t+2……)进行推理” 的设计,实现了速度优化。根据 AWS 官方博客的基准测试数据,在模型规模与 Chronos 相同的情况下,Chronos-Bolt 的运行性能最高可达到 Chronos 的 260 倍。

    Chronos-Bolt 的推理速度

    3. 相比 Chronos 模型精度更高

    根据官方基准测试结果,Chronos-Bolt 的预测精度优于初代 Chronos 模型。仅从这一点来看,选择 Chronos-Bolt 也具备显著优势。

    Chronos-Bolt 与其他模型的精度对比

    基于 Chronos-Bolt 的时间序列预测

    本文将介绍三种基于 Chronos-Bolt 的预测实现方案,具体如下:

    方案说明
    零样本推理不进行额外训练,直接基于预训练模型进行预测
    执行微调基于目标数据进行训练后,再开展预测
    利用回归模型进行特征外推除预测目标时间序列外,额外引入辅助特征用于预测

    前期准备

    1. 安装所需库

    要使用 Chronos-Bolt,需提前安装 autogluon 库,执行以下命令即可:

    pip install autogluon
    

    2. 数据准备

    本文使用的数据集为 Kaggle 平台公开的 “Store Sales – Time Series Forecasting”数据集,该数据集包含厄瓜多尔零售商 “Corporación Favorita” 的商品销量数据。

    数据集链接:www.kaggle.com

    由于原始数据格式难以直接适用,我们将在后续实现中对数据进行处理。

    import polars as pl
    
    transaction = pl.read_csv("./data/store-sales-time-series-forecasting/transactions.csv")
    holiday = pl.read_csv("./data/store-sales-time-series-forecasting/holidays_events.csv")
    
    transaction = transaction.join(holiday, how="left", on="date")
    transaction = transaction.fill_null("null")
    transaction = transaction.with_columns(
        pl.col("date").str.to_date()
    )
    transaction = transaction.with_columns(
        pl.col("date").dt.month().alias("month"),
        pl.col("date").dt.day().alias("day"),
        pl.col("date").dt.weekday().alias("weekday")
    )
    
    df = transaction
    train_df = df.filter(
        pl.col("date").dt.year() < 2017
    )
    train_df.write_csv("./data/train.csv")
    valid_df = df.filter(
        (pl.col("date").dt.year() >= 2017) & (pl.col("date").dt.month() == 1)
    )
    valid_df = pl.concat([train_df, valid_df], how="diagonal_relaxed")
    valid_df.write_csv("./data/valid.csv")
    

    以下是本分析中使用的主要参数说明:

    项目(列名)说明
    date日期
    store_nbr店铺编号
    transactions交易量(★本次预测目标值)
    type节假日或事件类型
    locale适用区域范围
    locale_name具体区域名称(如城市、省份等)
    month从 date 列中提取的月份
    day从 date 列中提取的日期
    weekday从 date 列中提取的星期几

    零样本推理

    零样本推理的实现非常简单,通过以下代码即可完成:

    其中,id_column指定用于分组的类别列,timestamp_column指定时间列,target指定预测目标列。在TimeSeriesPredictor.fit方法中,通过指定presets="bolt_base",即可调用 Chronos-Bolt 模型。此外,将bolt_base替换为bolt_smallbolt_tiny,还可使用不同规模的 Chronos-Bolt 模型。

    import polars as pl
    from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
    
    train_data = TimeSeriesDataFrame.from_path( 
         "data/train.csv", 
         id_column="store_nbr", 
         timestamp_column="date", 
    )
    predictor = TimeSeriesPredictor(prediction_length=30, freq="D", target="transactions").fit(train_data, presets="bolt_base")
    predictions = predictor.predict(train_data)
    

    推理结果可视化

    AutoGluon 提供了用于可视化的工具函数,我们将使用该函数展示结果。尽管数据中存在部分临时缺失,但可以看出:即使不进行额外训练,模型也基本能贴合实际数据趋势。

    test_data = TimeSeriesDataFrame.from_path( 
         "data/valid.csv", 
         id_column="store_nbr", 
         timestamp_column="date", 
    )
    predictor.plot(test_data,predictions,max_history_length=365,item_ids=[25])
    

    本图表中,蓝色线条代表实际交易量数据,橙色线条代表 Chronos-Bolt 的预测结果。此外,橙色线条前后的阴影部分表示80% 置信区间

    微调

    接下来,我们通过微调训练模型,并用其尝试时间序列预测。微调可通过以下代码实现:此代码会生成两种模型:零样本模型与微调模型。在hyperparametersChronos键下,通过列表定义了两种模型配置,核心要点是将fine_tune参数设为True

    predictor = TimeSeriesPredictor(prediction_length=31, eval_metric="MASE", freq="D", target="transactions").fit( 
         train_data, 
         hyperparameters={ 
             "Chronos": [ 
                 {"model_path": "bolt_base", "ag_args": {"name_suffix": "ZeroShot"}}, 
                 {"model_path": "bolt_base", "fine_tune": True, "ag_args": {"name_suffix": "FineTuned"}}, 
             ] 
         }, 
         enable_ensemble=False, 
         time_limit=600, 
    )

    训练结束后,可通过以下实现代码获取零样本模型与微调后模型的评估结果。

    predictor.leaderboard(test_data)



    此外,利用上述实现的预测器,还可按以下方式进行预测并可视化结果:

    predictions = predictor.predict(train_data, model=”ChronosZeroShot[bolt_base]”)
    predictor.plot(test_data,predictions,max_history_length=365,item_ids=symbols, max_num_item_ids=len(symbols))

    对比零样本推理结果可见,微调后的模型预测结果更贴合实际数据趋势。

    特征外推的使用

    在部分场景中,除了预测目标时间序列数据外,还可能存在可辅助预测的其他时间序列数据。例如,店铺销量可能与影响到店人数的 “降水量” 相关,这类数据即可作为辅助特征使用。但需注意:外推需使用 “预测当日” 的辅助特征数据,因此在实际验证中,若无法获取当日辅助数据,则难以进行当日预测(例如:若需在预测前日使用次日的准确降水量数据,实际场景中通常无法实现)。

    外推场景下的预测公式

    回归预测值 = Chronos-Bolt 的预测值 + CatBoost(外推回归模型)的预测值

    代码实现

    核心修改点在于:在TimeSeriesPredictor中通过known_covariates_names指定用于外推的辅助特征列。本次模型中,我们输入了日期特征(month、day、weekday)与节假日信息(type、locale、locale_name),具体实现如下:

    predictor = TimeSeriesPredictor(
        prediction_length=31,
        eval_metric="MASE",
        target="transactions",
        known_covariates_names=["type", "locale", "locale_name", "month", "day", "weekday"],
        freq="D"
    ).fit(
        train_data,
        hyperparameters={
            "Chronos": [
                {"model_path": "bolt_base", "ag_args": {"name_suffix": "ZeroShot"}},
                {
                    "model_path": "bolt_base",
                    "covariate_regressor": "CAT",
                    "target_scaler": "standard",
                    "ag_args": {"name_suffix": "WithRegressor"},
                },
            ],
        },
        time_limit=600,
        enable_ensemble=False,
    )
    

    与微调场景类似,可通过以下代码对比模型效果:

    predictor.leaderboard(test_data)

    在本次预测中,需要提供用于外推的特征量。由于此类推理需要 31 天的完整数据才能进行(若数据不完整则难以实现),因此我们筛选出店铺编号为 25 的完整数据集,并用其对推理结果进行可视化。具体实现代码如下。

    filtered_id_df = train_data.to_data_frame().loc[25]
    filtered_id_df['item_id'] = 25
    filtered_id_df['timestamp'] = filtered_id_df.index
    train_data = TimeSeriesDataFrame.from_data_frame(filtered_id_df)
    
    filtered_id_df = test_data.to_data_frame().loc[25]
    filtered_id_df['item_id'] = 25
    filtered_id_df['timestamp'] = filtered_id_df.index
    test_data = TimeSeriesDataFrame.from_data_frame(filtered_id_df)
    
    predictions = predictor.predict(train_data, model="ChronosWithRegressor[bolt_base]", 
                                    known_covariates =test_data
                                   )
    predictor.plot(test_data,predictions,max_history_length=365,
                   item_ids=[25], max_num_item_ids=1)

    对比微调模型可见,加入外推特征后的预测结果更接近实际数据趋势。

    总结

    我们通过 AutoGluon 轻松调用了 Chronos-Bolt 模型,并尝试完成了时间序列预测。将 LLM 等基础模型的思路应用到时间序列模型中,这一方向非常有趣,其未来发展值得期待。

  • Amazon Bedrock Flows 尝试实现交互式流程

    Amazon Bedrock Flows 尝试实现交互式流程

    前言

    Amazon Bedrock Flows 新增支持了多轮对话功能这一新特性。(参考链接:aws.amazon.com

    以往,用户需要在单次提示词中输入处理所需的全部信息,而借助多轮对话形式,AI 能够轻松实现适时追问缺失信息的功能。此次,我们就利用这一多轮对话功能来实现交互式流程。

    1. 概述

    Amazon Bedrock Flows 是什么?

    Amazon Bedrock Flows 是 AWS 提供的基于 GUI 的生成式 AI 框架。借助它,我们可以轻松构建专属的 AI 代理及聊天机器人。

    多轮对话功能是什么?

    此次新增支持的多轮对话功能,能够实现对话式流程,即向用户追问缺失的信息。

    例如,以旅行计划的建议与预订流程为例,单次对话与多轮对话存在如下区别:

    单次对话与多轮对话示例对比

    在以往的单次对话中,若要从输入文本中提取必要信息并进行处理,输入文本必须包含所有必要信息。倘若输入中缺少必要信息,流程会直接进入后续处理环节。因此,可能会导致无法处理,或者 AI 自行推测生成必要信息,从而出现非预期的行为。

    而使用多轮对话功能后,若存在信息缺失,流程不会进入下一步,而是会向用户进行追问。如此一来,借助多轮对话功能,能够轻松实现更自然的对话流程。

    在 Amazon Bedrock Flows 中创建多轮对话流程

    接下来,为大家介绍在 Amazon Bedrock Flows 中使用多轮对话功能的方法。本次将以示例形式,创建一个基于预算、PC 用途、台式机或笔记本电脑等条件推荐 PC 的流程。

    流程概述

    首先,本次创建的流程整体结构如下:

    PC 推荐流程

    用户输入由代理节点接收。提示词节点虽也能以文本形式接收用户输入,但由于多轮对话功能是代理独有的功能,因此此处使用代理。

    随后,代理提取预算、PC 用途、台式机或笔记本电脑这三类信息,并将其传递给后续的 AWS Lambda 函数以进行 PC 推荐。

    Lambda 函数的推荐逻辑较为简单,仅返回与提取的信息组合相对应的 PC 名称。

    创建支持多轮对话的代理

    首先,创建用于接收用户输入并提取 PC 推荐所需信息的代理。本次传递给代理的指令如下:

    请分析用户输入,并提取以下 3 个参数。
    
    # 需提取的信息
    1. 价格区间(price_range)
       - 若用户预算低于 15 万日元,返回 "lower_than_150000yen"
       - 若用户预算高于或等于 15 万日元,返回 "higher_than_150000yen"
    
    2. 用途(use_case)
       - 若需要游戏用 PC,返回 "game"
       - 若用于工作、文档制作、邮件等商务用途,返回 "business"
    
    3. PC 类型(pc_type)
       - 若需要台式机,返回 "desktop"
       - 若需要笔记本电脑,返回 "laptop"
    
    # 输出格式
    提取所需信息后,请以以下 JSON 格式返回结果。
    
    {
      "price_range": "lower_than_150000yen",
      "use_case": "game",
      "pc_type": "laptop"
    }
    

    接下来,进行多轮对话的设置:在 “其他设置” 中将 “用户输入” 设为 “启用”。这样即可实现多轮对话功能。

    (此处对应 “代理的多轮对话功能设置” 相关内容)

    创建 PC 推荐 Lambda 函数

    接下来,创建用于接收代理输出的 JSON 并返回推荐 PC 的 Lambda 函数。本次在函数中简单定义了一份 PC 列表。

    运行

    # PC 推荐映射表
    PC_RECOMMENDATION_MAP = {
        "lower_than_150000yen.game.desktop": "ASUS ROG Strix G15",
        "lower_than_150000yen.game.laptop": "Dell G15 Ryzen Edition",
        "lower_than_150000yen.business.desktop": "HP Pavilion Desktop TP01",
        "lower_than_150000yen.business.laptop": "Lenovo ThinkPad E14",
        "higher_than_150000yen.game.desktop": "Alienware Aurora R15",
        "higher_than_150000yen.game.laptop": "Razer Blade 15 Advanced",
        "higher_than_150000yen.business.desktop": "Apple Mac Studio",
        "higher_than_150000yen.business.laptop": "Apple MacBook Pro 16-inch",
    }
    
    def lambda_handler(event, context):
        # 获取输入参数
        input_str = event.get("node", {}).get("inputs", [])[0].get("value", "")
    
        # 从代理输出的文本中提取 JSON 并解析
        match = re.search(r'\{.*?\}', input_str, re.DOTALL)
        json_str = match.group(0) if match else None
        pc_params = json.loads(json_str)
    
        # 推荐 PC
        recommend_key = f"{pc_params['price_range']}.{pc_params['use_case']}.{pc_params['pc_type']}"
        recommended_pc = PC_RECOMMENDATION_MAP.get(recommend_key, "no_recommended_pc")
    
        if recommended_pc == "no_recommended_pc":
            response = "暂无推荐的 PC。"
        else:
            response = f"推荐您选择 {recommended_pc}。"
    
        return response
    

    尝试进行多轮对话

    下面,我们来与创建好的流程进行对话测试。

    面对模糊问题时,是否会追问具体内容?

    测试面对用户的模糊问题时,系统是否会追问具体内容。

    首先,来看多轮对话功能关闭时的结果,以及此时代理传递给后续环节的输出。

    此处,我们输入模糊的价格表述:“想要一台价格亲民的 PC”。

    多轮对话功能关闭时的结果

    从这一结果来看,即便多轮对话功能处于关闭状态,代理似乎也试图进行追问,但追问的内容并未传递给用户,而是直接发送至后续处理环节,进而导致了错误。

    另一方面,多轮对话功能开启时的结果如下:

    多轮对话功能开启时的结果

    从结果可以看出,代理并未进入下一步处理,而是针对用户的模糊提问追问了具体信息。

    此外,Lambda 中定义的推荐 PC 如下表所示,符合预期的 “Lenovo ThinkPad E14” 被成功推荐。

    价格区间用途PC 类型推荐 PC
    7000元以下游戏台式机ASUS ROG Strix G15
    7000元以下游戏笔记本电脑Dell G15 Ryzen Edition
    7000元以下商务台式机HP Pavilion Desktop TP01
    7000元以下商务笔记本电脑Lenovo ThinkPad E14
    7000元以下游戏台式机Alienware Aurora R15
    7000元以下游戏笔记本电脑Razer Blade 15 Advanced
    7000元以下商务台式机Apple Mac Studio
    7000元以下商务笔记本电脑Apple MacBook Pro 16-inch

    能否基于多次交互而非单次交流给出回答?

    接下来,我们测试当对话发生多次往复时,系统是否能基于此前的全部对话内容给出回答。

    以下测试中,我们将 PC 相关信息分多次提供给系统。

    往复对话的回答结果

    从结果可见,尽管信息被拆分成了 3 次对话传递,但系统依然包含了首个用户消息中 “预算控制在9000 元以内” 这一信息,并成功推荐了符合预期的 PC。

    总结

    通过 Amazon Bedrock Flows 的多轮对话功能,我们发现:即便用户输入模糊,或对话发生多次往复,系统也能准确提取所需信息。借助该功能,我们可以轻松构建自然对话形式的 AI 服务,建议大家务必尝试一下。

  • 在 AWS 上使用 Elasticsearch(Elastic Cloud)的要点 (下篇)

    在 AWS 上使用 Elasticsearch(Elastic Cloud)的要点 (下篇)

    本文作为在 AWS 上使用 Elasticsearch(Elastic Cloud)的要点 下篇,将介绍在 Elastic Cloud 中执行以下操作的步骤。

    1. 版本升级

    1. 审计日志设置

    本文中,将 Elastic Cloud(Elasticsearch Service)的名称统一表述为 “Elastic Cloud”。

    1. 版本升级

    关于 Elastic Stack 的版本升级

    Elastic Stack 有两种版本升级方式,各自特点如下表所示:

    序号版本升级方式特点
    1滚动升级(Rolling Upgrade)无需停止服务即可完成版本升级
    2全集群升级(Full Cluster Upgrade)需一次性停止所有服务

    在 Elastic Cloud 中,采用的是序号 1 的滚动升级方式,因此无需停止服务就能完成版本升级。

    关于升级的详细信息,可参考以下链接:Upgrade versions | Elasticsearch Service Documentation | Elastic

    此外,在 Elastic Cloud 中,只需在图形用户界面(GUI)上点击一下,即可完成版本升级。下面实际介绍操作步骤。

    在 Elastic Cloud 中升级 Elastic Stack

    (1) 根据需要将Deployment设为维护模式

    在 Elastic Cloud 中,若对高负载的Deployment应用设置变更,可能会导致设置变更耗时较长,最坏情况下会出现响应中断、设置变更失败的问题。因此,虽然原则上无需停止服务即可完成版本升级,但当部署处于高负载状态时,也可考虑先将其切换为维护模式,再进行升级。

    访问 Elastic Cloud 的Deployment页面,选择菜单中的 “Edit”。

    点击 “Edit”

    在页面底部的 “Extended maintenance”处勾选复选框,再点击 “Save”按钮,即可将 Deployment设置为维护模式。

    (2) 点击 Elastic Cloud 部署页面右侧的 “Upgrade”按钮

    (3) 选择版本后,点击 “Upgrade”按钮

    以上步骤完成后,版本升级即可执行。版本升级操作简便,能够始终使用最新功能,这是 Elastic Cloud 的一大重要优势。

    2. 审计日志设置

    关于 Elastic Stack 的审计日志

    Elasticsearch 和 Kibana 均支持输出审计日志。通过审计日志,可监控认证失败、连接拒绝等与安全相关的事件。

    关于审计日志的详细信息,可参考以下文档:

    审计日志默认处于未启用状态,因此需按以下步骤启用。

    启用审计日志的步骤

    (1) 在 Elastic Cloud 的Deployment管理页面中,点击 “Edit”

    (2) 点击 Elasticsearch 右侧的链接 “Manage user settings and extensions”

    (3) 为 Elasticsearch 应用审计日志设置

    设置内容如下:xpack.security.audit.enabled: true

    (4) 点击 Kibana 右侧的链接 “Edit user settings”

    (5) 为 Kibana 也应用审计日志设置

    设置内容如下:xpack.security.audit.enabled: true

    (6) 点击页面底部的 “Save”(保存)按钮,应用设置

    (7) 在Monitoring功能的日志Logs中查看审计日志

    此时可看到 Elasticsearch 和 Kibana 的审计日志已输出。

    从上述审计日志中可以看出,“y_nomura” 用户在两次认证失败(红色高亮部分)后,成功完成了认证(黄色高亮部分)。通过输出审计日志,能够确认 “谁在何时登录”“访问了哪里” 等信息。

    总结

    截至目前,使用 Elastic Cloud 的要点讲解完毕,但除此之外,针对不同使用场景还有许多需要考虑的事项。如果有疑问,欢迎评论。