ユーザーアイコン

mizuko

約1か月前

0
0

OpenSearchで再インデックスバッチを実現する

OpenSearch
Typescript

再インデックスバッチを実現するために、以下を考慮した。

  • 現状動いているOpenSearchを止めず、裏でインデックスを切り替える
  • 特定のテナントのみ再インデックスできること
  • データ初期登録の場合もインデックスできること
  • 既存のインデックスがある場合、再インデックス中に更新、削除があった場合データを救う
  • batchSizeが大きすぎる場合、一時的にbatchSizeを小さくして再実行する

細かい部分は記載しないが、基本的な処理

async run(input: string): Promise<void> { const { batchSize, tenantIds: inputTenantIds, indexName, } = this.validateInput(input); this.defaultBatchSize = batchSize; const tenantIds = inputTenantIds && inputTenantIds.length !== 0 ? inputTenantIds : (await this.tenantRepository.search()).map((tenant) => tenant.id); const configsToProcess = indexName ? openSearchIndexConfigs.filter((config) => config.name === indexName) : openSearchIndexConfigs; for (const { name: aliasName, settings, mappings } of configsToProcess) { const newIndexName = `${aliasName}_${Date.now()}`; const sinceTimestamp = new Date(); try { // 新しいインデックスの作成 await this.openSearchService.createIndex( newIndexName, settings, mappings, ); // 既存インデックスの確認 const oldIndexName = (await this.openSearchService.getAlias(aliasName)) ?? undefined; // 既存データのコピー(部分更新時のみ) if (oldIndexName && inputTenantIds) { this.logger.log( `Reindexing data from ${oldIndexName} to ${newIndexName}`, ); await this.openSearchService.reIndex(oldIndexName, newIndexName); } // バッチ処理の実行 for await (const { tenantId, documents } of this.streamRecords< AliasTypeMap[typeof aliasName] >(tenantIds, aliasName)) { await this.upsertDocumentsWithRetry( aliasName, newIndexName, tenantId, documents, ); } // 既存のインデックスが存在する場合は、差分を同期 if (oldIndexName) { await this.syncChangedRecords( aliasName, newIndexName, sinceTimestamp, ); } // インデックスの切り替えと旧インデックスの削除 await this.openSearchService.updateAlias( aliasName, newIndexName, oldIndexName, ); if (oldIndexName) { await this.openSearchService.deleteIndex(oldIndexName); } this.logger.log(`Successfully reindexed ${aliasName}`); } catch (error) { this.logger.error( `Failed to data index OpenSearch index. Cause: ${error}`, ); const exists = await this.openSearchService.exists(newIndexName); if (exists) { await this.openSearchService.deleteIndex(newIndexName); } throw error; } } this.logger.log('OpenSearch indices data indexed.'); }