Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion samples/AgentServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@

var builder = WebApplication.CreateBuilder(args);

// Configure CORS to allow all origins
builder.Services.AddCors(options =>
{
options.AddDefaultPolicy(policy =>
{
// Common local dev server ports: React (3000), ASP.NET (5000), Vite (5173)
policy.WithOrigins("http://localhost:3000", "http://localhost:5000", "http://localhost:5173")
.AllowAnyMethod()
.AllowAnyHeader();
});
});

// Configure OpenTelemetry
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource.AddService("A2AAgentServer"))
Expand All @@ -26,6 +38,7 @@
var app = builder.Build();

app.UseHttpsRedirection();
app.UseCors();

// Add health endpoint
app.MapGet("/health", () => Results.Ok(new { Status = "Healthy", Timestamp = DateTimeOffset.UtcNow }));
Expand Down Expand Up @@ -68,6 +81,14 @@
app.MapWellKnownAgentCard(taskManager, "/speccompliance");
break;

case "streaming":
var streamingAgent = new StreamingArtifactAgent();
streamingAgent.Attach(taskManager);
app.MapA2A(taskManager, "/streaming");
app.MapWellKnownAgentCard(taskManager, "/streaming");
app.MapHttpA2A(taskManager, "/streaming");
break;

default:
Console.WriteLine($"Unknown agent type: {agentType}");
Environment.Exit(1);
Expand All @@ -88,6 +109,6 @@ static string GetAgentTypeFromArgs(string[] args)
}

