PocketFlow学习指南

#AI

什么是PocketFlow?

PocketFlow是一个超级简单的大语言模型(LLM)应用框架,只有100行核心代码,零依赖,但却能实现复杂的LLM应用功能。就像搭积木一样,你可以轻松构建各种强大的AI应用,比如智能助手、文档问答系统和自动化工具。

PocketFlow的核心理念是:把LLM应用看作一个图+共享存储。这就像是一个小火车在轨道上跑,停靠不同的站点做不同的事情,并且所有站点都可以读写同一个大仓库。

PocketFlow基本概念

核心概念解释

1. 节点(Node)

节点是最小的构建块,就像是一个小工作站,有三个简单的步骤:

  1. 准备数据(prep): 从共享存储读取和准备数据

    • 比如:从文件读取内容、查询数据库
    • 返回预处理后的数据给执行步骤
  2. 执行计算(exec): 处理数据,通常是调用LLM

    • 比如:请求大语言模型生成摘要、回答问题
    • 这一步不应该直接访问共享存储
    • 返回处理结果给后处理步骤
  3. 后处理(post): 将结果写回共享存储,并决定下一步去哪里

    • 比如:保存结果、更新数据库
    • 通过返回"动作"字符串决定接下来要执行哪个节点

想象一个做蛋糕的节点:先准备材料(prep),然后烘焙(exec),最后装盘并决定是直接上桌还是加装饰(post)。

class 总结文件(Node):
    def prep(self, shared):
        # 读取文件内容
        return shared["文件内容"]
 
    def exec(self, 文件内容):
        # 调用LLM总结文件
        摘要 = call_llm(f"请总结这段文字: {文件内容}")
        return 摘要
 
    def post(self, shared, prep_res, 摘要):
        # 保存摘要到共享存储
        shared["摘要"] = 摘要
        # 没有返回值,默认返回"default"动作

2. 流程(Flow)

流程就是将多个节点连接起来的图,像一条火车轨道一样引导执行顺序:

  1. 创建流程: 指定起始节点(Flow(start=首节点))
  2. 定义转换: 使用 节点A >> 节点B节点A - "动作名" >> 节点B
  3. 执行流程: 调用 flow.run(shared) 运行整个流程

你可以创建简单的顺序流程,也可以根据节点返回的动作创建复杂的分支和循环。

例如,一个审批流程可能有三种路径:

审核 - "通过" >> 支付
审核 - "需修改" >> 修改
审核 - "拒绝" >> 结束
 
修改 >> 审核  # 修改后重新审核
支付 >> 结束  # 支付后结束

工作流示例

3. 通信方式

节点和流程之间有两种通信方式:

  1. 共享存储(Shared Store): 所有节点都可以读写的全局字典

    shared = {
        "文件内容": "这是一些文本...",
        "摘要": None  # 初始为空,会由节点填充
    }
  2. 参数(Params): 每个节点的本地参数,主要用于批处理

    节点.set_params({"文件名": "文档1.txt"})

4. 批处理(Batch)

批处理帮助处理大量数据,有两种形式:

  1. 批处理节点(BatchNode): 一次处理多个项目

    • prep() 返回可迭代对象(如列表)
    • exec() 对每个项目分别调用
    • post() 收到所有结果列表
  2. 批处理流程(BatchFlow): 多次运行同一流程

    • 为每次运行提供不同的参数
    • 适合处理多个文件或任务

例如,一个处理多个文件的批处理:

class 处理所有文件(BatchNode):
    def prep(self, shared):
        文件列表 = shared["文件列表"]
        return 文件列表  # 返回文件列表
 
    def exec(self, 单个文件):
        内容 = 读取文件(单个文件)
        return 总结文本(内容)
 
    def post(self, shared, prep_res, 所有摘要):
        shared["所有摘要"] = 所有摘要

5. 异步处理(Async)

异步节点使用prep_async(), exec_async(), post_async()处理I/O密集型任务:

