Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 独立工作流 Block #1440

Merged
merged 6 commits into from
Mar 30, 2025
Merged

Conversation

lss233
Copy link
Owner

@lss233 lss233 commented Mar 30, 2025

工作流 Block 只会在 build 时构造,fixes: #1433

好的,这是翻译成中文的 pull request 总结:

Sourcery 总结

重构工作流块构建过程,将 Block 实例化推迟到构建阶段,从而提高工作流的灵活性和性能

新特性:

  • 在工作流构建中增加对延迟 Block 初始化的支持
  • 实现更动态的连接机制

Bug 修复:

  • 修复了 Block 提前实例化的潜在问题
  • 改进了块命名和连接逻辑的处理

增强:

  • 修改工作流构建器,将节点规范与 Block 实例化分离
  • 引入一种更灵活的创建和连接工作流节点的方法
  • 改进工作流序列化和反序列化过程
Original summary in English

好的,这是翻译成中文的 pull request 总结:

Sourcery 总结

重构工作流块的构建方式,将块的初始化推迟到构建阶段,从而提高工作流的灵活性和性能

新特性:

  • 在工作流构建中引入延迟块初始化
  • 为工作流节点实现更动态的连接机制

Bug 修复:

  • 解决过早实例化块的潜在问题
  • 改进块命名和连接逻辑

增强:

  • 将节点规范与块实例化分离
  • 创建一种更灵活的工作流节点创建和连接方法
  • 改进工作流序列化和反序列化过程
Original summary in English

Summary by Sourcery

Refactor workflow block construction to defer Block initialization until the build phase, improving workflow flexibility and performance

New Features:

  • Introduce delayed Block initialization in workflow construction
  • Implement a more dynamic connection mechanism for workflow nodes

Bug Fixes:

  • Resolve potential issues with premature Block instantiation
  • Improve block naming and connection logic

Enhancements:

  • Separate node specification from Block instantiation
  • Create a more flexible method for workflow node creation and connection
  • Improve workflow serialization and deserialization process

lss233 added 2 commits March 30, 2025 04:56
- Updated the duration calculation for simulating typing in the TelegramAdapter to ensure a minimum pause of 2 seconds, improving the user experience during message sending.
- Refactored the workflow builder to replace blocks with nodes, enhancing the structure and clarity of the workflow definition.
- Introduced a new method to store wire specifications, allowing for better management of connections between nodes.
- Updated the API routes to accommodate the new node-based structure, ensuring compatibility with existing functionality.
- Improved error handling and initialization processes for blocks and connections during workflow creation and updates.
Copy link
Contributor

sourcery-ai bot commented Mar 30, 2025

Sourcery 代码审查指南

此 Pull Request 重构了工作流模块的构建过程,将模块实例化延迟到构建阶段,从而提高了工作流的灵活性和性能。它还包括新的聊天工作流、消息组合的改进以及 Telegram 适配器的调整。

Node 和 BlockSpec 的更新类图

classDiagram
    class BlockSpec {
        -block_class: Type[Block]
        -name: Optional[str]
        -kwargs: Dict[str, Any]
        -wire_from: List[str]
        +__post_init__()
    }
    class Node {
        -spec: BlockSpec
        -name: Optional[str]
        -next_nodes: List[Node]
        -merge_point: Node
        -parallel_nodes: List[Node]
        -is_parallel: bool
        -condition: Callable
        -is_conditional: bool
        -is_loop: bool
        -parent: Node
        -position: Optional[Dict[str, int]]
        +__post_init__()
        +ancestors() List[Node]
        +is_ancestor_of(node: Node) bool
    }
    Node -- BlockSpec : has a
Loading

WorkflowBuilder 的更新类图

