MyLab.Search.IndexerClient 2.0.0

dotnet add package MyLab.Search.IndexerClient --version 2.0.0                
NuGet\Install-Package MyLab.Search.IndexerClient -Version 2.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MyLab.Search.IndexerClient" Version="2.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MyLab.Search.IndexerClient --version 2.0.0                
#r "nuget: MyLab.Search.IndexerClient, 2.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MyLab.Search.IndexerClient as a Cake Addin
#addin nuget:?package=MyLab.Search.IndexerClient&version=2.0.0

// Install MyLab.Search.IndexerClient as a Cake Tool
#tool nuget:?package=MyLab.Search.IndexerClient&version=2.0.0                

MyLab.Search.Indexer

Docker образ: Docker image

Спецификация API : API specification

Клиент: NuGet

Индексирует данные из базы данных и/или RabbitMQ в ElasticSearch.

Обзор

alternate text is missing from this package README image

На схеме выше показаны все участники процесса индексации и их связи.

По пути передачи документов для индексации различаются:

  • БД - индексация по расписанию из БД (синхронизация БД и индексов поиска в Elasticsearch);
  • API - REST-API для синхронной индексации;
  • MQ - отложенная индесация через очередь RabbitMQ.

Синхронизация БД различает два способа организации хранения докеументов:

  • heap - документы создаются и изменяются в произвольное время;
  • stream - идентификаторы документов - последовательное число; документы никогда не изменяются.

Операции индексации через API или MQ:

  • post - создание нового документа в индексе;
  • put - создание нового документа или полное обновление индесированного ранее (сопоставление по идентификатору);
  • patch - частичное обновление индексированного документа (сопоставление по идентификатору, передаётся частичное содержание тела документа);
  • delete - удаление индексированного документа (передаётся идентификатор);
  • kick - добавление или полное обновление документа из БД (передаётся идентификатор).

База данных является приоритетным источником, т.е. если данные об индексируемой сущности приходят через очередь или api, то они перезапишутся данными из БД в ближайшей итерации по расписанию;

Множественная индексация

Приложение Indexer определяет абстракцию индекс, объединяющую настроки индексации документов из БД, очереди и api. Indexer через конфигурацию поддерживает указание нескольких индексов, что позволяет использовать один экземпляр Indexer для индекксирования нескольких типов документов, которые индексируются в разных индексах Elasticsearch по разным правилам.

Все объявленные индексы синхронизируются последовательно, в соответствии с конфигурацией в рамках каждой итерации. При передаче сообщения через MQ, идентификатор индекса передаётся в теле сообщения. В случае передаче запроса через API, идентификатор индекса передаётся в адресе запроса.

Индексация БД

Индексайия БД по сути - это синхранизация документов в БД с индексом в Elasticsearch.

Различаются два вида организации данных в БД:

  • heap (куча):
  • stream (поток)

Heap

В куче записи могут не иметь явной логической порследовательности. Идентификаторы записей могут иметь строковой или численный тип. Записи могут изменяться в любое время.

Записи должны иметь поле типа дата+время, которое отражает дату и время последнего изменения записи. Поле может быть nullable. Пустое значение будет интерпретироваться как то, что запись никогда не была изменена с момента её создания.

Примером такой кучи может быть база заявок, статус которых может меняться со временем.

Stream

В потоке записи логически выстроены во времени. Идентификатор записи - целочисленное поле с автоинкрементом. Т.е. более поздние записи имет больший идентификатор. Записи в такой базе не должны меняться со временем.

Например потока - протокол действий пользователя.

Синхронизация БД

Запуск

Синхронизация БД происходит по запросу в Task-API сторонним процессом:

POST /processing

При этом запуск синхронизации выполняется асинхронно, т.е. обработка запроса заканчивается правкитески моментально, а процесс синхронизации продолжает работу. Если запрос пришёл во время синхронизации, то он будет проигнорирован.

Поддерживается получение статуса процесса синхронизации:

GET /processing

Описание статуса обработки тут.

Выборка

Более подробно о настройках в разделе Конфигурирование

