Skip to content

Commit

Permalink
Break out nested classes from StreamingDataflowWorker. (#28537)
Browse files Browse the repository at this point in the history
Use ActiveWorkState class instead of an activeWorkMap in ComputationState
  • Loading branch information
m-trieu authored Sep 28, 2023
1 parent 56b1259 commit 170310e
Show file tree
Hide file tree
Showing 19 changed files with 2,422 additions and 1,553 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,8 @@
})
public class BatchDataflowWorker implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BatchDataflowWorker.class);

/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;

/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;

/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;

/** The idGenerator to generate unique id globally. */
private static final IdGenerator idGenerator = IdGenerators.decrementingLongs();

/**
* Function which converts map tasks to their network representation for execution.
*
Expand All @@ -90,64 +76,50 @@ public class BatchDataflowWorker implements Closeable {
new FixMultiOutputInfosOnParDoInstructions(idGenerator)
.andThen(new MapTaskToNetworkFunction(idGenerator));

/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();

/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();

/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;

/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;

private static final int DEFAULT_STATUS_PORT = 8081;

/** Status pages returning health of worker. */
private WorkerStatusPages statusPages;

/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;

/**
* A weight in "bytes" for the overhead of a {@link Weighted} wrapper in the cache. It is just an
* approximation so it is OK for it to be fairly arbitrary as long as it is nonzero.
*/
private static final int OVERHEAD_WEIGHT = 8;

private static final long MEGABYTES = 1024 * 1024;

/**
* Limit the number of logical references. Weak references may never be cleared if the object is
* long lived irrespective if the user actually is interested in the key lookup anymore.
*/
private static final int MAX_LOGICAL_REFERENCES = 1_000_000;

/** How many concurrent write operations to a cache should we allow. */
private static final int CACHE_CONCURRENCY_LEVEL = 4 * Runtime.getRuntime().availableProcessors();
/** A client to get and update work items. */
private final WorkUnitClient workUnitClient;
/**
* Pipeline options, initially provided via the constructor and partially provided via each work
* work unit.
*/
private final DataflowWorkerHarnessOptions options;
/** The factory to create {@link DataflowMapTaskExecutor DataflowMapTaskExecutors}. */
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
/** Registry of known {@link ReaderFactory ReaderFactories}. */
private final ReaderRegistry readerRegistry = ReaderRegistry.defaultRegistry();
/** Registry of known {@link SinkFactory SinkFactories}. */
private final SinkRegistry sinkRegistry = SinkRegistry.defaultRegistry();
/** A side input cache shared between all execution contexts. */
private final Cache<?, WeightedValue<?>> sideInputDataCache;
/**
* A side input cache shared between all execution contexts. This cache is meant to store values
* as weak references. This allows for insertion of logical keys with zero weight since they will
* only be scoped to the lifetime of the value being cached.
*/
private final Cache<?, ?> sideInputWeakReferenceCache;

private final Function<MapTask, MutableNetwork<Node, Edge>> mapTaskToNetwork;

private final MemoryMonitor memoryMonitor;
private final Thread memoryMonitorThread;

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options);
}
/** Status pages returning health of worker. */
private final WorkerStatusPages statusPages;
/** Periodic sender of debug information to the debug capture service. */
private DebugCapture.Manager debugCaptureManager = null;

protected BatchDataflowWorker(
WorkUnitClient workUnitClient,
Expand Down Expand Up @@ -188,6 +160,19 @@ protected BatchDataflowWorker(
ExecutionStateSampler.instance().start();
}

/**
* Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java
* execution.
*
* <p>This is also known as the "legacy" or "pre-portability" approach. It is not yet deprecated
* as there is not a compatible path forward for users.
*/
static BatchDataflowWorker forBatchIntrinsicWorkerHarness(
WorkUnitClient workUnitClient, DataflowWorkerHarnessOptions options) {
return new BatchDataflowWorker(
workUnitClient, IntrinsicMapTaskExecutorFactory.defaultFactory(), options);
}

private static DebugCapture.Manager initializeAndStartDebugCaptureManager(
DataflowWorkerHarnessOptions options, Collection<Capturable> debugCapturePages) {
DebugCapture.Manager result = new DebugCapture.Manager(options, debugCapturePages);
Expand Down Expand Up @@ -215,7 +200,7 @@ private static Thread startMemoryMonitorThread(MemoryMonitor memoryMonitor) {
*/
public boolean getAndPerformWork() throws IOException {
while (true) {
Optional<WorkItem> work = workUnitClient.getWorkItem();
Optional<WorkItem> work = Optional.fromJavaUtil(workUnitClient.getWorkItem());
if (work.isPresent()) {
WorkItemStatusClient statusProvider = new WorkItemStatusClient(workUnitClient, work.get());
return doWork(work.get(), statusProvider);
Expand Down Expand Up @@ -243,7 +228,7 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr
} else if (workItem.getSourceOperationTask() != null) {
stageName = workItem.getSourceOperationTask().getStageName();
} else {
throw new RuntimeException("Unknown kind of work item: " + workItem.toString());
throw new RuntimeException("Unknown kind of work item: " + workItem);
}

CounterSet counterSet = new CounterSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkProgressUpdater;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -87,7 +87,7 @@ class DataflowWorkUnitClient implements WorkUnitClient {
}

/**
* Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#absent()} if no
* Gets a {@link WorkItem} from the Dataflow service, or returns {@link Optional#empty()} if no
* work was found.
*
* <p>If work is returned, the calling thread should call reportWorkItemStatus after completing it
Expand Down Expand Up @@ -116,11 +116,11 @@ public Optional<WorkItem> getWorkItem() throws IOException {
if (!workItem.isPresent()) {
// Normal case, this means that the response contained no work, i.e. no work is available
// at this time.
return Optional.absent();
return Optional.empty();
}
if (workItem.isPresent() && workItem.get().getId() == null) {
logger.debug("Discarding invalid work item {}", workItem.orNull());
return Optional.absent();
if (workItem.get().getId() == null) {
logger.debug("Discarding invalid work item {}", workItem.get());
return Optional.empty();
}

WorkItem work = workItem.get();
Expand Down Expand Up @@ -148,7 +148,7 @@ public Optional<WorkItem> getWorkItem() throws IOException {

/**
* Gets a global streaming config {@link WorkItem} from the Dataflow service, or returns {@link
* Optional#absent()} if no work was found.
* Optional#empty()} if no work was found.
*/
@Override
public Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException {
Expand All @@ -158,7 +158,7 @@ public Optional<WorkItem> getGlobalStreamingConfigWorkItem() throws IOException

/**
* Gets a streaming config {@link WorkItem} for the given computation from the Dataflow service,
* or returns {@link Optional#absent()} if no work was found.
* or returns {@link Optional#empty()} if no work was found.
*/
@Override
public Optional<WorkItem> getStreamingConfigWorkItem(String computationId) throws IOException {
Expand Down Expand Up @@ -197,7 +197,7 @@ private Optional<WorkItem> getWorkItemInternal(
List<WorkItem> workItems = response.getWorkItems();
if (workItems == null || workItems.isEmpty()) {
// We didn't lease any work.
return Optional.absent();
return Optional.empty();
} else if (workItems.size() > 1) {
throw new IOException(
"This version of the SDK expects no more than one work item from the service: "
Expand Down
Loading

0 comments on commit 170310e

Please sign in to comment.