OpenSearchで再インデックスバッチを実現する
OpenSearch
Typescript
再インデックスバッチを実現するために、以下を考慮した。
- 現状動いているOpenSearchを止めず、裏でインデックスを切り替える
- 特定のテナントのみ再インデックスできること
- データ初期登録の場合もインデックスできること
- 既存のインデックスがある場合、再インデックス中に更新、削除があった場合データを救う
- batchSizeが大きすぎる場合、一時的にbatchSizeを小さくして再実行する
細かい部分は記載しないが、基本的な処理
async run(input: string): Promise<void> {
const {
batchSize,
tenantIds: inputTenantIds,
indexName,
} = this.validateInput(input);
this.defaultBatchSize = batchSize;
const tenantIds =
inputTenantIds && inputTenantIds.length !== 0
? inputTenantIds
: (await this.tenantRepository.search()).map((tenant) => tenant.id);
const configsToProcess = indexName
? openSearchIndexConfigs.filter((config) => config.name === indexName)
: openSearchIndexConfigs;
for (const { name: aliasName, settings, mappings } of configsToProcess) {
const newIndexName = `${aliasName}_${Date.now()}`;
const sinceTimestamp = new Date();
try {
// 新しいインデックスの作成
await this.openSearchService.createIndex(
newIndexName,
settings,
mappings,
);
// 既存インデックスの確認
const oldIndexName =
(await this.openSearchService.getAlias(aliasName)) ?? undefined;
// 既存データのコピー(部分更新時のみ)
if (oldIndexName && inputTenantIds) {
this.logger.log(
`Reindexing data from ${oldIndexName} to ${newIndexName}`,
);
await this.openSearchService.reIndex(oldIndexName, newIndexName);
}
// バッチ処理の実行
for await (const { tenantId, documents } of this.streamRecords<
AliasTypeMap[typeof aliasName]
>(tenantIds, aliasName)) {
await this.upsertDocumentsWithRetry(
aliasName,
newIndexName,
tenantId,
documents,
);
}
// 既存のインデックスが存在する場合は、差分を同期
if (oldIndexName) {
await this.syncChangedRecords(
aliasName,
newIndexName,
sinceTimestamp,
);
}
// インデックスの切り替えと旧インデックスの削除
await this.openSearchService.updateAlias(
aliasName,
newIndexName,
oldIndexName,
);
if (oldIndexName) {
await this.openSearchService.deleteIndex(oldIndexName);
}
this.logger.log(`Successfully reindexed ${aliasName}`);
} catch (error) {
this.logger.error(
`Failed to data index OpenSearch index. Cause: ${error}`,
);
const exists = await this.openSearchService.exists(newIndexName);
if (exists) {
await this.openSearchService.deleteIndex(newIndexName);
}
throw error;
}
}
this.logger.log('OpenSearch indices data indexed.');
}