Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Buffers;
using System.Collections.Immutable;
using System.Text.Json;
using HotChocolate.Fusion.Execution.Nodes;

namespace HotChocolate.Fusion.Execution.Clients;

Expand All @@ -27,14 +28,16 @@ public sealed class SourceSchemaErrors
/// <param name="json">
/// A <see cref="JsonElement"/> representing the "errors" array from a GraphQL response.
/// </param>
/// <param name="context"></param>
/// <param name="sourceNode"></param>
/// <returns>
/// A <see cref="SourceSchemaErrors"/> instance containing the parsed errors, or
/// <c>null</c> if the JSON is not a valid array format.
/// </returns>
/// <exception cref="InvalidOperationException">
/// Thrown when an error path contains unsupported element types (only strings and integer are supported).
/// </exception>
public static SourceSchemaErrors? From(JsonElement json)
public static SourceSchemaErrors? From(JsonElement json, OperationPlanContext context, ExecutionNode sourceNode)
{
if (json.ValueKind != JsonValueKind.Array)
{
Expand All @@ -48,13 +51,20 @@ public sealed class SourceSchemaErrors
{
var currentTrie = root;

var error = CreateError(jsonError);
var errorBuilder = CreateErrorBuilder(jsonError);

if (error is null)
if (errorBuilder is null)
{
continue;
}

if (context.CollectTelemetry)
{
errorBuilder.SetExtension(WellKnownErrorExtensions.SourceOperationPlanNodeId, sourceNode.Id);
}

var error = errorBuilder.Build();

if (error.Path is null)
{
rootErrors ??= ImmutableArray.CreateBuilder<IError>();
Expand Down Expand Up @@ -100,7 +110,7 @@ public sealed class SourceSchemaErrors
return new SourceSchemaErrors { RootErrors = rootErrors?.ToImmutableArray() ?? [], Trie = root };
}

private static IError? CreateError(JsonElement jsonError)
private static ErrorBuilder? CreateErrorBuilder(JsonElement jsonError)
{
if (jsonError.ValueKind is not JsonValueKind.Object)
{
Expand Down Expand Up @@ -133,7 +143,7 @@ public sealed class SourceSchemaErrors
}
}

return errorBuilder.Build();
return errorBuilder;
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public SourceSchemaResult(
_resource = resource;
Path = path;
Data = data;
Errors = SourceSchemaErrors.From(errors);
Errors = errors;
Extensions = extensions;
Final = final;
}
Expand All @@ -29,7 +29,7 @@ public SourceSchemaResult(

public JsonElement Data { get; }

public SourceSchemaErrors? Errors { get; }
public JsonElement Errors { get; }

public JsonElement Extensions { get; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public void CompleteNode(ExecutionNode node, ExecutionNodeResult result)
SpanId = result.Activity?.SpanId.ToHexString(),
Status = result.Status,
Duration = result.Duration,
VariableSets = result.VariableValueSets
VariableSets = result.VariableValueSets,
SchemaName = result.SchemaName
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public async Task ExecuteAsync(
Stopwatch.GetElapsedTime(start),
error,
context.GetDependentsToExecute(this),
context.GetVariableValueSets(this));
context.GetVariableValueSets(this),
context.GetSchemaName(this));

context.CompleteNode(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ internal sealed record ExecutionNodeResult(
TimeSpan Duration,
Exception? Exception,
ImmutableArray<ExecutionNode> DependentsToExecute,
ImmutableArray<VariableValues> VariableValueSets);
ImmutableArray<VariableValues> VariableValueSets,
string? SchemaName);
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,7 @@ public sealed class ExecutionNodeTrace

public required ExecutionStatus Status { get; init; }

public ImmutableArray<VariableValues> VariableSets { get; init; }
public required ImmutableArray<VariableValues> VariableSets { get; init; }

public required string? SchemaName { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ protected override ValueTask<ExecutionStatus> OnExecuteAsync(
|| !context.TryGetNodeLookupSchemaForType(typeName, out var schemaName))
{
// We have an invalid id or a valid id of a type that does not implement the Node interface
var error = ErrorBuilder.New()
var errorBuilder = ErrorBuilder.New()
.SetMessage("The node ID string has an invalid format.")
.SetExtension("originalValue", id)
.Build();
.SetExtension(WellKnownErrorExtensions.OriginalIdValue, id);

context.AddErrors(error, [_responseName], Path.Root);
if (context.CollectTelemetry)
{
errorBuilder.SetExtension(WellKnownErrorExtensions.SourceOperationPlanNodeId, Id);
}

context.AddErrors(errorBuilder.Build(), [_responseName], Path.Root);

return ValueTask.FromResult(ExecutionStatus.Failed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,41 +117,49 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
}
catch (Exception exception)
{
AddErrors(context, exception, variables, _responseNames);
AddErrors(context, exception, variables);
return ExecutionStatus.Failed;
}

var index = 0;
var bufferLength = Math.Max(variables.Length, 1);
var buffer = ArrayPool<SourceSchemaResult>.Shared.Rent(bufferLength);
var resultBuffer = ArrayPool<SourceSchemaResult>.Shared.Rent(bufferLength);
var errorBuffer = ArrayPool<SourceSchemaErrors?>.Shared.Rent(bufferLength);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional renting just to set the extension, since I didn't want to bleed the execution nodes into the source schema result parsing... have to revisit...


try
{
await foreach (var result in response.ReadAsResultStreamAsync(cancellationToken))
{
buffer[index++] = result;
resultBuffer[index] = result;
errorBuffer[index] = SourceSchemaErrors.From(result.Errors, context, this);

index++;
}

context.AddPartialResults(_source, buffer.AsSpan(0, index), _responseNames);
context.AddPartialResults(
_source,
resultBuffer.AsSpan(0, index),
errorBuffer.AsSpan(0, index),
_responseNames);
}
catch (Exception exception)
{
// if there is an error, we need to make sure that the pooled buffers for the JsonDocuments
// are returned to the pool.
foreach (var result in buffer.AsSpan(0, index))
foreach (var result in resultBuffer.AsSpan(0, index))
{
// ReSharper disable once ConditionalAccessQualifierIsNonNullableAccordingToAPIContract
result?.Dispose();
}

AddErrors(context, exception, variables, _responseNames);
AddErrors(context, exception, variables);

return ExecutionStatus.Failed;
}
finally
{
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
resultBuffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(resultBuffer);
}

return ExecutionStatus.Success;
Expand Down Expand Up @@ -194,23 +202,29 @@ internal async Task<SubscriptionResult> SubscribeAsync(
}
catch (Exception exception)
{
AddErrors(context, exception, variables, _responseNames);
AddErrors(context, exception, variables);

return SubscriptionResult.Failed();
}
}

private static void AddErrors(
private void AddErrors(
OperationPlanContext context,
Exception exception,
ImmutableArray<VariableValues> variables,
ReadOnlySpan<string> responseNames)
ImmutableArray<VariableValues> variables)
{
var error = ErrorBuilder.FromException(exception).Build();
var errorBuilder = ErrorBuilder.FromException(exception);

if (context.CollectTelemetry)
{
errorBuilder.SetExtension(WellKnownErrorExtensions.SourceOperationPlanNodeId, Id);
}

var error = errorBuilder.Build();

if (variables.Length == 0)
{
context.AddErrors(error, responseNames, Path.Root);
context.AddErrors(error, _responseNames, Path.Root);
}
else
{
Expand All @@ -224,7 +238,7 @@ private static void AddErrors(
pathBuffer[i] = variables[i].Path;
}

context.AddErrors(error, responseNames, pathBuffer.AsSpan(0, pathBufferLength));
context.AddErrors(error, _responseNames, pathBuffer.AsSpan(0, pathBufferLength));
}
finally
{
Expand Down Expand Up @@ -278,6 +292,7 @@ private sealed class SubscriptionEnumerator : IAsyncEnumerator<EventMessageResul
private readonly CancellationToken _cancellationToken;
private readonly IDisposable _subscriptionScope;
private readonly SourceSchemaResult[] _resultBuffer = new SourceSchemaResult[1];
private readonly SourceSchemaErrors?[] _errorsBuffer = new SourceSchemaErrors[1];
private bool _completed;
private bool _disposed;

Expand Down Expand Up @@ -322,8 +337,10 @@ public async ValueTask<bool> MoveNextAsync()

if (hasResult)
{
_resultBuffer[0] = _resultEnumerator.Current;
_context.AddPartialResults(_node._source, _resultBuffer, _node._responseNames);
var result = _resultEnumerator.Current;
_resultBuffer[0] = result;
_errorsBuffer[0] = SourceSchemaErrors.From(result.Errors, _context, _node);
_context.AddPartialResults(_node._source, _resultBuffer, _errorsBuffer, _node._responseNames);
}
}
catch (Exception exception)
Expand All @@ -340,7 +357,14 @@ public async ValueTask<bool> MoveNextAsync()
Exception: exception,
VariableValueSets: _context.GetVariableValueSets(_node));

var error = ErrorBuilder.FromException(exception).Build();
var errorBuilder = ErrorBuilder.FromException(exception);

if (_context.CollectTelemetry)
{
errorBuilder.SetExtension(WellKnownErrorExtensions.SourceOperationPlanNodeId, _node.Id);
}

var error = errorBuilder.Build();

_context.AddErrors(error, _node._responseNames, Path.Root);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ private static void WriteOperationNode(
jsonWriter.WriteNumber("id", node.Id);
jsonWriter.WriteString("type", node.Type.ToString());

if (!string.IsNullOrEmpty(node.SchemaName))
var schemaName = node.SchemaName ?? trace?.SchemaName;

if (!string.IsNullOrEmpty(schemaName))
{
jsonWriter.WriteString("schema", node.SchemaName);
jsonWriter.WriteString("schema", schemaName);
}

jsonWriter.WriteStartObject("operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ private static void WriteOperationNode(OperationExecutionNode node, ExecutionNod

writer.WriteLine("type: {0}", "Operation");

if (node.SchemaName is not null)
var schemaName = node.SchemaName ?? trace?.SchemaName;

if (!string.IsNullOrEmpty(schemaName))
{
writer.WriteLine("schema: {0}", node.SchemaName);
writer.WriteLine("schema: {0}", schemaName);
}

writer.WriteLine("operation: >-");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,23 @@ internal ImmutableArray<VariableValues> GetVariableValueSets(ExecutionNode node)
return [];
}

return _nodeContexts.TryGetValue(node.Id, out var variableValueSets)
? variableValueSets.Variables
return _nodeContexts.TryGetValue(node.Id, out var context)
? context.Variables
: [];
}

internal string? GetSchemaName(ExecutionNode node)
{
if (!CollectTelemetry)
{
return null;
}

return _nodeContexts.TryGetValue(node.Id, out var context)
? context.SchemaName
: null;
}

internal void CompleteNode(ExecutionNodeResult result)
=> _executionState.EnqueueForCompletion(result);

Expand Down Expand Up @@ -182,8 +194,9 @@ internal ImmutableArray<VariableValues> CreateVariableValueSets(
internal void AddPartialResults(
SelectionPath sourcePath,
ReadOnlySpan<SourceSchemaResult> results,
ReadOnlySpan<SourceSchemaErrors?> errors,
ReadOnlySpan<string> responseNames)
=> _resultStore.AddPartialResults(sourcePath, results, responseNames);
=> _resultStore.AddPartialResults(sourcePath, results, errors, responseNames);

internal void AddPartialResults(ObjectResult result, ReadOnlySpan<Selection> selections)
=> _resultStore.AddPartialResults(result, selections);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ async IAsyncEnumerable<IOperationResult> CreateResponseStream()
eventArgs.Duration,
Exception: null,
DependentsToExecute: [],
VariableValueSets: eventArgs.VariableValueSets));
VariableValueSets: eventArgs.VariableValueSets,
SchemaName: null));

while (!cancellationToken.IsCancellationRequested && executionState.IsProcessing())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public void Reset(ResultPoolSession resultPoolSession)
public void AddPartialResults(
SelectionPath sourcePath,
ReadOnlySpan<SourceSchemaResult> results,
ReadOnlySpan<SourceSchemaErrors?> errors,
ReadOnlySpan<string> responseNames)
{
ObjectDisposedException.ThrowIf(_disposed, this);
Expand All @@ -88,6 +89,7 @@ public void AddPartialResults(
try
{
ref var result = ref MemoryMarshal.GetReference(results);
ref var sourceSchemaError = ref MemoryMarshal.GetReference(errors);
ref var dataElement = ref MemoryMarshal.GetReference(dataElementsSpan);
ref var errorTrie = ref MemoryMarshal.GetReference(errorTriesSpan);
ref var end = ref Unsafe.Add(ref result, results.Length);
Expand All @@ -97,15 +99,16 @@ public void AddPartialResults(
// we need to track the result objects as they used rented memory.
_memory.Push(result);

if (result.Errors?.RootErrors is { Length: > 0 } rootErrors)
if (sourceSchemaError?.RootErrors is { Length: > 0 } rootErrors)
{
_errors.AddRange(rootErrors);
}

dataElement = GetDataElement(sourcePath, result.Data);
errorTrie = GetErrorTrie(sourcePath, result.Errors?.Trie);
errorTrie = GetErrorTrie(sourcePath, sourceSchemaError?.Trie);

result = ref Unsafe.Add(ref result, 1)!;
sourceSchemaError = ref Unsafe.Add(ref sourceSchemaError, 1);
dataElement = ref Unsafe.Add(ref dataElement, 1);
errorTrie = ref Unsafe.Add(ref errorTrie, 1)!;
}
Expand Down
Loading
Loading