实现持久化执行的Durable Swarm

发布:2024-10-18 10:08 阅读:24 点赞:0

本文将介绍如何通过Durable Swarm增强多代理系统的可靠性与可扩展性。Durable Swarm旨在帮助您构建可靠的多代理工作流,使其在遇到中断或重启时能够自动恢复至上一个完成的步骤。这一能力在当今多代理工作流变得越来越常见、长时间运行和高度交互的背景下尤为重要。

一. Durable Swarm的概述

在多代理工作流中,当代理需要长时间等待用户输入或处理复杂工作流时,确保其能抵抗暂时性故障(如服务器重启)显得尤为重要。然而,实现可靠的多代理编排并不简单,这通常需要复杂的架构重构,例如通过SQS或Kafka路由代理通信。

1.1 Durable Swarm的设计理念

Durable Swarm通过自动持久化您的Swarm工作流执行状态至Postgres数据库,使得在程序中断时能够自动恢复工作流。这种设计在提供可靠性的同时,保留了Swarm框架的易用性。其实现只需少于20行代码,通过将Swarm的主循环声明为持久工作流,并将每次聊天完成或工具调用视为该工作流中的一个步骤。

二. 使Swarm具备持久化能力

要将Durable Swarm添加到您的项目中,您只需创建一个名为 durable_swarm.py 的文件,内容如下:

from swarm import Swarm  # 导入Swarm框架
from dbos import DBOS, DBOSConfiguredInstance  # 导入DBOS模块

# 初始化DBOS
DBOS()

@DBOS.dbos_class()  # 将DurableSwarm类标记为DBOS类
class DurableSwarm(Swarm, DBOSConfiguredInstance):
    def __init__(self, client=None):
        Swarm.__init__(self, client)  # 初始化Swarm
        DBOSConfiguredInstance.__init__(self, "openai_client")  # 初始化DBOS配置实例

    @DBOS.step()  # 将此方法标记为工作流步骤
    def get_chat_completion(self, *args, **kwargs):
        return super().get_chat_completion(*args, **kwargs)  # 调用父类方法

    @DBOS.step()  # 将此方法标记为工作流步骤
    def handle_tool_calls(self, *args, **kwargs):
        return super().handle_tool_calls(*args, **kwargs)  # 调用父类方法

    @DBOS.workflow()  # 将此方法标记为工作流
    def run(self, *args, **kwargs):
        return super().run(*args, **kwargs)  # 调用父类方法

# 启动DBOS
DBOS.launch()

使用 DurableSwarm 替代 Swarm 进行应用开发,提供无缝替代。

三. 快速入门

要开始使用Durable Swarm,您需要安装Swarm和DBOS,并初始化DBOS。Swarm要求Python版本为3.10或更高。

3.1 安装依赖

运行以下命令进行安装:

pip install dbos git+https://github.com/openai/swarm.git  # 安装dbos和Swarm
dbos init --config  # 初始化DBOS

3.2 设置OpenAI API密钥

您需要一个OpenAI API密钥,可以在这里获取。将其设置为环境变量:

export OPENAI_API_KEY=<your-key>  # 设置API密钥

3.3 创建测试程序

创建 durable_swarm.py 文件后,在同一目录下创建 main.py 文件,内容如下:

from swarm import Agent  # 导入Agent类
from durable_swarm import DurableSwarm  # 导入DurableSwarm类

client = DurableSwarm()  # 创建DurableSwarm实例

# 定义转移到代理B的函数
def transfer_to_agent_b():
    return agent_b  # 返回代理B的引用

# 创建代理A
agent_a = Agent(
    name="Agent A",  # 设置代理名称
    instructions="You are a helpful agent.",  # 代理指令
    functions=[transfer_to_agent_b],  # 代理功能
)

# 创建代理B
agent_b = Agent(
    name="Agent B",  # 设置代理名称
    instructions="Only speak in Haikus.",  # 代理指令
)

# 运行代理A,并发送消息
response = client.run(
    agent=agent_a,  # 指定运行的代理
    messages=[{"role""user""content""I want to talk to agent B."}],  # 用户消息
)

# 打印代理B的响应内容
print(response.messages[-1]["content"])  # 输出最后一条消息内容

3.4 Postgres数据库要求

DBOS需要Postgres数据库。如果您已经有Postgres服务器,请修改 dbos-config.yaml 以配置连接信息。否则,我们提供了一个使用Docker启动Postgres的脚本:

export PGPASSWORD=swarm  # 设置Postgres密码
python3 start_postgres_docker.py  # 启动Postgres Docker

3.5 运行代理

最后,您可以通过以下命令运行您的代理:

python3 main.py  # 执行主程序

输出将会是:

Agent B is here,
Ready to help you today,
What do you need, friend?

四. 将现有应用程序转换为DurableSwarm

将任何现有的Swarm应用程序转换为DurableSwarm只需三个简单步骤:

  1. 安装dbos并使用 dbos init --config 初始化。
  2. durable_swarm.py 添加到您的项目中。
  3. 在应用程序中使用 DurableSwarm 替换 Swarm

通过以上步骤,您可以轻松地将现有应用程序升级为具备持久化能力的Durable Swarm,从而提高其可靠性与弹性。