Skip to content

Commit

Permalink
[GOBBLIN-2007] Implement Distributed Data Movement (DDM) Gobblin-on-T…
Browse files Browse the repository at this point in the history
…emporal `WorkUnit` generation (for arbitrary `Source`) (#3880)

* Implement Distributed Data Movement (DDM) Gobblin-on-Temporal `WorkUnit` generation (for arbitrary `Source`)

* Update in response to review comments
  • Loading branch information
phet authored Feb 26, 2024
1 parent bdbbfe1 commit a78ee54
Show file tree
Hide file tree
Showing 18 changed files with 542 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent workUnitChangeEvent)
WorkUnitStream workUnitStream = new BasicWorkUnitStream.Builder(workUnits).build();
// For streaming use case, this might be a necessary step to find dataset specific namespace so that each workUnit
// can create staging and temp directories with the correct determination of shard-path
workUnitStream = this.executeHandlers(workUnitStream, this.destDatasetHandlerService);
workUnitStream = this.destDatasetHandlerService.executeHandlers(workUnitStream);
workUnitStream = this.processWorkUnitStream(workUnitStream, jobState);
workUnits = materializeWorkUnitList(workUnitStream);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public void apply(JobListener jobListener, JobContext jobContext)
this.canCleanUpStagingData = this.canCleanStagingData(this.jobContext.getJobState());
this.destDatasetHandlerService = new DestinationDatasetHandlerService(jobState, canCleanUpStagingData, this.eventSubmitter);
closer.register(this.destDatasetHandlerService);
workUnitStream = this.executeHandlers(workUnitStream, this.destDatasetHandlerService);
workUnitStream = this.destDatasetHandlerService.executeHandlers(workUnitStream);

//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState, workUnitStream)).initialize();
Expand Down Expand Up @@ -698,10 +698,6 @@ private void executeUnfinishedCommitSequences(String jobName)
}
}

protected WorkUnitStream executeHandlers(WorkUnitStream workUnitStream, DestinationDatasetHandlerService datasetHandlerService) {
return datasetHandlerService.executeHandlers(workUnitStream);
}

