Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "plane-conductor"
version = "0.3.4"
version = "0.3.5"
description = "Webhook orchestrator that turns Plane mentions into Claude Code agent runs"
readme = "README.md"
license = { file = "LICENSE" }
Expand Down
58 changes: 43 additions & 15 deletions src/plane_conductor/mcp_tower.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class WorkspaceContext:
member_by_nickname: dict[str, str] = field(default_factory=dict)

def label_uuid(self, label_name: str) -> str:
"""Resolve a label by display name, raising if it is not in the workspace cache."""
uuid = self.label_by_name.get(label_name)
if not uuid:
raise LabelNotFoundError(
Expand All @@ -138,6 +139,7 @@ def label_uuid(self, label_name: str) -> str:
return uuid

def artifact_label_uuid(self, role: str) -> str:
"""Resolve the `artifact:<role>` label UUID for a pipeline role."""
label_name = ARTIFACT_LABEL_BY_ROLE.get(role)
if not label_name:
raise LabelNotFoundError(
Expand Down Expand Up @@ -167,6 +169,7 @@ class TowerRegistry:
by_project_identifier: dict[str, WorkspaceContext] = field(default_factory=dict)

def slugs(self) -> list[str]:
"""Sorted list of workspace slugs known to this tower instance."""
return sorted(self.by_slug)

def resolve(
Expand Down Expand Up @@ -321,6 +324,7 @@ async def build_registry(conductor_dir: Path | str) -> TowerRegistry:


def _initiator_mention(ctx: WorkspaceContext) -> str:
"""Render a Plane `<mention-component>` tag for this workspace's initiator."""
return _MENTION_TEMPLATE.format(uuid=str(ctx.config.initiator_uuid))


Expand Down Expand Up @@ -388,6 +392,7 @@ def _ensure_uuid(value: str | UUID, name: str) -> str:


async def _client_for(ctx: WorkspaceContext) -> PlaneClient:
"""Build a fresh PlaneClient for this workspace context (per-call, not singleton — see PR #18)."""
return PlaneClient(
ctx.config.plane_base_url,
ctx.config.plane_api_key,
Expand Down Expand Up @@ -417,17 +422,22 @@ async def _client_for(ctx: WorkspaceContext) -> PlaneClient:
_log_fh = open(_log_path, "a", buffering=1, encoding="utf-8") # noqa: SIM115 (long-lived handle for process lifetime)

def _emit(**event: Any) -> None:
"""Append a single JSON-line event to the tower call log; swallow IO errors."""
with contextlib.suppress(Exception):
_log_fh.write(json.dumps(event, default=str) + "\n")

_orig_tool = mcp.tool

def _logged_tool(*a: Any, **kw: Any) -> Any:
"""Shim around `mcp.tool` that wraps every registered tool with start/end/error log events."""
register = _orig_tool(*a, **kw)

def wrap(fn: Any) -> Any:
"""Decorator returned by the shimmed `mcp.tool` — wraps `fn` in `inner` then registers it."""

@wraps(fn)
async def inner(*args: Any, **kwargs: Any) -> Any:
"""Timed wrapper around the wrapped tool: emit start, end (with ms+size), or error."""
t0 = time.monotonic()
_emit(event="start", tool=fn.__name__, args=sorted(kwargs))
try:
Expand Down Expand Up @@ -460,6 +470,7 @@ async def inner(*args: Any, **kwargs: Any) -> Any:


def _registry() -> TowerRegistry:
"""Return the process-wide TowerRegistry, raising if `main()` has not run yet."""
global _REGISTRY
if _REGISTRY is None:
raise TowerError("plane-tower registry is not initialised; call main()")
Expand Down Expand Up @@ -556,6 +567,7 @@ async def list_sub_issues(


def _summarize_issue(item: dict[str, Any]) -> dict[str, Any]:
"""Project a Plane issue dict onto the small surface the tower exposes to agents."""
return {
"id": item.get("id"),
"sequence_id": item.get("sequence_id"),
Expand Down Expand Up @@ -654,6 +666,7 @@ async def create_root_issue(


def _create_lock_for(workspace_slug: str, root_uuid: str, role: str) -> asyncio.Lock:
"""Get or create the asyncio.Lock serialising create_sub_issue for this (workspace, root, role)."""
key = (workspace_slug, root_uuid, role)
lock = _create_sub_issue_locks.get(key)
if lock is None:
Expand Down Expand Up @@ -696,31 +709,40 @@ async def create_sub_issue(

lock = _create_lock_for(ctx.config.workspace_slug, root_uuid, role)
async with lock, await _client_for(ctx) as plane:
# Pre-condition: no duplicate
# Resolve root for title + name (also used by the name-fallback check).
root = await plane.get_issue(ctx.config.project_id, root_uuid)
root_name = str(root.get("name") or "").strip()
seq = root.get("sequence_id")
identifier = (
f"{ctx.project_identifier}-{seq}" if ctx.project_identifier and seq else f"{seq}"
)
title = f"{_role_display(role)}: {root_name} ({identifier})"

# Pre-condition: no duplicate. Two-layer check.
# (1) label match — the intended invariant: a sub-issue carrying
# artifact:<role> already exists under this root.
# (2) name match — fallback for label-loss incidents (label dropped
# by Plane after create, or stripped by a later workflow). We
# observed in 2026-05 that two SPEC sub-issues (COINEX-48/50)
# were created under the same root with empty `labels: []` after
# a successful create — the cause was not pinned down, but the
# name pattern always survives, so we use it as a second net.
items = await plane.list_issues(ctx.config.project_id)
existing = [
i
for i in items
if str(i.get("parent") or "") == root_uuid and label_uuid in (i.get("labels") or [])
if str(i.get("parent") or "") == root_uuid
and (label_uuid in (i.get("labels") or []) or str(i.get("name") or "").strip() == title)
]
if existing:
uuids = [str(i.get("id")) for i in existing]
raise DuplicateSubIssueError(
f"workspace {ctx.config.workspace_slug!r}: a sub-issue with "
f"label artifact:{role} already exists under root {root_uuid} "
f"({uuids}). Use re-entry path: read the existing artifact and "
f"update its description, do not create a second one."
f"workspace {ctx.config.workspace_slug!r}: a sub-issue for "
f"role {role!r} already exists under root {root_uuid} "
f"({uuids}). Use re-entry path: read the existing artifact "
f"and update its description, do not create a second one."
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Resolve root for title + name
root = await plane.get_issue(ctx.config.project_id, root_uuid)
root_name = str(root.get("name") or "").strip()
seq = root.get("sequence_id")
identifier = (
f"{ctx.project_identifier}-{seq}" if ctx.project_identifier and seq else f"{seq}"
)
title = f"{_role_display(role)}: {root_name} ({identifier})"

# Resolve assignee (the bot for this role/nickname).
assignees: list[str] = []
if nickname:
Expand Down Expand Up @@ -764,6 +786,7 @@ async def create_sub_issue(


def _role_display(role: str) -> str:
"""Canonical display name for a role (e.g. `frontend-react` → `Frontend (React)`)."""
return _ROLE_DISPLAY.get(role, role.replace("-", " ").title())


Expand Down Expand Up @@ -793,6 +816,7 @@ def _role_display(role: str) -> str:


def _strip_inline_tags(s: str) -> str:
"""Drop any `<...>` HTML tag and surrounding whitespace from a snippet of text."""
return _MD_STRIP_TAGS_RE.sub("", s).strip()


Expand Down Expand Up @@ -920,6 +944,7 @@ async def read_artifact(
has_more = comments_offset + len(sliced) < total

def _comment_text(c: dict[str, Any]) -> str:
"""Return the comment body in the format requested by the caller (markdown or raw HTML)."""
raw = c.get("comment_html") or ""
return html_to_markdown(raw) if description_format == "markdown" else raw

Expand Down Expand Up @@ -999,6 +1024,7 @@ async def list_comments(
has_more = offset + len(sliced) < total

def _comment_text(c: dict[str, Any]) -> str:
"""Return the comment body in the format requested by the caller (markdown or raw HTML)."""
raw = c.get("comment_html") or ""
return html_to_markdown(raw) if description_format == "markdown" else raw

Expand Down Expand Up @@ -1550,10 +1576,12 @@ async def update_comment(


def _conductor_dir() -> Path:
"""Resolve the directory holding per-workspace `*.yaml` config (overridable via `CONDUCTOR_DIR`)."""
return Path(os.environ.get("CONDUCTOR_DIR", "/etc/plane-conductor/conductor.d"))


async def _async_main() -> None:
"""Boot the tower: build the workspace registry from conductor.d then serve MCP over stdio."""
global _REGISTRY
_REGISTRY = await build_registry(_conductor_dir())
if not _REGISTRY.by_slug:
Expand Down
46 changes: 46 additions & 0 deletions tests/test_mcp_tower.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,11 @@ async def test_create_sub_issue_refuses_when_duplicate_exists(
registry: TowerRegistry, ctx: WorkspaceContext, project_id: str
) -> None:
base = f"https://plane.test/api/v1/workspaces/{ctx.config.workspace_slug}/projects/{project_id}"
respx.get(f"{base}/issues/{ROOT_UUID}/").mock(
return_value=httpx.Response(
200, json={"id": ROOT_UUID, "name": "Add user dashboard", "sequence_id": 42}
)
)
respx.get(f"{base}/issues/").mock(
return_value=httpx.Response(
200,
Expand All @@ -533,6 +538,47 @@ async def test_create_sub_issue_refuses_when_duplicate_exists(
)


@respx.mock
async def test_create_sub_issue_refuses_when_name_matches_even_without_label(
registry: TowerRegistry, ctx: WorkspaceContext, project_id: str
) -> None:
"""Second-layer defense: a sub-issue under the same root with the same
canonical title and empty `labels: []` still counts as a duplicate.

Models the COINEX-48 / COINEX-50 incident: two SPEC sub-issues were
created under the same root with `labels: []`, getting past the
label-only check. The name pattern always survives, so we use it as
a fallback.
"""
base = f"https://plane.test/api/v1/workspaces/{ctx.config.workspace_slug}/projects/{project_id}"
respx.get(f"{base}/issues/{ROOT_UUID}/").mock(
return_value=httpx.Response(
200, json={"id": ROOT_UUID, "name": "Add user dashboard", "sequence_id": 42}
)
)
respx.get(f"{base}/issues/").mock(
return_value=httpx.Response(
200,
json={
"results": [
{
"id": SPEC_SUB_UUID,
"parent": ROOT_UUID,
"labels": [],
"name": "SPEC: Add user dashboard (TEST-42)",
},
]
},
)
)
with pytest.raises(DuplicateSubIssueError, match="already exists"):
await create_sub_issue(
role="spec",
root_uuid=ROOT_UUID,
workspace=ctx.config.workspace_slug,
)


@respx.mock
async def test_create_sub_issue_unlabelled_post_condition_fails_loudly(
registry: TowerRegistry, ctx: WorkspaceContext, project_id: str
Expand Down