Error handlers are for acting on exceptions thrown from within a component, typical use-cases include recovering from the error or wrapping the original exception into another exception type.
There are three error handler types, one for each type of component:
InitializerErrorHandler
for initializers, they can produce a payloadStepErrorHandler
for steps, they can produce aResult
SinkErrorHandler
for sinks
An error handler implementation looks something like this:
public class MyHandler implements StepErrorHandler
{
@Override
public Result handle(Exception exception, Object input, Object payload, Results results, Context context)
{
logger.error("We did get an error.. ({})", exception.getMessage());
return new MyResult("..but let's ignore it anyway");
}
}
A handy feature of error handlers is that they can be combined with their andThen
method.
// Assuming a pipeline builder
pipelineBuilder.registerStep(builder -> builder
.step(someStep)
.withErrorHandler(someHandler.andThen(otherHandler))
);
All error handler interfaces have a no-op implementation named RETHROW_ALL
which simply rethrows the original exception.
By default, Pipeline
instances are built with those as default handlers.
Pipeline error handlers are responsible for managing exceptions occurring during the pipeline run, before they are thrown by the pipeline. Their typical use would be for running a recovery pipeline, or wrapping outgoing exceptions before their throw.
An exemple for a recovery pattern could look like this:
/* Given a hypothetical pipeline builder */
PayloadPipelineBuilder<String> builder;
/* Given a hypothetical recovery pipeline with a String input */
Pipeline<String> recovery;
PipelineErrorHandler recoveryHandler = (exception, previous, input, context) -> recovery.run(
(String) input,
new SimpleContext(previous).copyFrom(context)
);
builder.setErrorHandler(recoveryHandler);
In the example above, if the pipeline resulting from builder
fails, it will hand over processing to the recovery
pipeline.
Note that the context inherits from the previous run, so Result
created by the main pipeline, as well as original context metadata will be available in the recovery pipeline.
Wrappers are for encapsulating the execution of pipeline components, they allow the execution of additional logic before and/or after a component is executed.
They can do things as varied as performing retries if a component fails, or ensuring execution rate limitations (💡 notably, see current resilience4j
integrations).
At the time of this writing, only Step
and Sink
components can be wrapped.
Due to differences in their respective signatures and role within a Pipeline
, each have matching wrapper contracts:
StepWrapper
for stepsSinkWrapper
for sinks
A custom StepWrapper
may look like the following, it takes the current step as input, and returns the step that should run in its stead:
public class MyWrapper<T, I> implements StepWrapper<T, I>
{
@Override
public Step<T, I> wrap(Step<T, I> step)
{
return (object, input, payload, results, context) -> {
System.out.println("This is something I need to print before");
return step.execute(object, input, payload, results, context);
};
}
}
Similarly, a SinkWrapper
:
public class MyWrapper implements SinkWrapper
{
@Override
public Sink wrap(Sink sink)
{
return (output, context) -> {
System.out.println("This is something I need to print before");
sink.execute(output, context);
};
}
}
Like error handlers, wrappers can be combined with their andThen
method, and used like such:
// Assuming a pipeline builder
pipelineBuilder.registerStep(builder -> builder
.step(someStep)
.withWrapper(someWrapper.andThen(otherWrapper))
);
💡 There are several implementations of wrappers available out-of-the-box, as documented in the integrations section.
All wrapper interfaces have a no-op implementation named noOp
which simply return the original step.
By default, Pipeline
instances are built with those as default wrappers.
When a pipeline is run, the better part of its behaviours are tracked and uniquely identified with a UID (💡 see the documentation on tags).
The UIDGenerator
is the component responsible for producing a UID within a Pipeline
, and it can be tailored to your needs if you need a specific one (e.g. a DB persistence of pipeline results may require specific types of UIDs).
data-pipeline
provides the following strategies out of the box:
- a
ksuid
generator (KSUIDGenerator
) fromksuid-creator
, this is the default implementation - a
tsid
generator (TSIDGenerator
) fromtsid-creator
- a
ulid
generator (ULIDGenerator
) fromulid-creator
- a
uuid
generator (UUIDGenerator
) fromjava.util.UUID
💡 More on the subject of UUIDs in Twilio's "A brief history of the UUID", the GitHub repositories above also have great descriptions of what each implementation can do. For instance, you can leverage the
tsid
implementation for generating UIDs similar to Twitter Snowflakes or Discord Snowflakes.
A custom UIDGenerator
can look like this:
public class DiscordSnowflakeGenerator implements UIDGenerator
{
private final TsidFactory factory;
public DiscordSnowflakeGenerator(int worker, int process)
{
this.factory = TsidFactory.builder()
// Discord Epoch starts in the first millisecond of 2015
.withCustomEpoch(Instant.parse("2015-01-01T00:00:00.000Z"))
// Discord Snowflakes have 5 bits for worker ID and 5 bits for process ID
.withNode(worker << 5 | process)
.build()
;
}
@Override
public String generate()
{
return this.factory.create();
}
}
An AuthorResolver
is a component that is used upon launching a pipeline for flagging the pipeline execution with an author identifier.
This can be any String
you want, like a username or entity reference.
A typical AuthorResolver
may look like this:
public class MyAuthorResolver implements AuthorResolver<MyInputType>
{
@Override
public String resolve(MyInputType input, Context context)
{
return input.getUsername();
}
}
Authorship is used in pipeline and component tags, respectively produced at each pipeline and component (initializer, step, sink) run, 💡 see the relevant section for more details on that.
A TagResolver
is a component that is used upon launching a pipeline for generating custom tags that will be used in metrics (micrometer / prometheus) and log markers (slf4j / logback).
A typical TagResolver
may look like this:
public class MyTagResolver implements TagResolver<MyInputType>
{
@Override
public MetricTags resolve(MyInputType input, Context context)
{
return new MetricTags()
.put("some_tag", input.getSome())
.put("other_tag", input.getOther())
;
}
}
In this case metrics and logs will be annotated with —in addition to their default tags— the some_tag
and other_tag
tags.
For instance, in prometheus this may result in the following breakdown:
pipeline_run_success_total{other_tag="abc",pipeline="my-pipeline",some_tag="123",} 687613.0
pipeline_run_success_total{other_tag="abc",pipeline="my-pipeline",some_tag="234",} 3278.0
pipeline_run_success_total{other_tag="bcd",pipeline="my-pipeline",some_tag="123",} 142.0
pipeline_run_success_total{other_tag="bcd",pipeline="my-pipeline",some_tag="234",} 1571632.0
...and the following labels in loki (only displaying data-pipeline labels):
Log labels
author anonymous
other_tag abc
level INFO
pipeline my-pipeline
some_tag 123