Более подробно о файлах ресурсов в разделе Файлы ресурсов

В каждой итерации индексатор перебирает все индексы из конфигурации, для которых не отключена синхронизация, в порядке их описания. Доступность синхронизации для индекса опредлеяется в настройках индекса в поле EnableSync.

По каждому индексу последовательно происходит индексация новых данных из БД:

  • загружается скрипт выборки: из поля конфигурации SyncDbQuery или из соответствующего файла
  • загружается предыдущее состояние выборки seed или устанавливается минимальное
    • для heap - дата+время изменения последнего проиндексированного докумекнта
    • для stream - идентификатор последнего проиндексированного документа
  • происходит порциональная выборка данных и по каждой порции:
    • размер порции определяется настройками индекса в поле SyncPageSize
    • документы индексируются в индексе Elasticsearch, имя которго указано в настройках индекса в поле EsIndex или используется идентификатор инлекса из запроса;
    • сохраняется состояние выборки seed
      • для heap - дата+время изменения последнего проиндексированного докумекнта
      • для stream - идентификатор последнего проиндексированного документа

Требования к скрипту синхронизации

Скрипт синхронизации индекса может быть указан:

  • в настройках индекса в поле SyncDbQuery
  • в файле sync.sql из директории ресурсов индекса

Более подробно требованиях к скрипту синхронизации в разделе Файлы ресурсов

Индексация через API

Актуальная спецификация API: API specification.

Индексация через API - методика передачи запросов через REST-API индесатора для выполнения одиночных синхронных операций с индексами.

API позволяет выполнить запросы:

  • post - создать новый документ в индексе:
    • ошибка, если документ уже существует
    • тело документа передаётся полностью
  • put - создать новый документ в индексе или полностью обновить документ в индексе
    • тело документа передаётся полностью
  • patch - частично обновить документ в индексе
    • тело запроса может содержать часть документа - только те поля. которые надо заменить или добавить
  • kick - выполнить синхронизацию документа из БД с индексом
    • передаётся идентификатор документа
    • ошибка, если не найден
  • delete - осуществляется удаление документа из индекса
    • передаётся идентификатор документа

Термины, приведённые в списке по большей части сооответствуют, но не являются http-методами.

Каждый метод API выполняет действие над одним документом. Документ должен иметь поле с именем id (в любом регистре), содержащим идентификатор документа.

Индексация через очередь

Более подробно о настройках в разделе Конфигурирование

Индексация через очередь - слабосвязанная отложенная индексация через брокер очередей RabbitMQ. Имя очереди, которую слушает индексатор, указывается в настройках в поле MqQueue.

Индексация через очередь позволяет выполнить операции:

  • post - создать новый документ в индексе:
    • тело документа передаётся полностью
  • put - создать новый документ в индексе или полностью обновить документ в индексе
    • тело документа передаётся полностью
  • patch - частично обновить документ в индексе
    • тело запроса может содержать часть документа - только те поля. которые надо заменить или добавить
  • kick - выполнить синхронизацию документа из БД с индексом
    • передаётся идентификатор документа
    • ошибка, если не найден
  • delete - осуществляется удаление документа из индекса
    • передаётся идентификатор документа

Каждое сообщение содержит:

  • операции только для одного индекса
  • массивы каждого типа операций: post, put, patch, kick, delete

Пример сообщения:

{
    "indexId": "foo",
    "kick": [
        "979968895"
    ]
}

Json-хема сообщения тут.

Передаваемые в списках операций документы должны иметь поле с именем id (в любом регистре), содержащим идентификатор документа.

Kick индексация

Более подробно о настройках в разделе Конфигурирование

Более подробно о файлах ресурсов в разделе Файлы ресурсов

Kick-индексайия - методика индексирования, когда сервис индексирует отдельный документ или документы из БД, идентификаторы которых были ему переданы.

Kick-индексация может быть выполнена с одиночным документом через REST-API или с несколькими документами через очередь.

При Kick-индексации:

  • загружается скрипт выборки из настроек индекса поля KickDbQuery или соответствующего файла
  • происходит выборка указанных документов из БД
  • документы индексируются с помощью put-индексации, т.е. документы создаются в индексе или заменяют ранее проиндексированные доекменты с тем же идентификатором

