How to change MediatR publish strategy #736
Replies: 4 comments 15 replies
-
Are you using the DI extensions package? If so, you can specify the implementation type: services.AddMediatR(opt => opt.Using<MyCustomMediatorType>(), typeof(Program)); |
Beta Was this translation helpful? Give feedback.
-
EDIT: Do not use this implementation, it has some flaws. You can find a better one here. The example implementations are provided in the docs here. But, the example uses a separate construct
services.AddMediatR(options => options.Using<CustomMediator>().AsScoped(), typeof(Program));
public static class MediatorExtensions
{
public static Task Publish<TNotification>(this IMediator mediator, TNotification notification, PublishStrategy strategy, CancellationToken cancellationToken)
where TNotification : INotification
{
if (mediator is CustomMediator customMediator)
{
return customMediator.Publish(notification, strategy, cancellationToken);
}
throw new NotImplementedException("The custom mediator implementation is not registered!");
}
} The public class CustomMediator : Mediator
{
private readonly ServiceFactory _serviceFactory;
private readonly Func<IEnumerable<Func<INotification, CancellationToken, Task>>, INotification, CancellationToken, Task> _publish;
private readonly Dictionary<PublishStrategy, IMediator> _publishStrategies;
private CustomMediator(ServiceFactory serviceFactory, Func<IEnumerable<Func<INotification, CancellationToken, Task>>, INotification, CancellationToken, Task> publish) : base(serviceFactory)
{
_serviceFactory = serviceFactory;
_publish = publish;
_publishStrategies = default!;
}
public CustomMediator(ServiceFactory serviceFactory) : base(serviceFactory)
{
_serviceFactory = serviceFactory;
_publish = base.PublishCore;
_publishStrategies = new();
_publishStrategies[PublishStrategy.Async] = new CustomMediator(_serviceFactory, AsyncContinueOnException);
_publishStrategies[PublishStrategy.ParallelNoWait] = new CustomMediator(_serviceFactory, ParallelNoWait);
_publishStrategies[PublishStrategy.ParallelWhenAll] = new CustomMediator(_serviceFactory, ParallelWhenAll);
_publishStrategies[PublishStrategy.ParallelWhenAny] = new CustomMediator(_serviceFactory, ParallelWhenAny);
_publishStrategies[PublishStrategy.SyncContinueOnException] = new CustomMediator(_serviceFactory, SyncContinueOnException);
_publishStrategies[PublishStrategy.SyncStopOnException] = new CustomMediator(_serviceFactory, SyncStopOnException);
}
protected override Task PublishCore(IEnumerable<Func<INotification, CancellationToken, Task>> allHandlers, INotification notification, CancellationToken cancellationToken)
{
return _publish(allHandlers, notification, cancellationToken);
}
public Task Publish<TNotification>(TNotification notification, PublishStrategy strategy, CancellationToken cancellationToken)
where TNotification : INotification
{
if (!_publishStrategies.TryGetValue(strategy, out var mediator))
{
throw new ArgumentException($"Unknown strategy: {strategy}");
}
return mediator.Publish(notification, cancellationToken);
}
private Task ParallelWhenAll(IEnumerable<Func<INotification, CancellationToken, Task>> handlers, INotification notification, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var handler in handlers)
{
tasks.Add(Task.Run(() => handler(notification, cancellationToken)));
}
return Task.WhenAll(tasks);
}
private Task ParallelWhenAny(IEnumerable<Func<INotification, CancellationToken, Task>> handlers, INotification notification, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var handler in handlers)
{
tasks.Add(Task.Run(() => handler(notification, cancellationToken)));
}
return Task.WhenAny(tasks);
}
private Task ParallelNoWait(IEnumerable<Func<INotification, CancellationToken, Task>> handlers, INotification notification, CancellationToken cancellationToken)
{
foreach (var handler in handlers)
{
Task.Run(() => handler(notification, cancellationToken));
}
return Task.CompletedTask;
}
private async Task AsyncContinueOnException(IEnumerable<Func<INotification, CancellationToken, Task>> handlers, INotification notification, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
var exceptions = new List<Exception>();
foreach (var handler in handlers)
{
try
{
tasks.Add(handler(notification, cancellationToken));
}
catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException))
{
exceptions.Add(ex);
}
}
try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch (AggregateException ex)
{
exceptions.AddRange(ex.Flatten().InnerExceptions);
}
catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException))
{
exceptions.Add(ex);
}
if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
private async Task SyncStopOnException(IEnumerable<Func<INotification, CancellationToken, Task>> handlers, INotification notification, CancellationToken cancellationToken)
{
foreach (var handler in handlers)
{
await handler(notification, cancellationToken).ConfigureAwait(false);
}
}
private async Task SyncContinueOnException(IEnumerable<Func<INotification, CancellationToken, Task>> handlers, INotification notification, CancellationToken cancellationToken)
{
var exceptions = new List<Exception>();
foreach (var handler in handlers)
{
try
{
await handler(notification, cancellationToken).ConfigureAwait(false);
}
catch (AggregateException ex)
{
exceptions.AddRange(ex.Flatten().InnerExceptions);
}
catch (Exception ex) when (!(ex is OutOfMemoryException || ex is StackOverflowException))
{
exceptions.Add(ex);
}
}
if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
}
public enum PublishStrategy
{
/// <summary>
/// Run each notification handler after one another. Returns when all handlers are finished. In case of any exception(s), they will be captured in an AggregateException.
/// </summary>
SyncContinueOnException = 0,
/// <summary>
/// Run each notification handler after one another. Returns when all handlers are finished or an exception has been thrown. In case of an exception, any handlers after that will not be run.
/// </summary>
SyncStopOnException = 1,
/// <summary>
/// Run all notification handlers asynchronously. Returns when all handlers are finished. In case of any exception(s), they will be captured in an AggregateException.
/// </summary>
Async = 2,
/// <summary>
/// Run each notification handler on it's own thread using Task.Run(). Returns immediately and does not wait for any handlers to finish. Note that you cannot capture any exceptions, even if you await the call to Publish.
/// </summary>
ParallelNoWait = 3,
/// <summary>
/// Run each notification handler on it's own thread using Task.Run(). Returns when all threads (handlers) are finished. In case of any exception(s), they are captured in an AggregateException by Task.WhenAll.
/// </summary>
ParallelWhenAll = 4,
/// <summary>
/// Run each notification handler on it's own thread using Task.Run(). Returns when any thread (handler) is finished. Note that you cannot capture any exceptions (See msdn documentation of Task.WhenAny)
/// </summary>
ParallelWhenAny = 5,
} |
Beta Was this translation helpful? Give feedback.
-
Why you don't simply provide a built-in solution? |
Beta Was this translation helpful? Give feedback.
-
Notifications / events are fairly complicated to implement and come with substantial risk of introducing bugs to the battle tested code base - which by the way, hats off - absolutely awesome. I'm not sure if the way I've wrapped my head around it from the examples is exactly what you intended, but I ended up in the same boat as those on this thread - implementing custom mediators, publishers, etc... Take this with a grain of salt as I have not familiarized myself with the codebase well enough to say the suggestion I'm about to propose is sound. That said, it might be easier for end users to understand and less risky implement if the design pattern was more pain job and less engine tweaks. Again, I'm a laymen on the codebase which is an awesome piece of work so take this Hail Mary for what it is.... What I'll call a "decorator" pattern for lack of a classification seems (again - coming from laymen) to achieve the objective and seems to provide more (transparent) control over logic, sequencing, etc. See the base class below for an untested reference implementation as food for thought - would love to contribute the amazing work you've done or - more likely - be told to sit on the bench with the rest of the amateurs. /// Base class implementation - PublishAsync is overridden for bespoke publishing implementations.
/// The method takes bunch of parameters to cover the use cases from your example. These would
/// obviously make more sense passed to constructor on config object...
public abstract class PublisherDecoratorBase<TNotification> where TNotification : INotification
{
protected readonly Context _context;
public PublisherDecoratorBase(IMediator mediator)
=> _context = new Context(mediator);
public async Task PublishAsync(TNotification @notification)
=> await _context.Notifications.Writer.WriteAsync(@notification);
protected abstract Task ProcessAsync(bool continueOnException = false,
bool throwException = true, CancellationToken token = default);
protected class Context
{
public Context(IMediator mediator) => Mediator = mediator;
// Unbounded to simplify illustration
public Channel<TNotification> Notifications { get; } = Channel.CreateUnbounded<TNotification>();
public IMediator Mediator {get; init;}
}
} Psuedo code implemenation below (never been run) but if you think it's an approach worth pursuing, in the unlikely event you think it's something worth pursuing I'll be more than happy to lend some elbow grease and polish it into actual implementation. The class would obviously be injected into and inherited implementation of public class AsyncPublisherDecorator<TNotification> : PublisherDecoratorBase<TNotification>
where TNotification : INotification
{
public AsyncPublisherDecorator(IMediator mediator) : base(mediator) {}
// Overriden implementation which handles the use cases in your sample as if I
// understand them except for parallelizing tasks, which would be handled by simply
// running multiple Task.Run(()=>PublishAsync()) calls to add simultaneous consummers
public override async Task ProcessAsync(bool continueOnException = false,
bool throwException = true, CancellationToken token = default)
{
var exceptions = new List<Exception>();
await foreach (var item in _context.Notifications.Reader.ReadAllAsync(token))
{
try
{
await _context.Mediator.Publish(item, token);
}
catch (Exception ex)
{
if (!continueOnException && throwException) throw;
if (!continueOnException) break;
exceptions.Add(ex);
}
}
if (throwException && exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
} |
Beta Was this translation helpful? Give feedback.
-
how is it possible to configure another publishing strategy when using MediatR? basically i need to configure it to use publishingStrategy.ParallelNoWait. any help would be appreciated since im not able to find any documentation regarding this.
Beta Was this translation helpful? Give feedback.
All reactions