Skip to content

Commit

Permalink
WIP: In the process of refactoring the unit test POC into a more prod…
Browse files Browse the repository at this point in the history
…uction ready interface that can be used to coordinate work.

The code doesn't compile, the requests aren't fully featured, and the test is in the process of being shelled and rewritten (let alone clocked up to stress the interfaces).

Signed-off-by: Greg Schohn <[email protected]>
  • Loading branch information
gregschohn committed Jun 13, 2024
1 parent 3d5a5a2 commit dc24af9
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 475 deletions.
55 changes: 54 additions & 1 deletion RFS/src/main/java/com/rfs/cms/AbstractedHttpClient.java
Original file line number Diff line number Diff line change
@@ -1,2 +1,55 @@
package com.rfs.cms;public class AbstractedHttpClient {
package com.rfs.cms;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public interface AbstractedHttpClient extends AutoCloseable {
interface AbstractHttpResponse {
Stream<Map.Entry<String, String>> getHeaders();

default byte[] getPayloadBytes() throws IOException {
return getPayloadStream().readAllBytes();
}

default InputStream getPayloadStream() throws IOException {
return new ByteArrayInputStream(getPayloadBytes());
}

String getStatusText();

int getStatusCode();

default String toDiagnosticString() {
String payloadStr;
try {
payloadStr = Arrays.toString(getPayloadBytes());
} catch (IOException e) {
payloadStr = "[EXCEPTION EVALUATING PAYLOAD]: " + e;
}
return getStatusText() + "/" + getStatusCode() +
getHeaders().map(kvp -> kvp.getKey() + ": " + kvp.getValue())
.collect(Collectors.joining(";", "[", "]")) +
payloadStr;
}
}

AbstractHttpResponse makeRequest(String method, String path,
Map<String, String> headers, String payload) throws IOException;

default AbstractHttpResponse makeJsonRequest(String method, String path,
Map<String, String> extraHeaders, String body) throws IOException {
var combinedHeaders = new LinkedHashMap<String, String>();
combinedHeaders.put("Content-Type", "application/json");
combinedHeaders.put("Accept-Encoding", "identity");
if (extraHeaders != null) {
combinedHeaders.putAll(extraHeaders);
}
return makeRequest(method, path, combinedHeaders, body);
}
}
86 changes: 85 additions & 1 deletion RFS/src/main/java/com/rfs/cms/ApacheHttpClient.java
Original file line number Diff line number Diff line change
@@ -1,2 +1,86 @@
package com.rfs.cms;public class ApacheHttpClient {
package com.rfs.cms;

import org.apache.hc.client5.http.classic.methods.HttpDelete;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.classic.methods.HttpHead;
import org.apache.hc.client5.http.classic.methods.HttpOptions;
import org.apache.hc.client5.http.classic.methods.HttpPatch;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.classic.methods.HttpPut;
import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.StringEntity;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Stream;

public class ApacheHttpClient implements AbstractedHttpClient {
private final CloseableHttpClient client = HttpClients.createDefault();
private final URI baseUri;

public ApacheHttpClient(URI baseUri) {
this.baseUri = baseUri;
}

private static HttpUriRequestBase makeRequestBase(URI baseUri, String method, String path) {
switch (method.toUpperCase()) {
case "GET":
return new HttpGet(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
case OpenSearchWorkCoordinator.POST_METHOD:
return new HttpPost(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
case OpenSearchWorkCoordinator.PUT_METHOD:
return new HttpPut(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
case "PATCH":
return new HttpPatch(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
case "HEAD":
return new HttpHead(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
case "OPTIONS":
return new HttpOptions(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
case "DELETE":
return new HttpDelete(baseUri + "/" + OpenSearchWorkCoordinator.INDEX_NAME + path);
default:
throw new IllegalArgumentException("Cannot map method to an Apache Http Client request: " + method);
}
}

@Override
public AbstractHttpResponse makeRequest(String method, String path,
Map<String, String> headers, String payload) throws IOException {
var request = makeRequestBase(baseUri, method, path);
request.setHeaders(request.getHeaders());
request.setEntity(new StringEntity(payload));
return client.execute(request, fr -> new AbstractHttpResponse() {
@Override
public InputStream getPayloadStream() throws IOException {
return fr.getEntity().getContent();
}

@Override
public String getStatusText() {
return fr.getReasonPhrase();
}

@Override
public int getStatusCode() {
return fr.getCode();
}

@Override
public Stream<Map.Entry<String, String>> getHeaders() {
return Arrays.stream(fr.getHeaders())
.map(h -> new AbstractMap.SimpleEntry<>(h.getName(), h.getValue()));
}
});
}

@Override
public void close() throws Exception {
client.close();
}
}
51 changes: 50 additions & 1 deletion RFS/src/main/java/com/rfs/cms/IWorkCoordinator.java
Original file line number Diff line number Diff line change
@@ -1,2 +1,51 @@
package com.rfs.cms;public class IWorkCoordinator {
package com.rfs.cms;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;

public interface IWorkCoordinator extends AutoCloseable {
void setup() throws IOException;

/**
* This represents when the lease wasn't acquired because another process already owned the
* lease or the workItem was already marked as completed.
*/
class LeaseNotAcquiredException extends Exception {
public LeaseNotAcquiredException() {}
}

@Getter
@AllArgsConstructor
class WorkItemAndDuration {
public final String workItemId;
public final Instant leaseExpirationTime;
}
WorkItemAndDuration acquireNextWorkItem() throws IOException;

/**
* @param workItemId - the name of the document/resource to create.
* This value will be used as a key to other methods that update leases and to close work out.
* @throws IOException if the document was not successfully create for any reason
*/
void createUnassignedWorkItem(String workItemId) throws IOException;

/**
* @param workItemId the item that the caller is trying to take ownership of
* @param leaseDuration the initial amount of time that the caller would like to own the lease for.
* Notice if other attempts have been made on this workItem, the lease will be
* greater than the requested amount.
* @return a tuple that contains the expiration time of the lease, at which point,
* this process must completely yield all work on the item
* @throws IOException if there was an error resolving the lease ownership
* @throws LeaseNotAcquiredException if the lease is owned by another process
*/
@NonNull WorkItemAndDuration createOrUpdateLeaseForWorkItem(String workItemId, Duration leaseDuration)
throws IOException, LeaseNotAcquiredException;

void completeWorkItem(String workItemId);
}
Loading

0 comments on commit dc24af9

Please sign in to comment.