Skip to content

Commit

Permalink
kagkarlsson#491 - Initial support for observability
Browse files Browse the repository at this point in the history
  • Loading branch information
NicklasWallgren committed May 31, 2024
1 parent 823c9d2 commit 5e2d23d
Show file tree
Hide file tree
Showing 12 changed files with 430 additions and 163 deletions.
13 changes: 12 additions & 1 deletion db-scheduler-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,23 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.38.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
<version>1.38.0</version>
<optional>true</optional>
</dependency>

<!-- Annotations -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler.boot;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.CurrentlyExecuting;
import com.github.kagkarlsson.scheduler.ExecutePicked;
import com.github.kagkarlsson.scheduler.Executor;
import com.github.kagkarlsson.scheduler.HeartbeatConfig;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerClientEventListener;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.CompletionHandler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.ExecutionComplete;
import com.github.kagkarlsson.scheduler.task.ExecutionContext;
import com.github.kagkarlsson.scheduler.task.ExecutionOperations;
import com.github.kagkarlsson.scheduler.task.Task;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("rawtypes")
class ObservableExecutePicked implements ExecutePicked {

private static final Logger LOG = LoggerFactory.getLogger(ObservableExecutePicked.class);

private static final Tracer TRACER =
GlobalOpenTelemetry.get()
.tracerBuilder("com.github.kagkarlsson.db-scheduler-spring-boot-starter")
.setInstrumentationVersion("4.0.0") // TODO (nw) should not be hardcoded
.build();

private final Executor executor;
private final TaskRepository taskRepository;
private SchedulerClientEventListener earlyExecutionListener;
private final SchedulerClient schedulerClient;
private final StatsRegistry statsRegistry;
private final TaskResolver taskResolver;
private final SchedulerState schedulerState;
private final ConfigurableLogger failureLogger;
private final Clock clock;
private HeartbeatConfig heartbeatConfig;
private final Execution pickedExecution;

public ObservableExecutePicked(
Executor executor,
TaskRepository taskRepository,
SchedulerClientEventListener earlyExecutionListener,
SchedulerClient schedulerClient,
StatsRegistry statsRegistry,
TaskResolver taskResolver,
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
Clock clock,
HeartbeatConfig heartbeatConfig,
Execution pickedExecution) {
this.executor = executor;
this.taskRepository = taskRepository;
this.earlyExecutionListener = earlyExecutionListener;
this.schedulerClient = schedulerClient;
this.statsRegistry = statsRegistry;
this.taskResolver = taskResolver;
this.schedulerState = schedulerState;
this.failureLogger = failureLogger;
this.clock = clock;
this.heartbeatConfig = heartbeatConfig;
this.pickedExecution = pickedExecution;
}

@Override
public void run() {
// FIXLATER: need to cleanup all the references back to scheduler fields
CurrentlyExecuting currentlyExecuting =
new CurrentlyExecuting(pickedExecution, clock, heartbeatConfig);
final UUID executionId = executor.addCurrentlyProcessing(currentlyExecuting);

try {
statsRegistry.register(StatsRegistry.CandidateStatsEvent.EXECUTED);
executePickedExecution(pickedExecution, currentlyExecuting);
} finally {
executor.removeCurrentlyProcessing(executionId);
}
}

private void executePickedExecution(Execution execution, CurrentlyExecuting currentlyExecuting) {
final Optional<Task> task = taskResolver.resolve(execution.taskInstance.getTaskName());
if (!task.isPresent()) {
LOG.error(
"Failed to find implementation for task with name '{}'. Should have been excluded in JdbcRepository.",
execution.taskInstance.getTaskName());
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
return;
}

Instant executionStarted = clock.now();

Span span =
TRACER
.spanBuilder("queue.task")
.setSpanKind(SpanKind.CONSUMER)
.setParent(Context.current())
.setAttribute("queue.task.id", execution.taskInstance.getId())
.setAttribute("queue.task.name", execution.taskInstance.getTaskName())
.startSpan();

try (Scope scope = span.makeCurrent()) {
LOG.debug("Executing: " + execution);
CompletionHandler completion =
task.get()
.execute(
execution.taskInstance,
new ExecutionContext(
schedulerState, execution, schedulerClient, currentlyExecuting));
LOG.debug("Execution done: " + execution);

complete(completion, execution, executionStarted);
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.COMPLETED);

Span.current().addEvent("Finished working.");
} catch (RuntimeException unhandledException) {
span.recordException(unhandledException);
span.end();

failure(task.get(), execution, unhandledException, executionStarted, "Unhandled exception");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
} catch (Throwable unhandledError) {
span.recordException(unhandledError);
span.end();

failure(task.get(), execution, unhandledError, executionStarted, "Error");
statsRegistry.register(StatsRegistry.ExecutionStatsEvent.FAILED);
}
}

