diff --git a/website/www/site/content/en/contribute/runner-guide.md b/website/www/site/content/en/contribute/runner-guide.md index d64ebd2164cc..cb3b52048606 100644 --- a/website/www/site/content/en/contribute/runner-guide.md +++ b/website/www/site/content/en/contribute/runner-guide.md @@ -41,8 +41,7 @@ element-wise, grouping, windowing, union) rather than a specific implementation decision. The same primitive may require a very different implementation based on how the user instantiates it. For example, a `ParDo` that uses state or timers may require key partitioning, a `GroupByKey` with speculative triggering -may require a more costly or complex implementation, and `Read` is completely -different for bounded and unbounded data. +may require a more costly or complex implementation. ### What if you haven't implemented some of these features? @@ -57,6 +56,19 @@ native environment, this may look like throwing an `UnsupportedOperationException`. The Runner API RPCs will make this explicit, for cross-language portability. +### Implementing the Impulse primitive + +`Impulse` is a PTransform that takes no inputs and produces exactly one output +during the lifetime of the pipeline which should be the empty bytes in the +global window with the minimum timestamp. This has the encoded value of +`7f df 3b 64 5a 1c ac 09 00 00 00 01 0f 00` when encoded with the standard +windowed value coder. + +Though `Impulse` is generally not invoked by a user, it is the only root +primitive operation, and other root operations (like `Read`s and `Create`) +are composite operations constructed from an `Impulse` followed by a series +of (possibly Splittable) `ParDo`s. + ### Implementing the ParDo primitive The `ParDo` primitive describes element-wise transformation for a @@ -72,11 +84,27 @@ can discuss it with pseudocode. I will also often refer to the Java support code, since I know it and most of our current and future runners are Java-based. +Generally, rather than applying a series of `ParDo`s one at a time over the +entire input data set, it is more efficient to fuse several `ParDo`s together +in a single executable stage that consists of a whole series (in general, +a DAG) of mapping operations. In addition to `ParDo`s, windowing operations, +local (pre- or post-GBK) combining operations, and other mapping operations +may be fused into these stages as well. + +As DoFns may execute code in a different language, or requiring a different +environment, than the runner itself, Beam provides the ability to call these +in a cross-process way. This is the crux of the +[Beam Fn API](https://beam.apache.org/contribute/runner-guide/#writing-an-sdk-independent-runner), +for which more detail can be found below. +It is, however, perfectly acceptable for a runner to invoke this user code +in process (for simplicity or efficiency) when the environments are +compatible. + #### Bundles For correctness, a `DoFn` _should_ represent an element-wise function, but in -fact is a long-lived object that processes elements in small groups called -bundles. +most SDKS this is a long-lived object that processes elements in small groups +called bundles. Your runner decides how many elements, and which elements, to include in a bundle, and can even decide dynamically in the middle of processing that the @@ -89,66 +117,25 @@ But if your data is arriving as a stream, then you will want to terminate a bundle in order to achieve appropriate latency, so bundles may be just a few elements. -#### The DoFn Lifecycle - -While each language's SDK is free to make different decisions, the Python and -Java SDKs share an API with the following stages of a DoFn's lifecycle. - -However, if you choose to execute a DoFn directly to improve performance or -single-language simplicity, then your runner is responsible for implementing -the following sequence: - - * _Setup_ - called once per DoFn instance before anything else; this has not been - implemented in the Python SDK so the user can work around just with lazy - initialization - * _StartBundle_ - called once per bundle as initialization (actually, lazy - initialization is almost always equivalent and more efficient, but this hook - remains for simplicity for users) - * _ProcessElement_ / _OnTimer_ - called for each element and timer activation - * _FinishBundle_ - essentially "flush"; required to be called before - considering elements as actually processed - * _Teardown_ - release resources that were used across bundles; calling this - can be best effort due to failures - -#### DoFnRunner(s) - -This is a support class that has manifestations in both the Java codebase and -the Python codebase. - -**Java** - -In Java, the `beam-runners-core-java` library provides an interface -`DoFnRunner` for bundle processing, with implementations for many situations. +A bundle is the unit of commitment in Beam. If an error is encountered while +processing a bundle, all the prior outputs of that bundle (including any +modifications to state or timers) must be discarded by the runner and the +entire bundle retried. Upon successful completion of a bundle, its outputs, +together with any state/timer modifications and watermark updates, must be +committed atomically. -{{< highlight class="language-java no-toggle" >}} -interface DoFnRunner { - void startBundle(); - void processElement(WindowedValue elem); - void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain); - void finishBundle(); -} -{{< /highlight >}} - -There are some implementations and variations of this for different scenarios: - - * [`SimpleDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java) - - not actually simple at all; implements lots of the core functionality of - `ParDo`. This is how most runners execute most `DoFns`. - * [`LateDataDroppingDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java) - - wraps a `DoFnRunner` and drops data from expired windows so the wrapped - `DoFnRunner` doesn't get any unpleasant surprises - * [`StatefulDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java) - - handles collecting expired state - * [`PushBackSideInputDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java) - - buffers input while waiting for side inputs to be ready - -These are all used heavily in implementations of Java runners. Invocations -via the [Fn API](#the-fn-api) may manifest as another implementation of -`DoFnRunner` even though it will be doing far more than running a `DoFn`. - -**Python** +#### The DoFn Lifecycle -See the [DoFnRunner pydoc](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.runners.html#apache_beam.runners.common.DoFnRunner). +`DoFns` in many SDKS have several methods such as `setup`, `start_bundle`, +`finish_bundle`, `teardown`, etc. in addition to the standard, +element-wise `process` calls. Generally proper invocation of +[this lifecycle](https://beam.apache.org/documentation/programming-guide/#dofn) +should be handled for you when invoking one or more +`DoFn`s from the standard bundle processors (either via the FnAPI or directly +using a BundleProcessor +([java](https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java) +([python](https://github.com/apache/beam/blob/release-2.49.0/sdks/python/apache_beam/runners/worker/bundle_processor.py#L852))). +SDK-independent runners should never have to worry about these details directly. #### Side Inputs @@ -160,78 +147,86 @@ it from the main input, which is processed one element at a time. The SDK/user prepares a `PCollection` adequately, the runner materializes it, and then the runner feeds it to the `DoFn`. -What you will need to implement is to inspect the materialization requested for -the side input, and prepare it appropriately, and corresponding interactions -when a `DoFn` reads the side inputs. - -The details and available support code vary by language. - -**Java** - -If you are using one of the above `DoFnRunner` classes, then the interface for -letting them request side inputs is -[`SideInputReader`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java). -It is a simple mapping from side input and window to a value. The `DoFnRunner` -will perform a mapping with the -[`WindowMappingFn`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java) -to request the appropriate window so you do not worry about invoking this UDF. -When using the Fn API, it will be the SDK harness that maps windows as well. - -A simple, but not necessarily optimal approach to building a -[`SideInputReader`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputReader.java) -is to use a state backend. In our Java support code, this is called -[`StateInternals`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java) -and you can build a -[`SideInputHandler`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java) -that will use your `StateInternals` to materialize a `PCollection` into the -appropriate side input view and then yield the value when requested for a -particular side input and window. +Unlike main input data, which is *pushed* by the runner to the `ParDo` (generally +via the FnApi Data channel), side input data is *pulled* by the `ParDo` +from the runner (generally over the FnAPI State channel). + +A side input is accessed via a specific `access_pattern`. +There are currently two access patterns enumerated in the +`StandardSideInputTypes` proto: `beam:side_input:iterable:v1` which indicates +the runner must return all values in a PCollection corresponding to a specific +window and `beam:side_input:multimap:v1` which indicates the runner must return +all values corresponding to a specific key and window. +Being able to serve these access patterns efficiently may influence how a +runner materializes this PCollection. + +SideInputs can be detected by looking at the `side_inputs` map in the +`ParDoPayload` of `ParDo` transforms. +The `ParDo` operation itself is responsible for invoking the +`window_mapping_fn` (before invoking the runner) and `view_fn` (on the +runner-returned values), so the runner need not concern itself with these +fields. When a side input is needed but the side input has no data associated with it for a given window, elements in that window must be deferred until the side -input has some data. The aforementioned +input has some data or the watermark has advances sufficiently such that +we can be sure there will be no data for that window. The [`PushBackSideInputDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java) -is used to implement this. - -**Python** - -In Python, [`SideInputMap`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.transforms.html#apache_beam.transforms.sideinputs.SideInputMap) maps -windows to side input values. The `WindowMappingFn` manifests as a simple -function. See -[sideinputs.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sideinputs.py). +is an example of implementing this. #### State and Timers _Main design document: [https://s.apache.org/beam-state](https://s.apache.org/beam-state)_ When a `ParDo` includes state and timers, its execution on your runner is usually -very different. See the full details beyond those covered here. +very different. In particular, the state must be persisted when the bundle +completes and retrieved for future bundles. Timers that are set must also be +injected into future bundles as the watermark advances sufficiently. -State and timers are partitioned per key and window. You may need or want to +State and timers are partitioned per key and window, that is, a `DoFn` +processing a given key must have a consistent view of the state and timers +across all elements that share this key. You may need or want to explicitly shuffle data to support this. +Once the watermark has passed the end of the window (plus an allowance for +allowed lateness, if any), state associated with this window can be dropped. -**Java** - -We provide -[`StatefulDoFnRunner`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java) -to help with state cleanup. The non-user-facing interface -[`StateInternals`](https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java) -is what a runner generally implements, and then the Beam support code can use -this to implement user-facing state. +State setting and retrieval is performed on the FnAPI State channel, whereas +timer setting and firing happens on the FnAPI Data channel. #### Splittable DoFn _Main design document: [https://s.apache.org/splittable-do-fn](https://s.apache.org/splittable-do-fn)_ -Splittable `DoFn` is a generalization and combination of `ParDo` and `Read`. It -is per-element processing where each element has the capability of being "split" -in the same ways as a `BoundedSource` or `UnboundedSource`. This enables better -performance for use cases such as a `PCollection` of names of large files where -you want to read each of them. Previously they would have to be static data in -the pipeline or be read in a non-splittable manner. - -This feature is still under development, but likely to become the new primitive -for reading. It is best to be aware of it and follow developments. +Splittable `DoFn` is a generalization of `ParDo` that is useful for high-fanout +mappings that can be done in parallel. The prototypical example of such an +operation is reading from a file, where a single file name (as an input element) +can be mapped to all the elements contained in that file. +The `DoFn` is considered splittable in the sense that an element representing, +say, a single file can be split (e.g. into ranges of that file) to be processed +(e.g. read) by different workers. +The full power of this primitive is in the fact that these splits can happen +dynamically rather than just statically (i.e. ahead of time) avoiding the +problem of over- or undersplitting. + +A full explanation of Splittable `DoFn` is out of scope for this doc, but +here is a brief overview as it pertains to its execution. + +A Splittable `DoFn` can participate in the dynamic splitting protocol by +splitting within an element as well as between elements. Dynamic splitting +is triggered by the runner issuing `ProcessBundleSplitRequest` messages on +the control channel. The SDK will commit to process just a portion of the +indicated element and return a description of the remainder (i.e. the +unprocessed portion) to the runner in the `ProcessBundleSplitResponse` +to be scheduled by the runner (e.g. on a different worker or as part of a +different bundle). + +A Splittable `DoFn` can also initiate its own spitting, indicating it has +processed an element as far as it can for the moment (e.g. when tailing a file) +but more remains. These most often occur when reading unbounded sources. +In this case a set of elements representing the deferred work are passed back +in the `residual_roots` field of the `ProcessBundleResponse`. +At a future time, the runner must re-invoke these same operations with +the elements given in `residual_roots`. ### Implementing the GroupByKey (and window) primitive @@ -249,11 +244,12 @@ to group in a way that is consistent with grouping by those bytes, even if you have some special knowledge of the types involved. The elements you are processing will be key-value pairs, and you'll need to extract -the keys. For this reason, the format of key-value pairs is standardized and -shared across all SDKS. See either -[`KvCoder`](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/coders/KvCoder.html) +the keys. For this reason, the format of key-value pairs is +[standardized and shared](https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L838) +across all SDKS. See either +[`KvCoder`](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/coders/KvCoder.html) in Java or -[`TupleCoder`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.coders.html#apache_beam.coders.coders.TupleCoder.key_coder) +[`TupleCoder`](https://beam.apache.org/releases/pydoc/current/apache_beam.coders.coders.html#apache_beam.coders.coders.TupleCoder) in Python for documentation on the binary format. #### Window Merging @@ -266,11 +262,14 @@ grouping. #### Implementing via GroupByKeyOnly + GroupAlsoByWindow -The Java codebase includes support code for a particularly common way of +The Java and Python codebases includes support code for a particularly common way of implementing the full `GroupByKey` operation: first group the keys, and then group by window. For merging windows, this is essentially required, since merging is per key. +Often presenting the set of values in timestamp order can allow more +efficient grouping of these values into their final windows. + #### Dropping late data _Main design document: @@ -298,6 +297,8 @@ In Java, there is a lot of support code for executing triggers in the `GroupAlsoByWindow` implementations, `ReduceFnRunner` (legacy name), and `TriggerStateMachine`, which is an obvious way of implementing all triggers as an event-driven machine over elements and timers. +In Python this is supported by the +[TriggerDriver](https://github.com/apache/beam/blob/release-2.49.0/sdks/python/apache_beam/transforms/trigger.py#L1199) classes. #### TimestampCombiner @@ -321,6 +322,9 @@ To implement this primitive, you need to invoke the provided WindowFn on each element, which will return some set of windows for that element to be a part of in the output `PCollection`. +Most runners implement this by fusing these window-altering mappings in with +the `DoFns`. + **Implementation considerations** A "window" is just a second grouping key that has a "maximum timestamp". It can @@ -336,53 +340,13 @@ multiple windows". For values in the global window, you may want to use an even further compressed representation that doesn't bother including the window at all. +We provide coders with these optimizations such as +(`PARAM_WINDOWED_VALUE`)[https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L968] +that can be used to reduce the size of serialized data. + In the future, this primitive may be retired as it can be implemented as a ParDo if the capabilities of ParDo are enhanced to allow output to new windows. -### Implementing the Read primitive - -You implement this primitive to read data from an external system. The APIs are -carefully crafted to enable efficient parallel execution. Reading from an -`UnboundedSource` is a bit different than reading from a `BoundedSource`. - -#### Reading from an UnboundedSource - -An `UnboundedSource` is a source of potentially infinite data; you can think of -it like a stream. The capabilities are: - - * `split(int)` - your runner should call this to get the desired parallelism - * `createReader(...)` - call this to start reading elements; it is an enhanced iterator that also provides: - * watermark (for this source) which you should propagate downstream - * timestamps, which you should associate with elements read - * record identifiers, so you can dedup downstream if needed - * progress indication of its backlog - * checkpointing - * `requiresDeduping` - this indicates that there is some chance that the source - may emit duplicates; your runner should do its best to dedupe based on the - identifier attached to emitted records - -An unbounded source has a custom type of checkpoints and an associated coder for serializing them. - -#### Reading from a BoundedSource - -A `BoundedSource` is a source of data that you know is finite, such as a static -collection of log files, or a database table. The capabilities are: - - * `split(int)` - your runner should call this to get desired initial parallelism (but you can often steal work later) - * `getEstimatedSizeBytes(...)` - self explanatory - * `createReader(...)` - call this to start reading elements; it is an enhanced iterator that also provides: - * timestamps to associate with each element read - * `splitAtFraction` for dynamic splitting to enable work stealing, and other - methods to support it - see the [Beam blog post on dynamic work - rebalancing](/blog/2016/05/18/splitAtFraction-method.html) - -The `BoundedSource` does not report a watermark currently. Most of the time, reading -from a bounded source can be parallelized in ways that result in utterly out-of-order -data, so a watermark is not terribly useful. -Thus the watermark for the output `PCollection` from a bounded read should -remain at the minimum timestamp throughout reading (otherwise data might get -dropped) and advance to the maximum timestamp when all data is exhausted. - ### Implementing the Flatten primitive This one is easy - take as input a finite set of `PCollections` and outputs their @@ -399,59 +363,30 @@ fast path as an optimization. ### Special mention: the Combine composite A composite transform that is almost always treated specially by a runner is -`Combine` (per key), which applies an associative and commutative operator to +`CombinePerKey`, which applies an associative and commutative operator to the elements of a `PCollection`. This composite is not a primitive. It is implemented in terms of `ParDo` and `GroupByKey`, so your runner will work without treating it - but it does carry additional information that you probably want to use for optimizations: the associative-commutative operator, known as a `CombineFn`. +Generally runners will want to implement this via what is called +combiner lifting, where a new operation is placed before the `GroupByKey` +that does partial (within-bundle) combining, which often requires a slight +modification of what comes after the `GroupByKey` as well. +An example of this transformation can be found in the +(Python)[https://github.com/apache/beam/blob/release-2.49.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1193] +or (go)[https://github.com/apache/beam/blob/release-2.49.0/sdks/go/pkg/beam/runners/prism/internal/handlecombine.go#L67] +implementations of this optimization. +The resulting pre- and post-`GroupByKey` operations are generally fused in with +the `ParDo`s and executed as above. + ## Working with pipelines -When you receive a pipeline from a user, you will need to translate it. This is -a tour of the APIs that you'll use to do it. - -### Traversing a pipeline - -Something you will likely do is to traverse a pipeline, probably to translate -it into primitives for your engine. The general pattern is to write a visitor -that builds a job specification as it walks the graph of `PTransforms`. - -The entry point for this in Java is -[`Pipeline.traverseTopologically`](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#traverseTopologically-org.apache.beam.sdk.Pipeline.PipelineVisitor-) -and -[`Pipeline.visit`](https://beam.apache.org/releases/pydoc/2.0.0/apache_beam.html#apache_beam.pipeline.Pipeline.visit) -in Python. See the generated documentation for details. - -### Altering a pipeline - -Often, the best way to keep your -translator simple will be to alter the pipeline prior to translation. Some -alterations you might perform: - - * Elaboration of a Beam primitive into a composite transform that uses - multiple runner-specific primitives - * Optimization of a Beam composite into a specialized primitive for your - runner - * Replacement of a Beam composite with a different expansion more suitable for - your runner - -The Java SDK and the "runners core construction" library (the artifact is -`beam-runners-core-construction-java` and the namespaces is -`org.apache.beam.runners.core.construction`) contain helper code for this sort -of work. In Python, support code is still under development. - -All pipeline alteration is done via -[`Pipeline.replaceAll(PTransformOverride)`](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/Pipeline.html#replaceAll-java.util.List-) -method. A -[`PTransformOverride`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverride.java) -is a pair of a -[`PTransformMatcher`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformMatcher.java) -to select transforms for replacement and a -[`PTransformOverrideFactory`](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/PTransformOverrideFactory.java) -to produce the replacement. All `PTransformMatchers` that have been needed by -runners to date are provided. Examples include: matching a specific class, -matching a `ParDo` where the `DoFn` uses state or timers, etc. +When you receive a pipeline from a user, you will need to translate it. +An explanation of how Beam pipelines are represented can be found +(here)[https://docs.google.com/presentation/d/1atu-QC_mnK2SaeLhc0D78wZYgVOX1fN0H544QmBi3VA] +which compliment the (official proto declarations)[https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto]. ## Testing your runner @@ -555,6 +490,9 @@ All runner code should go in it's own package in `apache_beam/runners` directory Register the new runner in the `create_runner` function of `runner.py` so that the partial name is matched with the correct class to be used. +Python Runners can also be identified (e.g. when passing the runner parameter) +by their fully qualified name whether or not they live in the Beam repository. + ## Writing an SDK-independent runner There are two aspects to making your runner SDK-independent, able to run @@ -568,6 +506,9 @@ _Design documents:_ - _[https://s.apache.org/beam-fn-api-processing-a-bundle](https://s.apache.org/beam-fn-api-processing-a-bundle)_ - _[https://s.apache.org/beam-fn-api-send-and-receive-data](https://s.apache.org/beam-fn-api-send-and-receive-data)_ + - _[Overview](https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_0_317)_ + - _[Spec](https://github.com/apache/beam/blob/master/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto)_ + To run a user's pipeline, you need to be able to invoke their UDFs. The Fn API is an RPC interface for the standard UDFs of Beam, implemented using protocol buffers over gRPC. @@ -584,10 +525,10 @@ UDFs. ### The Runner API -The Runner API is an SDK-independent schema for a pipeline along with RPC -interfaces for launching a pipeline and checking the status of a job. The RPC -interfaces are still in development so for now we focus on the SDK-agnostic -representation of a pipeline. By examining a pipeline only through Runner API +The [Runner API](https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_3736) +is an SDK-independent schema for a pipeline along with RPC +interfaces for launching a pipeline and checking the status of a job. +By examining a pipeline only through Runner API interfaces, you remove your runner's dependence on the SDK for its language for pipeline analysis and job translation. @@ -602,7 +543,7 @@ You are fully welcome to _also_ use the SDK for your language, which may offer useful utility code. The language-independent definition of a pipeline is described via a protocol -buffers schema, covered below for reference. But your runner _should not_ +buffers schema, covered below for reference. But your runner _need not_ directly manipulate protobuf messages. Instead, the Beam codebase provides utilities for working with pipelines so that you don't need to be aware of whether or not the pipeline has ever been serialized or transmitted, or what @@ -660,7 +601,7 @@ sense that includes side effects, etc. {{< highlight class="no-toggle" >}} message FunctionSpec { string urn; - google.protobuf.Any parameter; + bytes payload; } {{< /highlight >}} @@ -681,27 +622,14 @@ used in a `PTransform` it describes a function from `PCollection` to `PCollectio and cannot be specific to an SDK because the runner is in charge of evaluating transforms and producing `PCollections`. -### `SdkFunctionSpec` proto - -When a `FunctionSpec` represents a UDF, in general only the SDK that serialized -it will be guaranteed to understand it. So in that case, it will always come -with an environment that can understand and execute the function. This is -represented by the `SdkFunctionSpec`. - -{{< highlight class="no-toggle" >}} -message SdkFunctionSpec { - FunctionSpec spec; - bytes environment_id; -} -{{< /highlight >}} - -In the Runner API, many objects are stored by reference. Here in the -`environment_id` is a pointer, local to the pipeline and just made up by the -SDK that serialized it, that can be dereferenced to yield the actual -environment proto. - -Thus far, an environment is expected to be a Docker container specification for -an SDK harness that can execute the specified UDF. +It goes without saying that not every environment will be able to deserialize +every function spec. For this reason `PTransform`s have an `environment_id` +parameter that indicates at least one environment that is capable of interpreting +the contained URNs. This is a reference to an environment in the environments +map of the Pipeline proto and is typically defined by a docker image (possibly +with some extra dependencies). +There may be other environments that are also capable of +doing so, and a runner is free to use them if it has this knowledge. ### Primitive transform payload protos @@ -721,7 +649,7 @@ inputs, state declarations, timer declarations, etc. {{< highlight class="no-toggle" >}} message ParDoPayload { - SdkFunctionSpec do_fn; + FunctionSpec do_fn; map side_inputs; map state_specs; map timer_specs; @@ -729,29 +657,6 @@ message ParDoPayload { } {{< /highlight >}} -#### `ReadPayload` proto - -A `Read` transform carries an `SdkFunctionSpec` for its `Source` UDF. - -{{< highlight class="no-toggle" >}} -message ReadPayload { - SdkFunctionSpec source; - ... -} -{{< /highlight >}} - -#### `WindowIntoPayload` proto - -A `Window` transform carries an `SdkFunctionSpec` for its `WindowFn` UDF. It is -part of the Fn API that the runner passes this UDF along and tells the SDK -harness to use it to assign windows (as opposed to merging). - -{{< highlight class="no-toggle" >}} -message WindowIntoPayload { - SdkFunctionSpec window_fn; - ... -} -{{< /highlight >}} #### `CombinePayload` proto @@ -764,7 +669,7 @@ a reference to this coder. {{< highlight class="no-toggle" >}} message CombinePayload { - SdkFunctionSpec combine_fn; + FunctionSpec combine_fn; string accumulator_coder_id; ... } @@ -773,9 +678,7 @@ message CombinePayload { ### `PTransform` proto A `PTransform` is a function from `PCollection` to `PCollection`. This is -represented in the proto using a FunctionSpec. Note that this is not an -`SdkFunctionSpec`, since it is the runner that observes these. They will never -be passed back to an SDK harness; they do not represent a UDF. +represented in the proto using a FunctionSpec. {{< highlight class="no-toggle" >}} message PTransform { @@ -796,6 +699,13 @@ The input and output `PCollections` are unordered and referred to by a local name. The SDK decides what this name is, since it will likely be embedded in serialized UDFs. +A runner that understands the specification of a given `PTransform` (whether +primitive or composite), as defined by its `FunctionSpec`, is free to +substitute it with another `PTransform` (or set thereof) that has identical +semantics. +This is typically how `CombinePerKey` is handled, but many other substitutions +can be done as well. + ### `PCollection` proto A `PCollection` just stores a coder, windowing strategy, and whether or not it @@ -813,83 +723,46 @@ message PCollection { ### `Coder` proto This is a very interesting proto. A coder is a parameterized function that may -only be understood by a particular SDK, hence an `SdkFunctionSpec`, but also +only be understood by a particular SDK, hence an `FunctionSpec`, but also may have component coders that fully define it. For example, a `ListCoder` is only a meta-format, while `ListCoder(VarIntCoder)` is a fully specified format. {{< highlight class="no-toggle" >}} message Coder { - SdkFunctionSpec spec; + FunctionSpec spec; repeated string component_coder_ids; } {{< /highlight >}} -## The Runner API RPCs +There are a large number of +[standard coders](https://github.com/apache/beam/blob/release-2.49.0/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L829) +understood by most, if not all, +SDKs. Using these allows for cross-language transforms. -While your language's SDK will probably insulate you from touching the Runner -API protos directly, you may need to implement adapters for your runner, to -expose it to another language. So this section covers proto that you will -possibly interact with quite directly. +## The Jobs API RPCs + +[Overview](https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_3722) +[Spec](https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto) -The specific manner in which the existing runner method calls will be expressed -as RPCs is not implemented as proto yet. This RPC layer is to enable, for -example, building a pipeline using the Python SDK and launching it on a runner -that is written in Java. It is expected that a small Python shim will -communicate with a Java process or service hosting the Runner API. +While your language's SDK will may insulate you from touching the Runner +API protos directly, you may need to implement adapters for your runner, to +expose it to another language. +This allows a Python SDK to invoke a Java runner or vice versa. +A typical implementation of this can be found in +[local_job_service.py](https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/local_job_service.py) +which is used directly to front several Python-implemented runners. The RPCs themselves will necessarily follow the existing APIs of PipelineRunner and PipelineResult, but altered to be the minimal backend channel, versus a rich and convenient API. -### `PipelineRunner.run(Pipeline)` RPC - -This will take the same form, but `PipelineOptions` will have to be serialized -to JSON (or a proto `Struct`) and passed along. +A key piece of this is the +(Artifacts API)[https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_artifact_api.proto], +which allows a Runner to fetch and deploy binary artifacts (such as jars, +pypi packages, etc.) that are listed as dependencies in the various environments, +and may have various representations. This is invoked after a pipeline +is submitted, but before it is executed. The SDK submitting a pipeline acts +as an artifact server to the runner receiving the request, and in turn the +runner then acts as an artifact server to the workers (environments) hosting +the users UDFs. -{{< highlight class="no-toggle" >}} -message RunPipelineRequest { - Pipeline pipeline; - Struct pipeline_options; -} -{{< /highlight >}} - -{{< highlight class="no-toggle" >}} -message RunPipelineResponse { - bytes pipeline_id; - - // TODO: protocol for rejecting pipelines that cannot be executed - // by this runner. May just be REJECTED job state with error message. - - // totally opaque to the SDK; for the shim to interpret - Any contents; -} -{{< /highlight >}} - -### `PipelineResult` aka "Job API" - -The two core pieces of functionality in this API today are getting the state of -a job and canceling the job. It is very much likely to evolve, for example to -be generalized to support draining a job (stop reading input and let watermarks -go to infinity). Today, verifying our test framework benefits (but does not -depend upon wholly) querying metrics over this channel. - -{{< highlight class="no-toggle" >}} -message CancelPipelineRequest { - bytes pipeline_id; - ... -} - -message GetStateRequest { - bytes pipeline_id; - ... -} - -message GetStateResponse { - JobState state; - ... -} - -enum JobState { - ... -} -{{< /highlight >}}