Требования к скрипту выборки

Более подробно о файлах ресурсов в разделе Файлы ресурсов

Cкрипт выборки индекса может быть указан:

  • в настройках индекса в поле KickDbQuery
  • в файле kick.sql из директории ресурсов индекса

Более подробно требованиях к скрипту выборки в разделе Файлы ресурсов

Создание Elasticsearch индексов

Более подробно о настройках в разделе Конфигурирование

При запуске индексатор перебирает все указанные в настройках индексы.

По каждому индексу выполняется:

  • определение имени индекса в Elasticsearch, которое берётся из настроек индекса из поля EsIndex или используется идентификатор инлекса из запроса;
  • проверка существования индекса в Elasticsearch по имени
  • если индекс не существует, то:
    • загружается запрос создания индекса из файла index.json из директории файлов ресурсов индекса
    • создаётся индекс в Elasticsearch

Более подробно о файле в разделе Файлы ресурсов

Конфигурирование

Настроки конфигурации делятся на следующие группы, представленные узлами конфигурации:

  • DB - настройки работы с БД;
  • MQ - настройки работы с RabbitMQ;
  • ES - настройки работы с ElasticSearch;
  • Indexer - настройки логики индексирования.

DB настроойки

Формат узла конфигурации должен соответствовать формату MyLab.Db со строкой подключения по умолчанию. Кроме того, в узле должны быть указаны дополнительные параметры:

  • Provider - имя поставщика данных (характеризует субд):
    • sqlite
    • mysql
    • oracel

Пример узла конфигурации DB:

{
  "DB": {
    "User": "foo",
    "Password": "bar",
    "ConnectionString": "Server=myServerAddress;Database=myDataBase;Uid={User};Pwd={Password};",
    "Provider": "sqlite"
  }
}

MQ настройки

Формат узла конфигурации должен соответствовать формату MyLab.Mq:

{
  "MQ": {
    "Host" : "myhost.com",
    "VHost" : "test-host",
    "User" : "foo",
    "Password" : "foo-pass"
  }
}

ES настройки

Данный узел должен содержать следущие параметры:

  • Url - адрес подключения к ElasticSearch .

Пример узла конфигурации ES:

{
  "ES": {
    "Url" : "http://localhost:9200"
  }
}

Indexer настройки логики индексирования

Данный узел должен содержать следущие параметры:

  • ResourcePath - базовый путь к директориям ресурсов индексов. По умолчанию - /etc/mylab-indexer/indexes;
  • SeedPath - базовый путь к директории, где будут храниться seedиндексов. По умолчанию - /var/libs/mylab-indexer/seeds
  • EsIndexNamePrefix - префикс, который будет добавляться к имени индекса Elasticsearch всех индексов (будет переведён в нижний регистр);
  • EsIndexNamePostfix - постфикс, который будет добавляться к имени индекса Elasticsearch всех индексов (будет переведён в нижний регистр);
  • MqQueue - имя очереди в RabbitMQ для входящих сообщений индексации;
  • Indexes - настройки индексов:
    • Id - литеральный идентификатор индекса. Например, users при индексации информации о пользователях;
    • IndexType - тип организации индексируемых данных: Heap (по умолчанию) / Stream;
    • IdPropertyType - тип свойства, идентифицирующего сущность: String/Int. Нет значения по умолчанию;
    • EnabledSync - включение синхронизации БД: true(по умолчанию) / false;
    • SyncPageSize - размер страницы выборки при синхронизации;
    • SyncDbQuery - SQL запрос выборки данных для индексации при синхронизации;
    • KickDbQuery - SQL запрос выборки данных для kick-индексации;
    • EsIndex - если указан, определяет имя целевого индекса в Elasticsearcch (будет переведён в нижний регистр).
  • DefaultIndexOptions - настройки индексов по умолчанию (если нет соответствующей записи в Indexes)
    • IdPropertyType - тип свойства, идентифицирующего сущность: String/Int. Нет значения по умолчанию;
    • IndexType - тип организации индексируемых данных: Heap (по умолчанию) / Stream;

