From 49864dfab209ec9b2289637441a02206264a4b95 Mon Sep 17 00:00:00 2001 From: Vincent Gao Date: Wed, 31 Jul 2024 23:09:00 +1000 Subject: [PATCH] add a custom Destination sdp_elasticsearch --- .github/workflows/build.yml | 2 +- .../TideElasticSearchDestination.php | 85 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2a99511..18d6875 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: tide_build: name: tide_build secrets: inherit - uses: dpc-sdp/github-actions/.github/workflows/tide_build.yml@feature/add-phpunit-test-support + uses: dpc-sdp/github-actions/.github/workflows/tide_build.yml@main with: module_build: true runner: biggy-tide diff --git a/modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php b/modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php new file mode 100644 index 0000000..c9df569 --- /dev/null +++ b/modules/tide_data_pipeline/src/Plugin/DatasetDestination/TideElasticSearchDestination.php @@ -0,0 +1,85 @@ + '', + ] + parent::defaultConfiguration(); + } + + /** + * {@inheritdoc} + */ + public function buildConfigurationForm(array $form, FormStateInterface $form_state): array { + $form = parent::buildConfigurationForm($form, $form_state); + $form['hash_prefix'] = [ + '#type' => 'textfield', + '#title' => $this->t('Index Hash Prefix'), + '#description' => $this->t('The index hash prefix to use.'), + '#default_value' => $this->configuration['hash_prefix'], + ]; + return $form; + } + + /** + * {@inheritdoc} + */ + public function processCleanup(DatasetInterface $dataset, array $invalidDeltas): bool { + $full_index_id = $this->getFullIndexId($dataset->getMachineName()); + try { + $bulk = ['body' => []]; + foreach ($invalidDeltas as $delta) { + $bulk['body'][] = [ + 'delete' => [ + '_index' => $full_index_id, + '_id' => $dataset->getMachineName() . ':' . $delta, + ], + ]; + } + if (count($bulk['body']) > 0) { + $this->getClient()->bulk($bulk); + } + return TRUE; + } + catch (\Exception $e) { + $this->logger->error("The invalid dataset data could not be purged due to @message", [ + '@message' => $e->getMessage(), + ]); + } + return FALSE; + } + + /** + * Returns the actual index id. + */ + protected function getFullIndexId(string $machineName): string { + $hashPrefix = $this->configuration['hash_prefix'] ?? ''; + $prefix = $this->configuration['prefix'] ?? ''; + + if ($hashPrefix && $prefix) { + return "{$hashPrefix}--{$prefix}{$machineName}"; + } + + return ($prefix ?: '') . $machineName; + } + +}