Skip to content

Commit

Permalink
HDDS-11324. Negative value preOpLatencyMs in DN audit log
Browse files Browse the repository at this point in the history
  • Loading branch information
ChenSammi committed Aug 19, 2024
1 parent 7f24f2d commit e7ef8f2
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private static final String AUDIT_PARAM_FORCE_DELETE = "forceDelete";
private static final String AUDIT_PARAM_START_CONTAINER_ID = "startContainerID";
private static final String AUDIT_PARAM_BLOCK_DATA = "blockData";
private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "blockDataSize";
private static final String AUDIT_PARAM_BLOCK_DATA_OFFSET = "offset";
private static final String AUDIT_PARAM_BLOCK_DATA_SIZE = "size";
private static final String AUDIT_PARAM_BLOCK_DATA_STAGE = "stage";
private static final String AUDIT_PARAM_COUNT = "count";
private static final String AUDIT_PARAM_START_LOCAL_ID = "startLocalID";
private static final String AUDIT_PARAM_PREV_CHUNKNAME = "prevChunkName";
Expand All @@ -112,7 +114,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
private String clusterId;
private ContainerMetrics metrics;
private final TokenVerifier tokenVerifier;
private long slowOpThresholdMs;
private long slowOpThresholdNs;
private VolumeUsage.MinFreeSpaceCalculator freeSpaceCalculator;

