Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFS] Improved RFS Worker exception logging #676

Merged
merged 6 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion RFS/src/main/java/com/rfs/RunRfsWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;


import com.rfs.cms.CmsClient;
import com.rfs.cms.CmsEntry;
import com.rfs.cms.OpenSearchCmsClient;
import com.rfs.common.ClusterVersion;
import com.rfs.common.ConnectionDetails;
Expand All @@ -32,7 +37,9 @@
import com.rfs.version_os_2_11.GlobalMetadataCreator_OS_2_11;
import com.rfs.worker.GlobalState;
import com.rfs.worker.MetadataRunner;
import com.rfs.worker.Runner;
import com.rfs.worker.SnapshotRunner;
import com.rfs.worker.WorkerStep;

public class RunRfsWorker {
private static final Logger logger = LogManager.getLogger(RunRfsWorker.class);
Expand Down Expand Up @@ -133,9 +140,30 @@
MetadataRunner metadataWorker = new MetadataRunner(globalState, cmsClient, snapshotName, metadataFactory, metadataCreator, transformer);
metadataWorker.run();

} catch (Runner.PhaseFailed e) {
logPhaseFailureRecord(e.phase, e.nextStep, e.cmsEntry, e.e);
throw e;

Check warning on line 145 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L143-L145

Added lines #L143 - L145 were not covered by tests
} catch (Exception e) {
logger.error("Error running RfsWorker", e);
logger.error("Unexpected error running RfsWorker", e);

Check warning on line 147 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L147

Added line #L147 was not covered by tests
throw e;
}
}

public static void logPhaseFailureRecord(GlobalState.Phase phase, WorkerStep nextStep, Optional<CmsEntry.Base> cmsEntry, Exception e) {
ObjectNode errorBlob = new ObjectMapper().createObjectNode();
errorBlob.put("exceptionMessage", e.getMessage());
errorBlob.put("exceptionClass", e.getClass().getSimpleName());
errorBlob.put("exceptionTrace", Arrays.toString(e.getStackTrace()));

Check warning on line 156 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L153-L156

Added lines #L153 - L156 were not covered by tests

errorBlob.put("phase", phase.toString());

Check warning on line 158 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L158

Added line #L158 was not covered by tests

String currentStep = (nextStep != null) ? nextStep.getClass().getSimpleName() : "null";
errorBlob.put("currentStep", currentStep);

Check warning on line 161 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L161

Added line #L161 was not covered by tests

String currentEntry = (cmsEntry.isPresent()) ? cmsEntry.toString() : "null";
errorBlob.put("cmsEntry", currentEntry);

Check warning on line 164 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L164

Added line #L164 was not covered by tests


logger.error(errorBlob.toString());
}

Check warning on line 168 in RFS/src/main/java/com/rfs/RunRfsWorker.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/RunRfsWorker.java#L167-L168

Added lines #L167 - L168 were not covered by tests
}
41 changes: 20 additions & 21 deletions RFS/src/main/java/com/rfs/cms/CmsClient.java
Original file line number Diff line number Diff line change
@@ -1,46 +1,45 @@
package com.rfs.cms;

import java.util.Optional;

/*
* Client to connect to and work with the Coordinating Metadata Store. The CMS could be implemented by any reasonable
* data store option (Postgres, AWS DynamoDB, Elasticsearch/Opensearch, etc).
*/
public interface CmsClient {
/*
* Creates a new entry in the CMS for the Snapshot's progress. Returns true if we created the entry, and false if
* the entry already exists.
*/
public boolean createSnapshotEntry(String snapshotName);

/*
* Attempt to retrieve the Snapshot entry from the CMS, if it exists; null if it doesn't currently exist
* Creates a new entry in the CMS for the Snapshot's progress. Returns an Optional; if the document was created, it
* will be the created object and empty otherwise.
*/
public CmsEntry.Snapshot getSnapshotEntry(String snapshotName);
public Optional<CmsEntry.Snapshot> createSnapshotEntry(String snapshotName);

/*
* Updates the status of the Snapshot entry in the CMS. Returns true if the update was successful, and false if
* something else updated it before we could
* Attempt to retrieve the Snapshot entry from the CMS. Returns an Optional; if the document exists, it will be the
* retrieved entry and empty otherwise.
*/
public boolean updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);
public Optional<CmsEntry.Snapshot> getSnapshotEntry(String snapshotName);

