Skip to content

26.3 任务队列

📝 模块更新日志 新特性*

+ 新增 任务队列支持配置延迟队列是否立即执行一次 4\.9\.1\.56 ⏱️2024\.03\.21 [\#I9AF54](https://gitee.com/dotnetchina/Furion/issues/I9AF54)
+ 新增 任务队列支持配置特定的任务采用并行还是串行的执行方式 4\.9\.1\.38 ⏱️2024\.03\.07 [a732c72](https://gitee.com/dotnetchina/Furion/commit/a732c72bd28e2629de90bd858ca67c3816bb02c4)
+ 新增 任务队列可传入自定义任务 `TaskId` 4\.9\.1\.38 ⏱️2024\.03\.07 [a732c72](https://gitee.com/dotnetchina/Furion/commit/a732c72bd28e2629de90bd858ca67c3816bb02c4)
+ 新增 任务队列支持分配任务 `TaskId`,管道分类 `Channel` 和 `OnExecuted` 事件订阅 4\.9\.1\.28 ⏱️2024\.01\.31 [1c27434](https://gitee.com/dotnetchina/Furion/commit/1c274347517671e7611a8c578a05ce273248700d)
+ 新增 任务队列支持配置重试次数、重试间隔 4\.9\.1\.24 ⏱️2023\.01\.18 [1c27434](https://gitee.com/dotnetchina/Furion/commit/1c274347517671e7611a8c578a05ce273248700d)
+ 新增 任务队列支持配置 `Concurrent` 串行执行 4\.9\.1\.22 ⏱️2024\.01\.14 [\#I8VXFV](https://gitee.com/dotnetchina/Furion/issues/I8VXFV)
+ 新增 `Enqueue/EnqueueAsync` 支持 `Cron` 表达式 实例重载方法 4\.8\.4\.10 ⏱️2023\.01\.09 [\#I69HM4](https://gitee.com/dotnetchina/Furion/issues/I69HM4)
  • 突破性变化

    • 调整 任务入队返回值类型,由 Guid 改为 object 类型 4.9.1.38 ⏱️2024.03.07 a732c72
    • 问题修复

    • 修复 任务队列在个别情况下出现出队同步阻塞问题 4.9.2.43 ⏱️2024.05.08 f595b47

    • 修复 任务队列订阅任务执行结果处理程序中出现异常后重复触发问题 4.9.1.35 ⏱️2024.02.27 fa81c54
    • 其他更改

    • 调整 任务队列管道容量,从 3000 增加到 12000,提升消费吞吐量 4.9.4.1 ⏱️2024.06.17 4d9feb3

    • 调整 任务队列 concurrent 类型定义,由 object 调整为 bool? 4.9.1.57 ⏱️2024.03.22 cebb48d

版本说明以下内容仅限 Furion 4.8.3 + 版本使用。

小知识任务队列可取代旧版本定时任务的 SpareTime.DoIt()SpareTime.DoOnce 功能。

26.3.1 关于任务队列

任务队列常用于管理后台工作,通常这些后台工作在主线程响应之外,不会对主线程或当前线程响应阻塞。任务队列的一个显著特定就是它是一个队列,入队的顺序决定它出队执行的先后。

任务队列使用 Channel + Task + ThreadPool(线程池) 实现,入队/出队速度非常快,吞吐量极高,内存和 CPU 占用几乎忽略不计。

任务队列应用场景:对于可能需长时间运行的任务,或不是那么及时的需要立即反馈的任务。 比如发送邮件,发送短信等等。

26.3.2 与事件总线的区别

事件总线基于消息通讯,任务队列最显著的特点就是将操作依次加入队列,然后按照入队的顺序出队去执行。

前者(事件总线)是无序的,只有完全匹配的消息 Id 才会触发执行操作,否则处于 “静待” 状态。

而后者(任务队列)则是将可能耗时且一定会执行的操作放到队列中,之后依次出队执行

26.3.3 快速入门

任务队列使用非常简单,只需要注册 services.AddTaskQueue() 服务,之后通过依赖注入 ITaskQueue 服务或通过 TaskQueued 静态类使用即可,

1. 注册 TaskQueue 服务

services.AddTaskQueue();  

2. 使用 ITaskQueue 服务

using Furion.TaskQueue;  

namespace Your.Application;  

public class YourService : IYourService  
{  
    private readonly ITaskQueue _taskQueue;  
    public YourService(ITaskQueue taskQueue)  
    {  
        _taskQueue = taskQueue;  
    }  

    /// <summary>  
    /// 同步入队  
    /// </summary>  
    public void SyncTask()  
    {  
        _taskQueue.Enqueue(provider =>  
        {  
            Console.WriteLine("我是同步的");  
        });  

        // 如无需使用 provider 参数,可用 _ 替代  
        _taskQueue.Enqueue(_ => {});  
    }  

    /// <summary>  
    /// 同步入队,延迟 3 秒触发  
    /// </summary>  
    public void SyncTask2()  
    {  
        _taskQueue.Enqueue(provider =>  
        {  
            Console.WriteLine("我是同步的,但我延迟了 3 秒");  
        }, 3000);  

        // 如无需使用 provider 参数,可用 _ 替代  
        _taskQueue.Enqueue(_ => {}, 3000);  
    }  

    /// <summary>  
    /// 异步入队  
    /// </summary>  
    public async Task AsyncTask()  
    {  
        await _taskQueue.EnqueueAsync(async (provider, token) =>  
        {  
            Console.WriteLine("我是异步的");  
            await ValueTask.CompletedTask;  
        });  

        // 如无需使用 provider 和 token 参数,可用 _ 替代  
        await _taskQueue.EnqueueAsync(async (_, _) => {});  
    }  

    /// <summary>  
    /// 异步入队,延迟 3 秒触发  
    /// </summary>  
    public async Task AsyncTask2()  
    {  
        await _taskQueue.EnqueueAsync(async (provider, token) =>  
        {  
            Console.WriteLine("我是异步的,但我延迟了 3 秒");  
            await ValueTask.CompletedTask;  
        }, 3000);  

        // 如无需使用 provider 和 token 参数,可用 _ 替代  
        await _taskQueue.EnqueueAsync(async (_, _) => {}, 3000);  
    }  

    /// <summary>  
    /// 同步入队,支持 Cron 表达式延迟  
    /// </summary>  
    public void SyncTask3()  
    {  
        _taskQueue.Enqueue(provider =>  
        {  
            Console.WriteLine("Cron ...");  
        }, "* * * * *");  

        // 如无需使用 provider 参数,可用 _ 替代  
        _taskQueue.Enqueue(_ => {}, "* * * * *", CronStringFormat.Default);  
    }  

    /// <summary>  
    /// 异步入队,支持 Cron 表达式延迟  
    /// </summary>  
    public async Task AsyncTask3()  
    {  
        await _taskQueue.EnqueueAsync(async (provider, token) =>  
        {  
            Console.WriteLine("Cron ...");  
            await ValueTask.CompletedTask;  
        }, "* * * * *");  

        // 如无需使用 provider 和 token 参数,可用 _ 替代  
        await _taskQueue.EnqueueAsync(async (_, _) => {}, "* * * * *", CronStringFormat.Default);  
    }  

    /// <summary>  
    /// 同步入队,延迟 3 秒触发,并立即执行一次(Furion 4.9.1.56+ 版本支持)  
    /// </summary>  
    public void SyncTask3()  
    {  
        _taskQueue.Enqueue(provider =>  
        {  
            Console.WriteLine("我是同步的,但我延迟了 3 秒");  
        }, 3000, runOnceIfDelaySet: true);  
    }  

    /// <summary>  
    /// 异步入队,延迟 3 秒触发,并立即执行一次(Furion 4.9.1.56+ 版本支持)  
    /// </summary>  
    public async Task AsyncTask3()  
    {  
        await _taskQueue.EnqueueAsync(async (_, _) =>  
        {  
            Console.WriteLine("我是异步的,但我延迟了 3 秒");  
            await ValueTask.CompletedTask;  
        }, 3000, runOnceIfDelaySet: true);  
    }  
}  

注意事项框架内置了一套简单的错误策略机制,也就是如果任务执行失败会默认重试 3 次,每次间隔 1秒,该策略配置暂不对外公开(Furion 4.9.1.24+ 版本已提供配置)。

26.3.4 TaskQueued 静态类

框架还提供了 TaskQueued 静态类可在任何线程中操作,如:

// 同步入队  
TaskQueued.Enqueue((provider) => {}, [delay]);  
TaskQueued.Enqueue((provider) => {}, cronExpression, [format]);  

// 异步入队  
await TaskQueued.EnqueueAsync(async (provider, token) => {}, [delay]);  
await TaskQueued.EnqueueAsync(async (provider, token) => {}, cronExpression, [format]);  

26.3.5 在处理程序中使用服务

如果在任务队列处理程序中使用了外部的服务,如:

public class YourService : IYourService  
{  
    private readonly ITaskQueue _taskQueue;  
    private readonly ILogger<YourService> _logger  

    public YourService(ITaskQueue taskQueue  
    , ILogger<YourService> logger)  
    {  
        _taskQueue = taskQueue;  
        _logger = logger;  
    }  

    public void SyncTask()  
    {  
        _taskQueue.Enqueue(provider =>  
        {  
            _logger.LogInformation("我使用了外部的 logger");  
        });  
    }  
}  

那么需要注意的是,如果使用的外部服务是 单例 服务,那么无需任何处理,但如果使用的服务属于 瞬时范围 作用域,那么需要创建作用域,如:

_taskQueue.Enqueue(provider =>  
{  
    // Repository 注册为范围,需创建作用域  
    using var scoped = provider.CreateScope();  
    var repository = scoped.ServiceProvider.GetService<IRepository<User>>();  

    // Logger 注册为单例,可以直接使用  
    _logger.LogInformation("我使用了外部的 logger");  
});  

26.3.6 订阅执行任务意外异常

任务处理程序使用的是 Task 对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:

services.AddTaskQueue(builder =>  
{  
    // 订阅 TaskQueue 意外未捕获异常  
    builder.UnobservedTaskExceptionHandler = (obj, args) =>  
    {  
        // ....  
    };  
});  

26.3.7 并行与串行

版本说明以下内容仅限 Furion 4.9.1.22 + 版本使用。

默认情况下,为了提升队列出队吞吐量,队列出队默认采用并行的方式执行,不会依次进行出队。如需配置依次出队(串行)方式,可通全局设置 Concurrent 属性:

services.AddTaskQueue(builder =>  
{  
    builder.Concurrent = false; // 串行执行  
});  

Furion 4.9.1.38+ 版本开始,可以进行局部配置任务采用 并行 还是 串行 的执行方式,如:

// 不设置,默认采用全局的配置  
await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
}, 3000);  

// 设置并行执行  
await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
}, 3000, concurrent: true);  

// 设置串行执行  
await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
}, 3000, concurrent: false);  

26.3.8 任务标识 TaskId

版本说明以下内容仅限 Furion 4.9.1.28 + 版本使用。

Furion 4.9.1.28 之前的版本,任务入队是没有返回值的,在之后的版本任务入队后会返回任务的唯一 TaskId,默认值是 Guid.NewGuid()

// Furion 4.9.1.28-(无返回值)  
await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
});  