protected WorkUnitStream processWorkUnitStream(WorkUnitStream workUnitStream, JobState jobState) {
// Add task ids
workUnitStream = prepareWorkUnits(workUnitStream, jobState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public interface GobblinTemporalConfigurationKeys {
String DEFAULT_GOBBLIN_TEMPORAL_JOB_LAUNCHER_CLASS = HelloWorldJobLauncher.class.getName();

String GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_PREFIX = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "arg.";
String GOBBLIN_TEMPORAL_JOB_LAUNCHER_CONFIG_OVERRIDES = GOBBLIN_TEMPORAL_JOB_LAUNCHER_PREFIX + "config.overrides";

/**
* Suffix for metrics emitted by GobblinTemporalJobLauncher for preventing collisions with prod jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected WorkerOptions createWorkerOptions() {
protected abstract Object[] getActivityImplInstances();

private final void stashWorkerConfig(Config cfg) {
// stash in association with...
// stash to associate with...
WorkerConfig.forWorker(this.getClass(), cfg); // the worker itself
Arrays.stream(getWorkflowImplClasses()).forEach(clazz -> WorkerConfig.withImpl(clazz, cfg)); // its workflow impls
Arrays.stream(getActivityImplInstances()).forEach(obj -> WorkerConfig.withImpl(obj.getClass(), cfg)); // its activity impls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,12 @@ private TemporalWorker initiateWorker() throws Exception {

String workerClassName = ConfigUtils.getString(clusterConfig,
GobblinTemporalConfigurationKeys.WORKER_CLASS, GobblinTemporalConfigurationKeys.DEFAULT_WORKER_CLASS);
logger.info("Creating worker - class: '{}'", workerClassName);
Config workerConfig = clusterConfig;
TemporalWorker worker = GobblinConstructorUtils.invokeLongestConstructor(
(Class<TemporalWorker>)Class.forName(workerClassName), clusterConfig, client);
(Class<TemporalWorker>)Class.forName(workerClassName), workerConfig, client);
worker.start();
logger.info("A new worker is started.");
logger.info("Finished starting worker - class: '{}'", workerClassName);
return worker;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity;

import java.util.Properties;

import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;

import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;


/** Activity for generating {@link WorkUnit}s and persisting them to the {@link org.apache.hadoop.fs.FileSystem}, per "job properties" */
@ActivityInterface
public interface GenerateWorkUnits {
/** @return the number of {@link WorkUnit}s generated and persisted */
@ActivityMethod
int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.activity.impl;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;

import com.google.api.client.util.Lists;
import com.google.common.io.Closer;
import com.typesafe.config.Config;

import io.temporal.failure.ApplicationFailure;

import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;


@Slf4j
public class GenerateWorkUnitsImpl implements GenerateWorkUnits {

@Override
public int generateWorkUnits(Properties jobProps, EventSubmitterContext eventSubmitterContext) {
// TODO: decide whether to acquire a job lock (as MR did)!
// TODO: provide for job cancellation (unless handling at the temporal-level of parent workflows)!
JobState jobState = new JobState(jobProps);
log.info("Created jobState: {}", jobState.toJsonString(true));
Optional<Config> thisClassConfig = WorkerConfig.of(this);
log.info("Obtained class config: {}", thisClassConfig.isPresent() ? thisClassConfig.get() : "NO WORKER CONFIG: ERROR!");

Path workDirRoot = JobStateUtils.getWorkDirRoot(jobState);
log.info("Using work dir root path for job '{}' - '{}'", jobState.getJobId(), workDirRoot);

// TODO: determine whether these are actually necessary to do (as MR/AbstractJobLauncher did)!
// SharedResourcesBroker<GobblinScopeTypes> jobBroker = JobStateUtils.getSharedResourcesBroker(jobState);
// jobState.setBroker(jobBroker);
// jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));

try (Closer closer = Closer.create()) {
// before embarking on (potentially expensive) WU creation, first pre-check that the FS is available
FileSystem fs = JobStateUtils.openFileSystem(jobState);
fs.mkdirs(workDirRoot);

List<WorkUnit> workUnits = generateWorkUnitsForJobState(jobState, eventSubmitterContext, closer);

JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs);

return workUnits.size();
} catch (ReflectiveOperationException roe) {
String errMsg = "Unable to construct a source for generating workunits for job " + jobState.getJobId();
log.error(errMsg, roe);
throw ApplicationFailure.newNonRetryableFailureWithCause(errMsg, "Failure: new Source()", roe);
} catch (IOException ioe) {
String errMsg = "Failed to generate workunits for job " + jobState.getJobId();
log.error(errMsg, ioe);
throw ApplicationFailure.newFailureWithCause(errMsg, "Failure: generating/writing workunits", ioe);
} finally {
// TODO: implement Troubleshooter integration!
}
}

protected static List<WorkUnit> generateWorkUnitsForJobState(JobState jobState, EventSubmitterContext eventSubmitterContext, Closer closer)
throws ReflectiveOperationException {
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
: new BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();

// TODO: report (timer) metrics for workunits creation
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // indicates a problem getting the WUs
String errMsg = "Failure in getting work units for job " + jobState.getJobId();
log.error(errMsg);
// TODO: decide whether a non-retryable failure is too severe... (in most circumstances, it's likely what we want)
throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: Source.getWorkUnits()");
}

if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: entirely normal result (not a failure)
log.warn("No work units created for job " + jobState.getJobId());
return Lists.newArrayList();
}

// TODO: count total bytes for progress tracking!

boolean canCleanUp = canCleanStagingData(jobState);
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUp, eventSubmitterContext.create()));
WorkUnitStream handledWorkUnitStream = datasetHandlerService.executeHandlers(workUnitStream);

// initialize writer and converter(s)
// TODO: determine whether registration here is effective, or the lifecycle of this activity is too brief (as is likely!)
closer.register(WriterInitializerFactory.newInstace(jobState, handledWorkUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState, handledWorkUnitStream)).initialize();

// update jobState before it gets serialized
long startTime = System.currentTimeMillis();
jobState.setStartTime(startTime);
jobState.setState(JobState.RunningState.RUNNING);

log.info("Starting job " + jobState.getJobId());
// TODO: report (timer) metrics for workunits preparation
WorkUnitStream preparedWorkUnitStream = AbstractJobLauncher.prepareWorkUnits(handledWorkUnitStream, jobState);

// TODO: gobblinJobMetricsReporter.reportWorkUnitCountMetrics(this.jobContext.getJobState().getPropAsInt(NUM_WORKUNITS), jobState);

// dump the work unit if tracking logs are enabled (post any materialization for counting)
WorkUnitStream trackedWorkUnitStream = AbstractJobLauncher.addWorkUnitTrackingPerConfig(preparedWorkUnitStream, jobState, log);

return AbstractJobLauncher.materializeWorkUnitList(trackedWorkUnitStream);
}