/**
Expand All @@ -134,7 +136,7 @@ public HddsDispatcher(ConfigurationSource config, ContainerSet contSet,
HddsConfigKeys.HDDS_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
this.tokenVerifier = tokenVerifier != null ? tokenVerifier
: new NoopTokenVerifier();
this.slowOpThresholdMs = getSlowOpThresholdMs(conf);
this.slowOpThresholdNs = getSlowOpThresholdMs(conf) * 1000000;

protocolMetrics =
new ProtocolMessageMetrics<>(
Expand Down Expand Up @@ -279,7 +281,7 @@ private ContainerCommandResponseProto dispatchRequest(
"ContainerID " + containerID
+ " has been lost and cannot be recreated on this DataNode",
ContainerProtos.Result.CONTAINER_MISSING);
audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}

Expand All @@ -306,7 +308,7 @@ private ContainerCommandResponseProto dispatchRequest(
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " creation failed",
responseProto.getResult());
audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
Expand All @@ -325,13 +327,13 @@ private ContainerCommandResponseProto dispatchRequest(
StorageContainerException sce = new StorageContainerException(
"ContainerID " + containerID + " does not exist",
ContainerProtos.Result.CONTAINER_NOT_FOUND);
audit(action, eventType, msg, AuditEventStatus.FAILURE, sce);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
containerType = getContainerType(container);
} else {
if (!msg.hasCreateContainer()) {
audit(action, eventType, msg, AuditEventStatus.FAILURE,
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception("MALFORMED_REQUEST"));
return malformedRequest(msg);
}
Expand All @@ -348,10 +350,10 @@ private ContainerCommandResponseProto dispatchRequest(
"ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
// log failure
audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE, ex);
return ContainerUtils.logAndReturnError(LOG, ex, msg);
}
perf.appendPreOpLatencyMs(Time.monotonicNow() - startTime);
perf.appendPreOpLatencyNano(Time.monotonicNowNanos() - startTime);
responseProto = handler.handle(msg, container, dispatcherContext);
long opLatencyNs = Time.monotonicNowNanos() - startTime;
if (responseProto != null) {
Expand Down Expand Up @@ -417,24 +419,24 @@ private ContainerCommandResponseProto dispatchRequest(
}
if (result == Result.SUCCESS) {
updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, msg, AuditEventStatus.SUCCESS, null);
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.SUCCESS, null);
} else {
//TODO HDDS-7096:
// This is a too general place for on demand scanning.
// Create a specific exception that signals for on demand scanning
// and move this general scan to where it is more appropriate.
// Add integration tests to test the full functionality.
OnDemandContainerDataScanner.scanContainer(container);
audit(action, eventType, msg, AuditEventStatus.FAILURE,
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception(responseProto.getMessage()));
}
perf.appendOpLatencyMs(opLatencyNs);
performanceAudit(action, msg, perf, opLatencyNs);
perf.appendOpLatencyNanos(opLatencyNs);
performanceAudit(action, msg, dispatcherContext, perf, opLatencyNs);

return responseProto;
} else {
// log failure
audit(action, eventType, msg, AuditEventStatus.FAILURE,
audit(action, eventType, msg, dispatcherContext, AuditEventStatus.FAILURE,
new Exception("UNSUPPORTED_REQUEST"));
return unsupportedRequest(msg);
}
Expand Down Expand Up @@ -547,7 +549,7 @@ public void validateContainerCommand(
StorageContainerException ex = new StorageContainerException(
"Invalid ContainerType " + containerType,
ContainerProtos.Result.CONTAINER_INTERNAL_ERROR);
audit(action, eventType, msg, AuditEventStatus.FAILURE, ex);
audit(action, eventType, msg, null, AuditEventStatus.FAILURE, ex);
throw ex;
}

Expand All @@ -567,12 +569,12 @@ public void validateContainerCommand(
// if the container is not open/recovering, no updates can happen. Just
// throw an exception
ContainerNotOpenException cex = new ContainerNotOpenException(log);
audit(action, eventType, msg, AuditEventStatus.FAILURE, cex);
audit(action, eventType, msg, null, AuditEventStatus.FAILURE, cex);
throw cex;
}
} else if (HddsUtils.isReadOnly(msg) && containerState == State.INVALID) {
InvalidContainerStateException iex = new InvalidContainerStateException(log);
audit(action, eventType, msg, AuditEventStatus.FAILURE, iex);
audit(action, eventType, msg, null, AuditEventStatus.FAILURE, iex);
throw iex;
}
}
Expand Down Expand Up @@ -678,14 +680,14 @@ private EventType getEventType(ContainerCommandRequestProto msg) {
}

private void audit(AuditAction action, EventType eventType,
ContainerCommandRequestProto msg, AuditEventStatus result,
Throwable exception) {
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext,
AuditEventStatus result, Throwable exception) {
Map<String, String> params;
AuditMessage amsg;
switch (result) {
case SUCCESS:
if (isAllowed(action.getAction())) {
params = getAuditParams(msg);
params = getAuditParams(msg, dispatcherContext);
if (eventType == EventType.READ &&
AUDIT.getLogger().isInfoEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForSuccess(action, params);
Expand All @@ -699,7 +701,7 @@ private void audit(AuditAction action, EventType eventType,
break;

case FAILURE:
params = getAuditParams(msg);
params = getAuditParams(msg, dispatcherContext);
if (eventType == EventType.READ &&
AUDIT.getLogger().isErrorEnabled(AuditMarker.READ.getMarker())) {
amsg = buildAuditMessageForFailure(action, params, exception);
Expand All @@ -719,9 +721,9 @@ private void audit(AuditAction action, EventType eventType,
}

private void performanceAudit(AuditAction action, ContainerCommandRequestProto msg,
PerformanceStringBuilder performance, long opLatencyMs) {
if (isOperationSlow(opLatencyMs)) {
Map<String, String> params = getAuditParams(msg);
DispatcherContext dispatcherContext, PerformanceStringBuilder performance, long opLatencyNs) {
if (isOperationSlow(opLatencyNs)) {
Map<String, String> params = getAuditParams(msg, dispatcherContext);
AuditMessage auditMessage =
buildAuditMessageForPerformance(action, params, performance);
AUDIT.logPerformance(auditMessage);
Expand Down Expand Up @@ -837,7 +839,7 @@ private static DNAction getAuditAction(Type cmdType) {
}

private static Map<String, String> getAuditParams(
ContainerCommandRequestProto msg) {
ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
Map<String, String> auditParams = new TreeMap<>();
Type cmdType = msg.getCmdType();
String containerID = String.valueOf(msg.getContainerID());
Expand Down Expand Up @@ -904,6 +906,8 @@ private static Map<String, String> getAuditParams(
case ReadChunk:
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockID.getFromProtobuf(msg.getReadChunk().getBlockID()).toString());
auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
String.valueOf(msg.getReadChunk().getChunkData().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getReadChunk().getChunkData().getLen()));
return auditParams;
Expand All @@ -918,8 +922,13 @@ private static Map<String, String> getAuditParams(
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockID.getFromProtobuf(msg.getWriteChunk().getBlockID())
.toString());
auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
String.valueOf(msg.getWriteChunk().getChunkData().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getWriteChunk().getChunkData().getLen()));
if (dispatcherContext != null && dispatcherContext.getStage() != null) {
auditParams.put(AUDIT_PARAM_BLOCK_DATA_STAGE, dispatcherContext.getStage().toString());
}
return auditParams;

case ListChunk:
Expand All @@ -936,6 +945,8 @@ private static Map<String, String> getAuditParams(
auditParams.put(AUDIT_PARAM_BLOCK_DATA,
BlockData.getFromProtoBuf(msg.getPutSmallFile()
.getBlock().getBlockData()).toString());
auditParams.put(AUDIT_PARAM_BLOCK_DATA_OFFSET,
String.valueOf(msg.getPutSmallFile().getChunkInfo().getOffset()));
auditParams.put(AUDIT_PARAM_BLOCK_DATA_SIZE,
String.valueOf(msg.getPutSmallFile().getChunkInfo().getLen()));
} catch (IOException ex) {
Expand Down Expand Up @@ -975,7 +986,7 @@ private static Map<String, String> getAuditParams(

}

private boolean isOperationSlow(long opLatencyMs) {
return opLatencyMs >= slowOpThresholdMs;
private boolean isOperationSlow(long opLatencyNs) {
return opLatencyNs >= slowOpThresholdNs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,20 @@ public void appendOpLatencyNanos(long nanos) {

/**
* Appends pre-operation operation latency in milliseconds.
* @param millis Latency in nanoseconds.
* @param millis Latency in milliseconds.
*/
public void appendPreOpLatencyMs(long millis) {
append("preOpLatencyMs", millis);
}

/**
* Appends pre-operation operation latency in milliseconds.
* @param nanos Latency in nanoseconds.
*/
public void appendPreOpLatencyNano(long nanos) {
append("preOpLatencyMs", TimeUnit.NANOSECONDS.toMillis(nanos));
}

/**
* Appends whole operation latency in milliseconds.
* @param millis Latency in milliseconds.
Expand Down

0 comments on commit e7ef8f2

Please sign in to comment.