// Furion 4.9.1.28+(返回任务唯一 ID)  
var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
});  

Furion 4.9.1.38+ 版本支持传入自定义 TaskId,如:

var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
}, taskId: "唯一 id");  // object 类型  

26.3.9 任务管道 Channel

版本说明以下内容仅限 Furion 4.9.1.28 + 版本使用。

Furion 4.9.1.28+ 版本,任务入队支持设置 管道/批次,通过 channel 参数传入,后续可以根据这个 channel 进行任务执行结果订阅进而触发回调操作。

var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>  
{  
    // ...  
}, channel: "批次1");  

26.3.10 异常重试配置

版本说明以下内容仅限 Furion 4.9.1.24 + 版本使用。

Furion 4.9.1.24 之前的版本并未公开任务队列执行异常后重试配置,该版本之后可自定义配置:

services.AddTaskQueue(builder =>  
{  
    builder.NumRetries = 3; // 默认重试 3 次  
    builder.RetryTimeout = 1000;    // 每次重试间隔 1000ms  
});  

26.3.11 任务执行结果订阅

版本说明以下内容仅限 Furion 4.9.1.28 + 版本使用。

Furion 4.9.1.28+ 版本,新增了 OnExecuted 事件订阅器,可以订阅事件执行成功还是失败,如:

