Skip to content
This repository has been archived by the owner on Sep 30, 2024. It is now read-only.

Commit

Permalink
add a custom Destination sdp_elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
vincent-gao committed Aug 1, 2024
1 parent 5dfaf70 commit 49864df
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
tide_build:
name: tide_build
secrets: inherit
uses: dpc-sdp/github-actions/.github/workflows/tide_build.yml@feature/add-phpunit-test-support
uses: dpc-sdp/github-actions/.github/workflows/tide_build.yml@main
with:
module_build: true
runner: biggy-tide
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

namespace Drupal\tide_data_pipeline\Plugin\DatasetDestination;

use Drupal\Core\Form\FormStateInterface;
use Drupal\data_pipelines\Entity\DatasetInterface;
use Drupal\data_pipelines_elasticsearch\Plugin\DatasetDestination\ElasticSearchDestination;

/**
* A class for providing JSON as an output.
*
* @DatasetDestination(
* id="sdp_elasticsearch",
* label="SDP ElasticSearch",
* description="Writes datasets to an sdp managed elasticsearch index"
* )
*/
class TideElasticSearchDestination extends ElasticSearchDestination {

/**
* {@inheritdoc}
*/
public function defaultConfiguration(): array {
return [
'hash_prefix' => '',
] + parent::defaultConfiguration();
}

/**
* {@inheritdoc}
*/
public function buildConfigurationForm(array $form, FormStateInterface $form_state): array {
$form = parent::buildConfigurationForm($form, $form_state);
$form['hash_prefix'] = [
'#type' => 'textfield',
'#title' => $this->t('Index Hash Prefix'),
'#description' => $this->t('The index hash prefix to use.'),
'#default_value' => $this->configuration['hash_prefix'],
];
return $form;
}

/**
* {@inheritdoc}
*/
public function processCleanup(DatasetInterface $dataset, array $invalidDeltas): bool {
$full_index_id = $this->getFullIndexId($dataset->getMachineName());
try {
$bulk = ['body' => []];
foreach ($invalidDeltas as $delta) {
$bulk['body'][] = [
'delete' => [
'_index' => $full_index_id,
'_id' => $dataset->getMachineName() . ':' . $delta,
],
];
}
if (count($bulk['body']) > 0) {
$this->getClient()->bulk($bulk);
}
return TRUE;
}
catch (\Exception $e) {
$this->logger->error("The invalid dataset data could not be purged due to @message", [
'@message' => $e->getMessage(),
]);
}
return FALSE;
}

/**
* Returns the actual index id.
*/
protected function getFullIndexId(string $machineName): string {
$hashPrefix = $this->configuration['hash_prefix'] ?? '';
$prefix = $this->configuration['prefix'] ?? '';

if ($hashPrefix && $prefix) {
return "{$hashPrefix}--{$prefix}{$machineName}";
}

return ($prefix ?: '') . $machineName;
}

}

0 comments on commit 49864df

Please sign in to comment.