Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement optional concurrency limit for parallel blocks #57

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,19 @@ compared to join plugin, parallel can be used for more complex workflows where t
}
)

it is possible to limit the concurrency of parallel blocks, preventing overwhelming concurrency and/or a very large queue :

parallel ( 2,
// jobs 1-6 will be scheduled in parallel,
// but only two will be executing and/or queued at any given time
{ build("job1") },
{ build("job2") },
{ build("job3") }
{ build("job4") }
{ build("job5") }
{ build("job6") }
)

you also can "name" parallel executions, so you can later use reference to extract parameters / status :

join = parallel ([
Expand Down
41 changes: 29 additions & 12 deletions src/main/groovy/com/cloudbees/plugins/flow/FlowDSL.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*
* The MIT License
*
* Copyright (c) 2013, CloudBees, Inc., Nicolas De Loof.
* Cisco Systems, Inc., a California corporation
* Copyright (c) 2013-2015, CloudBees, Inc., Nicolas De Loof.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please feel free to add a but I'm hesitant about changing existing notices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... note we are in 2016 too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is basically impossible to separate the copyright ownership of the various pieces, I feel that a single date range and a list of the contributes makes more sense. The original work was done in 2015, but I will update due to review changes.

* Cisco Systems, Inc., a California corporation
* SAP SE
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -41,6 +42,7 @@ import org.codehaus.groovy.control.customizers.ImportCustomizer
import java.util.concurrent.*
import java.util.logging.Logger

import static hudson.model.Result.ABORTED
import static hudson.model.Result.FAILURE
import static hudson.model.Result.SUCCESS

Expand Down Expand Up @@ -103,6 +105,9 @@ public class FlowDSL {
flowRun.state.result = Executor.currentExecutor().abortResult();
Executor.currentExecutor().recordCauseOfInterruption(flowRun, listener);

flowRun.isAborting = true
flowRun.abortUnstartedFutures()

def graph = flowRun.jobsGraph
graph.vertexSet().each() { ji ->
if (flowRun.project != ji.project) {
Expand Down Expand Up @@ -395,41 +400,45 @@ public class FlowDelegate {
}

// allows syntax like : parallel(["Kohsuke","Nicolas"].collect { name -> return { build("job1", param1:name) } })
def List<FlowState> parallel(Collection<? extends Closure> closures) {
parallel(closures as Closure[])
def List<FlowState> parallel(int maxThreads = 0, Collection<? extends Closure> closures) {
parallel(maxThreads, closures as Closure[])
}

// allows collecting job status by name rather than by index
// inspired by https://github.com/caolan/async#parallel
def Map<?, FlowState> parallel(Map<?, ? extends Closure> args) {
def Map<?, FlowState> parallel(int maxThreads = 0, Map<?, ? extends Closure> args) {
def keys = new ArrayList<?>()
def closures = new ArrayList<? extends Closure>()
args.entrySet().each { e ->
keys.add(e.key)
closures.add(e.value)
}
def results = new LinkedHashMap<?, FlowState>()
def flowStates = parallel(closures) // as List<FlowState>
def flowStates = parallel(maxThreads, closures) // as List<FlowState>
flowStates.eachWithIndex { v, i -> results[keys[i]] = v }
results
}

def List<FlowState> parallel(Closure ... closures) {
def List<FlowState> parallel(int maxThreads = 0, Closure ... closures) {
statusCheck()
// TODO use NamingThreadFactory since Jenkins 1.541
ExecutorService pool = Executors.newCachedThreadPool(new ThreadFactory() {
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(Runnable r) {
def thread = Executors.defaultThreadFactory().newThread(r);
thread.name = "BuildFlow parallel statement thread for " + flowRun.parent.fullName;
return thread;
}
});
}
ExecutorService pool = (maxThreads <= 0) ?
Executors.newCachedThreadPool(tf) : Executors.newFixedThreadPool(maxThreads, tf)
Set<Run> upstream = flowRun.state.lastCompleted
Set<Run> lastCompleted = Collections.synchronizedSet(new HashSet<Run>())
def results = new CopyOnWriteArrayList<FlowState>()
def tasks = new ArrayList<Future<FlowState>>()

println("parallel {")
def startMsg = "parallel"
if ( maxThreads > 0 ) startMsg += "( "+maxThreads+" )"
println(startMsg + " {")
++indent

def current_state = flowRun.state
Expand All @@ -450,26 +459,34 @@ public class FlowDelegate {

tasks.add(pool.submit(track_closure as Callable))
}
// add the full list of futures to a build wide list
// to account for possible nested parallel blocks
flowRun.addNewFutures(tasks)

tasks.each {task ->
try {
def final_state = task.get()
Result result = final_state.result
results.add(final_state)
current_state.result = current_state.result.combine(result)
} catch(ExecutionException e)
{
}
catch(ExecutionException e) {
// TODO perhaps rethrow?
current_state.result = FAILURE
listener.error("Failed to run DSL Script")
e.printStackTrace(listener.getLogger())
}
catch(CancellationException e) {
current_state.result = ABORTED
}
}

pool.shutdown()
pool.awaitTermination(1, TimeUnit.DAYS)
current_state.lastCompleted =lastCompleted
} finally {
pool.shutdown()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment to explain why we are doing shutdown here and not waiting for task termination.

tasks.each {task -> task.cancel(false)}
flowRun.state = current_state
--indent
println("}")
Expand Down
33 changes: 32 additions & 1 deletion src/main/java/com/cloudbees/plugins/flow/FlowRun.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*
* The MIT License
*
* Copyright (c) 2013, CloudBees, Inc., Nicolas De Loof.
* Copyright (c) 2013-2015, CloudBees, Inc., Nicolas De Loof.
* SAP SE
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -35,11 +36,14 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

Expand Down Expand Up @@ -67,10 +71,15 @@ public class FlowRun extends Build<BuildFlow, FlowRun> {

private DirectedGraph<JobInvocation, JobEdge> jobsGraph;

// Note: synchronization necessary
private transient final List<Future<FlowState>> futuresList = new ArrayList<Future<FlowState>>();

private transient ThreadLocal<FlowState> state = new ThreadLocal<FlowState>();

private transient AtomicInteger buildIndex = new AtomicInteger(1);

private transient volatile boolean isAborting = false;

public FlowRun(BuildFlow job, File buildDir) throws IOException {
super(job, buildDir);
setup(job);
Expand Down Expand Up @@ -156,6 +165,28 @@ public void run() {
}
}

protected void addNewFutures( List<Future<FlowState>> newFutures ) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add doc

synchronized(futuresList) {
if ( !isAborting ) {
futuresList.addAll(newFutures);
}
else {
for ( Future f : newFutures ) {
f.cancel(false);
}
}
}
}

protected void abortUnstartedFutures() {
synchronized(futuresList) {
isAborting = true;
for ( Future f : futuresList ) {
f.cancel(false);
}
}
}

protected class BuildWithWorkspaceRunnerImpl extends AbstractRunner {

private final String dsl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class AbortTest extends DSLTestCase {

def flow = future.waitForStart()
// wait for job1 to start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix comment

while (!job1.building) {
while (!job1.building || !job2.building) {
Thread.sleep(10L)
}
println("job has started")
Expand Down
31 changes: 29 additions & 2 deletions src/test/groovy/com/cloudbees/plugins/flow/DSLTestCase.groovy
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/*
* The MIT License
*
* Copyright (c) 2013, CloudBees, Inc., Nicolas De Loof.
* Cisco Systems, Inc., a California corporation
* Copyright (c) 2013-2015, CloudBees, Inc., Nicolas De Loof.
* Cisco Systems, Inc., a California corporation
* SAP SE
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -49,7 +50,9 @@ import hudson.model.BuildListener
import hudson.tasks.Builder
import com.cloudbees.plugin.flow.ConfigurableFailureBuilder
import com.cloudbees.plugin.flow.BlockingBuilder
import com.cloudbees.plugin.flow.SleepingBuilder
import hudson.model.Job
import com.cloudbees.plugin.flow.LimitedBuilder

import static hudson.model.Result.UNSTABLE

Expand All @@ -67,6 +70,14 @@ abstract class DSLTestCase extends HudsonTestCase {
return jobs
}

def createSleepingJobs = { names ->
def jobs = []
names.each {
jobs.add(createSleepingJob(it))
}
return jobs
}

def createFailJob = {String name, int failures = Integer.MAX_VALUE ->
def job = createJob(name)
job.getBuildersList().add(new ConfigurableFailureBuilder(failures));
Expand All @@ -85,6 +96,22 @@ abstract class DSLTestCase extends HudsonTestCase {
return job
}

def createSleepingJob = {String name, int seconds = 10 ->
def job = createJob(name)
job.getBuildersList().add(new SleepingBuilder(seconds));
return job
}

def initLimitedJobKey( String key, int permits ) {
LimitedBuilder.initialize(key, permits)
}

def createLimitedJob = {String name, String key ->
def job = createJob(name)
job.getBuildersList().add(new LimitedBuilder(key));
return job
}

def run = { script ->
BuildFlow flow = new BuildFlow(Jenkins.instance, getName())
flow.dsl = script
Expand Down
41 changes: 40 additions & 1 deletion src/test/groovy/com/cloudbees/plugins/flow/ParallelTest.groovy
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/*
* The MIT License
*
* Copyright (c) 2013, CloudBees, Inc., Nicolas De Loof.
* Copyright (c) 2013-2015, CloudBees, Inc., Nicolas De Loof.
* SAP SE
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -45,6 +46,25 @@ class ParallelTest extends DSLTestCase {
println flow.jobsGraph.edgeSet()
}

public void testParallelLimitConcurrency() {
initLimitedJobKey(getName(), 1)
def jobs = [
createLimitedJob("job1", getName()),
createLimitedJob("job2", getName()),
createLimitedJob("job3", getName()),
]
def flow = run("""
parallel( 1,
{ build("job1") },
{ build("job2") },
{ build("job3") },
)
""")
assertAllSuccess(jobs)
assert SUCCESS == flow.result
println flow.jobsGraph.edgeSet()
}

public void testFailOnParallelFailed() {
createJobs(["job1", "job2"])
createFailJob("willFail")
Expand Down Expand Up @@ -128,4 +148,23 @@ class ParallelTest extends DSLTestCase {
println flow.jobsGraph.edgeSet()
}

public void testParallelMapLimitConcurrency() {
initLimitedJobKey(getName(), 1)
def jobs = [
createLimitedJob("job1", getName()),
createLimitedJob("job2", getName()),
createLimitedJob("job3", getName()),
]
def flow = run("""
join = parallel ( 1, [
first: { build("job1") },
second: { build("job2") },
third: { build("job3") }
])
""")
assertAllSuccess(jobs)
assert SUCCESS == flow.result
println flow.jobsGraph.edgeSet()
}

}
Loading