MyLab.AsyncProcessor.Sdk 1.8.12

dotnet add package MyLab.AsyncProcessor.Sdk --version 1.8.12                
NuGet\Install-Package MyLab.AsyncProcessor.Sdk -Version 1.8.12                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MyLab.AsyncProcessor.Sdk" Version="1.8.12" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MyLab.AsyncProcessor.Sdk --version 1.8.12                
#r "nuget: MyLab.AsyncProcessor.Sdk, 1.8.12"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MyLab.AsyncProcessor.Sdk as a Cake Addin
#addin nuget:?package=MyLab.AsyncProcessor.Sdk&version=1.8.12

// Install MyLab.AsyncProcessor.Sdk as a Cake Tool
#tool nuget:?package=MyLab.AsyncProcessor.Sdk&version=1.8.12                

MyLab.AsyncProcessor

SDK: NuGet Version and Downloads count

Docker образ: Docker image

Спецификация API: API specification API specification

Поддерживаемые платформы: .NET Core 3.1+

Ознакомьтесь с последними изменениями в журнале изменений.

Обзор

MyLab.AsyncProcessor - платформа для разработки асинхронных обработчиков запросов с использованием микросервисной архитектуры.

mylab-asyncproc-simple

Архитектура асинхронного процессора подразумевает, что:

  • взаимодействие происходит через API
  • обработка запроса происходит в обработчике запросов бизнес-логики BizProc
  • BizProcполучает запросы по событийной модели
  • API и BizProcмогут быть масштабированы

На диаграмме ниже изображено взаимодествие клиента с асинхронным процессором через API:

alternate text is missing from this package README image

Также опционально можно использовать событийную можель взаимодействия:

alternate text is missing from this package README image

Внутри

alternate text is missing from this package README image

Основные компоненты асинхронного процессора:

  • API - обрабатывает входящие служебные запросы для подачи бизнес-запроса на обработку, получения статуса обработки, а также для получения результатов обработки;
  • BizProc - бизнес-процессор. Выполняет обрабтку поступившего бизнес-запроса в соответствии со специической логикой;
  • Redis - используется для временного хранения состояния выполнения запроса и информации о результате его выполнения;
  • RabbitMQ - используется для организации событийной модели при передаче запросов от API к BizProc.

Особенности взаимодействия компонентов:

  • API передаёт бизнес-запрос на обработку через очередь или через обменник. Запрос от клиента может содержать routing-key, который будет использоваться в случае с обменником;
  • BizProc сообщает об изменении состояния обработки запроса напряму в API;
  • API хранит состояние обработки запроса и результаты в Redis;
  • обработка бизнес-запроса не обязательно должна выполняться в один поток в пределах BizProc. Но BizProc ответственнен за начало этой обработки;
  • если в настройках указан Сallback (обменник), то API при изменении состояния запроса будет отправлять сообщение в этот обменник с соответствующим содержанием. В качестве ключа ротуинга будет использоваться переданный в запросе параметр callbackRouting.

Запрос

Структура

CreateRequest:
      type: object
      description: Contains request parameters
      properties:
      	requestId:
          type: string
          description:  Defines predefined request id
          example: '54f5d4d35d47497a92386e46acc6d3be'
        procRouting:
          type: string
          description: Defines queue routing to send message to processor
          example: big
        callbackRouting:
          type: string
          description: Defines queue routing to send callback messages through callback exchange
          example: client1
        content: 
          type: object
          additionalProperties: true
          description: Request content
          example: { "value": "123", "dt": "10.10.2010"  }
      required:
      - content

Жизненный цикл

alternate text is missing from this package README image

Время жизни запроса

В настройках API предусмотрено два параметра, влияющих на время жизни запроса:

  • ProcessingTimeout - период, выделенный на обработку запроса. Если опистечении этого времени процессор не сообщил в API об окончании обработки, то запрос удаляется. Это приводит к истечению срока жизни записи в Redis и сообщения в RabbitMQ;
  • RestTimeout - период действия данных о запросе, которые можно запросить через API после перехода запроса в состояние Completed. По истечение этого срока, записи о запросе в Redis экспирируются и API на все запросы по нему отвечает 404.

Статус запроса

Содержание статуса