classDiagram
    class WorkflowBuilder {
        -name: str
        -description: str
        -head: Node
        -current: Node
        -nodes: List[Node]
        -nodes_by_name: Dict[str, Node]
        -wire_specs: List[Tuple[str, str, str, str]]
        +__init__(name: str)
        +_generate_unique_name(base_name: str) str
        +_parse_block_spec(block_spec: Union[Type[Block], tuple]) BlockSpec
        +_get_available_inputs(node: Node) List[str]
        +_find_matching_ports(source_node: Node, target_node: Node, available_inputs: List[str]) List[Tuple[str, str]]
        +_store_wire_spec(source_name: str, target_name: str, source_node: Optional[Node], target_node: Optional[Node])
        +_create_node(spec: BlockSpec, is_parallel: bool = False) Node
        +chain(block_class: Type[Block], name: str = None, **kwargs) WorkflowBuilder
        +parallel(block_specs: List[Union[Type[Block], tuple]]) WorkflowBuilder
        +end_parallel() WorkflowBuilder
        +condition(condition_func: Callable) WorkflowBuilder
        +if_then(condition: Callable[[Dict[str, Any]], bool], name: str = None) WorkflowBuilder
        +end_if() WorkflowBuilder
        +loop(condition: Callable[[Dict[str, Any]], bool], name: str = None, iteration_var: str = "index") WorkflowBuilder
        +end_loop() WorkflowBuilder
        +build(container: DependencyContainer) Workflow
        +force_connect(source_name: str, target_name: str, source_output: str, target_input: str)
        +_find_parallel_nodes(start_node: Node) List[Node]
        +update_position(name: str, position: Tuple[int, int])
        +metadata: Dict[str, Any]
        +save_to_yaml(file_path: str, container: DependencyContainer)
        +load_from_yaml(file_path: str, container: DependencyContainer) WorkflowBuilder
    }
Loading

文件级别变更

变更 详情 文件
重构工作流构建器,将模块实例化延迟到构建阶段,从而提高灵活性和性能。
  • 引入 BlockSpec 数据类来管理模块创建参数。
  • 修改 Node 类以保存 BlockSpec 而不是 Block 实例。
  • 更新 _create_node 以创建节点而不实例化模块。
  • 在构建阶段实现延迟模块初始化。
  • 添加用于管理和匹配连接的输入/输出端口的方法。
  • 存储连接规范而不是立即创建连接。
  • 修改 build 方法以实例化模块并根据存储的规范创建连接。
  • 更新序列化和反序列化以处理新的节点和连接结构。
kirara_ai/workflow/core/workflow/builder.py
更新工作流 API 路由以反映工作流构建器中的更改。
  • 修改工作流列表以包含元数据。
  • 调整工作流检索以使用新的节点结构。
  • 更新工作流创建和更新以与延迟模块实例化对齐。
kirara_ai/web/api/workflow/routes.py
更新默认工作流并添加新的聊天工作流。
  • 将“AI对话-自定义分段”工作流重命名为“聊天 - 自定义分段”。
  • 为 DeepSeek 模型添加“聊天 - 深度思考”工作流。
  • 为多模态模型添加“聊天 - 原生多模态对话”工作流。
  • 更新默认工作流描述。
data/workflows/chat/talk_break.yaml
data/workflows/chat/dsr_thinking.yaml
data/workflows/chat/normal_multimodal.yaml
kirara_ai/workflow/implementations/factories/default_factory.py
通过删除 <think> 标签来改进消息组合。
  • 添加函数 drop_think_part 以从文本中删除 <think> 标签。
  • 修改 DefaultMemoryComposer 以在使用组合消息时使用 drop_think_part
kirara_ai/memory/composes/builtin_composes.py
调整 Telegram 适配器中的消息发送以模拟打字。
  • 修改睡眠持续时间计算以确保最短持续时间为 1 秒。
kirara_ai/plugins/im_telegram_adapter/adapter.py

提示和命令