// Default to echo if no agent specified
Console.WriteLine("No agent specified. Use --agent or -a parameter to specify agent type (echo, echotasks, researcher, speccompliance). Defaulting to 'echo'.");
Console.WriteLine("No agent specified. Use --agent or -a parameter to specify agent type (echo, echotasks, researcher, speccompliance, streaming). Defaulting to 'echo'.");
return "echo";
}
14 changes: 2 additions & 12 deletions samples/AgentServer/ResearcherAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public void Attach(ITaskManager taskManager)
// Initialize the agent state for the task
_agentStates[task.Id] = AgentState.Planning;
// Ignore other content in the task, just assume it is a text message.
var message = ((TextPart?)task.History?.Last()?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
var message = ((TextPart?)task.History?.LastOrDefault(m => m.Role == MessageRole.User)?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
await InvokeAsync(task.Id, message, cancellationToken);
};
_taskManager.OnTaskUpdated = async (task, cancellationToken) =>
{
// Note that the updated callback is helpful to know not to initialize the agent state again.
var message = ((TextPart?)task.History?.Last()?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
var message = ((TextPart?)task.History?.LastOrDefault(m => m.Role == MessageRole.User)?.Parts?.FirstOrDefault())?.Text ?? string.Empty;
await InvokeAsync(task.Id, message, cancellationToken);
};
_taskManager.OnAgentCardQuery = GetAgentCardAsync;
Expand All @@ -53,11 +53,6 @@ public async Task InvokeAsync(string taskId, string message, CancellationToken c
{
case AgentState.Planning:
await DoPlanningAsync(taskId, message, cancellationToken);
await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
{
Parts = [new TextPart() { Text = "When ready say go ahead" }],
},
cancellationToken: cancellationToken);
break;
case AgentState.WaitingForFeedbackOnPlan:
if (message == "go ahead") // Dumb check for now to avoid using an LLM
Expand All @@ -68,11 +63,6 @@ public async Task InvokeAsync(string taskId, string message, CancellationToken c
{
// Take the message and redo planning
await DoPlanningAsync(taskId, message, cancellationToken);
await _taskManager.UpdateStatusAsync(taskId, TaskState.InputRequired, new AgentMessage()
{
Parts = [new TextPart() { Text = "When ready say go ahead" }],
},
cancellationToken: cancellationToken);
}
break;
case AgentState.Researching:
Expand Down
109 changes: 109 additions & 0 deletions samples/AgentServer/StreamingArtifactAgent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using A2A;

namespace AgentServer;

/// <summary>
/// A sample agent that demonstrates streaming artifacts using TaskArtifactUpdateEvent.
/// It generates a story in chunks, streaming each paragraph as a separate artifact update.
/// </summary>
public class StreamingArtifactAgent
{
private ITaskManager? _taskManager;

public void Attach(ITaskManager taskManager)
{
_taskManager = taskManager;
taskManager.OnTaskCreated = ProcessMessageAsync;
taskManager.OnAgentCardQuery = GetAgentCardAsync;
}

private async Task ProcessMessageAsync(AgentTask task, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

var lastMessage = task.History!.Last();
var prompt = lastMessage.Parts.OfType<TextPart>().FirstOrDefault()?.Text ?? "a mysterious journey";

await _taskManager!.UpdateStatusAsync(
task.Id,
status: TaskState.Working,
cancellationToken: cancellationToken);

// Stream a story as multiple artifact chunks
var artifactId = $"story-{Guid.NewGuid():N}";
var paragraphs = GenerateStory(prompt);

for (int i = 0; i < paragraphs.Length; i++)
{
bool isFirst = i == 0;
bool isLast = i == paragraphs.Length - 1;

await _taskManager.ReturnArtifactStreamAsync(new TaskArtifactUpdateEvent
{
TaskId = task.Id,
Artifact = new Artifact
{
ArtifactId = artifactId,
Name = isFirst ? $"Story: {prompt}" : null,
Description = isFirst ? "A story generated in streaming chunks" : null,
Parts = [new TextPart { Text = paragraphs[i] }]
},
Append = !isFirst,
LastChunk = isLast
}, cancellationToken: cancellationToken);

// Simulate generation delay
await Task.Delay(500, cancellationToken);
}

await _taskManager.UpdateStatusAsync(
task.Id,
status: TaskState.Completed,
final: true,
cancellationToken: cancellationToken);
}

private static string[] GenerateStory(string prompt)
{
return
[
$"Once upon a time, in a land inspired by \"{prompt}\", there lived a curious adventurer.\n\n",
"The adventurer set out on a journey through enchanted forests and across vast mountains, seeking wisdom and wonder.\n\n",
"Along the way, they encountered a wise old owl who spoke of ancient secrets hidden beneath the stars.\n\n",
"With newfound knowledge, the adventurer returned home, forever changed by the journey.\n\nThe End."
];
}

private Task<AgentCard> GetAgentCardAsync(string agentUrl, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<AgentCard>(cancellationToken);
}

return Task.FromResult(new AgentCard
{
Name = "Streaming Story Agent",
Description = "Agent that generates stories streamed as artifact chunks, demonstrating TaskArtifactUpdateEvent with append and lastChunk semantics.",
Url = agentUrl,
Version = "1.0.0",
DefaultInputModes = ["text"],
DefaultOutputModes = ["text"],
Capabilities = new AgentCapabilities
{
Streaming = true,
PushNotifications = false,
},
Skills =
[
new AgentSkill
{
Id = "story-writer",
Name = "Story Writer",
Description = "Generates a short story based on a prompt, streamed in paragraph chunks.",
Tags = ["creative-writing", "streaming"]
}
],
});
}
}
128 changes: 128 additions & 0 deletions src/A2A/Server/ArtifactHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
using System.Text.Json;

namespace A2A;

/// <summary>
/// Provides helper methods for applying artifact updates to tasks.
/// </summary>
/// <remarks>
/// Centralizes the merge logic for <see cref="TaskArtifactUpdateEvent"/> so that all
/// <see cref="ITaskStore"/> implementations produce consistent results.
/// </remarks>
public static class ArtifactHelper
{
/// <summary>
/// Applies an artifact update to a task's artifact list using delta semantics.
/// </summary>
/// <remarks>
/// <para>
/// When <paramref name="append"/> is true, the update is treated as a delta:
/// <list type="bullet">
/// <item><description><b>Parts</b>: Appended to the existing parts list.</description></item>
/// <item><description><b>Metadata</b>: Upserted — new keys are added, existing keys are updated.</description></item>
/// <item><description><b>Extensions</b>: Appended — new values are added, duplicates are ignored.</description></item>
/// <item><description><b>Name/Description</b>: Updated if non-null/non-empty in the incoming artifact.</description></item>
/// </list>
/// If no existing artifact with the same <see cref="Artifact.ArtifactId"/> is found, a new artifact is created.
/// </para>
/// <para>
/// When <paramref name="append"/> is false, the incoming artifact replaces any existing artifact
/// with the same <see cref="Artifact.ArtifactId"/>, or is added if none exists.
/// </para>
/// </remarks>
/// <param name="task">The task to update. Its <see cref="AgentTask.Artifacts"/> list will be modified in place.</param>
/// <param name="artifact">The artifact or artifact chunk to apply.</param>
/// <param name="append">If true, apply delta semantics. If false, replace.</param>
public static void ApplyArtifactUpdate(AgentTask task, Artifact artifact, bool append)
{
task.Artifacts ??= [];

if (append)
{
var existingIndex = task.Artifacts.FindIndex(a => a.ArtifactId == artifact.ArtifactId);
if (existingIndex >= 0)
{
var existing = task.Artifacts[existingIndex];

// Parts: append
var mergedParts = new List<Part>(existing.Parts);
mergedParts.AddRange(artifact.Parts);

// Metadata: upsert
Dictionary<string, JsonElement>? mergedMetadata = null;
if (existing.Metadata != null || artifact.Metadata != null)
{
mergedMetadata = existing.Metadata != null ? new(existing.Metadata) : [];
if (artifact.Metadata != null)
{
foreach (var kvp in artifact.Metadata)
{
mergedMetadata[kvp.Key] = kvp.Value;
}
}
}

// Extensions: append (deduplicated)
List<string>? mergedExtensions = null;
if (existing.Extensions != null || artifact.Extensions != null)
{
mergedExtensions = existing.Extensions != null ? [.. existing.Extensions] : [];
if (artifact.Extensions != null)
{
foreach (var ext in artifact.Extensions)
{
if (!mergedExtensions.Contains(ext))
{
mergedExtensions.Add(ext);
}
}
}
}

// Build new artifact (immutable update)
task.Artifacts[existingIndex] = new Artifact
{
ArtifactId = artifact.ArtifactId,
Name = !string.IsNullOrEmpty(artifact.Name) ? artifact.Name : existing.Name,
Description = !string.IsNullOrEmpty(artifact.Description) ? artifact.Description : existing.Description,
Parts = mergedParts,
Metadata = mergedMetadata,
Extensions = mergedExtensions
};
}
else
{
// No existing artifact — create new copy
task.Artifacts.Add(CopyArtifact(artifact));
}
}
else
{
// Replace or add
var artifactCopy = CopyArtifact(artifact);
var existingIndex = task.Artifacts.FindIndex(a => a.ArtifactId == artifact.ArtifactId);
if (existingIndex >= 0)
{
task.Artifacts[existingIndex] = artifactCopy;
}
else
{
task.Artifacts.Add(artifactCopy);
}
}
}

/// <summary>
/// Creates a defensive copy of an artifact to prevent external mutation of stored state.
/// </summary>
/// <param name="artifact">The artifact to copy.</param>
internal static Artifact CopyArtifact(Artifact artifact) => new()
{
ArtifactId = artifact.ArtifactId,
Name = artifact.Name,
Description = artifact.Description,
Parts = [.. artifact.Parts],
Metadata = artifact.Metadata != null ? new(artifact.Metadata) : null,
Extensions = artifact.Extensions != null ? [.. artifact.Extensions] : null
};
}
24 changes: 24 additions & 0 deletions src/A2A/Server/ITaskManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ public interface ITaskManager
/// <returns>A task representing the asynchronous operation.</returns>
Task ReturnArtifactAsync(string taskId, Artifact artifact, CancellationToken cancellationToken = default);

/// <summary>
/// Streams a partial or complete artifact to a task with append and chunking semantics.
/// </summary>
/// <remarks>
/// <para>
/// This method allows streaming artifacts in chunks using TaskArtifactUpdateEvent objects with
/// append and lastChunk flags. When append is true, the artifact parts are appended to an existing
/// artifact with the same artifactId, or creates a new one if it doesn't exist.
/// When append is false, it replaces any existing artifact with the same artifactId.
/// </para>
/// <para>
/// Once lastChunk is true, the artifact is sealed. Any subsequent attempt to update a sealed artifact
/// will throw an <see cref="A2AException"/> with <see cref="A2AErrorCode.InvalidRequest"/>.
/// This is considered a bug in the agent implementation.
/// </para>
/// <para>
/// For complete artifacts sent in one call, use <see cref="ReturnArtifactAsync"/> instead.
/// </para>
/// </remarks>
/// <param name="artifactEvent">The artifact update event containing the artifact, taskId, append flag, and lastChunk flag.</param>
/// <param name="cancellationToken">A cancellation token that can be used to cancel the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
Task ReturnArtifactStreamAsync(TaskArtifactUpdateEvent artifactEvent, CancellationToken cancellationToken = default);

/// <summary>
/// Updates the status of a task and optionally adds a message to its history.
/// </summary>
Expand Down
Loading
Loading