/*
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns true if we created the entry, and
* false if the entry already exists.
* Updates the Snapshot entry in the CMS. Returns an Optional; if the document was updated, it will be
* the updated entry and empty otherwise.
*/
public boolean createMetadataEntry();
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status);

/*
* Attempt to retrieve the Metadata Migration entry from the CMS, if it exists; null if it doesn't currently exist
* Creates a new entry in the CMS for the Metadata Migration's progress. Returns an Optional; if the document was
* created, it will be the created entry and empty otherwise.
*/
public CmsEntry.Metadata getMetadataEntry();
public Optional<CmsEntry.Metadata> createMetadataEntry();

/*
* Updates just the status field of the Metadata Migration entry in the CMS. Returns true if the update was successful,
* Attempt to retrieve the Metadata Migration entry from the CMS, if it exists. Returns an Optional; if the document
* exists, it will be the retrieved entry and empty otherwise.
*/
public boolean setMetadataMigrationStatus(CmsEntry.MetadataStatus status);
public Optional<CmsEntry.Metadata> getMetadataEntry();

/*
* Updates all fields of the Metadata Migration entry in the CMS. Returns true if the update was successful, and
* false if something else updated it before we could
* Updates the Metadata Migration entry in the CMS. Returns an Optional; if the document was updated,
* it will be the updated entry and empty otherwise.
*/
public boolean updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts);
}
28 changes: 26 additions & 2 deletions RFS/src/main/java/com/rfs/cms/CmsEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,35 @@
import com.rfs.common.RfsException;

public class CmsEntry {
public abstract static class Base {
protected Base() {}
public abstract String toString();
}

public static enum SnapshotStatus {
NOT_STARTED,
IN_PROGRESS,
COMPLETED,
FAILED,
}

public static class Snapshot {
public static class Snapshot extends Base {
public final String name;
public final SnapshotStatus status;

public Snapshot(String name, SnapshotStatus status) {
super();
this.name = name;
this.status = status;
}

@Override
public String toString() {
return "Snapshot("
+ "name='" + name + ","
+ "status=" + status +
")";
}
}

public static enum MetadataStatus {
Expand All @@ -26,7 +40,7 @@ public static enum MetadataStatus {
FAILED,
}

public static class Metadata {
public static class Metadata extends Base {
public static final int METADATA_LEASE_MS = 1 * 60 * 1000; // 1 minute, arbitrarily chosen
public static final int MAX_ATTEMPTS = 3; // arbitrarily chosen

Expand All @@ -50,10 +64,20 @@ public static String getLeaseExpiry(long currentTime, int numAttempts) {
public final Integer numAttempts;

public Metadata(MetadataStatus status, String leaseExpiry, int numAttempts) {
super();
this.status = status;
this.leaseExpiry = leaseExpiry;
this.numAttempts = numAttempts;
}

@Override
public String toString() {
return "Metadata("
+ "status=" + status.toString() + ","
+ "leaseExpiry=" + leaseExpiry + ","
+ "numAttempts=" + numAttempts.toString() +
")";
}
}

public static class CouldNotFindNextLeaseDuration extends RfsException {
Expand Down
63 changes: 25 additions & 38 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.rfs.cms;

import java.net.HttpURLConnection;
import java.util.Optional;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.rfs.common.OpenSearchClient;
import com.rfs.common.RestClient;

public class OpenSearchCmsClient implements CmsClient {
private static final ObjectMapper objectMapper = new ObjectMapper();

public static final String CMS_INDEX_NAME = "cms-reindex-from-snapshot";
public static final String CMS_SNAPSHOT_DOC_ID = "snapshot_status";
public static final String CMS_METADATA_DOC_ID = "metadata_status";
Expand All @@ -21,54 +17,45 @@
}

@Override
public boolean createSnapshotEntry(String snapshotName) {
ObjectNode initial = OpenSearchCmsEntry.Snapshot.getInitial(snapshotName);
return client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, initial);
}

@Override
public CmsEntry.Snapshot getSnapshotEntry(String snapshotName) {
RestClient.Response response = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID);

if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
return null;
}
return OpenSearchCmsEntry.Snapshot.fromJsonString(response.body);
public Optional<CmsEntry.Snapshot> createSnapshotEntry(String snapshotName) {
OpenSearchCmsEntry.Snapshot newEntry = OpenSearchCmsEntry.Snapshot.getInitial(snapshotName);
Optional<ObjectNode> createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, newEntry.toJson());
return createdEntry.map(OpenSearchCmsEntry.Snapshot::fromJson);

Check warning on line 23 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java#L21-L23

Added lines #L21 - L23 were not covered by tests
}

@Override
public boolean updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) {
OpenSearchCmsEntry.Snapshot snapshot = new OpenSearchCmsEntry.Snapshot(snapshotName, status);
return client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, snapshot.toJson());
public Optional<CmsEntry.Snapshot> getSnapshotEntry(String snapshotName) {
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID);
return document.map(doc -> (ObjectNode) doc.get("_source"))
.map(OpenSearchCmsEntry.Snapshot::fromJson);

Check warning on line 30 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java#L28-L30

Added lines #L28 - L30 were not covered by tests
}

@Override
public boolean createMetadataEntry() {
ObjectNode metadataDoc = OpenSearchCmsEntry.Metadata.getInitial();
return client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadataDoc);
public Optional<CmsEntry.Snapshot> updateSnapshotEntry(String snapshotName, CmsEntry.SnapshotStatus status) {
OpenSearchCmsEntry.Snapshot entry = new OpenSearchCmsEntry.Snapshot(snapshotName, status);
Optional<ObjectNode> updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_SNAPSHOT_DOC_ID, entry.toJson());
return updatedEntry.map(OpenSearchCmsEntry.Snapshot::fromJson);

Check warning on line 37 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java#L35-L37

Added lines #L35 - L37 were not covered by tests
}

