Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion a2aclient/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (t *RESTTransport) GetTask(ctx context.Context, params ServiceParams, req *

// ListTasks retrieves a list of tasks.
func (t *RESTTransport) ListTasks(ctx context.Context, params ServiceParams, req *a2a.ListTasksRequest) (*a2a.ListTasksResponse, error) {
path := rest.MakeTasksListPath()
path := rest.MakeListTasksPath()

query := url.Values{}
if req.ContextID != "" {
Expand Down
19 changes: 10 additions & 9 deletions a2aext/activator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func TestActivator(t *testing.T) {
t.Fatalf("a2aclient.NewFromEndpoints() error = %v", err)
}

_, err = client.SendMessage(ctx, &a2a.MessageSendParams{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.TextPart{Text: "verify extensions"}),
_, err = client.SendMessage(ctx, &a2a.SendMessageRequest{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("verify extensions")),
})
if err != nil {
t.Fatalf("client.SendMessage() error = %v", err)
Expand All @@ -123,15 +123,16 @@ func startServerWithExtensions(t *testing.T, executor a2asrv.AgentExecutor, exte
for _, uri := range extensionURIs {
extensions = append(extensions, a2a.AgentExtension{URI: uri})
}
card := &a2a.AgentCard{
Capabilities: a2a.AgentCapabilities{
Extensions: extensions,
},
}

reqHandler := a2asrv.NewHandler(executor)
server := httptest.NewServer(a2asrv.NewJSONRPCHandler(reqHandler))
card.URL = server.URL
card.PreferredTransport = a2a.TransportProtocolJSONRPC
t.Cleanup(server.Close)

card := &a2a.AgentCard{
Capabilities: a2a.AgentCapabilities{Extensions: extensions},
SupportedInterfaces: []a2a.AgentInterface{
{URL: server.URL, ProtocolBinding: a2a.TransportProtocolJSONRPC},
},
}
return card
}
19 changes: 9 additions & 10 deletions a2aext/propagator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ func TestTripleHopPropagation(t *testing.T) {
t.Fatalf("a2aclient.NewFromEndpoints() error = %v", err)
}

resp, err := client.SendMessage(ctx, &a2a.MessageSendParams{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.TextPart{Text: "Hi!"}),
resp, err := client.SendMessage(ctx, &a2a.SendMessageRequest{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("Hi!")),
Metadata: tc.clientSvcParams,
})
if err != nil {
Expand Down Expand Up @@ -233,8 +233,8 @@ func TestDefaultPropagation(t *testing.T) {
t.Fatalf("a2aclient.NewFromEndpoints() error = %v", err)
}

resp, err := client.SendMessage(ctx, &a2a.MessageSendParams{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.TextPart{Text: "Hi!"}),
resp, err := client.SendMessage(ctx, &a2a.SendMessageRequest{
Message: a2a.NewMessage(a2a.MessageRoleUser, a2a.NewTextPart("Hi!")),
Metadata: tc.clientSvcParams,
})
if err != nil {
Expand All @@ -260,7 +260,7 @@ func startServer(t *testing.T, interceptor a2asrv.CallInterceptor, executor a2as
reqHandler := a2asrv.NewHandler(executor, a2asrv.WithCallInterceptors(interceptor))
server := httptest.NewServer(a2asrv.NewJSONRPCHandler(reqHandler))
t.Cleanup(server.Close)
return a2a.AgentInterface{URL: server.URL, Transport: a2a.TransportProtocolJSONRPC}
return a2a.AgentInterface{URL: server.URL, ProtocolBinding: a2a.TransportProtocolJSONRPC}
}

func newAgentCard(endpoint a2a.AgentInterface, extensionURIs []string) *a2a.AgentCard {
Expand All @@ -269,10 +269,9 @@ func newAgentCard(endpoint a2a.AgentInterface, extensionURIs []string) *a2a.Agen
extensions[i] = a2a.AgentExtension{URI: uri}
}
return &a2a.AgentCard{
URL: endpoint.URL,
PreferredTransport: endpoint.Transport,
Capabilities: a2a.AgentCapabilities{
Extensions: extensions,
Capabilities: a2a.AgentCapabilities{Extensions: extensions},
SupportedInterfaces: []a2a.AgentInterface{
{URL: endpoint.URL, ProtocolBinding: endpoint.ProtocolBinding},
},
}
}
Expand Down Expand Up @@ -300,7 +299,7 @@ func newProxyExecutor(interceptor a2aclient.CallInterceptor, target proxyTarget)
yield(nil, err)
return
}
result, err := client.SendMessage(ctx, &a2a.MessageSendParams{
result, err := client.SendMessage(ctx, &a2a.SendMessageRequest{
Message: a2a.NewMessage(a2a.MessageRoleUser, execCtx.Message.Parts...),
})
if err != nil {
Expand Down
14 changes: 6 additions & 8 deletions a2asrv/agentexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
// For streaming responses [a2a.TaskArtifactUpdatEvent]-s should be used.
// A2A server stops processing events after one of these events:
// - An [a2a.Message] with any payload.
// - An [a2a.TaskStatusUpdateEvent] with Final field set to true.
// - An [a2a.Task] with a [a2a.TaskState] for which Terminal() method returns true.
// - An [a2a.Task] or [a2a.TaskStatusUpdateEvent] with a [a2a.TaskState] for which Terminal() method returns true or it is TaskStateInputRequired.
//
// The following code can be used as a streaming implementation template with generateOutputs and toParts missing:
//
Expand Down Expand Up @@ -78,7 +77,6 @@ import (
// }
//
// event = a2a.NewStatusUpdateEvent(execCtx, a2a.TaskStateCompleted, nil)
// event.Final = true
// if err := queue.Write(ctx, event); err != nil {
// return fmt.Errorf("failed to write state working: %w", err)
// }
Expand Down Expand Up @@ -113,7 +111,7 @@ type factory struct {

var _ taskexec.Factory = (*factory)(nil)

func (f *factory) CreateExecutor(ctx context.Context, tid a2a.TaskID, params *a2a.MessageSendParams) (taskexec.Executor, taskexec.Processor, error) {
func (f *factory) CreateExecutor(ctx context.Context, tid a2a.TaskID, params *a2a.SendMessageRequest) (taskexec.Executor, taskexec.Processor, error) {
execCtx, err := f.loadExecutionContext(ctx, tid, params)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -149,7 +147,7 @@ type executionContext struct {
}

// loadExecutionContext returns the information necessary for creating agent executor and agent event processor.
func (f *factory) loadExecutionContext(ctx context.Context, tid a2a.TaskID, params *a2a.MessageSendParams) (*executionContext, error) {
func (f *factory) loadExecutionContext(ctx context.Context, tid a2a.TaskID, params *a2a.SendMessageRequest) (*executionContext, error) {
message := params.Message

taskStoreTask, err := f.taskStore.Get(ctx, tid)
Expand Down Expand Up @@ -207,7 +205,7 @@ func (f *factory) loadExecutionContext(ctx context.Context, tid a2a.TaskID, para
}, nil
}

func (f *factory) createNewExecutionContext(tid a2a.TaskID, params *a2a.MessageSendParams) (*executionContext, error) {
func (f *factory) createNewExecutionContext(tid a2a.TaskID, params *a2a.SendMessageRequest) (*executionContext, error) {
msg := params.Message
contextID := msg.ContextID
if contextID == "" {
Expand All @@ -222,7 +220,7 @@ func (f *factory) createNewExecutionContext(tid a2a.TaskID, params *a2a.MessageS
return &executionContext{ctx: execCtx, task: nil}, nil
}

func (f *factory) CreateCanceler(ctx context.Context, params *a2a.TaskIDParams) (taskexec.Canceler, taskexec.Processor, error) {
func (f *factory) CreateCanceler(ctx context.Context, params *a2a.CancelTaskRequest) (taskexec.Canceler, taskexec.Processor, error) {
storedTask, err := f.taskStore.Get(ctx, params.ID)
if err != nil {
return nil, nil, fmt.Errorf("failed to load a task: %w", err)
Expand All @@ -237,7 +235,7 @@ func (f *factory) CreateCanceler(ctx context.Context, params *a2a.TaskIDParams)
TaskID: task.ID,
StoredTask: task,
ContextID: task.ContextID,
Metadata: params.Metadata,
Metadata: nil, // TODO: Fix spec https://github.com/a2aproject/A2A/pull/1485
}
if callCtx, ok := CallContextFrom(ctx); ok {
execCtx.User = callCtx.User
Expand Down
Loading