protected static boolean canCleanStagingData(JobState jobState) {
if (DeliverySemantics.EXACTLY_ONCE.equals(DeliverySemantics.parse(jobState))) {
String errMsg = "DeliverySemantics.EXACTLY_ONCE NOT currently supported; job " + jobState.getJobId();
log.error(errMsg);
throw ApplicationFailure.newNonRetryableFailure(errMsg, "Unsupported: DeliverySemantics.EXACTLY_ONCE");
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.ddm.launcher;

import com.typesafe.config.Config;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import lombok.extern.slf4j.Slf4j;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.ConfigFactory;

import io.temporal.client.WorkflowOptions;

import org.apache.hadoop.fs.Path;

import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.GenerateWorkUnitsWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobScheduler;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.util.ConfigUtils;


/**
* A {@link JobLauncher} for the initial triggering of a Temporal workflow that generates {@link WorkUnit}s per an arbitrary
* {@link org.apache.gobblin.source.Source}; see: {@link GenerateWorkUnitsWorkflow}
*
* <p>
* This class is instantiated by the {@link GobblinTemporalJobScheduler#buildJobLauncher(Properties)} on every job submission to launch the Gobblin job.
* The actual task execution happens in the {@link GobblinTemporalTaskRunner}, usually in a different process.
* </p>
*/
@Slf4j
public class GenerateWorkUnitsJobLauncher extends GobblinTemporalJobLauncher {

public static final String WORKFLOW_ID_BASE = "GenerateWorkUnits";

public GenerateWorkUnitsJobLauncher(
Properties jobProps,
Path appWorkDir,
List<? extends Tag<?>> metadataTags,
ConcurrentHashMap<String, Boolean> runningMap,
EventBus eventBus
) throws Exception {
super(jobProps, appWorkDir, metadataTags, runningMap, eventBus);
}

@Override
public void submitJob(List<WorkUnit> workunits) {
try {
WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, ConfigFactory.parseProperties(jobProps)))
.build();
GenerateWorkUnitsWorkflow workflow = this.client.newWorkflowStub(GenerateWorkUnitsWorkflow.class, options);

Config jobConfigWithOverrides = applyJobLauncherOverrides(ConfigUtils.propertiesToConfig(this.jobProps));

Help.propagateGaaSFlowExecutionContext(this.jobProps);
EventSubmitterContext eventSubmitterContext = new EventSubmitterContext(this.eventSubmitter);

int numWorkUnits = workflow.generate(ConfigUtils.configToProperties(jobConfigWithOverrides), eventSubmitterContext);
log.info("FINISHED - GenerateWorkUnitsWorkflow.generate = {}", numWorkUnits);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void submitJob(List<WorkUnit> workunits) {

WorkflowOptions options = WorkflowOptions.newBuilder()
.setTaskQueue(this.queueName)
.setWorkflowId(Help.qualifyNamePerExec(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(WORKFLOW_ID_BASE, wuSpec, ConfigFactory.parseProperties(jobProps)))
.build();
ProcessWorkUnitsWorkflow workflow = this.client.newWorkflowStub(ProcessWorkUnitsWorkflow.class, options);
workflow.process(wuSpec);
Expand Down
Loading

0 comments on commit a78ee54

Please sign in to comment.