@Override
public CmsEntry.Metadata getMetadataEntry() {
RestClient.Response response = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID);
public Optional<CmsEntry.Metadata> createMetadataEntry() {
OpenSearchCmsEntry.Metadata entry = OpenSearchCmsEntry.Metadata.getInitial();
Optional<ObjectNode> createdEntry = client.createDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, entry.toJson());
return createdEntry.map(OpenSearchCmsEntry.Metadata::fromJson);

Check warning on line 44 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java#L42-L44

Added lines #L42 - L44 were not covered by tests

if (response.code == HttpURLConnection.HTTP_NOT_FOUND) {
return null;
}
return OpenSearchCmsEntry.Metadata.fromJsonString(response.body);
}

@Override
public boolean setMetadataMigrationStatus(CmsEntry.MetadataStatus status) {
ObjectNode statusUpdate = objectMapper.createObjectNode();
statusUpdate.put(OpenSearchCmsEntry.Metadata.FIELD_STATUS, status.toString());
return client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, statusUpdate);
public Optional<CmsEntry.Metadata> getMetadataEntry() {
Optional<ObjectNode> document = client.getDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID);
return document.map(doc -> (ObjectNode) doc.get("_source"))
.map(OpenSearchCmsEntry.Metadata::fromJson);

Check warning on line 52 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java#L50-L52

Added lines #L50 - L52 were not covered by tests
}

@Override
public boolean updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) {
public Optional<CmsEntry.Metadata> updateMetadataEntry(CmsEntry.MetadataStatus status, String leaseExpiry, Integer numAttempts) {
OpenSearchCmsEntry.Metadata metadata = new OpenSearchCmsEntry.Metadata(status, leaseExpiry, numAttempts);
return client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson());
}

Optional<ObjectNode> updatedEntry = client.updateDocument(CMS_INDEX_NAME, CMS_METADATA_DOC_ID, metadata.toJson());
return updatedEntry.map(OpenSearchCmsEntry.Metadata::fromJson);

Check warning on line 59 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsClient.java#L58-L59

Added lines #L58 - L59 were not covered by tests
}
}
59 changes: 29 additions & 30 deletions RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,18 @@
public static final String FIELD_NAME = "name";
public static final String FIELD_STATUS = "status";