与 Sourcery 互动

  • 触发新的审查: 在 Pull Request 上评论 @sourcery-ai review
  • 继续讨论: 直接回复 Sourcery 的审查评论。
  • 从审查评论生成 GitHub Issue: 通过回复审查评论,要求 Sourcery 从审查评论创建一个 Issue。您也可以回复审查评论并使用 @sourcery-ai issue 从其创建一个 Issue。
  • 生成 Pull Request 标题: 在 Pull Request 标题中的任意位置写入 @sourcery-ai 以随时生成标题。您也可以在 Pull Request 上评论 @sourcery-ai title 以随时(重新)生成标题。
  • 生成 Pull Request 摘要: 在 Pull Request 正文中的任意位置写入 @sourcery-ai summary 以随时在您想要的位置生成 PR 摘要。您也可以在 Pull Request 上评论 @sourcery-ai summary 以随时(重新)生成摘要。
  • 生成审查指南: 在 Pull Request 上评论 @sourcery-ai guide 以随时(重新)生成审查指南。
  • 解决所有 Sourcery 评论: 在 Pull Request 上评论 @sourcery-ai resolve 以解决所有 Sourcery 评论。如果您已经处理了所有评论并且不想再看到它们,这将非常有用。
  • 驳回所有 Sourcery 审查: 在 Pull Request 上评论 @sourcery-ai dismiss 以驳回所有现有的 Sourcery 审查。如果您想从新的审查开始,这将特别有用 - 不要忘记评论 @sourcery-ai review 以触发新的审查!
  • 为 Issue 生成行动计划: 在 Issue 上评论 @sourcery-ai plan 以为其生成行动计划。

自定义您的体验

访问您的 仪表板 以:

  • 启用或禁用审查功能,例如 Sourcery 生成的 Pull Request 摘要、审查指南等。
  • 更改审查语言。
  • 添加、删除或编辑自定义审查说明。
  • 调整其他审查设置。

获取帮助

Original review guide in English

Reviewer's Guide by Sourcery

This pull request refactors the workflow block construction process to defer Block instantiation until the build phase, improving workflow flexibility and performance. It also includes new chat workflows, improvements to message composition, and adjustments to the Telegram adapter.

Updated class diagram for Node and BlockSpec

classDiagram
    class BlockSpec {
        -block_class: Type[Block]
        -name: Optional[str]
        -kwargs: Dict[str, Any]
        -wire_from: List[str]
        +__post_init__()
    }
    class Node {
        -spec: BlockSpec
        -name: Optional[str]
        -next_nodes: List[Node]
        -merge_point: Node
        -parallel_nodes: List[Node]
        -is_parallel: bool
        -condition: Callable
        -is_conditional: bool
        -is_loop: bool
        -parent: Node
        -position: Optional[Dict[str, int]]
        +__post_init__()
        +ancestors() List[Node]
        +is_ancestor_of(node: Node) bool
    }
    Node -- BlockSpec : has a
Loading

Updated class diagram for WorkflowBuilder

classDiagram
    class WorkflowBuilder {
        -name: str
        -description: str
        -head: Node
        -current: Node
        -nodes: List[Node]
        -nodes_by_name: Dict[str, Node]
        -wire_specs: List[Tuple[str, str, str, str]]
        +__init__(name: str)
        +_generate_unique_name(base_name: str) str
        +_parse_block_spec(block_spec: Union[Type[Block], tuple]) BlockSpec
        +_get_available_inputs(node: Node) List[str]
        +_find_matching_ports(source_node: Node, target_node: Node, available_inputs: List[str]) List[Tuple[str, str]]
        +_store_wire_spec(source_name: str, target_name: str, source_node: Optional[Node], target_node: Optional[Node])
        +_create_node(spec: BlockSpec, is_parallel: bool = False) Node
        +chain(block_class: Type[Block], name: str = None, **kwargs) WorkflowBuilder
        +parallel(block_specs: List[Union[Type[Block], tuple]]) WorkflowBuilder
        +end_parallel() WorkflowBuilder
        +condition(condition_func: Callable) WorkflowBuilder
        +if_then(condition: Callable[[Dict[str, Any]], bool], name: str = None) WorkflowBuilder
        +end_if() WorkflowBuilder
        +loop(condition: Callable[[Dict[str, Any]], bool], name: str = None, iteration_var: str = "index") WorkflowBuilder
        +end_loop() WorkflowBuilder
        +build(container: DependencyContainer) Workflow
        +force_connect(source_name: str, target_name: str, source_output: str, target_input: str)
        +_find_parallel_nodes(start_node: Node) List[Node]
        +update_position(name: str, position: Tuple[int, int])
        +metadata: Dict[str, Any]
        +save_to_yaml(file_path: str, container: DependencyContainer)
        +load_from_yaml(file_path: str, container: DependencyContainer) WorkflowBuilder
    }
