TJC.Cyclops.TaskSystem
2025.12.4.1
There is a newer version of this package available.
See the version list below for details.
See the version list below for details.
dotnet add package TJC.Cyclops.TaskSystem --version 2025.12.4.1
NuGet\Install-Package TJC.Cyclops.TaskSystem -Version 2025.12.4.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="TJC.Cyclops.TaskSystem" Version="2025.12.4.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="TJC.Cyclops.TaskSystem" Version="2025.12.4.1" />
<PackageReference Include="TJC.Cyclops.TaskSystem" />
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add TJC.Cyclops.TaskSystem --version 2025.12.4.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: TJC.Cyclops.TaskSystem, 2025.12.4.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package TJC.Cyclops.TaskSystem@2025.12.4.1
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=TJC.Cyclops.TaskSystem&version=2025.12.4.1
#tool nuget:?package=TJC.Cyclops.TaskSystem&version=2025.12.4.1
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
Cyclops.TaskSystem
项目概述
Cyclops.TaskSystem是Cyclops.Framework框架中的任务系统组件,提供任务调度、执行、监控和管理功能。该组件支持定时任务、延时任务、分布式任务等多种任务类型,并提供任务状态跟踪、失败重试、任务依赖管理等高级特性,适用于后台任务处理、定时作业执行、批量处理等应用场景。
核心功能模块
任务调度器
- 基于Quartz.NET的任务调度
- Cron表达式支持
- 任务优先级管理
- 并发控制
任务执行器
- 同步/异步任务执行
- 任务超时控制
- 资源限制管理
- 执行结果处理
任务管理
- 任务创建与配置
- 任务状态跟踪
- 任务暂停/恢复/取消
- 任务参数管理
失败处理
- 自动重试机制
- 失败通知
- 死信队列
- 异常捕获与记录
分布式支持
- 分布式锁
- 任务分片执行
- 节点健康检查
- 负载均衡
高级功能
- 任务依赖链
- 工作流集成
- 任务版本控制
- 性能监控与统计
技术栈
- .NET 8.0
- Quartz.NET(任务调度)
- Polly(重试策略)
- Cyclops.Common
- Cyclops.DI
- Cyclops.Logging
环境依赖
- .NET 8.0 SDK
- 可选:数据库(用于持久化任务状态)
安装配置
NuGet安装
Install-Package Cyclops.TaskSystem
基本配置
在应用程序启动时进行配置:
// 在Program.cs或Startup.cs中
using Cyclops.TaskSystem;
var builder = WebApplication.CreateBuilder(args);
// 添加Cyclops.TaskSystem服务
builder.Services.AddCyclopsTaskSystem(options => {
// 调度器配置
options.SchedulerOptions = new SchedulerOptions {
ThreadPoolSize = 10,
EnableBatchProcessing = true,
BatchSize = 50
}
## 版本信息
- 当前版本[](https://www.nuget.org/packages?q=TJC.Cyclops)
- 作者:yswenli
- 描述:企服版框架中任务系统组件
## 贡献者
- yswenli
## 许可证
保留所有权利;
// 持久化配置
options.StorageOptions = new TaskStorageOptions {
StorageType = TaskStorageType.InMemory, // 可选:InMemory, SqlServer, MySql等
ConnectionString = builder.Configuration["TaskSystem:StorageConnectionString"]
};
// 分布式配置(可选)
options.DistributedOptions = new DistributedTaskOptions {
Enabled = false,
LockExpiration = TimeSpan.FromMinutes(5),
NodeId = Environment.MachineName
};
// 重试配置
options.RetryOptions = new RetryOptions {
MaxRetries = 3,
RetryDelay = TimeSpan.FromSeconds(10),
RetryBackoffMultiplier = 2
};
// 启用日志
options.EnableLogging = true;
});
// ...
代码示例
基本任务定义与调度示例
using Cyclops.TaskSystem.Attributes;
using Cyclops.TaskSystem.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
// 定义任务类
[TaskName("SampleTask")]
[Description("示例任务")]
public class SampleTask : ITask
{
private readonly ILogger<SampleTask> _logger;
public SampleTask(ILogger<SampleTask> logger)
{
_logger = logger;
}
public async Task<TaskResult> ExecuteAsync(TaskContext context)
{
try
{
_logger.LogInformation("任务开始执行,参数: {Parameters}", context.Parameters);
// 模拟任务执行
await Task.Delay(1000);
// 获取任务参数
var message = context.Parameters?.GetValueOrDefault("message", "无消息")?.ToString();
_logger.LogInformation("任务执行成功: {Message}", message);
// 返回成功结果
return TaskResult.Success(new Dictionary<string, object> {
{ "executedAt", DateTime.Now },
{ "result", "任务执行完成" }
});
}
catch (Exception ex)
{
_logger.LogError(ex, "任务执行失败");
return TaskResult.Failure(ex.Message);
}
}
}
## 版本信息
- 当前版本[](https://www.nuget.org/packages?q=TJC.Cyclops)
- 作者:yswenli
- 描述:企服版框架中任务系统组件
## 贡献者
- yswenli
## 许可证
保留所有权利
// 在服务中调度任务
public class TaskSchedulerService
{
private readonly ITaskScheduler _taskScheduler;
public TaskSchedulerService(ITaskScheduler taskScheduler)
{
_taskScheduler = taskScheduler;
}
public async Task ScheduleSampleTaskAsync()
{
// 立即执行任务
var immediateTaskId = await _taskScheduler.ScheduleTaskAsync<SampleTask>(new Dictionary<string, object> {
{ "message", "立即执行的任务" }
});
Console.WriteLine($"立即执行任务ID: {immediateTaskId}");
// 延时执行任务(10秒后)
var delayedTaskId = await _taskScheduler.ScheduleDelayedTaskAsync<SampleTask>(
delay: TimeSpan.FromSeconds(10),
parameters: new Dictionary<string, object> {
{ "message", "延时执行的任务" }
}
);
Console.WriteLine($"延时任务ID: {delayedTaskId}");
// 定时执行任务(使用Cron表达式,每分钟执行一次)
var scheduledTaskId = await _taskScheduler.ScheduleRecurringTaskAsync<SampleTask>(
cronExpression: "0 * * * * ?", // 每分钟执行
jobKey: "SampleTask_Recurring",
parameters: new Dictionary<string, object> {
{ "message", "定时重复执行的任务" }
}
);
Console.WriteLine($"定时任务ID: {scheduledTaskId}");
}
}
任务管理示例
using Cyclops.TaskSystem.Enums;
using Cyclops.TaskSystem.Services;
using Microsoft.Extensions.DependencyInjection;
// 在服务中管理任务
public class TaskManagementService
{
private readonly ITaskManager _taskManager;
public TaskManagementService(ITaskManager taskManager)
{
_taskManager = taskManager;
}
public async Task ManageTasksAsync()
{
// 获取任务状态
var taskId = "your-task-id";
var taskStatus = await _taskManager.GetTaskStatusAsync(taskId);
Console.WriteLine($"任务状态: {taskStatus.Status}");
Console.WriteLine($"创建时间: {taskStatus.CreatedAt}");
Console.WriteLine($"开始时间: {taskStatus.StartedAt}");
Console.WriteLine($"完成时间: {taskStatus.CompletedAt}");
Console.WriteLine($"执行次数: {taskStatus.ExecutionCount}");
// 暂停任务(仅对定时任务有效)
await _taskManager.PauseTaskAsync("recurring-task-key");
Console.WriteLine("任务已暂停");
// 恢复任务
await _taskManager.ResumeTaskAsync("recurring-task-key");
Console.WriteLine("任务已恢复");
// 取消任务
await _taskManager.CancelTaskAsync(taskId);
Console.WriteLine("任务已取消");
// 删除任务(从调度器中移除)
await _taskManager.DeleteTaskAsync("recurring-task-key");
Console.WriteLine("任务已删除");
// 查询任务列表
var tasks = await _taskManager.GetTasksAsync(new TaskQuery {
Status = TaskStatus.Completed,
CreatedAfter = DateTime.Now.AddDays(-7),
PageSize = 20,
PageIndex = 1
});
Console.WriteLine($"查询到 {tasks.TotalCount} 个任务");
foreach (var task in tasks.Items)
{
Console.WriteLine($"- 任务ID: {task.Id}, 状态: {task.Status}, 类型: {task.TaskType}");
}
}
}
分布式任务示例
using Cyclops.TaskSystem.Attributes;
using Cyclops.TaskSystem.Services;
using Microsoft.Extensions.DependencyInjection;
// 定义可分片的分布式任务
[TaskName("DistributedDataProcessTask")]
public class DistributedDataProcessTask : IDistributedTask
{
private readonly IDataService _dataService;
private readonly ILogger<DistributedDataProcessTask> _logger;
public DistributedDataProcessTask(IDataService dataService, ILogger<DistributedDataProcessTask> logger)
{
_dataService = dataService;
_logger = logger;
}
// 获取任务分片信息
public async Task<IEnumerable<TaskShard>> GetTaskShardsAsync(TaskContext context)
{
// 获取总数据量
var totalCount = await _dataService.GetTotalRecordCountAsync();
var shardSize = 1000; // 每个分片处理1000条记录
var shardCount = (int)Math.Ceiling((double)totalCount / shardSize);
var shards = new List<TaskShard>();
for (int i = 0; i < shardCount; i++)
{
shards.Add(new TaskShard {
ShardId = i.ToString(),
Parameters = new Dictionary<string, object> {
{ "offset", i * shardSize },
{ "limit", shardSize }
}
});
}
return shards;
}
// 执行单个分片
public async Task<TaskResult> ExecuteShardAsync(TaskContext context, TaskShard shard)
{
try
{
var offset = (int)shard.Parameters["offset"];
var limit = (int)shard.Parameters["limit"];
_logger.LogInformation("开始处理分片: {ShardId}, 偏移量: {Offset}, 数量: {Limit}",
shard.ShardId, offset, limit);
// 处理数据
var processedCount = await _dataService.ProcessDataBatchAsync(offset, limit);
_logger.LogInformation("分片处理完成: {ShardId}, 处理记录数: {Count}",
shard.ShardId, processedCount);
return TaskResult.Success(new Dictionary<string, object> {
{ "processedCount", processedCount },
{ "shardId", shard.ShardId }
});
}
catch (Exception ex)
{
_logger.LogError(ex, "分片处理失败: {ShardId}", shard.ShardId);
return TaskResult.Failure(ex.Message);
}
}
}
// 调度分布式任务
public class DistributedTaskService
{
private readonly IDistributedTaskScheduler _distributedTaskScheduler;
public DistributedTaskService(IDistributedTaskScheduler distributedTaskScheduler)
{
_distributedTaskScheduler = distributedTaskScheduler;
}
public async Task ScheduleDistributedTaskAsync()
{
// 配置分布式任务选项
var options = new DistributedTaskOptions {
MaxConcurrency = 5, // 最大并发分片数
EnableShardRetry = true,
ShardRetryCount = 3,
ProgressCallback = async (progress) => {
Console.WriteLine($"任务进度: {progress.CompletedShards}/{progress.TotalShards} ({progress.CompletionPercentage:P2})");
}
};
// 执行分布式任务
var taskId = await _distributedTaskScheduler.ExecuteDistributedTaskAsync<DistributedDataProcessTask>(
options: options,
parameters: new Dictionary<string, object> {
{ "processDate", DateTime.Today }
}
);
Console.WriteLine($"分布式任务ID: {taskId}");
// 监控任务进度
var taskProgress = await _distributedTaskScheduler.GetTaskProgressAsync(taskId);
Console.WriteLine($"任务进度详情:");
Console.WriteLine($"- 总分片数: {taskProgress.TotalShards}");
Console.WriteLine($"- 已完成分片: {taskProgress.CompletedShards}");
Console.WriteLine($"- 失败分片: {taskProgress.FailedShards}");
Console.WriteLine($"- 完成百分比: {taskProgress.CompletionPercentage:P2}");
Console.WriteLine($"- 开始时间: {taskProgress.StartedAt}");
Console.WriteLine($"- 预计完成时间: {taskProgress.EstimatedCompletionTime}");
}
}
任务依赖示例
using Cyclops.TaskSystem.Attributes;
using Cyclops.TaskSystem.Services;
using Microsoft.Extensions.DependencyInjection;
// 定义多个相关任务
[TaskName("DataExtractionTask")]
public class DataExtractionTask : ITask
{
public async Task<TaskResult> ExecuteAsync(TaskContext context)
{
Console.WriteLine("执行数据提取任务");
// 模拟数据提取
await Task.Delay(2000);
return TaskResult.Success(new Dictionary<string, object> {
{ "extractedDataCount", 1000 },
{ "extractionTime", DateTime.Now }
});
}
}
[TaskName("DataTransformationTask")]
public class DataTransformationTask : ITask
{
public async Task<TaskResult> ExecuteAsync(TaskContext context)
{
Console.WriteLine("执行数据转换任务");
// 获取前置任务结果
var extractedCount = context.PreviousTaskResult?["extractedDataCount"] as int? ?? 0;
// 模拟数据转换
await Task.Delay(3000);
return TaskResult.Success(new Dictionary<string, object> {
{ "transformedDataCount", extractedCount },
{ "transformationTime", DateTime.Now }
});
}
}
[TaskName("DataLoadingTask")]
public class DataLoadingTask : ITask
{
public async Task<TaskResult> ExecuteAsync(TaskContext context)
{
Console.WriteLine("执行数据加载任务");
// 获取前置任务结果
var transformedCount = context.PreviousTaskResult?["transformedDataCount"] as int? ?? 0;
// 模拟数据加载
await Task.Delay(2000);
return TaskResult.Success(new Dictionary<string, object> {
{ "loadedDataCount", transformedCount },
{ "loadingTime", DateTime.Now }
});
}
}
// 调度任务依赖链
public class TaskChainService
{
private readonly ITaskChainBuilder _taskChainBuilder;
public TaskChainService(ITaskChainBuilder taskChainBuilder)
{
_taskChainBuilder = taskChainBuilder;
}
public async Task BuildAndExecuteTaskChainAsync()
{
// 构建任务链
var taskChain = _taskChainBuilder
.AddTask<DataExtractionTask>()
.AddTask<DataTransformationTask>()
.AddTask<DataLoadingTask>()
.Build();
// 配置任务链选项
var options = new TaskChainOptions {
ContinueOnFailure = false, // 一个任务失败则整个链中断
EnableProgressTracking = true,
ProgressCallback = (progress) => {
Console.WriteLine($"任务链进度: {progress.CurrentTaskIndex + 1}/{progress.TotalTasks}, " +
$"当前任务: {progress.CurrentTaskName}, " +
$"状态: {progress.CurrentTaskStatus}");
}
};
// 执行任务链
var chainResult = await taskChain.ExecuteAsync(options);
Console.WriteLine($"任务链执行状态: {chainResult.Status}");
if (chainResult.Status == TaskChainStatus.Completed)
{
Console.WriteLine("任务链执行成功!");
// 输出每个任务的结果
foreach (var taskResult in chainResult.TaskResults)
{
Console.WriteLine($"- 任务: {taskResult.TaskName}, " +
$"状态: {taskResult.Status}, " +
$"执行时间: {taskResult.ExecutionTime.TotalMilliseconds}ms");
}
}
else
{
Console.WriteLine($"任务链执行失败: {chainResult.FailureReason}");
Console.WriteLine($"失败任务: {chainResult.FailedTaskName}");
}
}
}
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
net8.0
- Quartz (>= 3.15.1)
- Quartz.AspNetCore (>= 3.15.1)
- SqlSugarCore (>= 5.1.4.210)
- TJC.Cyclops.Common (>= 2025.12.4.1)
- TJC.Cyclops.Redis (>= 2025.12.4.1)
- TJC.Cyclops.Web.Core (>= 2025.12.4.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2026.1.7.1 | 91 | 1/7/2026 |
| 2025.12.23.1 | 183 | 12/23/2025 |
| 2025.12.16.1 | 298 | 12/16/2025 |
| 2025.12.15.2 | 243 | 12/15/2025 |
| 2025.12.15.1 | 221 | 12/15/2025 |
| 2025.12.12.1 | 144 | 12/12/2025 |
| 2025.12.11.1 | 428 | 12/11/2025 |
| 2025.12.4.1 | 212 | 12/4/2025 |
| 2025.12.3.3 | 677 | 12/3/2025 |
| 2025.12.3.2 | 687 | 12/3/2025 |
| 2025.12.3.1 | 676 | 12/3/2025 |
| 2025.12.2.1 | 681 | 12/2/2025 |
| 2025.11.28.1 | 171 | 11/28/2025 |
| 2025.11.25.1 | 211 | 11/25/2025 |
| 2025.11.21.1 | 413 | 11/21/2025 |
| 2025.11.20.1 | 412 | 11/20/2025 |
| 2025.11.19.2 | 411 | 11/19/2025 |
| 2025.11.19.1 | 404 | 11/19/2025 |
| 2025.11.18.2 | 411 | 11/18/2025 |
| 2025.11.18.1 | 424 | 11/18/2025 |
Loading failed
企服版任务核心