Статус запроса содержит следующие параметры:

  • processStep - этап жизни запроса:
    • pending
    • processing
    • completed
  • bizStep - этап обработки в терминах бизнес-процесса. Перечень значений зависит от конкурентной предметной области;
  • successful- флаг, указывающий фактор успешности обработки запроса на шаге Completed. Если false - в процессе обработки, возникла ошибка, при этом должно быть заполнено поле error;
  • error - содержит информацию об ошибке, возникшей в процессе обработки запроса;
    • bizMgs - сообщение бизнес-уровня, понятное пользователю;
    • techMgs - техническое сообщение. Обычно, сообщение из исключения;
    • techInfo - развёрнутое техническое описание ошибки. Обычно - stacktrace исключения;
  • resultSize - размер результата обработки запроса;
  • resultMime - MIME тип результата обработки запроса.

Подробнее - тут.

Бизнес-шаг обработки (BizStep)

BizStep предназначен для того, чтобы детализировать этапы обработки запроса, специфичные для конкретной задачи. Эти шаги устанавливаются только в процессе обработки запроса, т.е. в то время, когда processStep == processing.

alternate text is missing from this package README image

API асинхронного процессора

Docker контейнер API

API доступно для развёртывания в качестве docker контейнера. Образ расположен на docker-hub.

Pull-команда:

docker pull ozzyext/mylab-async-proc-api

Конфигурация API

API поддерживает удалённый конфиг.

Настройки логики AsyncProc

Имя узла конфигурации - AsyncProc. Ниже приведён пример конфигурации:

{
  "RedisKeyPrefix": "myorder-async-proc:requests:",
  "ProcessingTimeout": "01:00",
  "RestTimeout": "1",
  "QueueExchange": "myorder:proc-exchange",
  "QueueRoutingKey": "myorder:proc-queue",
  "DeadLetter": "myorder:dead-letter",
  "Callback": "myorder:callback-exchange"
}

, где:

  • RedisKeyPrefix - префикс для ключей в Redis, в которых будет хранится состояние обработки запроса;
  • ProcessingTimeout - период жизни запроса в одижании обработки. В формате TimeSpan;
  • RestTimeout - период жизни запроса после перехода в конечное состояние Completed (после обработки). В формате TimeSpan;
  • QueueExchange - обменникRabbitMQ, в который будут подаваться сообщения для BizProc. Может быть не указан, если публикация должна осуществляться сразу в очередь;
  • QueueRoutingKey - очередь для публикации, если не указан QueueExchange или ключ маршрутизации (routing key) по умолчанию. Опциональный параметр;
  • DeadLetter - определяет имя обменника, куда будут направлены сообщения из основной очереди, если произойдут сбои обработки. Опциональный параметр;
  • Callback - определяет обменник для отправки уведомлений об изменении состояния запроса.

Для параметров, содержащих период времени, формат значения должен соответствовать формату, поддерживаемому TimeSpan:

  • 6 - 6 дней;
  • 03:00 - 3 часа;
  • 00:01 - 1 минута.

Подробнее о форматах TimeSpan тут.

Настройки подключения к RabbitMQ

Нстройки подключения к RabbitMQ заключены в узле Mq и соответствуют параметрам конфигурации MyLab.RabbitClient.

Настройки подключения к Redis

Нстройки подключения к Readis заключены в узле Redis и соответствуют параметрам конфигурации MyLab.RedisAdapter.

Настройки OpenTelemetry

Настройки OpenTelemetry заключены в узле Otlp и расширяют базовые настройки экспортёра телеметрии.

Основные базовые параметры:

  • Endpoint - конечная точка, куда экспортёр будет отправялть телеметрию;
  • Protocol - протокол передачи телеметрии:
    • grpc for protobuf-encoded data using gRPC wire format over HTTP/2 connection;
    • http/protobuf for protobuf-encoded data over HTTP connection;
    • http/json for JSON-encoded data over HTTP connection.

Специфичные параметры AsyncProc:

  • Enabled - true/false, включена/выключена сборка телеметрии;

  • ServiceName - имя сервиса, которе будет фигурировать в экспортированной телеметрии.

Событийная модель

