MyLab.Search.IndexerClient
2.0.0
dotnet add package MyLab.Search.IndexerClient --version 2.0.0
NuGet\Install-Package MyLab.Search.IndexerClient -Version 2.0.0
<PackageReference Include="MyLab.Search.IndexerClient" Version="2.0.0" />
paket add MyLab.Search.IndexerClient --version 2.0.0
#r "nuget: MyLab.Search.IndexerClient, 2.0.0"
// Install MyLab.Search.IndexerClient as a Cake Addin #addin nuget:?package=MyLab.Search.IndexerClient&version=2.0.0 // Install MyLab.Search.IndexerClient as a Cake Tool #tool nuget:?package=MyLab.Search.IndexerClient&version=2.0.0
MyLab.Search.Indexer
Индексирует данные из базы данных и/или RabbitMQ
в ElasticSearch
.
Обзор
На схеме выше показаны все участники процесса индексации и их связи.
По пути передачи документов для индексации различаются:
БД
- индексация по расписанию из БД (синхронизация БД и индексов поиска вElasticsearch
);API
-REST-API
для синхронной индексации;MQ
- отложенная индесация через очередьRabbitMQ
.
Синхронизация БД различает два способа организации хранения докеументов:
heap
- документы создаются и изменяются в произвольное время;stream
- идентификаторы документов - последовательное число; документы никогда не изменяются.
Операции индексации через API
или MQ
:
post
- создание нового документа в индексе;put
- создание нового документа или полное обновление индесированного ранее (сопоставление по идентификатору);patch
- частичное обновление индексированного документа (сопоставление по идентификатору, передаётся частичное содержание тела документа);delete
- удаление индексированного документа (передаётся идентификатор);kick
- добавление или полное обновление документа из БД (передаётся идентификатор).
База данных является приоритетным источником, т.е. если данные об индексируемой сущности приходят через очередь или api
, то они перезапишутся данными из БД в ближайшей итерации по расписанию;
Множественная индексация
Приложение Indexer
определяет абстракцию индекс, объединяющую настроки индексации документов из БД, очереди и api
. Indexer
через конфигурацию поддерживает указание нескольких индексов, что позволяет использовать один экземпляр Indexer
для индекксирования нескольких типов документов, которые индексируются в разных индексах Elasticsearch
по разным правилам.
Все объявленные индексы синхронизируются последовательно, в соответствии с конфигурацией в рамках каждой итерации. При передаче сообщения через MQ
, идентификатор индекса передаётся в теле сообщения. В случае передаче запроса через API
, идентификатор индекса передаётся в адресе запроса.
Индексация БД
Индексайия БД по сути - это синхранизация документов в БД с индексом в Elasticsearch
.
Различаются два вида организации данных в БД:
heap
(куча):stream
(поток)
Heap
В куче записи могут не иметь явной логической порследовательности. Идентификаторы записей могут иметь строковой или численный тип. Записи могут изменяться в любое время.
Записи должны иметь поле типа дата+время, которое отражает дату и время последнего изменения записи. Поле может быть nullable
. Пустое значение будет интерпретироваться как то, что запись никогда не была изменена с момента её создания.
Примером такой кучи может быть база заявок, статус которых может меняться со временем.
Stream
В потоке записи логически выстроены во времени. Идентификатор записи - целочисленное поле с автоинкрементом. Т.е. более поздние записи имет больший идентификатор. Записи в такой базе не должны меняться со временем.
Например потока - протокол действий пользователя.
Синхронизация БД
Запуск
Синхронизация БД происходит по запросу в Task-API
сторонним процессом:
POST /processing
При этом запуск синхронизации выполняется асинхронно, т.е. обработка запроса заканчивается правкитески моментально, а процесс синхронизации продолжает работу. Если запрос пришёл во время синхронизации, то он будет проигнорирован.
Поддерживается получение статуса процесса синхронизации:
GET /processing
Описание статуса обработки тут.
Выборка
Более подробно о настройках в разделе Конфигурирование
Более подробно о файлах ресурсов в разделе Файлы ресурсов
В каждой итерации индексатор перебирает все индексы из конфигурации, для которых не отключена синхронизация, в порядке их описания. Доступность синхронизации для индекса опредлеяется в настройках индекса в поле EnableSync
.
По каждому индексу последовательно происходит индексация новых данных из БД:
- загружается скрипт выборки: из поля конфигурации
SyncDbQuery
или из соответствующего файла - загружается предыдущее состояние выборки
seed
или устанавливается минимальное- для
heap
- дата+время изменения последнего проиндексированного докумекнта - для
stream
- идентификатор последнего проиндексированного документа
- для
- происходит порциональная выборка данных и по каждой порции:
- размер порции определяется настройками индекса в поле
SyncPageSize
- документы индексируются в индексе
Elasticsearch
, имя которго указано в настройках индекса в полеEsIndex
или используется идентификатор инлекса из запроса; - сохраняется состояние выборки
seed
- для
heap
- дата+время изменения последнего проиндексированного докумекнта - для
stream
- идентификатор последнего проиндексированного документа
- для
- размер порции определяется настройками индекса в поле
Требования к скрипту синхронизации
Скрипт синхронизации индекса может быть указан:
- в настройках индекса в поле
SyncDbQuery
- в файле
sync.sql
из директории ресурсов индекса
Более подробно требованиях к скрипту синхронизации в разделе Файлы ресурсов
Индексация через API
Актуальная спецификация API: .
Индексация через API - методика передачи запросов через REST-API индесатора для выполнения одиночных синхронных операций с индексами.
API позволяет выполнить запросы:
post
- создать новый документ в индексе:- ошибка, если документ уже существует
- тело документа передаётся полностью
put
- создать новый документ в индексе или полностью обновить документ в индексе- тело документа передаётся полностью
patch
- частично обновить документ в индексе- тело запроса может содержать часть документа - только те поля. которые надо заменить или добавить
kick
- выполнить синхронизацию документа из БД с индексом- передаётся идентификатор документа
- ошибка, если не найден
delete
- осуществляется удаление документа из индекса- передаётся идентификатор документа
Термины, приведённые в списке по большей части сооответствуют, но не являются http-методами.
Каждый метод API выполняет действие над одним документом. Документ должен иметь поле с именем id
(в любом регистре), содержащим идентификатор документа.
Индексация через очередь
Более подробно о настройках в разделе Конфигурирование
Индексация через очередь - слабосвязанная отложенная индексация через брокер очередей RabbitMQ
. Имя очереди, которую слушает индексатор, указывается в настройках в поле MqQueue
.
Индексация через очередь позволяет выполнить операции:
post
- создать новый документ в индексе:- тело документа передаётся полностью
put
- создать новый документ в индексе или полностью обновить документ в индексе- тело документа передаётся полностью
patch
- частично обновить документ в индексе- тело запроса может содержать часть документа - только те поля. которые надо заменить или добавить
kick
- выполнить синхронизацию документа из БД с индексом- передаётся идентификатор документа
- ошибка, если не найден
delete
- осуществляется удаление документа из индекса- передаётся идентификатор документа
Каждое сообщение содержит:
- операции только для одного индекса
- массивы каждого типа операций:
post
,put
,patch
,kick
,delete
Пример сообщения:
{
"indexId": "foo",
"kick": [
"979968895"
]
}
Json
-хема сообщения тут.
Передаваемые в списках операций документы должны иметь поле с именем id
(в любом регистре), содержащим идентификатор документа.
Kick индексация
Более подробно о настройках в разделе Конфигурирование
Более подробно о файлах ресурсов в разделе Файлы ресурсов
Kick-индексайия - методика индексирования, когда сервис индексирует отдельный документ или документы из БД, идентификаторы которых были ему переданы.
Kick-индексация может быть выполнена с одиночным документом через REST-API или с несколькими документами через очередь.
При Kick-индексации:
- загружается скрипт выборки из настроек индекса поля
KickDbQuery
или соответствующего файла - происходит выборка указанных документов из БД
- документы индексируются с помощью
put
-индексации, т.е. документы создаются в индексе или заменяют ранее проиндексированные доекменты с тем же идентификатором
Требования к скрипту выборки
Более подробно о файлах ресурсов в разделе Файлы ресурсов
Cкрипт выборки индекса может быть указан:
- в настройках индекса в поле
KickDbQuery
- в файле
kick.sql
из директории ресурсов индекса
Более подробно требованиях к скрипту выборки в разделе Файлы ресурсов
Создание Elasticsearch индексов
Более подробно о настройках в разделе Конфигурирование
При запуске индексатор перебирает все указанные в настройках индексы.
По каждому индексу выполняется:
- определение имени индекса в
Elasticsearch
, которое берётся из настроек индекса из поляEsIndex
или используется идентификатор инлекса из запроса; - проверка существования индекса в
Elasticsearch
по имени - если индекс не существует, то:
- загружается запрос создания индекса из файла
index.json
из директории файлов ресурсов индекса - создаётся индекс в
Elasticsearch
- загружается запрос создания индекса из файла
Более подробно о файле в разделе Файлы ресурсов
Конфигурирование
Настроки конфигурации делятся на следующие группы, представленные узлами конфигурации:
DB
- настройки работы с БД;MQ
- настройки работы сRabbitMQ
;ES
- настройки работы сElasticSearch
;Indexer
- настройки логики индексирования.
DB
настроойки
Формат узла конфигурации должен соответствовать формату MyLab.Db со строкой подключения по умолчанию. Кроме того, в узле должны быть указаны дополнительные параметры:
Provider
- имя поставщика данных (характеризует субд):sqlite
mysql
oracel
Пример узла конфигурации DB
:
{
"DB": {
"User": "foo",
"Password": "bar",
"ConnectionString": "Server=myServerAddress;Database=myDataBase;Uid={User};Pwd={Password};",
"Provider": "sqlite"
}
}
MQ
настройки
Формат узла конфигурации должен соответствовать формату MyLab.Mq:
{
"MQ": {
"Host" : "myhost.com",
"VHost" : "test-host",
"User" : "foo",
"Password" : "foo-pass"
}
}
ES
настройки
Данный узел должен содержать следущие параметры:
Url
- адрес подключения кElasticSearch
.
Пример узла конфигурации ES
:
{
"ES": {
"Url" : "http://localhost:9200"
}
}
Indexer
настройки логики индексирования
Данный узел должен содержать следущие параметры:
ResourcePath
- базовый путь к директориям ресурсов индексов. По умолчанию -/etc/mylab-indexer/indexes
;SeedPath
- базовый путь к директории, где будут хранитьсяseed
-ы индексов. По умолчанию -/var/libs/mylab-indexer/seeds
EsIndexNamePrefix
- префикс, который будет добавляться к имени индексаElasticsearch
всех индексов (будет переведён в нижний регистр);EsIndexNamePostfix
- постфикс, который будет добавляться к имени индексаElasticsearch
всех индексов (будет переведён в нижний регистр);MqQueue
- имя очереди вRabbitMQ
для входящих сообщений индексации;Indexes
- настройки индексов:Id
- литеральный идентификатор индекса. Например,users
при индексации информации о пользователях;IndexType
- тип организации индексируемых данных:Heap
(по умолчанию) /Stream
;IdPropertyType
- тип свойства, идентифицирующего сущность:String
/Int
. Нет значения по умолчанию;EnabledSync
- включение синхронизации БД:true
(по умолчанию) /false
;SyncPageSize
- размер страницы выборки при синхронизации;SyncDbQuery
-SQL
запрос выборки данных для индексации при синхронизации;KickDbQuery
-SQL
запрос выборки данных дляkick
-индексации;EsIndex
- если указан, определяет имя целевого индекса вElasticsearcch
(будет переведён в нижний регистр).
DefaultIndexOptions
- настройки индексов по умолчанию (если нет соответствующей записи вIndexes
)IdPropertyType
- тип свойства, идентифицирующего сущность:String
/Int
. Нет значения по умолчанию;IndexType
- тип организации индексируемых данных:Heap
(по умолчанию) /Stream
;
Пример узла конфигурации Indexer
:
{
"Indexer": {
"MqQueue": "my-queue",
"Indexes":[
{
"Id": "users",
"IdPropertyType" : "Int",
"LastChangeProperty" : "LastChangeDt",
"SyncPageSize": "100",
"SyncDbQuery": "select Id, Content from test_tb where Id > @seed limit @limit offset @offset",
"EsIndex": "users-index"
}
]
}
}
Файлы ресурсов
Файлы реусурсов индексов распологаются в директории, указанной в настройках индексатора в поле ResourcePath
. По умолчанию /etc/mylab-indexer/indexes
.
Индексатор ищет файлы ресурсов индексов в поддиректории с имененм, соответствующим идентифиактору индекса.
Например, для индекса foo
, директория с ресурсами будет
/etc/mylab-indexer/indexes/foo
sync.sql
Файл sync.sql
содержит SQL
-скрипт для выборки данных из БД с целью синхронизации с индексом.
Скрипт должен содержать:
список выранных полей с именами или псевданимами (alias), соответствующими именам полей индексируемых документов, включая регистр;
список выбранных полей должен содержать поле с именем
id
(в любом регистре), содержащий идентификатор документа;параметры пагинации с применением переменных
offset
иlimit
, которые указывают сдвиг и лимит выборки, соответственно;условие выборки с применением переменной
seed
:для
heap
:тип переменной
seed
определяется приложением, как дата+времяусловие сравнения должно быть (
[value]
- условное обозначение):[value] > @seed
если значение не-nullable
[value] is null or [value] > @seed
если значениеnullable
значение для сравнения должно содержать дату+время последнего изменения записи
для
stream
:- тип переменной
seed
определяется приложением, как целочисленное - условие сравнения должно быть
value > @seed
- значение для сравнения должно содержать идентификатор записи
- тип переменной
Пример скрипта:
select id, content from test_doc where changed is null or changed > @seed limit @offset, @limit
kick.sql
Файл kick.sql
содержит SQL
-скрипт для выборки данных БД по идентификаторам с целью индексации этих данных.
Скрипт должен содержать:
- список выранных полей с именами или псевданимами (alias), соответствующими именам полей индексируемых документов, включая регистр;
- список выбранных полей должен содержать поле с именем
id
(в любом регистре), содержащий иеднтификатор документа; - условие выборки с применением переменной
id
в виде ([value]
- условное обозначение):... where [value] in (@id)
если нужна поддержка множественной kick-индексации... where [value] = @id
если нет MQ взаимодействия
Пример скрипта:
select id, content from test_doc where id in (@id)
index.json
Файл index.json
содержит тело запроса создания индекса в Elasticsearch
.
Содержимое файла должно быть в формате JSON
и соответствовать документации Elasticsearch.
Общий index.json
По содержанию аналогичен index.json
для конкретного индекса. При наличии, объединяется с index.json
конкретного индекса или используется вместо него, если тот отсутствует. Располагается в корне директории ресурсов индексов.
Развёртывание
Развёртывание сервиса предусмотрено в виде docker
-контейнера.
Пример docker-compose.yml
файла:
version: '3.2'
services:
mylab-search-indexer:
container_name: mylab-search-indexer
image: ghcr.io/mylab-search-fx/indexer:latest
volumes:
- ./appsettings.json:/app/appsettings.json
- ./new-index-request.json:/etc/mylab-indexer/indexes/users/index.json
Клиент
Для индексатора разработан клиент на .NET Core
, доступный в виде nuget
пакета - MyLab.Search.IndexerClient.
Ключ клиента для конфигурирования - indexer
. Подробнее о конфигурировании клиента - тут.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp3.1 is compatible. |
-
.NETCoreApp 3.1
- MyLab.ApiClient (>= 3.15.25)
- Newtonsoft.Json (>= 13.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.