class 异步总结节点(AsyncNode):
    async def prep_async(self, shared):
        文档 = await 异步读取文件(shared["文件路径"])
        return 文档
 
    async def exec_async(self, 文档):
        摘要 = await 异步调用LLM(f"总结: {文档}")
        return 摘要

6. 并行处理(Parallel)

并行节点可以同时处理多个任务,适合I/O密集型操作(如多个LLM调用):

class 并行总结(AsyncParallelBatchNode):
    async def prep_async(self, shared):
        return shared["文本列表"]
 
    async def exec_async(self, 文本):
        return await 异步调用LLM(f"总结: {文本}")

主要设计模式

PocketFlow支持多种设计模式,可以应对不同的应用场景:

1. 代理(Agent)模式

代理可以根据上下文动态决定执行哪些操作,像是一个会思考的助手:

代理模式

例如,一个搜索代理:

  1. 决定是搜索还是回答
  2. 如果搜索,收集信息后回到决策点
  3. 如果回答,基于收集的信息给出答案
class 决策节点(Node):
    def exec(self, 输入):
        决定 = call_llm("应该搜索更多信息还是直接回答?")
        if "搜索" in 决定:
            return "search"
        else:
            return "answer"
 
    def post(self, shared, prep_res, exec_res):
        return exec_res  # 返回"search"或"answer"
 
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide  # 循环回决策点

2. 工作流(Workflow)模式

工作流将复杂任务分解为连续的小步骤:

例如,文章写作流程:

创建大纲 >> 撰写内容 >> 审核修改

这种模式适合有明确顺序的任务,每一步都是独立的LLM调用。

3. 检索增强生成(RAG)模式

RAG结合了检索和生成,分为两个阶段:

RAG模式

  1. 离线索引:

    • 将文档分块
    • 为每块生成向量嵌入
    • 存储到向量数据库
  2. 在线查询:

    • 为问题生成向量嵌入
    • 检索最相关的文档块
    • 结合问题和文档块生成答案
# 离线索引流程
分块节点 >> 嵌入节点 >> 存储节点
 
# 在线查询流程  
问题嵌入 >> 检索文档 >> 生成回答

4. 映射归约(MapReduce)模式

处理大量数据的模式,分为两个阶段:

MapReduce模式

  1. 映射(Map): 将大任务分解为小任务并并行处理
  2. 归约(Reduce): 汇总小任务的结果

例如,总结多个文档:

class 总结所有文件(BatchNode):
    def prep(self, shared):
        return shared["文件列表"]
 
    def exec(self, 文件):
        return call_llm(f"总结这个文件: {文件}")
 
class 合并总结(Node):
    def exec(self, 所有总结):
        return call_llm(f"合并这些总结: {所有总结}")
 
总结所有文件 >> 合并总结

5. 结构化输出(Structured Output)模式

确保LLM生成符合特定格式的输出:

  1. 使用YAML或JSON格式
  2. 在提示中明确要求格式
  3. 验证输出符合要求
def exec(self, 文本):
    提示 = f"""
    总结以下文本,输出格式为YAML:
    {文本}
    
    ```yaml
    摘要:
      - 要点1
      - 要点2
      - 要点3

""" 回复 = call_llm(提示) yaml_字符串 = 提取YAML(回复) 结构化结果 = yaml.safe_load(yaml_字符串)

验证结果

assert "摘要" in 结构化结果 return 结构化结果


## 工具函数

PocketFlow不内置工具函数,而是鼓励用户根据需要实现自己的工具,常见的包括:

### 1. LLM调用