API асинхронного процессора поддерживает событийную модель взаимодействия. При изменении состояния запроса будет отправляться сообщение в обменник с соответствующим содежанием. Обменник указывается через конфигурацию - праметр Callback. Если требуется указать роутинг для обменникка, то необходимо указать его значение в запросе в поле callbackRouting .

Инициирующие события:

  • изменился шаг обработки запроса
  • изменился бизнес-шаг обрабтки запроса
  • обработка завершена с ошибкой
  • обработка завершена с результатом

Схема сообщения:

alternate text is missing from this package README image

Схема сообщения - тут.

Разработка процессора бизнес-запросов (BizProc)

Порядок действий

Для разработки процессора бизнес-запросов необходимо:

  • Создать проект .NET Core Web API;

  • Добавить nuget MyLab.AsyncProcessor.Sdk;

  • Добавить сборку с классом - моделью бизнес-запроса, который будет направляться в API для обработки;

  • Разработать логику обработки запроса, реализующую интерфейс IAsyncProcessingLogic<T>, где T - класс-модель бизнес-запроса;

  • В методе конфигурирования приложения, интегрировать логику асинхронного процессора:

    public class Startup
    {
         public Startup(IConfiguration configuration)
         {
             Configuration = configuration;
         }
    
         public IConfiguration Configuration { get; }
    
         public void ConfigureServices(IServiceCollection services)
         {
             //...
    
             services.AddAsyncProcessing<TestRequest, ProcessingLogic>(Configuration);
         }
    }
    

    , где:

    • TestRequest - класс-модель бизнес-запроса;
    • ProcessingLogic - логику обработки запроса.

Логика обработки

Класс логики обработки должен реализовывать интерфейс IAsyncProcessingLogic<T>. Интерфейс определяет один метод:

Task ProcessAsync(AsyncProcRequest<T> request, IProcessingOperator op);

, где:

  • request - объект запроса асинхронной обработки:
    • Id - идентификатор запроса, назначенный для асинхронной обработки;
    • IncomingDt - дата и время поступления запроса;
    • Content - содержательная часть исходного запроса, десериализованная в тип T;
    • Headers - заголовки исходного запроса;
  • op - объект предоставляющий необходимые средства для изменений состояния запроса.

IProcessingOperator предоставляет следующие методы:

  • SetBizStepAsync - устанавливает этап обработки бизнес-уровня;
  • CompleteWithErrorAsync - завершает обработку запроса с ошибкой;
  • CompleteWithResultAsync - завершает обработку запроса с результатом;
  • CompleteAsync - завершает обработку запроса без результата. Используется в случае, когда выполнение запроса не подразумевает возврат какого-либо результата.

Необработанные ошибки

Механизм, использующий логику обработки бизнес-процессов при вызове метода ProcessAsync перехватывает необработанные исключения и сам завершает обработку запроса с ошибкой. Поэтому делать это в объекте логике не обязательно:

//Нет необходимости так делать!

class ProcessingLogic : IAsyncProcessingLogic<TestRequest>
{
    public Task ProcessAsync(AsyncProcRequest<TestRequest> request, IProcessingOperator op)
    {
        try
        {
        	//...
        }
        catch (Exception e)
        {
        	return op.CompleteWithErrorAsync(e);
        }
    }
}

Проверка работоспособности

Проверка рбаотоспособности осуществляется через механизм HealthCheck. Результат проверки доступен по относительному адресу /health.

Пример запроса:

GET http://localhost/health

Пример ответа:

200 OK

Content-Type: text/plain

Healthy
Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.1 is compatible. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.8.12 258 3/10/2023
1.8.11 285 2/1/2023
1.8.10 478 5/26/2022
1.7.10 692 2/24/2022
1.7.9 481 2/24/2022
1.7.8 461 2/18/2022
1.6.8 367 12/7/2021
1.5.8 523 9/21/2021
1.5.7 398 9/16/2021
1.4.7 365 9/16/2021
1.4.6 374 9/16/2021
1.4.5 368 9/16/2021
1.3.5 390 9/8/2021
1.2.5 379 7/28/2021
1.2.4 407 7/28/2021
1.2.3 619 2/8/2021
1.2.2 454 2/4/2021
1.1.2 468 12/10/2020
1.1.1 698 11/24/2020
1.1.0 464 11/24/2020
1.0.0 532 11/11/2020