Skip to content

Commit

Permalink
Merge pull request #155 from kagkarlsson/scheduler_client_improvements
Browse files Browse the repository at this point in the history
Scheduler client improvements
  • Loading branch information
kagkarlsson authored Nov 26, 2020
2 parents fd497d5 + bdbcc31 commit 2c690f2
Show file tree
Hide file tree
Showing 15 changed files with 487 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void it_should_initialize_an_empty_scheduler() {
assertThat(ctx).hasSingleBean(DataSource.class);
assertThat(ctx).hasSingleBean(Scheduler.class);

ctx.getBean(Scheduler.class).getScheduledExecutions(execution -> {
ctx.getBean(Scheduler.class).fetchScheduledExecutions(execution -> {
fail("No scheduled executions should be present", execution);
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.github.kagkarlsson.jdbc.ResultSetMapper;
import com.github.kagkarlsson.jdbc.SQLRuntimeException;
import com.github.kagkarlsson.scheduler.TaskResolver.UnresolvedTask;
import com.github.kagkarlsson.scheduler.jdbc.AndCondition;
import com.github.kagkarlsson.scheduler.jdbc.AutodetectJdbcCustomization;
import com.github.kagkarlsson.scheduler.jdbc.JdbcCustomization;
import com.github.kagkarlsson.scheduler.jdbc.QueryBuilder;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.Task;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
Expand Down Expand Up @@ -108,27 +110,24 @@ public boolean createIfNotExists(Execution execution) {
}

@Override
public void getScheduledExecutions(Consumer<Execution> consumer) {
public void getScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<Execution> consumer) {
UnresolvedFilter unresolvedFilter = new UnresolvedFilter(taskResolver.getUnresolved());
jdbcRunner.query(
"select * from " + tableName + " where picked = ? " + unresolvedFilter.andCondition() + " order by execution_time asc",
(PreparedStatement p) -> {
int index = 1;
p.setBoolean(index++, false);
unresolvedFilter.setParameters(p, index);
},
new ExecutionResultSetConsumer(consumer)
);

QueryBuilder q = queryForFilter(filter);
if (unresolvedFilter.isActive()) {
q.andCondition(unresolvedFilter);
}

jdbcRunner.query(q.getQuery(), q.getPreparedStatementSetter(), new ExecutionResultSetConsumer(consumer));
}

@Override
public void getScheduledExecutions(String taskName, Consumer<Execution> consumer) {
jdbcRunner.query(
"select * from " + tableName + " where picked = ? and task_name = ? order by execution_time asc",
(PreparedStatement p) -> {
p.setBoolean(1, false);
p.setString(2, taskName);
},
public void getScheduledExecutions(ScheduledExecutionsFilter filter, String taskName, Consumer<Execution> consumer) {
QueryBuilder q = queryForFilter(filter);
q.andCondition(new TaskCondition(taskName));

jdbcRunner.query(q.getQuery(),
q.getPreparedStatementSetter(),
new ExecutionResultSetConsumer(consumer)
);
}
Expand Down Expand Up @@ -335,6 +334,19 @@ public int removeExecutions(String taskName) {
});
}

private QueryBuilder queryForFilter(ScheduledExecutionsFilter filter) {
final QueryBuilder q = QueryBuilder.selectFromTable(tableName);

filter.getPickedValue().ifPresent(value -> {
q.andCondition(new PickedCondition(value));
});

q.orderBy("execution_time asc");
return q;
}



private class ExecutionResultSetMapper implements ResultSetMapper<List<Execution>> {

private final ArrayList<Execution> executions;
Expand Down Expand Up @@ -420,16 +432,23 @@ private static class NewData {
}
}

private static class UnresolvedFilter {
private static class UnresolvedFilter implements AndCondition {
private final List<UnresolvedTask> unresolved;

public UnresolvedFilter(List<UnresolvedTask> unresolved) {
this.unresolved = unresolved;
}

public boolean isActive() {
return !unresolved.isEmpty();
}

public String andCondition() {
return unresolved.isEmpty() ? "" :
"and task_name not in (" + unresolved.stream().map(ignored -> "?").collect(joining(",")) + ")";
return unresolved.isEmpty() ? "" : "and " + getQueryPart();
}

public String getQueryPart() {
return "task_name not in (" + unresolved.stream().map(ignored -> "?").collect(joining(",")) + ")";
}

public int setParameters(PreparedStatement p, int index) throws SQLException {
Expand All @@ -440,4 +459,42 @@ public int setParameters(PreparedStatement p, int index) throws SQLException {
return index;
}
}

private static class PickedCondition implements AndCondition {
private final boolean value;

public PickedCondition(boolean value) {
this.value = value;
}

@Override
public String getQueryPart() {
return "picked = ?";
}

@Override
public int setParameters(PreparedStatement p, int index) throws SQLException {
p.setBoolean(index++, value);
return index;
}
}

private static class TaskCondition implements AndCondition {
private final String value;

public TaskCondition(String value) {
this.value = value;
}

@Override
public String getQueryPart() {
return "task_name = ?";
}

@Override
public int setParameters(PreparedStatement p, int index) throws SQLException {
p.setString(index++, value);
return index;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,55 +1,79 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Instant;
import java.util.Objects;
public class ScheduledExecution<DATA_TYPE> {
private final Class<DATA_TYPE> dataClass;
private final Execution execution;
public ScheduledExecution(Class<DATA_TYPE> dataClass, Execution execution) {
this.dataClass = dataClass;
this.execution = execution;
}
public TaskInstanceId getTaskInstance() {
return execution.taskInstance;
}
public Instant getExecutionTime() {
return execution.getExecutionTime();
}
@SuppressWarnings("unchecked")
public DATA_TYPE getData() {
if (dataClass.isInstance(this.execution.taskInstance.getData())) {
return (DATA_TYPE) this.execution.taskInstance.getData();
}
throw new DataClassMismatchException();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScheduledExecution<?> that = (ScheduledExecution<?>) o;
return Objects.equals(execution, that.execution);
}
@Override
public int hashCode() {
return Objects.hash(execution);
}
public static class DataClassMismatchException extends RuntimeException {
}
}
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;
import com.github.kagkarlsson.scheduler.task.Execution;
import com.github.kagkarlsson.scheduler.task.TaskInstance;
import com.github.kagkarlsson.scheduler.task.TaskInstanceId;
import java.time.Instant;
import java.util.Objects;
public class ScheduledExecution<DATA_TYPE> {
private final Class<DATA_TYPE> dataClass;
private final Execution execution;
public ScheduledExecution(Class<DATA_TYPE> dataClass, Execution execution) {
this.dataClass = dataClass;
this.execution = execution;
}

public TaskInstanceId getTaskInstance() {
return execution.taskInstance;
}

public Instant getExecutionTime() {
return execution.getExecutionTime();
}

@SuppressWarnings("unchecked")
public DATA_TYPE getData() {
if (dataClass.isInstance(this.execution.taskInstance.getData())) {
return (DATA_TYPE) this.execution.taskInstance.getData();
}
throw new DataClassMismatchException();
}

public Instant getLastSuccess() {
return execution.lastSuccess;
}

public Instant getLastFailure() {
return execution.lastFailure;
}

public int getConsecutiveFailures() {
return execution.consecutiveFailures;
}

public boolean isPicked() {
return execution.picked;
}

public String getPickedBy() {
return execution.pickedBy;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ScheduledExecution<?> that = (ScheduledExecution<?>) o;
return Objects.equals(execution, that.execution);
}
@Override
public int hashCode() {
return Objects.hash(execution);
}
public static class DataClassMismatchException extends RuntimeException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (C) Gustav Karlsson
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.kagkarlsson.scheduler;

import java.util.Optional;

public class ScheduledExecutionsFilter {

private Boolean pickedValue;

private ScheduledExecutionsFilter() {
}

public static ScheduledExecutionsFilter all() {
return new ScheduledExecutionsFilter();
}

public ScheduledExecutionsFilter withPicked(boolean pickedValue) {
this.pickedValue = pickedValue;
return this;
}

public Optional<Boolean> getPickedValue() {
return Optional.ofNullable(pickedValue);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,23 @@ public void cancel(TaskInstanceId taskInstanceId) {
}

@Override
public void getScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
this.delegate.getScheduledExecutions(consumer);
public void fetchScheduledExecutions(Consumer<ScheduledExecution<Object>> consumer) {
this.delegate.fetchScheduledExecutions(consumer);
}

@Override
public <T> void getScheduledExecutionsForTask(String taskName, Class<T> dataClass, Consumer<ScheduledExecution<T>> consumer) {
this.delegate.getScheduledExecutionsForTask(taskName, dataClass, consumer);
public void fetchScheduledExecutions(ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<Object>> consumer) {
this.delegate.fetchScheduledExecutions(filter, consumer);
}

@Override
public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, Consumer<ScheduledExecution<T>> consumer) {
this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, consumer);
}

@Override
public <T> void fetchScheduledExecutionsForTask(String taskName, Class<T> dataClass, ScheduledExecutionsFilter filter, Consumer<ScheduledExecution<T>> consumer) {
this.delegate.fetchScheduledExecutionsForTask(taskName, dataClass, filter, consumer);
}

@Override
Expand Down
Loading

0 comments on commit 2c690f2

Please sign in to comment.