Пример узла конфигурации Indexer:

{
  "Indexer": {
	"MqQueue": "my-queue",
    "Indexes":[ 
      {
        "Id": "users",
        "IdPropertyType" : "Int",
        "LastChangeProperty" : "LastChangeDt",
        "SyncPageSize": "100",
        "SyncDbQuery": "select Id, Content from test_tb where Id > @seed limit @limit offset @offset",
        "EsIndex": "users-index"      
      }
    ]
  }
}

Файлы ресурсов

Файлы реусурсов индексов распологаются в директории, указанной в настройках индексатора в поле ResourcePath. По умолчанию /etc/mylab-indexer/indexes .

Индексатор ищет файлы ресурсов индексов в поддиректории с имененм, соответствующим идентифиактору индекса.

Например, для индекса foo, директория с ресурсами будет

/etc/mylab-indexer/indexes/foo

sync.sql

Файл sync.sql содержит SQL-скрипт для выборки данных из БД с целью синхронизации с индексом.

Скрипт должен содержать:

  • список выранных полей с именами или псевданимами (alias), соответствующими именам полей индексируемых документов, включая регистр;

  • список выбранных полей должен содержать поле с именем id (в любом регистре), содержащий идентификатор документа;

  • параметры пагинации с применением переменных offset и limit, которые указывают сдвиг и лимит выборки, соответственно;

  • условие выборки с применением переменной seed:

    • для heap:

      • тип переменной seed определяется приложением, как дата+время

      • условие сравнения должно быть ([value] - условное обозначение):

        • [value] > @seed если значение не-nullable
        • [value] is null or [value] > @seed если значение nullable
      • значение для сравнения должно содержать дату+время последнего изменения записи

    • для stream:

      • тип переменной seed определяется приложением, как целочисленное
      • условие сравнения должно быть value > @seed
      • значение для сравнения должно содержать идентификатор записи

Пример скрипта:

select id, content from test_doc where changed is null or changed > @seed limit @offset, @limit

kick.sql

Файл kick.sql содержит SQL-скрипт для выборки данных БД по идентификаторам с целью индексации этих данных.

Скрипт должен содержать:

  • список выранных полей с именами или псевданимами (alias), соответствующими именам полей индексируемых документов, включая регистр;
  • список выбранных полей должен содержать поле с именем id (в любом регистре), содержащий иеднтификатор документа;
  • условие выборки с применением переменной id в виде ([value] - условное обозначение):
    • ... where [value] in (@id) если нужна поддержка множественной kick-индексации
    • ... where [value] = @id если нет MQ взаимодействия

Пример скрипта:

select id, content from test_doc where id in (@id)

index.json

Файл index.json содержит тело запроса создания индекса в Elasticsearch.

Содержимое файла должно быть в формате JSON и соответствовать документации Elasticsearch.

Общий index.json

По содержанию аналогичен index.json для конкретного индекса. При наличии, объединяется с index.json конкретного индекса или используется вместо него, если тот отсутствует. Располагается в корне директории ресурсов индексов.

Развёртывание

Развёртывание сервиса предусмотрено в виде docker-контейнера.

Пример docker-compose.yml файла:

version: '3.2'

services:
  mylab-search-indexer:
    container_name: mylab-search-indexer
    image: ghcr.io/mylab-search-fx/indexer:latest
    volumes:
    - ./appsettings.json:/app/appsettings.json
    - ./new-index-request.json:/etc/mylab-indexer/indexes/users/index.json

Клиент

Для индексатора разработан клиент на .NET Core, доступный в виде nuget пакета - MyLab.Search.IndexerClient.

Ключ клиента для конфигурирования - indexer. Подробнее о конфигурировании клиента - тут.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp3.1 is compatible. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.0.0 474 7/7/2022
1.5.9 445 5/25/2022
1.4.7 770 12/9/2021
1.4.6 347 12/6/2021
1.4.5 268 12/3/2021
1.4.4 274 12/3/2021
1.3.4 845 12/3/2021