本指南将帮助您快速上手 DataFlow Agent 平台的5个核心功能模块。
根据用户的自然语言描述,自动推荐并生成合适的 DataFlow Pipeline,包括算子选择、参数配置和代码生成。
- 快速构建数据处理流程
- 不熟悉具体算子时的智能推荐
- 自动化 Pipeline 生成
-
目标描述 (必需)
- 描述您想要实现的数据处理目标
- 示例:
"给我随意符合逻辑的5个算子,过滤,去重!" - 示例:
"对文本数据进行清洗、去重、分类"
-
输入 JSONL 文件路径 (必需)
- 用于测试 Pipeline 的数据文件
- 格式:每行一个 JSON 对象
- 默认:
{项目根目录}/tests/test.jsonl
-
Session ID
- 会话标识符,用于缓存和追踪
- 默认:
"default"
主要模型配置
- Chat API URL: LLM 服务地址
- 默认:
http://123.129.219.111:3000/v1/
- 默认:
- API Key: 访问密钥
- 模型名称: 如
gpt-4o,qwen-max,llama3等- 默认:
gpt-4o
- 默认:
嵌入模型配置
- Embedding API URL: 嵌入模型服务地址(可选,留空则使用主要 API)
- Embedding 模型名称: 如
text-embedding-3-small
- 启用调试模式: 是否启用自动调试和修复
- 调试模式执行次数: 1-10 次,默认 2 次
# 自动生成的 Python 代码
# 包含完整的 Pipeline 定义和执行逻辑- Pipeline 执行过程的详细日志
- 包含每个算子的执行状态
- 错误信息和调试信息
{
"recommender": {...},
"pipeline_builder": {...},
"operator_executor": {...}
}- 各个 Agent 节点的详细执行结果
- 包含推荐的算子列表、构建过程等
- 在"目标描述"框中输入您的需求
- 配置 API 信息(URL、Key、模型)
- (可选)配置嵌入模型和调试选项
- 点击"生成 Pipeline"按钮
- 查看生成的代码和执行结果
根据用户需求自动生成新的 DataFlow 算子代码,包括算子实现、测试代码和调试。
- 创建自定义数据处理算子
- 扩展 DataFlow 功能
- 快速原型开发
-
目标描述 (必需)
- 描述算子的功能和用途
- 示例:
"创建一个算子,用于对文本进行情感分析" - 示例:
"实现一个数据去重算子,支持多字段组合去重"
-
算子类别
- 算子所属类别,用于匹配相似算子作为参考
- 默认:
"Default" - 可选:
"filter","mapper","aggregator"等
-
测试数据文件路径 (JSONL)
- 用于测试算子的数据文件
- 默认:
{项目根目录}/tests/test.jsonl
- Chat API URL: LLM 服务地址
- API Key: 访问密钥(留空则使用环境变量
DF_API_KEY) - 模型名称: 默认
gpt-4o
- 输出语言:
en(英文) 或zh(中文) - 启用调试模式: 自动执行并修复代码错误
- 最大调试轮次: 1-10 次,默认 3 次
- 输出文件路径: 保存生成代码的位置(可选)
# 完整的算子实现代码
class YourOperator(Operator):
def __init__(self, ...):
...
def run(self, dataset, ...):
...[
{
"op_name": "similar_operator_1",
"similarity": 0.85,
"description": "..."
}
]- 系统匹配到的相似算子列表
- 用作参考和学习
{
"success": true,
"output": {...},
"stderr": "",
"stdout": "..."
}- 算子的执行状态
- 输出数据预览
- 错误信息(如有)
{
"round": 2,
"input_key": "text",
"available_keys": ["text", "label"],
"stdout": "...",
"stderr": "..."
}- 调试过程的详细信息
- 每轮调试的输入输出
- 各个 Agent 节点的执行详情
- 包含匹配、编写、执行、调试等阶段
- 完整的执行过程日志
- 包含所有阶段的详细信息
- 在"目标描述"中详细说明算子功能
- 选择合适的算子类别
- 配置 API 信息
- (可选)启用调试模式以自动修复错误
- 点击"生成算子"按钮
- 查看生成的代码和测试结果
- 如需修改,可调整参数后重新生成
通过可视化界面手动选择和组装算子,构建自定义 Pipeline,支持拖拽排序和参数配置。
- 精确控制 Pipeline 结构
- 复用现有算子
- 快速原型验证
- 学习算子使用方法
- Chat API URL: LLM 服务地址
- API Key: 访问密钥
- 模型名称: 默认
gpt-4o - 输入 JSONL 文件路径: 测试数据文件
步骤 1: 选择算子
- 从"算子分类"下拉框选择类别
- 如:
filter,mapper,deduplicator等
- 如:
- 从"算子"下拉框选择具体算子
- 系统会自动显示该算子的参数说明
步骤 2: 配置参数
-
Prompt Template (可选)
- 如果算子支持 Prompt 模板,会显示下拉选择器
- 选择后自动更新到
__init__()参数中
-
__init__()参数 (JSON 格式){ "param1": "value1", "param2": 123, "prompt_template": "module.PromptClass" }- 算子初始化参数
- 必须是有效的 JSON 对象
-
run()参数 (JSON 格式){ "input_key": "text", "output_key": "processed_text", "batch_size": 32 }- 算子运行时参数
- 必须是有效的 JSON 对象
步骤 3: 添加到 Pipeline
- 点击"➕ 添加算子到 Pipeline"按钮
- 算子会被添加到 Pipeline 序列中
步骤 4: 调整顺序
- 在 Pipeline 可视化区域,拖拽算子卡片调整顺序
- 系统会自动重新编号
步骤 5: 自动链接
- 系统会自动分析算子间的输入输出关系
- 显示链接状态:
- 🔗 已链接: 输出键成功匹配到下一个算子的输入
⚠️ 待处理: 输入为空或未匹配
- 每个算子显示为卡片,包含:
- 步骤编号
- 算子名称
__init__()参数预览run()参数预览- 与上一步的连接状态
[
{
"op_name": "TextCleanerOperator",
"init_params": {...},
"run_params": {...},
"_incoming_links": [
{
"input_key": "text",
"value": "raw_text",
"output_keys": ["output"]
}
]
}
]# 完整的 Pipeline 执行代码
from dataflow import Dataset
from dataflow.operators import *
# 加载数据
dataset = Dataset.load("input.jsonl")
# 执行 Pipeline
dataset = TextCleanerOperator(...).run(dataset, ...)
dataset = DeduplicatorOperator(...).run(dataset, ...)
...
# 保存结果
dataset.save("output.jsonl")[
{"text": "processed text 1", "label": "A"},
{"text": "processed text 2", "label": "B"},
...
]- 处理后数据的保存位置
- 配置 API 信息和输入文件路径
- 选择算子分类和具体算子
- 编辑
__init__()和run()参数(JSON 格式) - 点击"➕ 添加算子到 Pipeline"
- 重复步骤 2-4 添加更多算子
- 拖拽调整算子顺序(可选)
- 检查自动链接状态,确保参数正确
- 点击"🚀 运行 Pipeline"
- 查看生成的代码和执行结果
- 清空 Pipeline: 点击"🗑️ 清空 Pipeline"按钮
- 参数复用: 系统会自动将上一个算子的输出键链接到下一个算子的输入
- 调试: 如果执行失败,检查日志中的错误信息,调整参数后重试
PromptAgent 前端,用于生成和优化算子的 Prompt 模板,支持多轮对话式改写和测试。
- 为算子创建高质量的 Prompt 模板
- 优化现有 Prompt 的效果
- 快速迭代 Prompt 设计
- 生成测试代码和数据
- Chat API Base URL: LLM 服务地址
- 默认:
http://123.129.219.111:3000/v1/
- 默认:
- Chat API Key: 访问密钥
- Model: 模型名称,默认
gpt-4o - Language: 提示词语言,
zh(中文) 或en(英文)
-
任务描述 (必需)
- 详细描述 Prompt 要完成的任务
- 示例:
"对用户输入的文本进行情感分析,判断是正面、负面还是中性" - 示例:
"将产品描述改写为更吸引人的营销文案"
-
算子名称 (op-name) (必需)
- Prompt 类的名称
- 示例:
SentimentAnalysisPrompt - 示例:
MarketingCopywriterPrompt
-
输出格式 (可选)
- 指定 Prompt 输出的格式
- 示例:
{ "sentiment": "positive/negative/neutral", "confidence": 0.95 }
-
参数列表 (可选)
- Prompt 模板需要的参数,用逗号、空格或换行分隔
- 示例:
text, language, style - 示例:
input_text target_audience tone
-
文件输出根路径 (可选)
- 保存生成文件的目录
- 默认:
./pa_cache
-
生成后删除测试文件
- 是否在生成后删除测试文件(保留路径占位)
- 默认:启用
- 生成的 Prompt 模板文件位置
- 示例:
./pa_cache/prompts/SentimentAnalysisPrompt.py
- 自动生成的测试数据文件
- 示例:
./pa_cache/test_data/test_data.jsonl
- 自动生成的测试代码
- 示例:
./pa_cache/tests/test_prompt.py
[
{"text": "这个产品真不错!", "language": "zh"},
{"text": "质量太差了", "language": "zh"},
{"text": "还可以吧", "language": "zh"}
][
{
"input": {"text": "这个产品真不错!"},
"output": {
"sentiment": "positive",
"confidence": 0.92
}
}
]from dataflow_agent.promptstemplates import PromptTemplate
class SentimentAnalysisPrompt(PromptTemplate):
"""情感分析 Prompt 模板"""
def __init__(self):
super().__init__()
self.system_prompt = "你是一个情感分析专家..."
self.user_prompt_template = "请分析以下文本的情感:{text}"
def format(self, text: str, **kwargs) -> str:
return self.user_prompt_template.format(text=text)import json
from your_prompt import SentimentAnalysisPrompt
# 加载测试数据
with open("test_data.jsonl") as f:
test_data = [json.loads(line) for line in f]
# 测试 Prompt
prompt = SentimentAnalysisPrompt()
for item in test_data:
result = prompt.format(**item)
print(result)在右侧对话区域,您可以:
-
查看初次生成结果
- Prompt 代码
- 测试结果
-
提出改进建议
- 在对话输入框中描述您希望如何修改
- 示例:
"增加对讽刺语气的识别""输出格式改为只返回 positive/negative/neutral 字符串""添加置信度阈值,低于 0.7 时返回 uncertain"
-
发送改写指令
- 点击"发送改写指令"按钮
- 系统会根据反馈重新生成 Prompt
-
迭代优化
- 查看更新后的代码和测试结果
- 继续提出改进建议
- 重复直到满意
-
清空会话
- 点击"清空会话"按钮重新开始
- 配置 API 信息(URL、Key、模型)
- 填写任务描述、算子名称
- (可选)指定输出格式和参数列表
- 点击"生成 Prompt 模板"按钮
- 查看生成的 Prompt 代码和测试结果
- 在右侧对话框中输入改进建议
- 点击"发送改写指令"
- 查看更新后的代码和测试结果
- 重复步骤 1-3 直到满意
- 从"Prompt 文件路径"获取文件位置
- 将 Prompt 类导入到您的算子中
- 在算子的
__init__()中指定prompt_template
从网络(HuggingFace、Kaggle 等平台)自动采集数据集,并转换为统一格式,支持智能搜索、下载和数据清洗。
- 快速构建训练数据集
- 收集特定领域的数据
- 数据集格式转换
- 批量下载和处理
-
目标描述 (必需)
- 描述您想要收集的数据类型
- 示例:
"收集 Python 代码示例的数据集" - 示例:
"收集中文对话数据,用于训练聊天机器人" - 示例:
"收集图像分类数据集,包含猫和狗的图片"
-
数据类别
PT: 预训练数据(Pre-Training)SFT: 监督微调数据(Supervised Fine-Tuning)- 默认:
SFT
-
数据集数量上限(每关键词)
- 每个搜索关键词返回的数据集数量
- 范围:1-50
- 默认:5
- 注意:仅用于参考,实际数量可能因搜索结果而异
-
数据集大小范围
- 筛选数据集的大小范围
- 选项:
n<1K: 小于 1000 条1K<n<10K: 1000-10000 条10K<n<100K: 10000-100000 条100K<n<1M: 100000-1000000 条n>1M: 大于 1000000 条
- 默认:
1K<n<10K
-
下载子任务上限
- 限制最终执行的下载任务数量
- 留空表示不限制
- 用于控制下载规模和时间
-
最大数据集大小
- 单个数据集的大小上限
- 输入数值后选择单位(B/KB/MB/GB/TB)
- 留空表示不限制
-
下载目录
- 数据保存的根目录
- 默认:
downloaded_data
-
提示词语言
zh: 中文en: 英文- 默认:
zh
- CHAT_API_URL: LLM 服务地址
- 默认:
http://123.129.219.111:3000/v1/chat/completions
- 默认:
- CHAT_API_KEY: 访问密钥
- CHAT_MODEL: 模型名称
- 默认:
deepseek-chat
- 默认:
- HF_ENDPOINT: HuggingFace 镜像地址
- 默认:
https://hf-mirror.com
- 默认:
- KAGGLE_USERNAME: Kaggle 用户名
- KAGGLE_KEY: Kaggle API 密钥
- TAVILY_API_KEY: Tavily 搜索 API 密钥
- RAG_EBD_MODEL: 嵌入模型名称
- 默认:
text-embedding-3-large
- 默认:
- RAG_API_URL: RAG 服务地址
- RAG_API_KEY: RAG API 密钥
网页采集高级配置
- 下载任务最大循环次数: 1-50,默认 10
- 控制每个下载任务的最大重试次数
- 研究阶段最大循环次数: 1-50,默认 15
- research 阶段的最大循环次数,允许访问更多网站
- 搜索引擎:
tavily/duckduckgo/jina- 默认:
tavily
- 默认:
- 使用 Jina Reader: 是否使用 Jina Reader 提取网页内容
- 默认:启用
- 优点:快速、结构化(Markdown 格式)
- 启用 RAG 增强: 是否使用 RAG 精炼内容
- 默认:启用
- 并行处理页面数: 1-20,默认 5
- 并行处理的页面数量
- 建议:3-10(根据网络和机器性能调整)
- 禁用缓存: 是否禁用 HuggingFace 和 Kaggle 缓存
- 默认:启用
- 启用后使用临时目录,下载后自动清理
- 临时目录: 自定义临时目录路径
- 留空则使用默认临时目录
数据转换高级配置
- 转换模型温度: 0.0-2.0,默认 0.0
- 数据转换时的模型温度参数
- 转换最大 Token 数: 512-8192,默认 4096
- 数据转换时的最大 token 数
- 最大采样长度(字符): 50-1000,默认 200
- 每个字段的最大采样长度
- 采样记录数量: 1-10,默认 3
- 用于分析的采样记录数量
============================================================
开始执行网页采集与转换工作流
============================================================
目标: 收集 Python 代码示例的数据集
类别: SFT
下载目录: downloaded_data
【网页采集配置】
- 搜索引擎: tavily
- 下载子任务上限: 不限制
- 任务最大循环次数: 10
- 研究阶段最大循环次数: 15
- 使用 Jina Reader: 是
- 启用 RAG: 是
- 并行页面数: 5
- 禁用缓存: 是
【数据转换配置】
- 模型温度: 0.0
- 最大 Token 数: 4096
- 最大采样长度: 200
- 采样记录数: 3
数据集大小限制: 不限制
============================================================
2025-01-23 10:00:00 [INFO] 开始搜索数据集...
2025-01-23 10:00:05 [INFO] 找到 15 个候选数据集
2025-01-23 10:00:10 [INFO] 开始下载数据集 1/5...
2025-01-23 10:01:00 [INFO] 数据集 1 下载完成
...
2025-01-23 10:15:00 [INFO] 开始数据转换...
2025-01-23 10:20:00 [INFO] 数据转换完成
流程执行完成!
{
"download_dir": "downloaded_data",
"processed_output": "downloaded_data/processed_output",
"category": "SFT",
"language": "zh",
"chat_model": "deepseek-chat",
"max_download_subtasks": null,
"max_dataset_size_bytes": null,
"max_dataset_size_unit": null,
"max_dataset_size_value": null
}downloaded_data/
├── raw/ # 原始下载的数据
│ ├── dataset_1/
│ │ ├── data.jsonl
│ │ └── metadata.json
│ ├── dataset_2/
│ └── ...
└── processed_output/ # 转换后的统一格式数据
├── combined.jsonl # 合并后的数据
├── train.jsonl # 训练集(如果分割)
├── validation.jsonl # 验证集(如果分割)
└── metadata.json # 元数据信息
- 在"目标描述"中详细说明要收集的数据类型
- 选择数据类别(PT 或 SFT)
- 配置数据集数量和大小限制
- 配置 LLM API 信息
- (可选)配置 Kaggle、Tavily 等服务的密钥
- 点击"开始网页采集与转换"按钮
- 实时查看执行日志
- 等待完成后查看结果摘要
- 在下载目录中查看采集的数据
- 展开"⚙️ 高级配置"区域
- 根据需求调整:
- 搜索引擎选择
- 并行处理数量
- 缓存策略
- 数据转换参数
- 执行采集任务
- 根据日志调整参数优化效果
-
API 密钥
- 确保配置了必要的 API 密钥
- Tavily 用于搜索,Kaggle 用于下载 Kaggle 数据集
-
网络环境
- 如果在国内,建议使用 HuggingFace 镜像
- 调整并行数量以适应网络带宽
-
存储空间
- 确保有足够的磁盘空间
- 大型数据集可能需要数 GB 空间
-
执行时间
- 采集过程可能需要较长时间(几分钟到几小时)
- 可以通过限制下载任务数量来控制时间
-
数据质量
- 启用 RAG 增强可以提高数据质量
- 调整采样参数以平衡质量和速度
- OpenAI/GPT: 访问 OpenAI Platform
- Tavily: 访问 Tavily
- Kaggle: 访问 Kaggle Settings
- 快速原型:
gpt-3.5-turbo,deepseek-chat - 高质量输出:
gpt-4o,claude-3-opus - 中文优化:
qwen-max,deepseek-chat
- 检查执行日志中的错误信息
- 确认输入数据格式正确
- 检查算子参数配置
- 启用调试模式自动修复
- 查看 Agent 结果了解详细错误
- 使用更精确的目标描述
- 启用 RAG 增强
- 调整数据集大小范围
- 增加采样记录数量
- 使用更强大的 LLM 模型
- 管线推荐: 可以直接运行,但建议先在测试数据上验证
- 算子编写: 建议先测试,必要时手动调整
- 手动编排: 生成的代码已经过测试,可以直接使用
- Prompt 模板: 建议多轮优化后再用于生产环境
如有问题或建议,请:
- 提交 GitHub Issue
- 查看 FAQ
- 参考 贡献指南
在很多场景下,我们希望 Agent 不只是返回一段自由文本,而是返回结构化的 JSON 结果,方便后续程序消费。
DataFlow-Agent 在 BaseAgentConfig 中内置了针对 JSON 解析的快捷配置字段:
parser_type: 解析器类型,默认为"json"response_schema: 期望的返回结构定义(轻量级 JSON Schema)response_schema_description: 对该结构的中文/英文文字说明response_example: 一份期望的 JSON 返回示例required_fields: 必须出现的字段列表,用于约束模型输出
这些字段可以通过 create_simple_agent / create_react_agent 等便捷函数传入,也可以自行构造 SimpleConfig / ReactConfig 后再调用 create_agent。
下面的示例中,我们要求 Agent 返回包含 code 和 files 两个字段的 JSON,其中 code 是字符串,files 是字符串列表:
from dataflow_agent.agentroles import create_simple_agent
agent = create_simple_agent(
"coder",
# 解析器类型设为 json(默认即为 json)
parser_type="json",
# 期望的返回结构(轻量 JSON Schema)
response_schema={
"code": "string",
"files": ["string"], # 表示由字符串组成的列表
"summary": "string", # 可选字段,用于描述本次修改
},
# 对 Schema 的自然语言说明,帮助模型理解每个字段含义
response_schema_description=(
"请返回代码生成结果。"
"code 字段包含完整的代码内容;"
"files 是本次修改涉及到的文件路径列表;"
"summary 对主要改动进行简要说明。"
),
# 一份期望的返回示例,模型会参考该结构生成
response_example={
"code": "# 写在这里的是完整的 Python 代码 ...",
"files": ["dataflow_agent/agentroles/__init__.py"],
"summary": "新增了 create_simple_agent 的使用示例。",
},
# 必须出现的字段
required_fields=["code", "files"],
)
# 后续在你的业务逻辑中,一般会以异步方式调用 agent:
# result = await agent.execute({"task": "请为我实现一个计算阶乘的函数"})
# print(result["code"])
# print(result["files"])要点说明:
-
parser_type 建议为
"json"
response_schema*这一组字段是为 JSON 解析场景设计的。当parser_type="json"时,框架会根据这些配置构建 Prompt 与解析逻辑,尽量保证返回满足你指定的结构。 -
response_schema是“轻量级 JSON Schema”- 推荐使用简单的类型标注,如:
"string","number","boolean"等["string"]表示“字符串列表”- 也可以嵌套字典/列表描述更复杂结构
- 不必完全遵循官方 JSON Schema 规范,保持可读性和易理解为主。
- 推荐使用简单的类型标注,如:
-
response_schema_description与response_example用于“教模型”response_schema_description: 用自然语言解释每个字段含义和要求;response_example: 给出一份“长得像最终结果”的 JSON 示例。
在实践中,这两项对提升结构化输出的稳定性非常有帮助。
-
required_fields指定“必填字段”required_fields=["code", "files"]表示:无论如何,这两个 key 必须出现在最终 JSON 中;- 这可以配合 ReAct 模式/验证逻辑,用于在缺字段时触发重试或报错(具体行为取决于所用 Agent 策略和解析实现)。
create_react_agent 同样支持以上字段,使用方式几乎一致,只是多了 ReAct 本身的重试与验证能力:
from dataflow_agent.agentroles import create_react_agent
agent = create_react_agent(
"planner",
max_retries=3,
parser_type="json",
response_schema={
"plan": ["string"], # 分步计划,每一步是一句自然语言描述
"risks": ["string"], # 可能的风险点
},
response_schema_description=(
"请返回本次任务的分步计划,以及可能的风险点列表。"
"plan 字段是按执行顺序排列的步骤列表,risks 列出需要注意的问题。"
),
response_example={
"plan": [
"分析用户提供的数据源格式。",
"根据数据格式选择合适的预处理算子。",
"设计并生成完整的 DataFlow Pipeline 代码。"
],
"risks": [
"数据中可能包含脏数据,需要额外清洗步骤。",
"某些算子依赖的第三方库可能尚未安装。"
]
},
required_fields=["plan"],
)在这种模式下:
- 如果模型第一次返回的 JSON 不满足要求(字段缺失/类型不符等),
- 策略层可以结合验证器(
validators)和重试机制,对结果进行检查并尝试修正, - 从而提高结构化输出的可靠性。
如果你希望显式构造配置对象,也可以直接使用 SimpleConfig / ReactConfig 等,然后调用 create_agent:
from dataflow_agent.agentroles import (
SimpleConfig,
create_agent,
)
config = SimpleConfig(
model_name="gpt-4o",
parser_type="json",
response_schema={
"title": "string",
"outline": ["string"],
},
response_schema_description="生成一篇技术文章的大纲。",
response_example={
"title": "如何使用 DataFlow-Agent 构建智能数据处理管线",
"outline": [
"项目背景介绍",
"核心概念与模块划分",
"实战示例:从原始数据到可复用算子",
],
},
required_fields=["title", "outline"],
)
agent = create_agent("writer", config=config)
# result = await agent.execute({"topic": "DataFlow-Agent 使用实践"})
# print(result["title"])
# print(result["outline"])这种方式在以下场景比较适合:
- 你已经有一套统一的配置管理/注入机制;
- 希望在不同 Agent 之间重用同一份结构化返回配置;
- 或者需要在运行时动态调整配置(模型、温度、schema 等)。