Skip to content

Commit

Permalink
simplify the dependencies on XFuture so APIs only depend on this 1 we…
Browse files Browse the repository at this point in the history
…bpieces jar (#134)

Co-authored-by: Dean <[email protected]>
  • Loading branch information
deanhiller and Dean authored Aug 29, 2022
1 parent a1e7d1d commit 60e988a
Show file tree
Hide file tree
Showing 15 changed files with 633 additions and 0 deletions.
1 change: 1 addition & 0 deletions config/global.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ext {
'core-logging' : 'org.webpieces.core:core-logging',
'core-metrics' : 'org.webpieces.core:core-metrics',
'core-jackson' : 'org.webpieces.core:core-jackson',
'core-future' : 'org.webpieces.core:core-future',
'core-util' : 'org.webpieces.core:core-util',
'core-ddl' : 'org.webpieces.core:core-ddl',
'runtimecompile' : 'org.webpieces.core:runtimecompile',
Expand Down
32 changes: 32 additions & 0 deletions core/core-future/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
plugins {
id 'java-library'
id 'checkstyle'
id 'jacoco' //code coverage
id 'eclipse'
id 'idea'
id 'signing'
id 'maven-publish'
}

group = 'org.webpieces.core'

apply from: '../../config/global.gradle'

dependencies {
//THIS IS USED BY ANDROID projects as well!
//PLEASE do NOT bring anything in here. this is core-future and is used by many so if someone brings this in,
//we don't want to carry baggage into the client project
//ie. you add some apache lib and only org.webpieces.util.time uses it
//well, MANY clients are NOT using that package and now they have to drag along extra baggage....instead, create another util or something
//crap but everyone is using this anyways or at least should be just like logging
}

publishing {
publications {
mavenJava(MavenPublication) {
pom {
description = 'A simple utility library with special Executor but rather small amount of code'
}
}
}
}
1 change: 1 addition & 0 deletions core/core-future/settings.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.webpieces.util.futures;

import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Callable;
import org.webpieces.util.futures.XFuture;
import java.util.function.Function;

import javax.inject.Singleton;

@Singleton
public class FutureHelper {

/**
* USAGE:
*
* XFuture<Response> future = futureUtil.tryCatchFinallyBlock(
* () -> handleCompleteRequestImpl(),
* () -> runOnSuccessOfAboveMethod()
* () -> runOnSuccessOrFailureOfAboveTwoMethods()
* );
*/
public <T> XFuture<T> trySuccessFinallyBlock(
Callable<XFuture<T>> function,
Callable<XFuture<T>> successFunction,
Runnable finallyCode
) {
//convert sync exceptions into async exceptions so we can re-use same exception handling logic..
XFuture<T> future = syncToAsyncException(function);
XFuture<T> newFuture = future.thenCompose( result -> syncToAsyncException(successFunction));

XFuture<T> lastFuture = finallyBlock(
() -> newFuture,
() -> finallyCode.run()
);

return lastFuture;
}

/**
* USAGE:
*
* //In this example, you might set the MDC and clear it in a finally block like so
* MDC.put("txId", generate());
*
* XFuture<Response> future = futureUtil.finallyBlock(
* () -> handleCompleteRequestImpl(),
* () -> MDC.put("txId", null)
* );
*/
public <T> XFuture<T> finallyBlock(
Callable<XFuture<T>> function,
Runnable finallyCode
) {
//convert sync exceptions into async exceptions so we can re-use same exception handling logic..
XFuture<T> future = syncToAsyncException(function);

//Now, handle ANY (sync or aysnc) exceptions the same by running a finally block......
XFuture<T> local = future.handle((r, t) -> {
//RUN the finally code BUT try....catch it
try {
finallyCode.run();
} catch (Exception e) {
XFuture<T> failedFuture = new XFuture<>();
if(t != null) {
failedFuture.completeExceptionally(t);
t.addSuppressed(e);
return failedFuture;
} else {
failedFuture.completeExceptionally(e);
return failedFuture;

}
}

//r and t can BOTH be null but t is not null on failure and null on good response
if(t == null) //response is good so return it
return XFuture.completedFuture(r);

XFuture<T> failedFuture = new XFuture<>();
failedFuture.completeExceptionally(t);
return failedFuture;
}).thenCompose(Function.identity());

return local;
}

/**
* USAGE:
*
* //In this example, you may catch and recover or catch and fail like a normal
* //synchronous catch block EXCEPT this is for both async and sync exceptions
* //from the function
*
* XFuture<Response> future = futureUtil.catchBlock(
* () -> handleCompleteRequestImpl(),
* (t) -> runCatchBlock(t) //can return success if recovering OR failure if not
* );
*/
public <T> XFuture<T> catchBlock(
Callable<XFuture<T>> function,
Function<Throwable, XFuture<T>> catchBlock
) {
//convert sync exceptions into async exceptions so we can re-use same exception handling logic..
XFuture<T> future = syncToAsyncException(function);

//Now, handle ANY (sync or aysnc) exceptions the same by running a finally block......
XFuture<T> local = future.handle((r, t) -> {
//r and t can BOTH be null but t is not null on failure and null on good response
if(t == null) //response is good so return it
return XFuture.completedFuture(r);

//RUN the finally code BUT try....catch it
try {
return catchBlock.apply(t);
} catch (Throwable e) {
//IF the catch block throws sync exception, they get screwed so just pass original exception up the chain
//along with the suppressed exception inside that....
t.addSuppressed(e);
return this.<T>failedFuture(t);
}

}).thenCompose(Function.identity());

return local;
}

/**
* This is just a version of catchBlock above except this type of catch block always fails
* so it's not meant for recovery situations on returning a success response
*
* USAGE:
*
* XFuture<Response> response = futureUtil.wrap(
* () -> pageNotFoundRouter.invokeNotFoundRoute(requestCtx, responseCb),
* (e) -> new RuntimeException("NotFound Route had an exception", e)
* );
*/
public <T> XFuture<T> catchBlockWrap(
Callable<XFuture<T>> function,
Function<Throwable, Throwable> wrapException
) {
return catchBlock(function, (t) -> failedFuture(wrapException.apply(t)));
}

/**
* Copying Twitter filters, we convert all synchronous exceptions to XFuture.failedFuture
* as there has never been a need over all twitter's 1000's of servers over 10 years where we need
* to distinguish between a synchronous exception and an asynchronous one. IF YOU HAPPEN to find a
* single case, 1. create a special AsyncException and key off that and 2. PLEASE do tell me as I am
* very curious if a case like that exists
*
* USAGE:
*
* XFuture<InitiationResult> future =
* futureUtil.syncToAsyncException(
* () -> http11Handler.initialData(socket, b)
* );
*
*/
//Change to Runnable...
public <T> XFuture<T> syncToAsyncException(Callable<XFuture<T>> callable) {
try {
//DAMN these damn InvocationTargetExceptions that just fucking wrap the original
//GET rid of checked exceptions....in reality InvocationTargetException == FUCKING ANYTHING!!!
//This ensures the original thrown exception comes through.

//specifically, it's important that NotFoundException is unwrapped so above layers can catch
//that instead of this damn InvocationTargetException
return callable.call();
} catch (InvocationTargetException e) {
if(e.getCause() != null)
return failedFuture(e.getCause());
return failedFuture(e);
} catch (Exception ex) {
return failedFuture(ex);
}
}

//In jdk8, they don't have XFuture.failedFuture yet :(
public <T> XFuture<T> failedFuture(Throwable ex) {
XFuture<T> future = new XFuture<>();
future.completeExceptionally(ex);
return future;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.webpieces.util.futures;

import java.util.List;
import org.webpieces.util.futures.XFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

//TODO(dhiller): Use this http2engine(client/server), http1 client, http2to11client AND frontend as
//all of them do this pattern and we should wire that all up
public class LoopingChain<T> {

public XFuture<Void> runLoop(List<T> newData, Session session, Processor<T> processFunction) {

//All the below futures must be chained with previous ones in case previous ones are not
//done which will serialize it all to be in sequence
XFuture<Void> future = session.getProcessFuture();

for(T data : newData) {
//VERY IMPORTANT: Writing the code like this would slam through calling process N times
//BUT it doesn't give the clients a chance to seet a flag between packets
//Mainly done for exceptions and streaming so you can log exc, set a boolean so you
//don't get 100 exceptions while something is happening like socket disconnect
//In these 2 lines of code, processCorrectly is CALLED N times RIGHT NOW
//The code below this only calls them right now IF AND ONLY IF the client returns
//a completed future each time!!!

//This seems to have memory issues as well....
//XFuture<Void> temp = processFunction.process(data);
//future = future.thenCompose(f -> temp);

//future = future.thenComposeAsync( voidd -> processFunction.process(data), executor );
future = future.thenCompose( voidd -> processFunction.process(data) );
}

//comment this out and memory leak goes away of course.......
session.setProcessFuturee(future);

return future;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.webpieces.util.futures;

import org.webpieces.util.futures.XFuture;

public interface Processor<T> {

public XFuture<Void> process(T item);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.webpieces.util.futures;

import org.webpieces.util.futures.XFuture;

public interface Session {

void setProcessFuturee(XFuture<Void> future);

XFuture<Void> getProcessFuture();

}
Loading

0 comments on commit 60e988a

Please sign in to comment.