// 订阅任务事件  
_taskQueue.OnExecuted += (sender, args) =>  
{  
    Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");  
}  

26.3.11.1 重复订阅问题

由于 ITaskQueue 注册为单例,所以如果在非单例服务或者特定代码范围内订阅,请务必订阅处理之后移除事件,避免重复多播订阅(除非需要多播订阅)

// 订阅局部或类全局事件方法  
void TaskSubscribe(object sender, TaskHandlerEventArgs args)  
{  
    Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");  
}  

// 订阅任务事件  
_taskQueue.OnExecuted += TaskSubscribe;  

// 其他操作。。。。  

// 移除订阅事件  
_taskQueue.OnExecuted -= TaskSubscribe;  


如果是在类构造函数中订阅,那么需要实现 IDisposable 接口并在其中移除订阅。

public class SomeService : ISomeService, IScoped, IDisposable  
{  
    private readonly ITaskQueue _taskQueue;  
    public SomeService(ITaskQueue taskQueue)  
    {  
        _taskQueue = taskQueue;  

        _taskQueue.OnExecuted += Subscribe;  
    }  

    void Subscribe(object sender, TaskHandlerEventArgs args)  
    {  
        Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");  
    }  

    public void Dispose()  
    {  
        _taskQueue.OnExecuted -= Subscribe;  
    }  
}  

26.3.11.2 订阅最佳推荐

  • Startup.cs 中订阅

由于在非单例的服务中存在重复订阅的问题,所以推荐在 Startup.csConfigure 中统一订阅,如:

Startup.cs

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)  
{  
    // 解析任务队列服务  
    var taskQueue = app.ApplicationServices.GetRequiredService<ITaskQueue>();  

    // 订阅  
    taskQueue.OnExecuted += (sender, args) =>  
    {  
        Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");  
    };  

    // ....  
}  

  • 在单例服务中订阅
public class SomeService : ISomeService, ISingleton  
{  
    private readonly ITaskQueue _taskQueue;  
    public SomeService(ITaskQueue taskQueue)  
    {  
        _taskQueue = taskQueue;  

        _taskQueue.OnExecuted += (sender, args) =>  
        {  
            Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");  
        };  
    }  

    // ...  
}  

之后再将 ISomeService 注入到其他类中使用即可。

26.3.12 反馈与建议

与我们交流给 Furion 提 Issue