private void complete(
CompletionHandler completion, Execution execution, Instant executionStarted) {
ExecutionComplete completeEvent =
ExecutionComplete.success(execution, executionStarted, clock.now());
try {
completion.complete(
completeEvent,
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.COMPLETIONHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
LOG.error(
"Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. "
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
execution,
e);
}
}

private void failure(
Task task,
Execution execution,
Throwable cause,
Instant executionStarted,
String errorMessagePrefix) {
String logMessage =
errorMessagePrefix + " during execution of task with name '{}'. Treating as failure.";
failureLogger.log(logMessage, cause, task.getName());

ExecutionComplete completeEvent =
ExecutionComplete.failure(execution, executionStarted, clock.now(), cause);
try {
task.getFailureHandler()
.onFailure(
completeEvent,
new ExecutionOperations(taskRepository, earlyExecutionListener, execution));
statsRegistry.registerSingleCompletedExecution(completeEvent);
} catch (Throwable e) {
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.FAILUREHANDLER_ERROR);
statsRegistry.register(StatsRegistry.SchedulerStatsEvent.UNEXPECTED_ERROR);
LOG.error(
"Failed while completing execution {}. Execution will likely remain scheduled and locked/picked. "
+ "The execution should be detected as dead after a while, and handled according to the tasks DeadExecutionHandler.",
execution,
e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) Gustav Karlsson
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.kagkarlsson.scheduler.boot;

import com.github.kagkarlsson.scheduler.Clock;
import com.github.kagkarlsson.scheduler.ExecutePicked;
import com.github.kagkarlsson.scheduler.ExecutePickedFactory;
import com.github.kagkarlsson.scheduler.Executor;
import com.github.kagkarlsson.scheduler.HeartbeatConfig;
import com.github.kagkarlsson.scheduler.SchedulerClient;
import com.github.kagkarlsson.scheduler.SchedulerClientEventListener;
import com.github.kagkarlsson.scheduler.SchedulerState;
import com.github.kagkarlsson.scheduler.TaskRepository;
import com.github.kagkarlsson.scheduler.TaskResolver;
import com.github.kagkarlsson.scheduler.logging.ConfigurableLogger;
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.Execution;

public class ObservableExecutePickedFactory implements ExecutePickedFactory {

public ExecutePicked create(
Executor executor,
TaskRepository taskRepository,
SchedulerClientEventListener earlyExecutionListener,
SchedulerClient schedulerClient,
StatsRegistry statsRegistry,
TaskResolver taskResolver,
SchedulerState schedulerState,
ConfigurableLogger failureLogger,
Clock clock,
HeartbeatConfig heartbeatConfig,
Execution pickedExecution) {
return new ObservableExecutePicked(
executor,
taskRepository,
earlyExecutionListener,
schedulerClient,
statsRegistry,
taskResolver,
schedulerState,
failureLogger,
clock,
heartbeatConfig,
pickedExecution);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
*/
package com.github.kagkarlsson.scheduler.boot.autoconfigure;

import com.github.kagkarlsson.scheduler.DefaultExecutePickedFactory;
import com.github.kagkarlsson.scheduler.ExecutePickedFactory;
import com.github.kagkarlsson.scheduler.PollingStrategyConfig;
import com.github.kagkarlsson.scheduler.Scheduler;
import com.github.kagkarlsson.scheduler.SchedulerBuilder;
import com.github.kagkarlsson.scheduler.SchedulerName;
import com.github.kagkarlsson.scheduler.boot.ObservableExecutePickedFactory;
import com.github.kagkarlsson.scheduler.boot.config.DbSchedulerCustomizer;
import com.github.kagkarlsson.scheduler.boot.config.DbSchedulerProperties;
import com.github.kagkarlsson.scheduler.boot.config.DbSchedulerStarter;
Expand All @@ -28,6 +31,7 @@
import com.github.kagkarlsson.scheduler.stats.StatsRegistry;
import com.github.kagkarlsson.scheduler.task.OnStartup;
import com.github.kagkarlsson.scheduler.task.Task;
import io.opentelemetry.api.OpenTelemetry;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInput;
Expand All @@ -43,13 +47,16 @@
import org.springframework.boot.autoconfigure.AutoConfigurationPackage;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.sql.init.dependency.DependsOnDatabaseInitialization;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.ConfigurableObjectInputStream;
import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;

Expand Down Expand Up @@ -97,11 +104,26 @@ StatsRegistry noopStatsRegistry() {
return StatsRegistry.NOOP;
}

@ConditionalOnMissingClass("io.opentelemetry.api.OpenTelemetry")
@Bean
ExecutePickedFactory defaultExecutePickedFactory() {
return new DefaultExecutePickedFactory();
}

@ConditionalOnClass(OpenTelemetry.class)
@Bean
ExecutePickedFactory observableExecutePickedFactory() {
return new ObservableExecutePickedFactory();
}

@ConditionalOnBean(DataSource.class)
@ConditionalOnMissingBean
@DependsOnDatabaseInitialization
@Bean(destroyMethod = "stop")
public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry registry) {
public Scheduler scheduler(
DbSchedulerCustomizer customizer,
StatsRegistry registry,
ExecutePickedFactory executePickedFactory) {
log.info("Creating db-scheduler using tasks from Spring context: {}", configuredTasks);

// Ensure that we are using a transactional aware data source
Expand Down Expand Up @@ -182,6 +204,8 @@ public Scheduler scheduler(DbSchedulerCustomizer customizer, StatsRegistry regis
// Shutdown max wait
builder.shutdownMaxWait(config.getShutdownMaxWait());

builder.executePickedFactory(executePickedFactory);

return builder.build();
}

Expand Down
Loading

0 comments on commit 5e2d23d

Please sign in to comment.