PocketFlow学习指南
#AI
什么是PocketFlow?
PocketFlow是一个超级简单的大语言模型(LLM)应用框架,只有100行核心代码,零依赖,但却能实现复杂的LLM应用功能。就像搭积木一样,你可以轻松构建各种强大的AI应用,比如智能助手、文档问答系统和自动化工具。
PocketFlow的核心理念是:把LLM应用看作一个图+共享存储。这就像是一个小火车在轨道上跑,停靠不同的站点做不同的事情,并且所有站点都可以读写同一个大仓库。

核心概念解释
1. 节点(Node)
节点是最小的构建块,就像是一个小工作站,有三个简单的步骤:
-
准备数据(prep): 从共享存储读取和准备数据
- 比如:从文件读取内容、查询数据库
- 返回预处理后的数据给执行步骤
-
执行计算(exec): 处理数据,通常是调用LLM
- 比如:请求大语言模型生成摘要、回答问题
- 这一步不应该直接访问共享存储
- 返回处理结果给后处理步骤
-
后处理(post): 将结果写回共享存储,并决定下一步去哪里
- 比如:保存结果、更新数据库
- 通过返回"动作"字符串决定接下来要执行哪个节点
想象一个做蛋糕的节点:先准备材料(prep),然后烘焙(exec),最后装盘并决定是直接上桌还是加装饰(post)。
class 总结文件(Node):
def prep(self, shared):
# 读取文件内容
return shared["文件内容"]
def exec(self, 文件内容):
# 调用LLM总结文件
摘要 = call_llm(f"请总结这段文字: {文件内容}")
return 摘要
def post(self, shared, prep_res, 摘要):
# 保存摘要到共享存储
shared["摘要"] = 摘要
# 没有返回值,默认返回"default"动作2. 流程(Flow)
流程就是将多个节点连接起来的图,像一条火车轨道一样引导执行顺序:
- 创建流程: 指定起始节点(
Flow(start=首节点)) - 定义转换: 使用
节点A >> 节点B或节点A - "动作名" >> 节点B - 执行流程: 调用
flow.run(shared)运行整个流程
你可以创建简单的顺序流程,也可以根据节点返回的动作创建复杂的分支和循环。
例如,一个审批流程可能有三种路径:
审核 - "通过" >> 支付
审核 - "需修改" >> 修改
审核 - "拒绝" >> 结束
修改 >> 审核 # 修改后重新审核
支付 >> 结束 # 支付后结束
3. 通信方式
节点和流程之间有两种通信方式:
-
共享存储(Shared Store): 所有节点都可以读写的全局字典
shared = { "文件内容": "这是一些文本...", "摘要": None # 初始为空,会由节点填充 } -
参数(Params): 每个节点的本地参数,主要用于批处理
节点.set_params({"文件名": "文档1.txt"})
4. 批处理(Batch)
批处理帮助处理大量数据,有两种形式:
-
批处理节点(BatchNode): 一次处理多个项目
prep()返回可迭代对象(如列表)exec()对每个项目分别调用post()收到所有结果列表
-
批处理流程(BatchFlow): 多次运行同一流程
- 为每次运行提供不同的参数
- 适合处理多个文件或任务
例如,一个处理多个文件的批处理:
class 处理所有文件(BatchNode):
def prep(self, shared):
文件列表 = shared["文件列表"]
return 文件列表 # 返回文件列表
def exec(self, 单个文件):
内容 = 读取文件(单个文件)
return 总结文本(内容)
def post(self, shared, prep_res, 所有摘要):
shared["所有摘要"] = 所有摘要5. 异步处理(Async)
异步节点使用prep_async(), exec_async(), post_async()处理I/O密集型任务:
class 异步总结节点(AsyncNode):
async def prep_async(self, shared):
文档 = await 异步读取文件(shared["文件路径"])
return 文档
async def exec_async(self, 文档):
摘要 = await 异步调用LLM(f"总结: {文档}")
return 摘要6. 并行处理(Parallel)
并行节点可以同时处理多个任务,适合I/O密集型操作(如多个LLM调用):
class 并行总结(AsyncParallelBatchNode):
async def prep_async(self, shared):
return shared["文本列表"]
async def exec_async(self, 文本):
return await 异步调用LLM(f"总结: {文本}")主要设计模式
PocketFlow支持多种设计模式,可以应对不同的应用场景:
1. 代理(Agent)模式
代理可以根据上下文动态决定执行哪些操作,像是一个会思考的助手:

例如,一个搜索代理:
- 决定是搜索还是回答
- 如果搜索,收集信息后回到决策点
- 如果回答,基于收集的信息给出答案
class 决策节点(Node):
def exec(self, 输入):
决定 = call_llm("应该搜索更多信息还是直接回答?")
if "搜索" in 决定:
return "search"
else:
return "answer"
def post(self, shared, prep_res, exec_res):
return exec_res # 返回"search"或"answer"
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # 循环回决策点2. 工作流(Workflow)模式
工作流将复杂任务分解为连续的小步骤:
例如,文章写作流程:
创建大纲 >> 撰写内容 >> 审核修改这种模式适合有明确顺序的任务,每一步都是独立的LLM调用。
3. 检索增强生成(RAG)模式
RAG结合了检索和生成,分为两个阶段:

-
离线索引:
- 将文档分块
- 为每块生成向量嵌入
- 存储到向量数据库
-
在线查询:
- 为问题生成向量嵌入
- 检索最相关的文档块
- 结合问题和文档块生成答案
# 离线索引流程
分块节点 >> 嵌入节点 >> 存储节点
# 在线查询流程
问题嵌入 >> 检索文档 >> 生成回答4. 映射归约(MapReduce)模式
处理大量数据的模式,分为两个阶段:

- 映射(Map): 将大任务分解为小任务并并行处理
- 归约(Reduce): 汇总小任务的结果
例如,总结多个文档:
class 总结所有文件(BatchNode):
def prep(self, shared):
return shared["文件列表"]
def exec(self, 文件):
return call_llm(f"总结这个文件: {文件}")
class 合并总结(Node):
def exec(self, 所有总结):
return call_llm(f"合并这些总结: {所有总结}")
总结所有文件 >> 合并总结5. 结构化输出(Structured Output)模式
确保LLM生成符合特定格式的输出:
- 使用YAML或JSON格式
- 在提示中明确要求格式
- 验证输出符合要求
def exec(self, 文本):
提示 = f"""
总结以下文本,输出格式为YAML:
{文本}
```yaml
摘要:
- 要点1
- 要点2
- 要点3""" 回复 = call_llm(提示) yaml_字符串 = 提取YAML(回复) 结构化结果 = yaml.safe_load(yaml_字符串)
验证结果
assert "摘要" in 结构化结果 return 结构化结果
## 工具函数
PocketFlow不内置工具函数,而是鼓励用户根据需要实现自己的工具,常见的包括:
### 1. LLM调用
```python
def call_llm(提示):
from openai import OpenAI
client = OpenAI(api_key="你的API密钥")
响应 = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": 提示}]
)
return 响应.choices[0].message.content2. 嵌入生成
def get_embedding(文本):
from openai import OpenAI
client = OpenAI(api_key="你的API密钥")
响应 = client.embeddings.create(
model="text-embedding-ada-002",
input=文本
)
嵌入向量 = 响应.data[0].embedding
return 嵌入向量3. 向量存储
import faiss
import numpy as np
def create_index(嵌入列表):
维度 = len(嵌入列表[0])
索引 = faiss.IndexFlatL2(维度)
向量数组 = np.array(嵌入列表).astype('float32')
索引.add(向量数组)
return 索引
def search_index(索引, 查询向量, top_k=5):
查询数组 = np.array([查询向量]).astype('float32')
距离, 索引位置 = 索引.search(查询数组, top_k)
return 索引位置, 距离代码案例集锦
下面通过三个完整案例展示PocketFlow的应用:
案例1: 问答系统
这个简单的问答系统接收用户问题并用LLM回答:
from pocketflow import Node, Flow
class 获取问题节点(Node):
def exec(self, _):
用户问题 = input("请输入您的问题: ")
return 用户问题
def post(self, shared, prep_res, exec_res):
shared["问题"] = exec_res
return "default"
class 回答节点(Node):
def prep(self, shared):
return shared["问题"]
def exec(self, 问题):
return call_llm(问题)
def post(self, shared, prep_res, exec_res):
shared["回答"] = exec_res
print(f"回答: {exec_res}")
# 连接节点
获取问题 = 获取问题节点()
回答 = 回答节点()
获取问题 >> 回答
# 创建并运行流程
问答流程 = Flow(start=获取问题)
shared = {}
问答流程.run(shared)案例2: 文章生成系统
分步骤生成高质量文章:
class 生成大纲(Node):
def prep(self, shared):
return shared["主题"]
def exec(self, 主题):
return call_llm(f"为'{主题}'创建一个详细的文章大纲")
def post(self, shared, prep_res, exec_res):
shared["大纲"] = exec_res
return "default"
class 撰写内容(Node):
def prep(self, shared):
return shared["大纲"]
def exec(self, 大纲):
return call_llm(f"根据这个大纲撰写内容:\n{大纲}")
def post(self, shared, prep_res, exec_res):
shared["草稿"] = exec_res
return "default"
class 审核修改(Node):
def prep(self, shared):
return shared["草稿"]
def exec(self, 草稿):
return call_llm(f"审核并改进这篇草稿:\n{草稿}")
def post(self, shared, prep_res, exec_res):
shared["最终文章"] = exec_res
print(f"最终文章: {exec_res}")
# 连接节点
大纲 = 生成大纲()
撰写 = 撰写内容()
审核 = 审核修改()
大纲 >> 撰写 >> 审核
# 创建并运行流程
写作流程 = Flow(start=大纲)
shared = {"主题": "人工智能的未来"}
写作流程.run(shared)案例3: 文档问答系统(RAG)
使用检索增强生成回答文档相关问题:
# 离线索引阶段
class 分块文档(BatchNode):
def prep(self, shared):
return shared["文件列表"]
def exec(self, 文件路径):
with open(文件路径, "r", encoding="utf-8") as f:
文本 = f.read()
块大小 = 1000
块列表 = []
for i in range(0, len(文本), 块大小):
块列表.append(文本[i:i+块大小])
return 块列表
def post(self, shared, prep_res, exec_res_list):
所有块 = []
for 块列表 in exec_res_list:
所有块.extend(块列表)
shared["所有块"] = 所有块
class 嵌入文档(BatchNode):
def prep(self, shared):
return shared["所有块"]
def exec(self, 块):
return get_embedding(块)
def post(self, shared, prep_res, exec_res_list):
shared["所有嵌入"] = exec_res_list
class 存储索引(Node):
def prep(self, shared):
return shared["所有嵌入"]
def exec(self, 所有嵌入):
索引 = create_index(所有嵌入)
return 索引
def post(self, shared, prep_res, 索引):
shared["索引"] = 索引
# 在线查询阶段
class 嵌入问题(Node):
def prep(self, shared):
return shared["问题"]
def exec(self, 问题):
return get_embedding(问题)
def post(self, shared, prep_res, 问题嵌入):
shared["问题嵌入"] = 问题嵌入
class 检索文档(Node):
def prep(self, shared):
return shared["问题嵌入"], shared["索引"], shared["所有块"]
def exec(self, 输入):
问题嵌入, 索引, 所有块 = 输入
索引位置, 距离 = search_index(索引, 问题嵌入, top_k=1)
最佳位置 = 索引位置[0][0]
相关块 = 所有块[最佳位置]
return 相关块
def post(self, shared, prep_res, 相关块):
shared["检索块"] = 相关块
class 生成答案(Node):
def prep(self, shared):
return shared["问题"], shared["检索块"]
def exec(self, 输入):
问题, 上下文 = 输入
提示 = f"问题: {问题}\n上下文: {上下文}\n回答:"
return call_llm(提示)
def post(self, shared, prep_res, 答案):
shared["答案"] = 答案
print(f"答案: {答案}")
# 连接离线索引流程
分块节点 = 分块文档()
嵌入节点 = 嵌入文档()
存储节点 = 存储索引()
分块节点 >> 嵌入节点 >> 存储节点
离线流程 = Flow(start=分块节点)
# 连接在线查询流程
问题嵌入节点 = 嵌入问题()
检索节点 = 检索文档()
生成节点 = 生成答案()
问题嵌入节点 >> 检索节点 >> 生成节点
在线流程 = Flow(start=问题嵌入节点)
# 先执行离线索引
shared = {
"文件列表": ["文档1.txt", "文档2.txt"]
}
离线流程.run(shared)
# 再执行在线查询
shared["问题"] = "人工智能有哪些应用?"
在线流程.run(shared)图表:PocketFlow核心概念关系
graph TD
A[PocketFlow框架] --> B[核心抽象]
A --> C[设计模式]
A --> D[工具函数]
B --> B1[节点Node]
B --> B2[流程Flow]
B --> B3[通信方式]
B --> B4[批处理Batch]
B --> B5[异步Async]
B --> B6[并行Parallel]
C --> C1[代理Agent]
C --> C2[工作流Workflow]
C --> C3[检索增强生成RAG]
C --> C4[映射归约MapReduce]
C --> C5[结构化输出]
D --> D1[LLM调用]
D --> D2[嵌入生成]
D --> D3[向量存储]
D --> D4[网络搜索]
D --> D5[文本分块]
B1 --> E1[prep准备]
B1 --> E2[exec执行]
B1 --> E3[post后处理]
B2 --> F1[连接节点]
B2 --> F2[处理动作]
B2 --> F3[嵌套流程]小结
PocketFlow是一个简单而强大的框架,只需理解几个核心概念就能构建复杂的LLM应用:
- 节点(Node) 处理单个任务,包括准备、执行、后处理三个步骤
- 流程(Flow) 将节点连接成图,并通过节点返回的动作决定执行路径
- 共享存储 让节点之间共享数据
- 批处理、异步和并行 提供处理大量数据和提高性能的能力
通过这些基础构建块,你可以实现各种强大的模式:代理、工作流、RAG、MapReduce等,创建丰富的LLM应用。
记住,PocketFlow的哲学是:保持简单,让你专注于应用逻辑,而不是框架本身。