Skip to content

Commit

Permalink
.Net Processes - Add Process-Level Error Handler (#9477)
Browse files Browse the repository at this point in the history
### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

Enabled support for function-specific error-handler step in this PR:
#9187

Fixes: #9291

This change provides the ability to define a _process scoped_ error
handler (as opposed to function specific).

When a function-scoped error-handler is defined, it will take
precedence.

### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

```c#
ProcessBuilder process = new(nameof(ProcessFunctionErrorHandledAsync));

ProcessStepBuilder errorStep = process.AddStepFromType<ErrorStep>();
process.OnError().SendEventTo(new ProcessFunctionTargetBuilder(errorStep));

class ErrorStep : KernelProcessStep
{
    [KernelFunction]
    public void GlobalErrorHandler(Exception exception) { }
}
```

**Notes:**
- Switch error handler from passing `Exception` object to a
`KernelProcessError` to satisfy serialization expectations
- Normalized namespaces for `Internal` shared code
- Introduced shared `ProcessConstants` file
- Opportunistically converted some `List` creation to `Array`
- Opportunistically included parameter name in some `Verify` assertions.
- Opportunistically removed a extraneous _not-null_ directives (`!`)
- Verified DAPR error handling in demo app (`True` means the expected
error handler was invoked):
<img width="449" alt="image"
src="https://github.com/user-attachments/assets/2d987378-edd2-4c9f-92dd-cf112888b8b0">


### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
crickman authored Oct 31, 2024
1 parent 303c202 commit 5ac8460
Show file tree
Hide file tree
Showing 30 changed files with 395 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public ProcessController(Kernel kernel)
public async Task<IActionResult> PostAsync(string processId)
{
var process = this.GetProcess();
var processContext = await process.StartAsync(this._kernel, new KernelProcessEvent() { Id = CommonEvents.StartProcess }, processId: processId);
var processContext = await process.StartAsync(new KernelProcessEvent() { Id = CommonEvents.StartProcess }, processId: processId);
var finalState = await processContext.GetStateAsync();

return this.Ok(processId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public void RenderDone()
/// Render exception
/// </summary>
[KernelFunction]
public void RenderError(Exception exception, ILogger logger)
public void RenderError(KernelProcessError error, ILogger logger)
{
string message = string.IsNullOrWhiteSpace(exception.Message) ? "Unexpected failure" : exception.Message;
Render($"ERROR: {message} [{exception.GetType().Name}]{Environment.NewLine}{exception.StackTrace}");
logger.LogError(exception, "Unexpected failure.");
string message = string.IsNullOrWhiteSpace(error.Message) ? "Unexpected failure" : error.Message;
Render($"ERROR: {message} [{error.GetType().Name}]{Environment.NewLine}{error.StackTrace}");
logger.LogError("Unexpected failure: {ErrorMessage} [{ErrorType}]", error.Message, error.Type);
}

/// <summary>
Expand Down
39 changes: 39 additions & 0 deletions dotnet/src/Experimental/Process.Abstractions/KernelProcessError.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Runtime.Serialization;

namespace Microsoft.SemanticKernel;

/// <summary>
/// Represents an failure that occurred during the execution of a process.
/// </summary>
/// <remarks>
/// Initializes a new instance of the <see cref="KernelProcessError"/> class.
/// </remarks>
/// <param name="Type">The exception type name</param>
/// <param name="Message">The exception message (<see cref="Exception.Message"/></param>
/// <param name="StackTrace">The exception stack-trace (<see cref="Exception.StackTrace"/></param>
[DataContract]
public sealed record KernelProcessError(
[property:DataMember]
string Type,
[property:DataMember]
string Message,
[property:DataMember]
string? StackTrace)
{
/// <summary>
/// The inner failure, when exists, as <see cref="KernelProcessError"/>.
/// </summary>
[DataMember]
public KernelProcessError? InnerError { get; init; }

/// <summary>
/// Factory method to create a <see cref="KernelProcessError"/> from a source <see cref="Exception"/> object.
/// </summary>
public static KernelProcessError FromException(Exception ex) =>
new(ex.GetType().Name, ex.Message, ex.StackTrace)
{
InnerError = ex.InnerException is not null ? FromException(ex.InnerException) : null
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;

namespace Microsoft.SemanticKernel;
Expand Down
17 changes: 3 additions & 14 deletions dotnet/src/Experimental/Process.Core/Internal/EndStep.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;

namespace Microsoft.SemanticKernel;
Expand All @@ -10,18 +11,6 @@ namespace Microsoft.SemanticKernel;
/// </summary>
internal sealed class EndStep : ProcessStepBuilder
{
private const string EndStepValue = "Microsoft.SemanticKernel.Process.EndStep";

/// <summary>
/// The name of the end step.
/// </summary>
public const string EndStepName = EndStepValue;

/// <summary>
/// The event ID for stopping a process.
/// </summary>
public const string EndStepId = EndStepValue;

/// <summary>
/// The static instance of the <see cref="EndStep"/> class.
/// </summary>
Expand All @@ -31,7 +20,7 @@ internal sealed class EndStep : ProcessStepBuilder
/// Represents the end of a process.
/// </summary>
internal EndStep()
: base(EndStepName)
: base(ProcessConstants.EndStepName)
{
}

Expand All @@ -49,6 +38,6 @@ internal override KernelProcessStepInfo BuildStep()
internal override KernelProcessStepInfo BuildStep(KernelProcessStepStateMetadata<object>? stateMetadata)
{
// The end step has no state.
return new KernelProcessStepInfo(typeof(KernelProcessStepState), new KernelProcessStepState(EndStepName), []);
return new KernelProcessStepInfo(typeof(KernelProcessStepState), new KernelProcessStepState(ProcessConstants.EndStepName), []);
}
}
14 changes: 14 additions & 0 deletions dotnet/src/Experimental/Process.Core/ProcessBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;

namespace Microsoft.SemanticKernel;
Expand Down Expand Up @@ -177,6 +178,19 @@ public ProcessEdgeBuilder OnInputEvent(string eventId)
return new ProcessEdgeBuilder(this, eventId);
}

/// <summary>
/// Provides an instance of <see cref="ProcessStepEdgeBuilder"/> for defining an edge to a
/// step that responds to an unhandled process error.
/// </summary>
/// <returns>An instance of <see cref="ProcessStepEdgeBuilder"/></returns>
/// <remarks>
/// To target a specific error source, use the <see cref="ProcessStepBuilder.OnFunctionError"/> on the step.
/// </remarks>
public ProcessEdgeBuilder OnError()
{
return new ProcessEdgeBuilder(this, ProcessConstants.GlobalErrorEventId);
}

/// <summary>
/// Retrieves the target for a given external event. The step associated with the target is the process itself (this).
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion dotnet/src/Experimental/Process.Core/ProcessStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using Microsoft.SemanticKernel.Process;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Models;

namespace Microsoft.SemanticKernel;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using Microsoft.SemanticKernel.Process.Internal;

namespace Microsoft.SemanticKernel;

Expand Down Expand Up @@ -76,6 +77,6 @@ public void StopProcess()

var outputTarget = new ProcessFunctionTargetBuilder(EndStep.Instance);
this.Target = outputTarget;
this.Source.LinkTo(EndStep.EndStepName, this);
this.Source.LinkTo(ProcessConstants.EndStepName, this);
}
}
27 changes: 21 additions & 6 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Runtime;
using Microsoft.VisualStudio.Threading;

namespace Microsoft.SemanticKernel;

internal sealed class LocalProcess : LocalStep, IDisposable
{
private const string EndProcessId = "Microsoft.SemanticKernel.Process.EndStep";
private readonly JoinableTaskFactory _joinableTaskFactory;
private readonly JoinableTaskContext _joinableTaskContext;
private readonly Channel<KernelProcessEvent> _externalEventChannel;
Expand Down Expand Up @@ -240,11 +240,11 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep
}

// Complete the writing side, indicating no more messages in this superstep.
var messagesToProcess = messageChannel.ToList();
var messagesToProcess = messageChannel.ToArray();
messageChannel.Clear();

// If there are no messages to process, wait for an external event.
if (messagesToProcess.Count == 0)
if (messagesToProcess.Length == 0)
{
if (!keepAlive || !await this._externalEventChannel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
Expand All @@ -257,7 +257,7 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep
foreach (var message in messagesToProcess)
{
// Check for end condition
if (message.DestinationId.Equals(EndProcessId, StringComparison.OrdinalIgnoreCase))
if (message.DestinationId.Equals(ProcessConstants.EndStepName, StringComparison.OrdinalIgnoreCase))
{
this._processCancelSource?.Cancel();
break;
Expand Down Expand Up @@ -320,7 +320,7 @@ private void EnqueueExternalMessages(Queue<ProcessMessage> messageChannel)
private void EnqueueStepMessages(LocalStep step, Queue<ProcessMessage> messageChannel)
{
var allStepEvents = step.GetAllEvents();
foreach (var stepEvent in allStepEvents)
foreach (ProcessEvent stepEvent in allStepEvents)
{
// Emit the event out of the process (this one) if it's visibility is public.
if (stepEvent.Visibility == KernelProcessEventVisibility.Public)
Expand All @@ -329,10 +329,25 @@ private void EnqueueStepMessages(LocalStep step, Queue<ProcessMessage> messageCh
}

// Get the edges for the event and queue up the messages to be sent to the next steps.
foreach (var edge in step.GetEdgeForEvent(stepEvent.Id!))
bool foundEdge = false;
foreach (KernelProcessEdge edge in step.GetEdgeForEvent(stepEvent.Id))
{
ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, stepEvent.Data);
messageChannel.Enqueue(message);
foundEdge = true;
}

// Error event was raised with no edge to handle it, send it to an edge defined as the global error target.
if (!foundEdge && stepEvent.IsError)
{
if (this._outputEdges.TryGetValue(ProcessConstants.GlobalErrorEventId, out List<KernelProcessEdge>? edges))
{
foreach (KernelProcessEdge edge in edges)
{
ProcessMessage message = ProcessMessageFactory.CreateFromEdge(edge, stepEvent.Data);
messageChannel.Enqueue(message);
}
}
}
}
}
Expand Down
44 changes: 28 additions & 16 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.SemanticKernel.Process.Internal;
using Microsoft.SemanticKernel.Process.Runtime;

namespace Microsoft.SemanticKernel;
Expand Down Expand Up @@ -111,9 +112,17 @@ internal IEnumerable<KernelProcessEdge> GetEdgeForEvent(string eventId)
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <returns>A <see cref="ValueTask"/></returns>
public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
public ValueTask EmitEventAsync(KernelProcessEvent processEvent) => this.EmitEventAsync(processEvent, isError: false);

/// <summary>
/// Emits an event from the step.
/// </summary>
/// <param name="processEvent">The event to emit.</param>
/// <param name="isError">Flag indicating if the event being emitted is in response to a step failure</param>
/// <returns>A <see cref="ValueTask"/></returns>
internal ValueTask EmitEventAsync(KernelProcessEvent processEvent, bool isError)
{
this.EmitEvent(ProcessEvent.FromKernelProcessEvent(processEvent, this._eventNamespace));
this.EmitEvent(new ProcessEvent(this._eventNamespace, processEvent, isError));
return default;
}

Expand Down Expand Up @@ -148,7 +157,7 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)

if (!this._inputs.TryGetValue(message.FunctionName, out Dictionary<string, object?>? functionParameters))
{
this._inputs[message.FunctionName] = new();
this._inputs[message.FunctionName] = [];
functionParameters = this._inputs[message.FunctionName];
}

Expand Down Expand Up @@ -179,28 +188,31 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)
throw new ArgumentException($"Function {targetFunction} not found in plugin {this.Name}");
}

FunctionResult? invokeResult = null;
string? eventName = null;
object? eventValue = null;

// Invoke the function, catching all exceptions that it may throw, and then post the appropriate event.
#pragma warning disable CA1031 // Do not catch general exception types
try
{
invokeResult = await this.InvokeFunction(function, this._kernel, arguments).ConfigureAwait(false);
eventName = $"{targetFunction}.OnResult";
eventValue = invokeResult?.GetValue<object>();
FunctionResult invokeResult = await this.InvokeFunction(function, this._kernel, arguments).ConfigureAwait(false);
await this.EmitEventAsync(
new KernelProcessEvent
{
Id = $"{targetFunction}.OnResult",
Data = invokeResult.GetValue<object>(),
}).ConfigureAwait(false);
}
catch (Exception ex)
{
this._logger.LogError("Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
eventName = $"{targetFunction}.OnError";
eventValue = ex;
await this.EmitEventAsync(
new KernelProcessEvent
{
Id = $"{targetFunction}.OnError",
Data = KernelProcessError.FromException(ex),
},
isError: true).ConfigureAwait(false);
}
finally
{
await this.EmitEventAsync(new KernelProcessEvent { Id = eventName, Data = eventValue }).ConfigureAwait(false);

// Reset the inputs for the function that was just executed
this._inputs[targetFunction] = new(this._initialInputs[targetFunction] ?? []);
}
Expand All @@ -216,7 +228,7 @@ protected virtual async ValueTask InitializeStepAsync()
{
// Instantiate an instance of the inner step object
KernelProcessStep stepInstance = (KernelProcessStep)ActivatorUtilities.CreateInstance(this._kernel.Services, this._stepInfo.InnerStepType);
var kernelPlugin = KernelPluginFactory.CreateFromObject(stepInstance, pluginName: this._stepInfo.State.Name!);
var kernelPlugin = KernelPluginFactory.CreateFromObject(stepInstance, pluginName: this._stepInfo.State.Name);

// Load the kernel functions
foreach (KernelFunction f in kernelPlugin)
Expand Down Expand Up @@ -312,6 +324,6 @@ protected ProcessEvent ScopedEvent(ProcessEvent localEvent)
protected ProcessEvent ScopedEvent(KernelProcessEvent processEvent)
{
Verify.NotNull(processEvent, nameof(processEvent));
return ProcessEvent.FromKernelProcessEvent(processEvent, $"{this.Name}_{this.Id}");
return new ProcessEvent($"{this.Name}_{this.Id}", processEvent);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dapr.Actors.Runtime;
using Microsoft.SemanticKernel.Process.Runtime;
Expand All @@ -27,10 +26,10 @@ public EventBufferActor(ActorHost host) : base(host)
/// Dequeues an event.
/// </summary>
/// <returns>A <see cref="List{T}"/> where T is <see cref="ProcessEvent"/></returns>
public async Task<List<ProcessEvent>> DequeueAllAsync()
public async Task<IList<ProcessEvent>> DequeueAllAsync()
{
// Dequeue and clear the queue.
var items = this._queue!.ToList();
var items = this._queue!.ToArray();
this._queue!.Clear();

// Save the state.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Dapr.Actors.Runtime;
using Microsoft.SemanticKernel.Process.Runtime;
Expand All @@ -27,10 +26,10 @@ public MessageBufferActor(ActorHost host) : base(host)
/// Dequeues an event.
/// </summary>
/// <returns>A <see cref="List{T}"/> where T is <see cref="ProcessEvent"/></returns>
public async Task<List<ProcessMessage>> DequeueAllAsync()
public async Task<IList<ProcessMessage>> DequeueAllAsync()
{
// Dequeue and clear the queue.
var items = this._queue!.ToList();
var items = this._queue!.ToArray();
this._queue!.Clear();

// Save the state.
Expand Down
Loading

0 comments on commit 5ac8460

Please sign in to comment.