```python
def call_llm(提示):
    from openai import OpenAI
    client = OpenAI(api_key="你的API密钥")
    响应 = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": 提示}]
    )
    return 响应.choices[0].message.content

2. 嵌入生成

def get_embedding(文本):
    from openai import OpenAI
    client = OpenAI(api_key="你的API密钥")
    响应 = client.embeddings.create(
        model="text-embedding-ada-002",
        input=文本
    )
    嵌入向量 = 响应.data[0].embedding
    return 嵌入向量

3. 向量存储

import faiss
import numpy as np
 
def create_index(嵌入列表):
    维度 = len(嵌入列表[0])
    索引 = faiss.IndexFlatL2(维度)
    向量数组 = np.array(嵌入列表).astype('float32')
    索引.add(向量数组)
    return 索引
 
def search_index(索引, 查询向量, top_k=5):
    查询数组 = np.array([查询向量]).astype('float32')
    距离, 索引位置 = 索引.search(查询数组, top_k)
    return 索引位置, 距离

代码案例集锦

下面通过三个完整案例展示PocketFlow的应用:

案例1: 问答系统

这个简单的问答系统接收用户问题并用LLM回答:

from pocketflow import Node, Flow
 
class 获取问题节点(Node):
    def exec(self, _):
        用户问题 = input("请输入您的问题: ")
        return 用户问题
    
    def post(self, shared, prep_res, exec_res):
        shared["问题"] = exec_res
        return "default"
 
class 回答节点(Node):
    def prep(self, shared):
        return shared["问题"]
    
    def exec(self, 问题):
        return call_llm(问题)
    
    def post(self, shared, prep_res, exec_res):
        shared["回答"] = exec_res
        print(f"回答: {exec_res}")
 
# 连接节点
获取问题 = 获取问题节点()
回答 = 回答节点()
获取问题 >> 回答
 
# 创建并运行流程
问答流程 = Flow(start=获取问题)
shared = {}
问答流程.run(shared)

案例2: 文章生成系统

分步骤生成高质量文章:

class 生成大纲(Node):
    def prep(self, shared):
        return shared["主题"]
    
    def exec(self, 主题):
        return call_llm(f"为'{主题}'创建一个详细的文章大纲")
    
    def post(self, shared, prep_res, exec_res):
        shared["大纲"] = exec_res
        return "default"
 
class 撰写内容(Node):
    def prep(self, shared):
        return shared["大纲"]
    
    def exec(self, 大纲):
        return call_llm(f"根据这个大纲撰写内容:\n{大纲}")
    
    def post(self, shared, prep_res, exec_res):
        shared["草稿"] = exec_res
        return "default"
 
class 审核修改(Node):
    def prep(self, shared):
        return shared["草稿"]
    
    def exec(self, 草稿):
        return call_llm(f"审核并改进这篇草稿:\n{草稿}")
    
    def post(self, shared, prep_res, exec_res):
        shared["最终文章"] = exec_res
        print(f"最终文章: {exec_res}")
 
# 连接节点
大纲 = 生成大纲()
撰写 = 撰写内容()
审核 = 审核修改()
 
大纲 >> 撰写 >> 审核
 
# 创建并运行流程
写作流程 = Flow(start=大纲)
shared = {"主题": "人工智能的未来"}
写作流程.run(shared)

案例3: 文档问答系统(RAG)

使用检索增强生成回答文档相关问题:

# 离线索引阶段
class 分块文档(BatchNode):
    def prep(self, shared):
        return shared["文件列表"]
 
    def exec(self, 文件路径):
        with open(文件路径, "r", encoding="utf-8") as f:
            文本 = f.read()
        
        块大小 = 1000
        块列表 = []
        for i in range(0, len(文本), 块大小):
            块列表.append(文本[i:i+块大小])
        return 块列表
    
    def post(self, shared, prep_res, exec_res_list):
        所有块 = []
        for 块列表 in exec_res_list:
            所有块.extend(块列表)
        shared["所有块"] = 所有块
 
class 嵌入文档(BatchNode):
    def prep(self, shared):
        return shared["所有块"]
 
    def exec(self, 块):
        return get_embedding(块)
 
    def post(self, shared, prep_res, exec_res_list):
        shared["所有嵌入"] = exec_res_list
 
class 存储索引(Node):
    def prep(self, shared):
        return shared["所有嵌入"]
 
    def exec(self, 所有嵌入):
        索引 = create_index(所有嵌入)
        return 索引
 
    def post(self, shared, prep_res, 索引):
        shared["索引"] = 索引
 
# 在线查询阶段
class 嵌入问题(Node):
    def prep(self, shared):
        return shared["问题"]
 
    def exec(self, 问题):
        return get_embedding(问题)
 
    def post(self, shared, prep_res, 问题嵌入):
        shared["问题嵌入"] = 问题嵌入
 
class 检索文档(Node):
    def prep(self, shared):
        return shared["问题嵌入"], shared["索引"], shared["所有块"]
 
    def exec(self, 输入):
        问题嵌入, 索引, 所有块 = 输入
        索引位置, 距离 = search_index(索引, 问题嵌入, top_k=1)
        最佳位置 = 索引位置[0][0]
        相关块 = 所有块[最佳位置]
        return 相关块
 
    def post(self, shared, prep_res, 相关块):
        shared["检索块"] = 相关块
 
class 生成答案(Node):
    def prep(self, shared):
        return shared["问题"], shared["检索块"]
 
    def exec(self, 输入):
        问题, 上下文 = 输入
        提示 = f"问题: {问题}\n上下文: {上下文}\n回答:"
        return call_llm(提示)
 
    def post(self, shared, prep_res, 答案):
        shared["答案"] = 答案
        print(f"答案: {答案}")
 
# 连接离线索引流程
分块节点 = 分块文档()
嵌入节点 = 嵌入文档()
存储节点 = 存储索引()
 
分块节点 >> 嵌入节点 >> 存储节点
离线流程 = Flow(start=分块节点)
 
# 连接在线查询流程
问题嵌入节点 = 嵌入问题()
检索节点 = 检索文档()
生成节点 = 生成答案()
 
问题嵌入节点 >> 检索节点 >> 生成节点
在线流程 = Flow(start=问题嵌入节点)
 
# 先执行离线索引
shared = {
    "文件列表": ["文档1.txt", "文档2.txt"]
}
离线流程.run(shared)
 
# 再执行在线查询
shared["问题"] = "人工智能有哪些应用?"
在线流程.run(shared)

图表:PocketFlow核心概念关系

graph TD
    A[PocketFlow框架] --> B[核心抽象]
    A --> C[设计模式]
    A --> D[工具函数]
    
    B --> B1[节点Node]
    B --> B2[流程Flow]
    B --> B3[通信方式]
    B --> B4[批处理Batch]
    B --> B5[异步Async]
    B --> B6[并行Parallel]
    
    C --> C1[代理Agent]
    C --> C2[工作流Workflow]
    C --> C3[检索增强生成RAG]
    C --> C4[映射归约MapReduce]
    C --> C5[结构化输出]
    
    D --> D1[LLM调用]
    D --> D2[嵌入生成]
    D --> D3[向量存储]
    D --> D4[网络搜索]
    D --> D5[文本分块]
    
    B1 --> E1[prep准备]
    B1 --> E2[exec执行]
    B1 --> E3[post后处理]
    
    B2 --> F1[连接节点]
    B2 --> F2[处理动作]
    B2 --> F3[嵌套流程]

小结

PocketFlow是一个简单而强大的框架,只需理解几个核心概念就能构建复杂的LLM应用:

  1. 节点(Node) 处理单个任务,包括准备、执行、后处理三个步骤
  2. 流程(Flow) 将节点连接成图,并通过节点返回的动作决定执行路径
  3. 共享存储 让节点之间共享数据
  4. 批处理、异步和并行 提供处理大量数据和提高性能的能力

通过这些基础构建块,你可以实现各种强大的模式:代理、工作流、RAG、MapReduce等,创建丰富的LLM应用。

记住,PocketFlow的哲学是:保持简单,让你专注于应用逻辑,而不是框架本身。