public static ObjectNode getInitial(String name) {
ObjectNode node = objectMapper.createObjectNode();
node.put(FIELD_STATUS, name);
node.put(FIELD_STATUS, CmsEntry.SnapshotStatus.NOT_STARTED.toString());
return node;
public static Snapshot getInitial(String name) {
return new Snapshot(name, CmsEntry.SnapshotStatus.NOT_STARTED);

Check warning on line 17 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L17

Added line #L17 was not covered by tests
}

public static Snapshot fromJsonString(String json) {
public static Snapshot fromJson(ObjectNode node) {
try {
ObjectNode node = objectMapper.readValue(json, ObjectNode.class);
ObjectNode sourceNode = (ObjectNode) node.get("_source");

return new Snapshot(
sourceNode.get(FIELD_STATUS).asText(),
CmsEntry.SnapshotStatus.valueOf(sourceNode.get(FIELD_STATUS).asText())
node.get(FIELD_STATUS).asText(),
CmsEntry.SnapshotStatus.valueOf(node.get(FIELD_STATUS).asText())

Check warning on line 24 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L23-L24

Added lines #L23 - L24 were not covered by tests
);
} catch (Exception e) {
throw new CantParseCmsEntryFromJson(Snapshot.class, json, e);
throw new CantParseCmsEntryFromJson(Snapshot.class, node.toString(), e);

Check warning on line 27 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L27

Added line #L27 was not covered by tests
}
}

Expand All @@ -44,37 +38,37 @@
node.put(FIELD_STATUS, status.toString());
return node;
}

@Override
public String toString() {
return this.toJson().toString();

Check warning on line 44 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L44

Added line #L44 was not covered by tests
}
chelma marked this conversation as resolved.
Show resolved Hide resolved
}

public static class Metadata extends CmsEntry.Metadata {
public static final String FIELD_STATUS = "status";
public static final String FIELD_LEASE_EXPIRY = "leaseExpiry";
public static final String FIELD_NUM_ATTEMPTS = "numAttempts";

public static ObjectNode getInitial() {
ObjectNode metadataDoc = objectMapper.createObjectNode();
metadataDoc.put(FIELD_STATUS, CmsEntry.MetadataStatus.IN_PROGRESS.toString());
metadataDoc.put(FIELD_NUM_ATTEMPTS, 1);

// TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way
// to do this. For now, we'll just use the client's clock.
metadataDoc.put(FIELD_LEASE_EXPIRY, CmsEntry.Metadata.getLeaseExpiry(Instant.now().toEpochMilli(), 1));

return metadataDoc;
public static Metadata getInitial() {
return new Metadata(

Check warning on line 54 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L54

Added line #L54 was not covered by tests
CmsEntry.MetadataStatus.IN_PROGRESS,
// TODO: We should be ideally setting the lease using the server's clock, but it's unclear on the best way
// to do this. For now, we'll just use the client's clock.
CmsEntry.Metadata.getLeaseExpiry(Instant.now().toEpochMilli(), 1),
1

Check warning on line 59 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L58-L59

Added lines #L58 - L59 were not covered by tests
);
}

public static Metadata fromJsonString(String json) {
public static Metadata fromJson(ObjectNode node) {
try {
ObjectNode node = objectMapper.readValue(json, ObjectNode.class);
ObjectNode sourceNode = (ObjectNode) node.get("_source");

return new Metadata(
CmsEntry.MetadataStatus.valueOf(sourceNode.get(FIELD_STATUS).asText()),
sourceNode.get(FIELD_LEASE_EXPIRY).asText(),
sourceNode.get(FIELD_NUM_ATTEMPTS).asInt()
CmsEntry.MetadataStatus.valueOf(node.get(FIELD_STATUS).asText()),
node.get(FIELD_LEASE_EXPIRY).asText(),
node.get(FIELD_NUM_ATTEMPTS).asInt()

Check warning on line 68 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L66-L68

Added lines #L66 - L68 were not covered by tests
);
} catch (Exception e) {
throw new CantParseCmsEntryFromJson(Metadata.class, json, e);
throw new CantParseCmsEntryFromJson(Metadata.class, node.toString(), e);

Check warning on line 71 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L71

Added line #L71 was not covered by tests
}
}

Expand All @@ -89,6 +83,11 @@
node.put(FIELD_NUM_ATTEMPTS, numAttempts);
return node;
}

@Override
public String toString() {
return this.toJson().toString();

Check warning on line 89 in RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java

View check run for this annotation

Codecov / codecov/patch

RFS/src/main/java/com/rfs/cms/OpenSearchCmsEntry.java#L89

Added line #L89 was not covered by tests
}
}

public static class CantParseCmsEntryFromJson extends RfsException {
Expand Down
Loading
Loading