Loading

File-Level Changes

Change Details Files
Refactors the workflow builder to defer block instantiation until the build phase, enhancing flexibility and performance.
  • Introduces BlockSpec data class for managing block creation parameters.
  • Modifies Node class to hold BlockSpec instead of a Block instance.
  • Updates _create_node to create nodes without instantiating blocks.
  • Implements lazy block initialization during the build phase.
  • Adds methods for managing and matching input/output ports for connections.
  • Stores wire specifications instead of creating wires immediately.
  • Modifies the build method to instantiate blocks and create wires based on stored specifications.
  • Updates serialization and deserialization to handle the new node and connection structure.
kirara_ai/workflow/core/workflow/builder.py
Updates the workflow API routes to reflect changes in the workflow builder.
  • Modifies the workflow listing to include metadata.
  • Adjusts the workflow retrieval to use the new node structure.
  • Updates workflow creation and updating to align with the deferred block instantiation.
kirara_ai/web/api/workflow/routes.py
Updates default workflows and adds new chat workflows.
  • Renames 'AI对话-自定义分段' workflow to '聊天 - 自定义分段'.
  • Adds '聊天 - 深度思考' workflow for DeepSeek models.
  • Adds '聊天 - 原生多模态对话' workflow for multimodal models.
  • Updates the default workflow description.
data/workflows/chat/talk_break.yaml
data/workflows/chat/dsr_thinking.yaml
data/workflows/chat/normal_multimodal.yaml
kirara_ai/workflow/implementations/factories/default_factory.py
Improves message composition by removing <think> tags.
  • Adds a function drop_think_part to remove <think> tags from text.
  • Modifies DefaultMemoryComposer to use drop_think_part when composing messages.
kirara_ai/memory/composes/builtin_composes.py
Adjusts message sending in the Telegram adapter to simulate typing.
  • Modifies the sleep duration calculation to ensure a minimum duration of 1 second.
kirara_ai/plugins/im_telegram_adapter/adapter.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!
  • Generate a plan of action for an issue: Comment @sourcery-ai plan on
    an issue to generate a plan of action for it.

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

codecov bot commented Mar 30, 2025

Codecov Report

Attention: Patch coverage is 65.48673% with 39 lines in your changes missing coverage. Please review.

Project coverage is 67.15%. Comparing base (86cc532) to head (38b9eda).
Report is 7 commits behind head on master.

✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
kirara_ai/workflow/core/workflow/builder.py 64.15% 38 Missing ⚠️
...kflow/implementations/factories/default_factory.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1440      +/-   ##
==========================================
+ Coverage   67.09%   67.15%   +0.05%     
==========================================
  Files         148      148              
  Lines        6632     6661      +29     
==========================================
+ Hits         4450     4473      +23     
- Misses       2182     2188       +6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

lss233 added 3 commits March 30, 2025 18:30
…tion storage

- Introduced methods to retrieve available inputs and find matching ports between nodes, improving the connection process.
- Updated the _store_wire_spec method to automatically match input and output ports, streamlining the workflow creation.
- Refactored existing connection storage calls to utilize the new matching logic, enhancing clarity and reducing manual specification.
- Introduced a new function to remove <think> tags from message text, enhancing message composition clarity.
- Updated DefaultMemoryComposer to utilize the new function when composing messages, ensuring cleaner output.
- Renamed workflows in YAML files for better understanding: "DeepSeek R系列聊天" to "聊天 - 深度思考" and "默认 - 角色扮演" to "聊天 - 角色扮演".
- Cleared parameters in the msg_sender_lakgf8 block to simplify configuration.
@lss233 lss233 marked this pull request as ready for review March 30, 2025 11:15
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lss233 - I've reviewed your changes - here's some feedback:

