26.3 任务队列
📝 模块更新日志 新特性*
+ 新增 任务队列支持配置延迟队列是否立即执行一次 4\.9\.1\.56 ⏱️2024\.03\.21 [\#I9AF54](https://gitee.com/dotnetchina/Furion/issues/I9AF54)
+ 新增 任务队列支持配置特定的任务采用并行还是串行的执行方式 4\.9\.1\.38 ⏱️2024\.03\.07 [a732c72](https://gitee.com/dotnetchina/Furion/commit/a732c72bd28e2629de90bd858ca67c3816bb02c4)
+ 新增 任务队列可传入自定义任务 `TaskId` 4\.9\.1\.38 ⏱️2024\.03\.07 [a732c72](https://gitee.com/dotnetchina/Furion/commit/a732c72bd28e2629de90bd858ca67c3816bb02c4)
+ 新增 任务队列支持分配任务 `TaskId`,管道分类 `Channel` 和 `OnExecuted` 事件订阅 4\.9\.1\.28 ⏱️2024\.01\.31 [1c27434](https://gitee.com/dotnetchina/Furion/commit/1c274347517671e7611a8c578a05ce273248700d)
+ 新增 任务队列支持配置重试次数、重试间隔 4\.9\.1\.24 ⏱️2023\.01\.18 [1c27434](https://gitee.com/dotnetchina/Furion/commit/1c274347517671e7611a8c578a05ce273248700d)
+ 新增 任务队列支持配置 `Concurrent` 串行执行 4\.9\.1\.22 ⏱️2024\.01\.14 [\#I8VXFV](https://gitee.com/dotnetchina/Furion/issues/I8VXFV)
+ 新增 `Enqueue/EnqueueAsync` 支持 `Cron` 表达式 实例重载方法 4\.8\.4\.10 ⏱️2023\.01\.09 [\#I69HM4](https://gitee.com/dotnetchina/Furion/issues/I69HM4)
-
突破性变化
- 调整 任务入队返回值类型,由
Guid改为object类型 4.9.1.38 ⏱️2024.03.07 a732c72 -
问题修复
-
修复 任务队列在个别情况下出现出队同步阻塞问题 4.9.2.43 ⏱️2024.05.08 f595b47
- 修复 任务队列订阅任务执行结果处理程序中出现异常后重复触发问题 4.9.1.35 ⏱️2024.02.27 fa81c54
-
其他更改
-
调整 任务队列管道容量,从
3000增加到12000,提升消费吞吐量 4.9.4.1 ⏱️2024.06.17 4d9feb3 - 调整 任务队列
concurrent类型定义,由object调整为bool?4.9.1.57 ⏱️2024.03.22 cebb48d
- 调整 任务入队返回值类型,由
版本说明以下内容仅限 Furion 4.8.3 + 版本使用。
小知识任务队列可取代旧版本定时任务的 SpareTime.DoIt() 和 SpareTime.DoOnce 功能。
26.3.1 关于任务队列
任务队列常用于管理后台工作,通常这些后台工作在主线程响应之外,不会对主线程或当前线程响应阻塞。任务队列的一个显著特定就是它是一个队列,入队的顺序决定它出队执行的先后。
任务队列使用 Channel + Task + ThreadPool(线程池) 实现,入队/出队速度非常快,吞吐量极高,内存和 CPU 占用几乎忽略不计。
任务队列应用场景:对于可能需长时间运行的任务,或不是那么及时的需要立即反馈的任务。 比如发送邮件,发送短信等等。
26.3.2 与事件总线的区别
事件总线基于消息通讯,任务队列最显著的特点就是将操作依次加入队列,然后按照入队的顺序出队去执行。
前者(事件总线)是无序的,只有完全匹配的消息 Id 才会触发执行操作,否则处于 “静待” 状态。
而后者(任务队列)则是将可能耗时且一定会执行的操作放到队列中,之后依次出队执行。
26.3.3 快速入门
任务队列使用非常简单,只需要注册 services.AddTaskQueue() 服务,之后通过依赖注入 ITaskQueue 服务或通过 TaskQueued 静态类使用即可,
1. 注册 TaskQueue 服务
services.AddTaskQueue();
2. 使用 ITaskQueue 服务
using Furion.TaskQueue;
namespace Your.Application;
public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
public YourService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;
}
/// <summary>
/// 同步入队
/// </summary>
public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的");
});
// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {});
}
/// <summary>
/// 同步入队,延迟 3 秒触发
/// </summary>
public void SyncTask2()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的,但我延迟了 3 秒");
}, 3000);
// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, 3000);
}
/// <summary>
/// 异步入队
/// </summary>
public async Task AsyncTask()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的");
await ValueTask.CompletedTask;
});
// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {});
}
/// <summary>
/// 异步入队,延迟 3 秒触发
/// </summary>
public async Task AsyncTask2()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("我是异步的,但我延迟了 3 秒");
await ValueTask.CompletedTask;
}, 3000);
// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, 3000);
}
/// <summary>
/// 同步入队,支持 Cron 表达式延迟
/// </summary>
public void SyncTask3()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("Cron ...");
}, "* * * * *");
// 如无需使用 provider 参数,可用 _ 替代
_taskQueue.Enqueue(_ => {}, "* * * * *", CronStringFormat.Default);
}
/// <summary>
/// 异步入队,支持 Cron 表达式延迟
/// </summary>
public async Task AsyncTask3()
{
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
Console.WriteLine("Cron ...");
await ValueTask.CompletedTask;
}, "* * * * *");
// 如无需使用 provider 和 token 参数,可用 _ 替代
await _taskQueue.EnqueueAsync(async (_, _) => {}, "* * * * *", CronStringFormat.Default);
}
/// <summary>
/// 同步入队,延迟 3 秒触发,并立即执行一次(Furion 4.9.1.56+ 版本支持)
/// </summary>
public void SyncTask3()
{
_taskQueue.Enqueue(provider =>
{
Console.WriteLine("我是同步的,但我延迟了 3 秒");
}, 3000, runOnceIfDelaySet: true);
}
/// <summary>
/// 异步入队,延迟 3 秒触发,并立即执行一次(Furion 4.9.1.56+ 版本支持)
/// </summary>
public async Task AsyncTask3()
{
await _taskQueue.EnqueueAsync(async (_, _) =>
{
Console.WriteLine("我是异步的,但我延迟了 3 秒");
await ValueTask.CompletedTask;
}, 3000, runOnceIfDelaySet: true);
}
}
注意事项框架内置了一套简单的错误策略机制,也就是如果任务执行失败会默认重试 3 次,每次间隔 1秒,该策略配置暂不对外公开(Furion 4.9.1.24+ 版本已提供配置)。
26.3.4 TaskQueued 静态类
框架还提供了 TaskQueued 静态类可在任何线程中操作,如:
// 同步入队
TaskQueued.Enqueue((provider) => {}, [delay]);
TaskQueued.Enqueue((provider) => {}, cronExpression, [format]);
// 异步入队
await TaskQueued.EnqueueAsync(async (provider, token) => {}, [delay]);
await TaskQueued.EnqueueAsync(async (provider, token) => {}, cronExpression, [format]);
26.3.5 在处理程序中使用服务
如果在任务队列处理程序中使用了外部的服务,如:
public class YourService : IYourService
{
private readonly ITaskQueue _taskQueue;
private readonly ILogger<YourService> _logger
public YourService(ITaskQueue taskQueue
, ILogger<YourService> logger)
{
_taskQueue = taskQueue;
_logger = logger;
}
public void SyncTask()
{
_taskQueue.Enqueue(provider =>
{
_logger.LogInformation("我使用了外部的 logger");
});
}
}
那么需要注意的是,如果使用的外部服务是 单例 服务,那么无需任何处理,但如果使用的服务属于 瞬时 或 范围 作用域,那么需要创建作用域,如:
_taskQueue.Enqueue(provider =>
{
// Repository 注册为范围,需创建作用域
using var scoped = provider.CreateScope();
var repository = scoped.ServiceProvider.GetService<IRepository<User>>();
// Logger 注册为单例,可以直接使用
_logger.LogInformation("我使用了外部的 logger");
});
26.3.6 订阅执行任务意外异常
任务处理程序使用的是 Task 对象进行创建并执行,但可能存在一些意外且难以捕获的异常,这时候可以通过以下方式订阅:
services.AddTaskQueue(builder =>
{
// 订阅 TaskQueue 意外未捕获异常
builder.UnobservedTaskExceptionHandler = (obj, args) =>
{
// ....
};
});
26.3.7 并行与串行
版本说明以下内容仅限 Furion 4.9.1.22 + 版本使用。
默认情况下,为了提升队列出队吞吐量,队列出队默认采用并行的方式执行,不会依次进行出队。如需配置依次出队(串行)方式,可通全局设置 Concurrent 属性:
services.AddTaskQueue(builder =>
{
builder.Concurrent = false; // 串行执行
});
在 Furion 4.9.1.38+ 版本开始,可以进行局部配置任务采用 并行 还是 串行 的执行方式,如:
// 不设置,默认采用全局的配置
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, 3000);
// 设置并行执行
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, 3000, concurrent: true);
// 设置串行执行
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, 3000, concurrent: false);
26.3.8 任务标识 TaskId
版本说明以下内容仅限 Furion 4.9.1.28 + 版本使用。
在 Furion 4.9.1.28 之前的版本,任务入队是没有返回值的,在之后的版本任务入队后会返回任务的唯一 TaskId,默认值是 Guid.NewGuid()。
// Furion 4.9.1.28-(无返回值)
await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
});
// Furion 4.9.1.28+(返回任务唯一 ID)
var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
});
在 Furion 4.9.1.38+ 版本支持传入自定义 TaskId,如:
var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, taskId: "唯一 id"); // object 类型
26.3.9 任务管道 Channel
版本说明以下内容仅限 Furion 4.9.1.28 + 版本使用。
在 Furion 4.9.1.28+ 版本,任务入队支持设置 管道/批次,通过 channel 参数传入,后续可以根据这个 channel 进行任务执行结果订阅进而触发回调操作。
var taskId = await _taskQueue.EnqueueAsync(async (provider, token) =>
{
// ...
}, channel: "批次1");
26.3.10 异常重试配置
版本说明以下内容仅限 Furion 4.9.1.24 + 版本使用。
在 Furion 4.9.1.24 之前的版本并未公开任务队列执行异常后重试配置,该版本之后可自定义配置:
services.AddTaskQueue(builder =>
{
builder.NumRetries = 3; // 默认重试 3 次
builder.RetryTimeout = 1000; // 每次重试间隔 1000ms
});
26.3.11 任务执行结果订阅
版本说明以下内容仅限 Furion 4.9.1.28 + 版本使用。
在 Furion 4.9.1.28+ 版本,新增了 OnExecuted 事件订阅器,可以订阅事件执行成功还是失败,如:
// 订阅任务事件
_taskQueue.OnExecuted += (sender, args) =>
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
}
26.3.11.1 重复订阅问题
由于 ITaskQueue 注册为单例,所以如果在非单例服务或者特定代码范围内订阅,请务必订阅处理之后移除事件,避免重复多播订阅(除非需要多播订阅)。
// 订阅局部或类全局事件方法
void TaskSubscribe(object sender, TaskHandlerEventArgs args)
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
}
// 订阅任务事件
_taskQueue.OnExecuted += TaskSubscribe;
// 其他操作。。。。
// 移除订阅事件
_taskQueue.OnExecuted -= TaskSubscribe;
如果是在类构造函数中订阅,那么需要实现 IDisposable 接口并在其中移除订阅。
public class SomeService : ISomeService, IScoped, IDisposable
{
private readonly ITaskQueue _taskQueue;
public SomeService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;
_taskQueue.OnExecuted += Subscribe;
}
void Subscribe(object sender, TaskHandlerEventArgs args)
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
}
public void Dispose()
{
_taskQueue.OnExecuted -= Subscribe;
}
}
26.3.11.2 订阅最佳推荐
- 在
Startup.cs中订阅
由于在非单例的服务中存在重复订阅的问题,所以推荐在 Startup.cs 的 Configure 中统一订阅,如:
Startup.cs
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
// 解析任务队列服务
var taskQueue = app.ApplicationServices.GetRequiredService<ITaskQueue>();
// 订阅
taskQueue.OnExecuted += (sender, args) =>
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
};
// ....
}
- 在单例服务中订阅
public class SomeService : ISomeService, ISingleton
{
private readonly ITaskQueue _taskQueue;
public SomeService(ITaskQueue taskQueue)
{
_taskQueue = taskQueue;
_taskQueue.OnExecuted += (sender, args) =>
{
Console.WriteLine($"任务 {args.TaskId} 管道 {args.Channel},执行结果:{args.Status},异常:{args.Exception}");
};
}
// ...
}
之后再将 ISomeService 注入到其他类中使用即可。
26.3.12 反馈与建议
与我们交流给 Furion 提 Issue。