Skip to content

22. 事件总线

📝 模块更新日志 新特性*

+ 新增 事件总线支持事件处理程序执行结果订阅 4\.9\.1\.47 ⏱️2024\.03\.13 [f3028fa](https://gitee.com/dotnetchina/Furion/commit/f3028fa8a397c326073683797aac7d038e2b4dc7)
+ 新增 **事件总线支持配置处理程序只消费一次** 4\.9\.1\.24 ⏱️2024\.01\.19 [dc882eb](https://gitee.com/dotnetchina/Furion/commit/dc882ebc8f8e37977b4a55963711b0fa86f2c741)
+ 新增 事件总线支持简单的 `Order` 编排规则 4\.8\.0 [833c0d4](https://gitee.com/dotnetchina/Furion/commit/833c0d4d069bca5f5304aa21cb32c7f902c20c69)
+ 新增 事件总线 `.ReplaceStorerOrFallback` 自定义事件源存储器方法,可在自定义初始失败时回退到默认值 4\.7\.6 [\#I602NU](https://gitee.com/dotnetchina/Furion/issues/I602NU)
+ 新增 事件总线模块重试失败后支持回调 4\.6\.1 [\#I5UVMV](https://gitee.com/dotnetchina/Furion/issues/I5UVMV)
+ 新增 事件总线 `LogEnabled` 配置,可控制是否输出服务日志 [\#I5QLY5](https://gitee.com/dotnetchina/Furion/issues/I5QLY5)
+ 新增 **事件总线 `MessageCenter` 静态类,解决从 `Fur v1.x` 版本升级问题 [a29fc7c](https://gitee.com/dotnetchina/Furion/commit/a29fc7cf63a3ea41b1617a6ad98a701a243e24f8)**
+ 新增 **事件总线工厂,支持运行时动态添加订阅程序和移除订阅程序** [\#I5NNQX](https://gitee.com/dotnetchina/Furion/issues/I5NNQX)
+ 新增 **事件总线 `[EventSubscribe]` 事件 `Id` 支持正则表达式匹配** [\#I5NNQX](https://gitee.com/dotnetchina/Furion/issues/I5NNQX)
+ 新增 **事件总线 `[EventSubscribe]` 支持局部失败重试配置** [\#I5NNQX](https://gitee.com/dotnetchina/Furion/issues/I5NNQX)
+ 新增 事件总线 `options.AddSubscriber(Type)` 重载 [42446078](https://gitee.com/dotnetchina/Furion/blob/424460780b630e1c71de4db84ad8fd14e33a09f5/framework/Furion.Pure/EventBus/Builders/EventBusOptionsBuilder.cs)
+ 新增 事件总线 `UseUtcTimestamp` 选项配置,可选择使用 `DateTime.UtcNow` 还是 `DateTime.Now`,默认是 `DateTime.Now` [\#I5JSEU](https://gitee.com/dotnetchina/Furion/issues/I5JSEU)
+ 新增 事件总线模块事件 `Id` 支持枚举类型 [2f328aa](https://gitee.com/dotnetchina/Furion/commit/2f328aa8213c8efe7a8480116985573cc6b7fce6)
+ 新增 事件总线模块发布者 `PublishAsync` 和 `PublishDelayAsync` 重载 [2f328aa](https://gitee.com/dotnetchina/Furion/commit/2f328aa8213c8efe7a8480116985573cc6b7fce6)
+ 新增 事件总线模块拓展方法:`Enum.ParseToString()` 和 `String.ParseToEnum()` [2f328aa](https://gitee.com/dotnetchina/Furion/commit/2f328aa8213c8efe7a8480116985573cc6b7fce6)
  • 突破性变化

    • 新增 事件总线支持配置处理程序只消费一次 4.9.1.24 ⏱️2024.01.19 dc882eb
    • 调整 事件总线触发处理程序的逻辑,由过去的 foreach 改为 Parallel.ForEach,吞吐量提升近 4 倍 4.6.4 7384c9c
    • 调整 事件总线 IEventBusFactory 事件工厂方法 AddSubscriber -> SubscribeRemoveSubscriber -> Unsubscribe a29fc7c
    • 问题修复

    • 修复 事件总线在 WinForm 中将组件作为事件总线处理程序不能更新 UI 线程问题 4.9.5.12 ⏱️2024.09.23 #IAT1JG

    • 修复 事件总线因 4.9.1.330bbeb49) 版本导致自定义事件存储器无法消费动态订阅器问题 4.9.1.34 ⏱️2024.02.28 d66175c
    • 修复 超高频率下发送事件总线消息,但是 GC 来不及回收导致内存和 CPU 爆掉问题 4.6.8 dbc7935
    • 修复 基于 Redis 重写事件存储器序列化 IEventSource 实例异常问题 4.4.7 3e45020
    • 修复 事件总线默认 Channel 管道初始化时机过晚问题,解决部分第三方依赖使用问题 #I5MM3O
    • 修复 事件总线默认开启模糊匹配(正则表达式)导致不必要的订阅 #I5NVOP
    • 其他调整

    • 移除 事件总线针对 WinForm 的更改 4.9.5.13 ⏱️2024.09.29 #IAT1JG 3d33093

    • 优化 事件总线在超高频事件中内存占用(约 9%) 4.9.1.33 ⏱️2024.02.25 0bbeb49
    • 调整 事件总线默认日志类名为 System.Logging.EventBusService #I5QLY5
    • 调整 事件总线默认 Channel 管道初始化时机,解决部分第三方依赖使用问题 #I5MM3O

v2.20 之前版本说明Furion 2.20+ 版本采用 Jaina 事件总线替换原有的 EventBus查看旧文档

版本说明以下内容仅限 Furion 2.20.0 + 版本使用。

22.1 关于事件总线

事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,允许不同的组件之间进行彼此通信而又不需要相互依赖,达到一种解耦的目的。

22.2 快速入门

22.2.1 定义事件处理程序

定义事件订阅者 ToDoEventSubscriber

// 实现 IEventSubscriber 接口  
public class ToDoEventSubscriber : IEventSubscriber  
{  
    private readonly ILogger<ToDoEventSubscriber> _logger;  
    public ToDoEventSubscriber(ILogger<ToDoEventSubscriber> logger)  
    {  
        _logger = logger;  
    }  

    [EventSubscribe("ToDo:Create")]  
    public async Task CreateToDo(EventHandlerExecutingContext context)  
    {  
        var todo = context.Source;  
        _logger.LogInformation("创建一个 ToDo:{Name}", todo.Payload);  
        await Task.CompletedTask;  
    }  

    // 支持多个  
    [EventSubscribe("ToDo:Create")]  
    [EventSubscribe("ToDo:Update")]  
    public async Task CreateOrUpdateToDo(EventHandlerExecutingContext context)  
    {  
        var todo = context.Source;  
        _logger.LogInformation("创建或更新一个 ToDo:{Name}", todo.Payload);  
        await Task.CompletedTask;  
    }  

    // 支持枚举类型,v3.4.3+ 版本支持  
    [EventSubscribe(YourEnum.Some)]  
    public async Task EnumHandler(EventHandlerExecutingContext context)  
    {  
        var eventEnum = context.Source.EventId.ParseToEnum(); // 将事件 Id 转换成枚举对象  
        await Task.CompletedTask;  
    }  

    // 支持正则表达式匹配,4.2.10+ 版本支持  
    [EventSubscribe("(^1[3456789][0-9]{9}$)|((^[0-9]{3,4}\\-[0-9]{3,8}$)|(^[0-9]{3,8}$)|(^\\([0-9]{3,4}\\)[0-9]{3,8}$)|(^0{0,1}13[0-9]{9}$))", FuzzyMatch = true)]  
    public async Task RegexHandler(EventHandlerExecutingContext context)  
    {  
        var eventId = context.Source.EventId;  
        await Task.CompletedTask;  
    }  

    // 支持多种异常重试配置,Furion 4.2.10+ 版本支持  
    [EventSubscribe("test:error", NumRetries = 3)]  
    [EventSubscribe("test:error", NumRetries = 3, RetryTimeout = 1000)] // 重试间隔时间  
    [EventSubscribe("test:error", NumRetries = 3, ExceptionTypes = new[] { typeof(ArgumentException) })]    // 特定类型异常才重试  
    public async Task ExceptionHandler(EventHandlerExecutingContext context)  
    {  
        var eventId = context.Source.EventId;  
        await Task.CompletedTask;  
    }  

    // 支持简单 Order 编排,Furion 4.8.0+ 版本支持  
    [EventSubscribe("test:order", Order = 1)]  
    public async Task ExceptionHandler(EventHandlerExecutingContext context)  
    {  
        var eventId = context.Source.EventId;  
        await Task.CompletedTask;  
    }  
}  

22.2.2 发布事件消息

创建控制器 ToDoController,依赖注入 IEventPublisher 服务:

public class ToDoController : ControllerBase  
{  
    // 依赖注入事件发布者 IEventPublisher  
    private readonly IEventPublisher _eventPublisher;  
    public ToDoController(IEventPublisher eventPublisher)  
    {  
        _eventPublisher = eventPublisher;  
    }  

    // 发布 ToDo:Create 消息  
    public async Task CreateDoTo(string name)  
    {  
        await _eventPublisher.PublishAsync(new ChannelEventSource("ToDo:Create", name));  
        // 也可以延迟发布,比如延迟 3s  
        await _eventPublisher.PublishDelayAsync(new ChannelEventSource("ToDo:Create", name), 3000);  
    }  

    // v3.4.3+ 版本支持发送消息简化  
    public async Task CreateDoTo(string name)  
    {  
        await _eventPublisher.PublishAsync("ToDo:Create", name);  
        // 也可以延迟发布,比如延迟 3s  
        await _eventPublisher.PublishDelayAsync("ToDo:Create", 3000, name);  
        // 也支持枚举  
        await _eventPublisher.PublishAsync(YourEnum.Some);  
    }  
}  

22.2.3 注册事件服务

Startup.cs 注册 EventBus 服务:

// 注册 EventBus 服务  
services.AddEventBus(builder =>  
{  
    // 注册 ToDo 事件订阅者  
    builder.AddSubscriber<ToDoEventSubscriber>();  

    // 通过类型注册,Furion 4.2.1+ 版本  
    builder.AddSubscriber(typeof(ToDoEventSubscriber));  

    // 批量注册事件订阅者  
    builder.AddSubscribers(ass1, ass2, ....);  
});  

懒人提醒在 Furion 中可以不用通过 builder.AddSubscriber<T>() 方式一一注册,只需要实现 ISingleton 接口即可,如:

public class ToDoEventSubscriber : IEventSubscriber, ISingleton  
{  
}  

这样就无需写 ~~builder.AddSubscriber<ToDoEventSubscriber>();~~ 代码了,只需保留 services.AddEventBus() 服务即可。

22.2.4 运行项目

运行项目并触发事件发布接口或方法。

info: Jaina.Samples.ToDoEventSubscriber[0]  
      创建一个 ToDo:Jaina  

22.3 自定义事件源

Furion 使用 IEventSource 作为消息载体,任何实现该接口的类都可以充当消息载体。

如需自定义,只需实现 IEventSource 接口即可:

public class ToDoEventSource : IEventSource  
{  
    public ToDoEventSource()  
    {  
    }  

    public ToDoEventSource(string eventId, string todoName)  
    {  
        EventId = eventId;  
        ToDoName = todoName;  
    }  

    // 自定义属性  
    public string ToDoName { get; set; }  

    /// <summary>  
    /// 事件 Id  
    /// </summary>  
    public string EventId { get; set; }  

    /// <summary>  
    /// 事件承载(携带)数据  
    /// </summary>  
    public object Payload { get; set; }  

    /// <summary>  
    /// 事件创建时间  
    /// </summary>  
    public DateTime CreatedTime { get; set; } = DateTime.UtcNow;  

    /// <summary>  
    /// 消息是否只消费一次  
    /// </summary>  
    public bool IsConsumOnce { get; set; }  // Furion 4.9.1.24 添加  

    /// <summary>  
    /// 取消任务 Token  
    /// </summary>  
    /// <remarks>用于取消本次消息处理</remarks>  
    [Newtonsoft.Json.JsonIgnore]  
    [System.Text.Json.Serialization.JsonIgnore]  
    public CancellationToken CancellationToken { get; set; }  
}  

使用:

await _eventPublisher.PublishAsync(new ToDoEventSource ("ToDo:Create", "我的 ToDo Name"));  

22.4 自定义事件源存储器

Furion 默认采用 Channel 作为事件源 IEventSource 存储器,开发者可以使用任何消息队列组件进行替换,如 Kafka、RabbitMQ、ActiveMQ 等,也可以使用部分数据库 Redis、SQL Server、MySql 实现。

如需自定义,只需实现 IEventSourceStorer 接口即可。

22.4.1 Redis 自定义指南

Windows 版本 Redis 安装包下载https://github.com/tporadowski/redis

1. 安装 Redis 拓展包

Install-Package Microsoft.Extensions.Caching.StackExchangeRedis  

也可以直接安装 StackExchange.Redis 包。

2. 创建 RedisEventSourceStorer 自定义存储器

using Furion.EventBus;  
using StackExchange.Redis;  
using System;  
using System.Text.Json;  
using System.Threading;  
using System.Threading.Channels;  
using System.Threading.Tasks;  

namespace Furion.Core;  

public sealed class RedisEventSourceStorer : IEventSourceStorer, IDisposable  
{  
    /// <summary>  
    /// 内存通道事件源存储器  
    /// </summary>  
    private readonly Channel<IEventSource> _channel;  

    /// <summary>  
    /// Redis 连接对象  
    /// </summary>  
    private readonly ConnectionMultiplexer _connectionMultiplexer;  

    /// <summary>  
    /// 路由键  
    /// </summary>  
    private readonly string _routeKey;  

    /// <summary>  
    /// 构造函数  
    /// </summary>  
    /// <param name="connectionMultiplexer">Redis 连接对象</param>  
    /// <param name="routeKey">路由键</param>  
    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>  
    public RedisEventSourceStorer(ConnectionMultiplexer connectionMultiplexer, string routeKey, int capacity)  
    {  
        // 配置通道,设置超出默认容量后进入等待  
        var boundedChannelOptions = new BoundedChannelOptions(capacity)  
        {  
            FullMode = BoundedChannelFullMode.Wait  
        };  

        // 创建有限容量通道  
        _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);  

        _connectionMultiplexer = connectionMultiplexer;  
        _routeKey = routeKey;  

        // 获取一个订阅对象  
        var subscriber = connectionMultiplexer.GetSubscriber();  

        // 订阅消息  
        subscriber.Subscribe(RedisChannel.Literal(_routeKey), (channel, data) =>  
        {  
            // 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写  
            var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(data.ToString());  

            // 写入内存管道存储器  
            Task.Run(async () =>   
            {  
                await _channel.Writer.WriteAsync(eventSource);  
            });  
        });  
    }  

    /// <summary>  
    /// 将事件源写入存储器  
    /// </summary>  
    /// <param name="eventSource">事件源对象</param>  
    /// <param name="cancellationToken">取消任务 Token</param>  
    /// <returns><see cref="ValueTask"/></returns>  
    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)  
    {  
        // 空检查  
        if (eventSource == default)  
        {  
            throw new ArgumentNullException(nameof(eventSource));  
        }  

        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource  
        if (eventSource is ChannelEventSource source)  
        {  
            // 序列化,这里可以选择自己喜欢的序列化工具  
            var data = JsonSerializer.Serialize(source);  

            // 获取一个订阅对象  
            var subscriber = _connectionMultiplexer.GetSubscriber();  
            await subscriber.PublishAsync(RedisChannel.Literal(_routeKey), data);  
        }  
        else  
        {  
            // 这里处理动态订阅问题  
            await _channel.Writer.WriteAsync(eventSource, cancellationToken);  
        }  
    }  

    /// <summary>  
    /// 从存储器中读取一条事件源  
    /// </summary>  
    /// <param name="cancellationToken">取消任务 Token</param>  
    /// <returns>事件源对象</returns>  
    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)  
    {  
        // 读取一条事件源  
        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);  
        return eventSource;  
    }  

    /// <summary>  
    /// 释放非托管资源  
    /// </summary>  
    public void Dispose()  
    {  
        _connectionMultiplexer.Dispose();  
    }  
}  

3. 替换默认事件存储器

services.AddEventBus(options =>  
{  
    // 创建 Redis 连接对象  
    var connectionMultiplexer = ConnectionMultiplexer.Connect("localhost");  

    // 创建默认内存通道事件源对象,可自定义队列路由 key,比如这里是启动项目名称  
    var redisEventSourceStorer = new RedisEventSourceStorer(connectionMultiplexer, Assembly.GetExecutingAssembly().GetName().Name, 12000);  

    // 替换默认事件总线存储器  
    options.ReplaceStorer(serviceProvider =>  
    {  
        return redisEventSourceStorer;  
    });  
});  

关于 StackExchange.Redis 超时问题如遇到 StackExchange.Redis 超时(Timeout)问题,可编辑启动层的 .csproj 添加以下配置:

<Project Sdk="Microsoft.NET.Sdk">  

  <PropertyGroup>  
    <ThreadPoolMinThreads>50</ThreadPoolMinThreads>  
  </PropertyGroup>  

</Project>  

如果以上配置依然存在超时问题,那么可以在 Program.cs/Startup.cs 中添加 ThreadPool.SetMinThreads(200, 200);

22.4.2 RabbitMQ 自定义指南

版本说明以下内容仅限 Furion 4.3.4 + 版本使用。

由于使用 RabbitMQ 作为事件总线存储器的比较多,所以这里提供了完整的使用例子。

1. 安装 RabbitMQ.Client 拓展包

Install-Package RabbitMQ.Client  

2. 创建 RabbitMQEventSourceStorer 自定义存储器

using Furion.EventBus;  
using RabbitMQ.Client;  
using RabbitMQ.Client.Events;  
using System;  
using System.Text;  
using System.Text.Json;  
using System.Threading;  
using System.Threading.Channels;  
using System.Threading.Tasks;  

namespace Furion.Core;  

public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable  
{  
    /// <summary>  
    /// 内存通道事件源存储器  
    /// </summary>  
    private readonly Channel<IEventSource> _channel;  

    /// <summary>  
    /// 通道对象  
    /// </summary>  
    private readonly IModel _model;  

    /// <summary>  
    /// 连接对象  
    /// </summary>  
    private readonly IConnection _connection;  

    /// <summary>  
    /// 路由键  
    /// </summary>  
    private readonly string _routeKey;  

    /// <summary>  
    /// 构造函数  
    /// </summary>  
    /// <param name="factory">连接工厂</param>  
    /// <param name="routeKey">路由键</param>  
    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>  
    public RabbitMQEventSourceStorer(ConnectionFactory factory, string routeKey, int capacity)  
    {  
        // 配置通道,设置超出默认容量后进入等待  
        var boundedChannelOptions = new BoundedChannelOptions(capacity)  
        {  
            FullMode = BoundedChannelFullMode.Wait  
        };  

        // 创建有限容量通道  
        _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);  

        // 创建连接  
        _connection = factory.CreateConnection();  
        _routeKey = routeKey;  

        // 创建通道  
        _model = _connection.CreateModel();  

        // 声明路由队列  
        _model.QueueDeclare(routeKey, false, false, false, null);  

        // 创建消息订阅者  
        var consumer = new EventingBasicConsumer(_model);  

        // 订阅消息并写入内存 Channel  
        consumer.Received += (ch, ea) =>  
        {  
            // 读取原始消息  
            var stringEventSource = Encoding.UTF8.GetString(ea.Body.ToArray());  

            // 转换为 IEventSource,这里可以选择自己喜欢的序列化工具,如果自定义了 EventSource,注意属性是可读可写  
            var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(stringEventSource);  

            // 写入内存管道存储器  
            Task.Run(async () =>   
            {  
                await _channel.Writer.WriteAsync(eventSource);  
            });  

            // 确认该消息已被消费  
            _model.BasicAck(ea.DeliveryTag, false);  
        };  

        // 启动消费者 设置为手动应答消息  
        _model.BasicConsume(routeKey, false, consumer);  
    }  

    /// <summary>  
    /// 将事件源写入存储器  
    /// </summary>  
    /// <param name="eventSource">事件源对象</param>  
    /// <param name="cancellationToken">取消任务 Token</param>  
    /// <returns><see cref="ValueTask"/></returns>  
    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)  
    {  
        // 空检查  
        if (eventSource == default)  
        {  
            throw new ArgumentNullException(nameof(eventSource));  
        }  

        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource  
        if (eventSource is ChannelEventSource source)  
        {  
            // 序列化,这里可以选择自己喜欢的序列化工具  
            var data = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(source));  

            // 发布  
            _model.BasicPublish("", _routeKey, null, data);  
        }  
        else  
        {  
            // 这里处理动态订阅问题  
            await _channel.Writer.WriteAsync(eventSource, cancellationToken);  
        }  
    }  

    /// <summary>  
    /// 从存储器中读取一条事件源  
    /// </summary>  
    /// <param name="cancellationToken">取消任务 Token</param>  
    /// <returns>事件源对象</returns>  
    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)  
    {  
        // 读取一条事件源  
        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);  
        return eventSource;  
    }  

    /// <summary>  
    /// 释放非托管资源  
    /// </summary>  
    public void Dispose()  
    {  
        _model.Dispose();  
        _connection.Dispose();  
    }  
}  

3. 替换默认事件存储器

services.AddEventBus(options =>  
{  
    // 创建连接工厂  
    var factory = new ConnectionFactory  
    {  
        UserName = "admin",  
        Password = "q1w2e3",  
    };  

    // 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是启动项目名称  
    var rbmqEventSourceStorer = new RabbitMQEventSourceStorer(factory, Assembly.GetExecutingAssembly().GetName().Name, 12000);  

    // 替换默认事件总线存储器  
    options.ReplaceStorer(serviceProvider =>  
    {  
        return rbmqEventSourceStorer;  
    });  
});  

22.4.3 Kafka 自定义指南

1. 安装 Confluent.Kafka 拓展包

Install-Package Confluent.Kafka  

2. 创建 EventConsumer 订阅类

using Confluent.Kafka;  

namespace Furion.Core;  

/// <summary>  
/// Kafka 消息扩展  
/// </summary>  
/// <typeparam name="TKey"></typeparam>  
/// <typeparam name="TValue"></typeparam>  
public class EventConsumer<TKey, TValue> : IDisposable  
{  
    private Task _consumerTask;  
    private CancellationTokenSource _consumerCts;  

    /// <summary>  
    /// 消费者  
    /// </summary>  
    public IConsumer<TKey, TValue> Consumer { get; }  

    /// <summary>  
    /// ConsumerBuilder  
    /// </summary>  
    public ConsumerBuilder<TKey, TValue> Builder { get; set; }  

    /// <summary>  
    /// 消息回调  
    /// </summary>  
    public event EventHandler<ConsumeResult<TKey, TValue>> Received;  

    /// <summary>  
    /// 异常回调  
    /// </summary>  
    public event EventHandler<ConsumeException> OnConsumeException;  

    /// <summary>  
    /// 构造函数  
    /// </summary>  
    /// <param name="config"></param>  
    public EventConsumer(IEnumerable<KeyValuePair<string, string>> config)  
    {  
        Builder = new ConsumerBuilder<TKey, TValue>(config);  
        Consumer = Builder.Build();  
    }  

    /// <summary>  
    /// 启动  
    /// </summary>  
    /// <exception cref="InvalidOperationException"></exception>  
    public void Start()  
    {  
        if (Consumer.Subscription?.Any() != true)  
        {  
            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");  
        }  
        if (_consumerTask != null)  
        {  
            return;  
        }  
        _consumerCts = new CancellationTokenSource();  
        var ct = _consumerCts.Token;  
        _consumerTask = Task.Factory.StartNew(() =>  
        {  
            while (!ct.IsCancellationRequested)  
            {  
                try  
                {  
                    var cr = Consumer.Consume(TimeSpan.FromSeconds(1));  
                    if (cr == null) continue;  
                    Received?.Invoke(this, cr);  
                }  
                catch (ConsumeException e)  
                {  
                    OnConsumeException?.Invoke(this, e);  
                }  
            }  
        }, ct, TaskCreationOptions.LongRunning, System.Threading.Tasks.TaskScheduler.Default);  
    }  

    /// <summary>  
    /// 停止  
    /// </summary>  
    /// <returns></returns>  
    public async Task Stop()  
    {  
        if (_consumerCts == null || _consumerTask == null) return;  
        _consumerCts.Cancel();  
        try  
        {  
            await _consumerTask;  
        }  
        finally  
        {  
            _consumerTask = null;  
            _consumerCts = null;  
        }  
    }  

    /// <summary>  
    /// 释放  
    /// </summary>  
    public void Dispose()  
    {  
        Dispose(true);  
        GC.SuppressFinalize(this);  
    }  

    /// <summary>  
    /// 释放  
    /// </summary>  
    /// <param name="disposing"></param>  
    protected virtual void Dispose(bool disposing)  
    {  
        if (disposing)  
        {  
            if (_consumerTask != null)  
            {  
                Stop().Wait();  
            }  
            Consumer?.Dispose();  
        }  
    }  
}  

3. 创建 KafkaEventSourceStore 自定义存储器

using Confluent.Kafka;  
using Furion.EventBus;  
using Newtonsoft.Json;  
using System.Threading.Channels;  

namespace Furion.Core;  

/// <summary>  
/// Kafka 存储源  
/// </summary>  
public class KafkaEventSourceStore : IEventSourceStorer, IDisposable  
{  
    /// <summary>  
    /// 内存通道事件源存储器  
    /// </summary>  
    private readonly Channel<IEventSource> _channel;  
    /// <summary>  
    /// 主题  
    /// </summary>  
    private readonly string _topic;  
    /// <summary>  
    /// 消费者  
    /// </summary>  
    private readonly EventConsumer<Null, string> _eventConsumer;  
    /// <summary>  
    /// 生产者  
    /// </summary>  
    private readonly IProducer<Null, string> _producer;  

    /// <summary>  
    /// 构造函数  
    /// </summary>  
    /// <param name="consumerConf">消费者配置</param>  
    /// <param name="producerConf">生产者配置</param>  
    /// <param name="topic">主题</param>  
    /// <param name="capacity">存储器最多能够处理多少消息,超过该容量进入等待写入</param>  
    public KafkaEventSourceStore(ConsumerConfig consumerConf, ProducerConfig producerConf, string topic, int capacity)  
    {  
        // 配置通道,设置超出默认容量后进入等待  
        var boundedChannelOptions = new BoundedChannelOptions(capacity)  
        {  
            FullMode = BoundedChannelFullMode.Wait  
        };  

        // 创建有限容量通道  
        _channel = Channel.CreateBounded<IEventSource>(boundedChannelOptions);  

        // 主题  
        _topic = topic;  

        // 创建消息订阅者  
        _eventConsumer = new EventConsumer<Null, string>(consumerConf);  
        _eventConsumer.Consumer.Subscribe(new[] { topic });  

        // 订阅消息写入 Channel  
        _eventConsumer.Received += (send, cr) =>  
        {  
            // 反序列化消息  
            var eventSource = JsonConvert.DeserializeObject<ChannelEventSource>(cr.Message.Value);  

            // 写入内存管道存储器  
            Task.Run(async () =>   
            {  
                await _channel.Writer.WriteAsync(eventSource);  
            });  
        };  

        // 启动消费者  
        _eventConsumer.Start();  

        // 创建生产者  
        _producer = new ProducerBuilder<Null, string>(producerConf).Build();  
    }  

    /// <summary>  
    /// 将事件源写入存储器  
    /// </summary>  
    /// <param name="eventSource">事件源对象</param>  
    /// <param name="cancellationToken">取消任务 Token</param>  
    /// <returns><see cref="ValueTask"/></returns>  
    public async ValueTask WriteAsync(IEventSource eventSource, CancellationToken cancellationToken)  
    {  
        if (eventSource == default)  
        {  
            throw new ArgumentNullException(nameof(eventSource));  
        }  

        // 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource  
        if (eventSource is ChannelEventSource source)  
        {  
            // 序列化  
            var data = JsonConvert.SerializeObject(source);  
            // 异步发布  
            await _producer.ProduceAsync(_topic, new Message<Null, string>  
            {  
                Value = data  
            }, cancellationToken);  
        }  
        else  
        {  
            // 这里处理动态订阅问题  
            await _channel.Writer.WriteAsync(eventSource, cancellationToken);  
        }  
    }  

    /// <summary>  
    /// 从存储器中读取一条事件源  
    /// </summary>  
    /// <param name="cancellationToken">取消任务 Token</param>  
    /// <returns>事件源对象</returns>  
    public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)  
    {  
        // 读取一条事件源  
        var eventSource = await _channel.Reader.ReadAsync(cancellationToken);  
        return eventSource;  
    }  

    /// <summary>  
    /// 释放非托管资源  
    /// </summary>  
    public async void Dispose()  
    {  
        await _eventConsumer.Stop();  
        GC.SuppressFinalize(this);  
    }  
}  

4. 替换默认事件存储器

services.AddEventBus(options =>  
{  
    var consumerConf = new ConsumerConfig  
    {  
        BootstrapServers = "xxx.xxx.xxx.xxx:9092",  
        GroupId = "Consumer",  
        AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起  
    };  

    var producerConf = new ProducerConfig  
    {  
        BootstrapServers = "xxx.xxx.xxx.xxx:9092",  
        BatchSize = 16384, // 修改批次大小为16K  
        LingerMs = 20 // 修改等待时间为20ms  
    };  

    // 创建默认内存通道事件源对象,可自定义队列路由key,比如这里是启动项目名称  
    var kafkaEventSourceStorer = new KafkaEventSourceStore(consumerConf, producerConf, Assembly.GetExecutingAssembly().GetName().Name, 12000);  

    // 替换默认事件总线存储器  
    options.ReplaceStorer(serviceProvider =>  
    {  
        return kafkaEventSourceStorer;  
    });  
});  

22.5 自定义事件发布者

Furion 默认内置基于 Channel 的事件发布者 ChannelEventPublisher

如需自定义,只需实现 IEventPublisher 接口即可:

public class ToDoEventPublisher : IEventPublisher  
{  
    private readonly IEventSourceStorer _eventSourceStorer;  

    public ToDoEventPublisher(IEventSourceStorer eventSourceStorer)  
    {  
        _eventSourceStorer = eventSourceStorer;  
    }  

    public async Task PublishAsync(IEventSource eventSource)  
    {  
        await _eventSourceStorer.WriteAsync(eventSource, eventSource.CancellationToken);  
    }  

    // .... 其他接口实现  
}  

最后,在注册 EventBus 服务中替换默认 IEventPublisher

services.AddEventBus(builder =>  
{  
    // 替换事件源存储器  
    builder.ReplacePublisher<ToDoEventPublisher>();  
});  

22.6 添加事件执行监视器

Furion 提供了 IEventHandlerMonitor 监视器接口,实现该接口可以监视所有订阅事件,包括 执行之前、执行之后,执行异常,共享上下文数据

如添加 ToDoEventHandlerMonitor

public class ToDoEventHandlerMonitor : IEventHandlerMonitor  
{  
    private readonly ILogger<ToDoEventHandlerMonitor> _logger;  
    public ToDoEventHandlerMonitor(ILogger<ToDoEventHandlerMonitor> logger)  
    {  
        _logger = logger;  
    }  

    public Task OnExecutingAsync(EventHandlerExecutingContext context)  
    {  
        _logger.LogInformation("执行之前:{EventId}", context.Source.EventId);  
        return Task.CompletedTask;  
    }  

    public Task OnExecutedAsync(EventHandlerExecutedContext context)  
    {  
        _logger.LogInformation("执行之后:{EventId}", context.Source.EventId);  

        if (context.Exception != null)  
        {  
            _logger.LogError(context.Exception, "执行出错啦:{EventId}", context.Source.EventId);  
        }  

        return Task.CompletedTask;  
    }  
}  

最后,在注册 EventBus 服务中注册 ToDoEventHandlerMonitor

services.AddEventBus(builder =>  
{  
    // 添加事件执行监视器  
    builder.AddMonitor<ToDoEventHandlerMonitor>();  
});  

22.7 添加事件执行器

Furion 提供了 IEventHandlerExecutor 执行器接口,可以让开发者自定义事件处理函数执行策略,如 超时控制,失败重试、熔断等等

如添加 RetryEventHandlerExecutor

public class RetryEventHandlerExecutor : IEventHandlerExecutor  
{  
    public async Task ExecuteAsync(EventHandlerExecutingContext context, Func<EventHandlerExecutingContext, Task> handler)  
    {  
        // 如果执行失败,每隔 1s 重试,最多三次  
        await Retry.InvokeAsync(async () => {  
            await handler(context);  
        }, 3, 1000);  
    }  
}  

最后,在注册 EventBus 服务中注册 RetryEventHandlerExecutor

services.AddEventBus(builder =>  
{  
    // 添加事件执行器  
    builder.AddExecutor<RetryEventHandlerExecutor>();  
});  

22.8 使用有作用域的服务

Furion 中, Event Bus 所有服务均注册为单例,如需使用作用域服务(单例服务可直接注入),可通过依赖注入 IServiceScopeFactory 实例并通过 CreateScope() 创建一个作用域,如:

public class ToDoEventSubscriber : IEventSubscriber  
{  
    private readonly ILogger<ToDoEventSubscriber> _logger;  

    public ToDoEventSubscriber(IServiceScopeFactory scopeFactory    // 单例服务可以直接注入  
        , ILogger<ToDoEventSubscriber> logger)  // 单例服务可以直接注入  
    {  
        ScopeFactory = scopeFactory;  
        _logger = logger;  
        // 避免在这里解析非单例服务  
    }  

    public IServiceScopeFactory ScopeFactory { get; }  

    [EventSubscribe("ToDo:Create")]  
    public async Task CreateToDo(EventHandlerExecutingContext context)  
    {  
        // 创建新的作用域  
        using var scope = ScopeFactory.CreateScope();  

        // 解析服务  
        var scopedProcessingService = scope.ServiceProvider.GetRequiredService<IScopedProcessingService>();  
        // ....  
    }  
}  

特别注意如果是非单例对象,应避免在构造函数中解析服务,如把 scope.ServiceProvider.GetRequiredService<T>() 放在构造函数中初始化会导致一些服务无法使用。

22.9 订阅执行任务意外异常

事件处理程序使用的是 Task 对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:

services.AddEventBus(builder =>  
{  
    // 订阅 EventBus 意外未捕获异常  
    builder.UnobservedTaskExceptionHandler = (obj, args) =>  
    {  
        // ....  
    };  
});  

22.10 事件总线工厂

版本说明以下内容仅限 Furion 4.2.10 + 版本使用。

在该版本中,Furion 提供了 IEventBusFactory 工厂服务,可在运行时动态新增或删除订阅,如:

public class TestEventBus : IDynamicApiController  
{  
    private readonly IEventPublisher _eventPublisher;  
    private readonly IEventBusFactory _eventBusFactory;  
    public TestEventBus(IEventPublisher eventPublisher, IEventBusFactory eventBusFactory)  
    {  
        _eventPublisher = eventPublisher;  
        _eventBusFactory = eventBusFactory;  
    }  

    // 运行时动态添加一个订阅器  
    public async Task AddSubscriber()  
    {  
        await _eventBusFactory.Subscribe("xxx", async (ctx) =>  
        {  
            Console.WriteLine("我是动态的");  
            await Task.CompletedTask;  
        });  
    }  

    // 运行时动态删除一个订阅器  
    public async Task RemoveDynamic(string eventId)  
    {  
        await _eventBusFactory.Unsubscribe(eventId);  
    }  
}  

22.11 MessageCenter 静态类

版本说明以下内容仅限 Furion 4.3.3 + 版本使用。

Furion 4.3.3 版本新增了 MessageCenter 静态类,可在任何地方发送事件消息或订阅消息。

// 发送消息(含诸多重载)  
await MessageCenter.PublishAsync("messageId", new {});  

// 动态订阅消息  
MessageCenter.Subscribe("messageId", async (ctx) => {  
    Console.WriteLine("我是动态的");  
    await Task.CompletedTask;  
});  

// 取消订阅  
MessageCenter.Unsubscribe("messageId");  

22.12 配置重试失败回调

版本说明以下内容仅限 Furion 4.6.1 + 版本使用。

  1. 创建 IEventFallbackPolicy 实现类并实现 Callback 方法,如 EventFallbackPolicy
public class EventFallbackPolicy : IEventFallbackPolicy  
{  
    private readonly ILogger<EventFallbackPolicy> _logger;  
    public EventFallbackPolicy(ILogger<EventFallbackPolicy> logger)  
    {  
        _logger = logger;  
    }  

    public async Task CallbackAsync(EventHandlerExecutingContext context, Exception ex)  
    {  
        _logger.LogError(ex, "重试了多次最终还是失败了");  
        await Task.CompletedTask;  
    }  
}  

  1. 注册 EventFallbackPolicy 类型服务
services.AddEventBus(options =>  
{  
    options.AddFallbackPolicy<EventFallbackPolicy>();  
});  

  1. 通过 [EventSubscribe] 特性配置 FallbackPolicy 属性使用
[EventSubscribe("test:error", NumRetries = 3, FallbackPolicy = typeof(EventFallbackPolicy))]  // 重试三次  
public async Task TestError(EventHandlerExecutingContext context)  
{  
    Console.WriteLine("我执行啦~~");  
    throw new NotImplementedException();  
}  

小知识可以定义多个 IEventFallbackPolicy 实现类,然后通过 options.AddFallbackPolicy<T>() 注册多个,这样实现不同的事件订阅程序执行不同的策略。如:

[EventSubscribe("test:error", NumRetries = 3, FallbackPolicy = typeof(EventFallbackPolicy))]  

[EventSubscribe("test:error", NumRetries = 1010, FallbackPolicy = typeof(OtherEventFallbackPolicy))]  

22.13 自定义事件源存储器不可用时回退

有时候我们自定义了事件源存储器如使用 RedisRabbitMQ 等第三方存储介质,但由于一些原因导致启动时不可用如 Redis 服务未启动,这时候我们可以配置选择回退到默认事件源存储器(内存)中,如:

services.AddEventBus(options =>  
{  
    options.ReplaceStorerOrFallback(() => new RedisEventSourceStorer(......));  
});  

// 还可以解析服务传入  
services.AddEventBus(options =>  
{  
    options.ReplaceStorerOrFallback(serviceProvider =>    // 提供 IServiceProvider 可解析服务  
    {  
        // var someService = serviceProvider.GetRequiredService<ISomeSerivce>();  
        return new RedisEventSourceStorer(......);  
    });  
});  

特别说明目前只支持在应用启动时检查,不支持在运行时实现故障转移切换。

22.14 EventBusOptionsBuilder 配置

EventBusOptionsBuilderAddEventBus 构建服务选项,该选项包含以下属性和方法:

  • 属性
    • ChannelCapacity:内存通道容量,默认 12000
    • UnobservedTaskExceptionHandler:订阅执行任务未察觉异常
    • UseUtcTimestamp:是否使用 UTC 时间,bool 类型,默认 false
    • FuzzyMatch:是否开启全局模糊匹配(正则表达式)事件 Idbool 类型,默认 false
    • LogEnabled:是否启用日志输出,bool 类型,默认 true
  • 方法
    • AddSubscriber<TEventSubscriber>:添加订阅者
    • ReplacePublisher<TEventPublisher>:替换发布者
    • ReplaceStorer(Func<IServiceProvider, IEventSourceStorer>):替换存储器
    • ReplaceStorerOrFallback(Func<IServiceProvider, IEventSourceStorer>):替换存储器如果失败则回滚默认
    • AddMonitor<TEventHandlerMonitor>:添加监视器
    • AddExecutor<TEventHandlerExecutor>:添加执行器

22.15 关于高频消息处理方式

在一些高频发送消息的场景如 IoT、日志记录、数据采集,为避免频繁解析服务和创建作用域,可使用 类全局作用域 和所有服务都采取单例的方式:

public class ToDoEventSubscriber : IEventSubscriber, IDisposable  
{  
    private readonly ILogger<ToDoEventSubscriber> _logger;  
    private readonly IServiceScope _serviceScope;  

    public ToDoEventSubscriber(IServiceScopeFactory scopeFactory  
        , ILogger<ToDoEventSubscriber> logger)  
    {  
        _serviceScope = scopeFactory.CreateScope();  
        _logger = logger;  
    }  

    [EventSubscribe("iot:log")]  
    public async Task LogFromIoT(EventHandlerExecutingContext context)  
    {  
        // 解析服务  
        var scopedProcessingService = _serviceScope.ServiceProvider.GetRequiredService<IScopedProcessingService>();  
        // ....  
    }  

    /// <summary>  
    /// 释放服务作用域  
    /// </summary>  
    public void Dispose()  
    {  
        _serviceScope.Dispose();  
    }  
}  

22.16 事件处理执行结果订阅

版本说明以下内容仅限 Furion 4.9.1.47 + 版本使用。

Furion 4.9.1.47+ 版本,新增了 OnExecuted 事件订阅器,可以订阅事件执行成功还是失败,如:

// 订阅事件处理程序事件  
_eventPublisher.OnExecuted += (sender, args) =>  
{  
    Console.WriteLine($"事件 {args.Source.EventId} 执行结果:{args.Status},异常:{args.Exception}");  
}  

22.16.1 重复订阅问题

由于 IEventPublisher 注册为单例,所以如果在非单例服务或者特定代码范围内订阅,请务必订阅处理之后移除事件,避免重复多播订阅(除非需要多播订阅)

// 订阅局部或类全局事件方法  
void Subscribe(object sender, EventHandlerEventArgs args)  
{  
    Console.WriteLine($"事件 {args.Source.EventId} 执行结果:{args.Status},异常:{args.Exception}");  
}  

// 订阅任务事件  
_eventPublisher.OnExecuted += Subscribe;  

// 其他操作。。。。  

// 移除订阅事件  
_eventPublisher.OnExecuted -= Subscribe;  


如果是在类构造函数中订阅,那么需要实现 IDisposable 接口并在其中移除订阅。

public class SomeService : ISomeService, IScoped, IDisposable  
{  
    private readonly IEventPublisher _eventPublisher;  
    public SomeService(IEventPublisher eventPublisher)  
    {  
        _eventPublisher = eventPublisher;  

        _eventPublisher.OnExecuted += Subscribe;  
    }  

    void Subscribe(object sender, EventHandlerEventArgs args)  
    {  
        Console.WriteLine($"事件 {args.Source.EventId} 执行结果:{args.Status},异常:{args.Exception}");  
    }  

    public void Dispose()  
    {  
        _taskQueue.OnExecuted -= Subscribe;  
    }  
}  

22.16.2 订阅最佳推荐

  • Startup.cs 中订阅

由于在非单例的服务中存在重复订阅的问题,所以推荐在 Startup.csConfigure 中统一订阅,如:

Startup.cs

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)  
{  
    // 解析事件总线发布服务  
    var eventPublisher = app.ApplicationServices.GetRequiredService<IEventPublisher>();  

    // 订阅  
    eventPublisher.OnExecuted += (sender, args) =>  
    {  
        Console.WriteLine($"事件 {args.Source.EventId} 执行结果:{args.Status},异常:{args.Exception}");  
    };  

    // ....  
}  

  • 在单例服务中订阅
public class SomeService : ISomeService, ISingleton  
{  
    private readonly IEventPublisher _eventPublisher;  
    public SomeService(IEventPublisher eventPublisher)  
    {  
        _eventPublisher = eventPublisher;  

        _eventPublisher.OnExecuted += (sender, args) =>  
        {  
            Console.WriteLine($"事件 {args.Source.EventId} 执行结果:{args.Status},异常:{args.Exception}");  
        };  
    }  

    // ...  
}  

之后再将 ISomeService 注入到其他类中使用即可。

22.17 IIS 部署回收设置

如果在项目中使用了事件总线且部署到 IIS 中,那么需要设置 IIS 禁止回收,避免事件总线服务进入休眠,点击查看 IIS 回收问题解决方案

22.18 使用第三方事件总线 CAP 示例

builder.Services.AddCap(options =>  // Startup 模式使用 services.AddCap(...)  
{  
    options.UseInMemoryStorage();  
    options.UseInMemoryMessageQueue();  
}).AddSubscriberAssembly(App.Assemblies.ToArray());  

22.19 在 WinForm/WPF 中使用

事件总线支持在 WinForm/WPF 中完整的使用。

  • 发送事件
private async void button1_Click(object sender, EventArgs e)  
{  
    await _eventPublisher.PublishAsync("update.textbox", textBox1); // 可将 TextBox 控件作为 Payload 传递  
}  

  • 订阅事件并更新 UI
public class UIEventSubscriber : IEventSubscriber  
{  
    [EventSubscribe("update.textbox")]  
    public async Task UpdateTextBox(EventHandlerExecutingContext context)  
    {  
        // 将 Palload 转 TextBox  
        var textbox = context.Source.Payload as TextBox;  

        // 多线程更新 UI 组件(WinForm)  
        textbox?.BeginInvoke(() =>  
        {  
            textbox.Text = "我是事件总线更新的";  
        });  

        // 多线程更新 UI 组件(WPF)  
        textbox?.Dispatcher?.BeginInvoke(() =>  
        {  
            textbox.Content = " 我是事件总线更新的";  
        });  

        await Task.CompletedTask;  
    }  
}  

22.20 反馈与建议

与我们交流给 Furion 提 Issue