Overall Comments:

  • Consider adding a method to the Workflow class to encapsulate the block instantiation and wiring logic currently in WorkflowBuilder.build().
  • The _store_wire_spec method could be simplified by directly using the source_node and target_node parameters instead of looking them up by name.
Here's what I looked at during the review
  • 🟡 General issues: 2 issues found
  • 🟢 Security: all looks good
  • 🟢 Testing: all looks good
  • 🟢 Complexity: all looks good
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

if output.data_type == input.data_type:
matches.append((out_name, in_name))
# 一旦找到匹配就从可用输入中移除
target_inputs.pop(in_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Avoid mutating dictionary during iteration in _find_matching_ports.

Removing an item from target_inputs while iterating over its items can lead to unpredictable behavior. Consider iterating over a copy of the keys or using a different approach to avoid mutation during iteration.

position:
x: 100
y: 138
connected_to:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (bug_risk): Duplicate connection entries detected in YAML workflow.

There are duplicate connection entries from the 'get_message' block to 'ToggleEditState_81bvwg' with the same mapping. Confirm that the duplicates are intended and do not result in unintended side effects.

Comment on lines 51 to 73
# 构建工作流定义
blocks = []
for block in builder.blocks:
position = builder.nodes_by_name[block.name].position
for node in builder.nodes:
blocks.append(
{
"type_name": block_registry.get_block_type_name(block.__class__),
"name": block.name,
"config": builder.nodes_by_name[block.name].spec.kwargs,
"position": position if position else {"x": 0, "y": 0},
"type_name": block_registry.get_block_type_name(node.spec.block_class),
"name": node.name,
"config": node.spec.kwargs,
"position": node.position if node.position else {"x": 0, "y": 0},
}
)

wires = []
for wire in builder.wires:
for source_name, source_output, target_name, target_input in builder.wire_specs:
wires.append(
{
"source_block": wire.source_block.name,
"source_output": wire.source_output,
"target_block": wire.target_block.name,
"target_input": wire.target_input,
"source_block": source_name,
"source_output": source_output,
"target_block": target_name,
"target_input": target_input,
}
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Convert for loop into list comprehension [×2] (list-comprehension)

Suggested change
# 构建工作流定义
blocks = []
for block in builder.blocks:
position = builder.nodes_by_name[block.name].position
for node in builder.nodes:
blocks.append(
{
"type_name": block_registry.get_block_type_name(block.__class__),
"name": block.name,
"config": builder.nodes_by_name[block.name].spec.kwargs,
"position": position if position else {"x": 0, "y": 0},
"type_name": block_registry.get_block_type_name(node.spec.block_class),
"name": node.name,
"config": node.spec.kwargs,
"position": node.position if node.position else {"x": 0, "y": 0},
}
)
wires = []
for wire in builder.wires:
for source_name, source_output, target_name, target_input in builder.wire_specs:
wires.append(
{
"source_block": wire.source_block.name,
"source_output": wire.source_output,
"target_block": wire.target_block.name,
"target_input": wire.target_input,
"source_block": source_name,
"source_output": source_output,
"target_block": target_name,
"target_input": target_input,
}
)
blocks = [
{
"type_name": block_registry.get_block_type_name(
node.spec.block_class
),
"name": node.name,
"config": node.spec.kwargs,
"position": node.position if node.position else {"x": 0, "y": 0},
}
for node in builder.nodes
]
wires = [
{
"source_block": source_name,
"source_output": source_output,
"target_block": target_name,
"target_input": target_input,
}
for source_name, source_output, target_name, target_input in builder.wire_specs
]


# 首先实例化所有 Block
for node in self.nodes:
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Explicitly raise from a previous error (raise-from-previous-error)

Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
@lss233 lss233 merged commit 8898cae into master Mar 30, 2025
6 of 7 checks passed
@lss233 lss233 deleted the feature/indiviual_workflow_instance branch March 30, 2025 11:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] 群聊内同时问答,会出现串台现象
1 participant