Skip to content

Commit

Permalink
[vcr-2.0] Measure Azure Storage Container lag in bytes (linkedin#2884)
Browse files Browse the repository at this point in the history
This patch measures the drift of a Azure Storage container from a Ambry partition in bytes. It introduces a class responsible for aggregating container metrics. We deliberately avoid emitting per-container metrics to prevent an overwhelming increase in the number of metrics, which would strain the telemetry system. This approach has been tried previously and was unsuccessful.

A daemon will run periodically, emitting aggregate metrics in a controlled and predictable manner. The drift of the Azure container from the Ambry partition is set using a compare-and-set mechanism to prevent accidental multithreading errors. However, it's unlikely that multiple threads will handle the same partition in the VCR, as a single thread manages all replicas of a partition using ROUND_ROBIN policy.

We use the min() function because bootstrapping replicas can skew the data, falsely indicating a large drift when the partition is fully backed up. If the lag or drift is -1, we round it up to 0 and proceed.
  • Loading branch information
snalli authored Sep 13, 2024
1 parent 9f410bc commit f591e30
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.azure.data.tables.models.TableEntity;
import com.github.ambry.cloud.azure.AzureCloudConfig;
import com.github.ambry.cloud.azure.AzureMetrics;
import com.github.ambry.cloud.azure.AzureStorageContainerMetricsCollector;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.ReplicaSyncUpManager;
Expand Down Expand Up @@ -60,6 +61,7 @@
*/
public class VcrReplicaThread extends ReplicaThread {
private static final Logger logger = LoggerFactory.getLogger(VcrReplicaThread.class);
protected final AzureStorageContainerMetricsCollector azureStorageContainerMetricsCollector;
protected CloudConfig vcrNodeConfig;
protected ReplicaComparator comparator;
protected String azureTableNameReplicaTokens;
Expand Down Expand Up @@ -89,6 +91,7 @@ public VcrReplicaThread(String threadName, FindTokenHelper findTokenHelper, Clus
this.azureTableNameReplicaTokens = this.azureCloudConfig.azureTableNameReplicaTokens;
this.azureMetrics = new AzureMetrics(clusterMap.getMetricRegistry());
this.numReplIter = 0;
this.azureStorageContainerMetricsCollector = AzureStorageContainerMetricsCollector.getInstance(clusterMap.getMetricRegistry(), properties);
comparator = new ReplicaComparator();
}

Expand Down Expand Up @@ -223,6 +226,10 @@ public void advanceToken(RemoteReplicaInfo remoteReplicaInfo, ExchangeMetadataRe
StoreFindToken oldToken = (StoreFindToken) remoteReplicaInfo.getToken();
// The parent method sets in-memory token
super.advanceToken(remoteReplicaInfo, exchangeMetadataResponse);
// The lag can be -1 at times, so just round up to 0 and move on
azureStorageContainerMetricsCollector.setPartitionReplicaLag(
remoteReplicaInfo,
Math.max(0, exchangeMetadataResponse.getLocalLagFromRemoteInBytes()));
StoreFindToken token = (StoreFindToken) remoteReplicaInfo.getToken();
if (token == null) {
azureMetrics.replicaTokenWriteErrorCount.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.ambry.cloud.azure.AzureCloudConfig;
import com.github.ambry.cloud.azure.AzureMetrics;
import com.github.ambry.cloud.azure.AzureStorageContainerMetricsCollector;
import com.github.ambry.clustermap.CloudReplica;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterMapChangeListener;
Expand Down Expand Up @@ -83,6 +84,7 @@ public class VcrReplicationManager extends ReplicationEngine {
private final VcrMetrics vcrMetrics;
private final VcrClusterParticipant vcrClusterParticipant;
private final String localDatacenterName;
protected final AzureStorageContainerMetricsCollector azureStorageContainerMetricsCollector;
protected String azureTableNameReplicaTokens;
protected AzureCloudConfig azureCloudConfig;
protected AzureMetrics azureMetrics;
Expand Down Expand Up @@ -123,6 +125,7 @@ public VcrReplicationManager(VerifiableProperties properties, StoreManager store
this.azureCloudConfig = new AzureCloudConfig(properties);
this.vcrMetrics = new VcrMetrics(metricRegistry);
this.azureMetrics = new AzureMetrics(metricRegistry);
this.azureStorageContainerMetricsCollector = AzureStorageContainerMetricsCollector.getInstance(metricRegistry, properties);
this.vcrClusterParticipant = vcrClusterParticipant;
try {
vcrHelixConfig =
Expand Down Expand Up @@ -216,6 +219,7 @@ protected void addPartitionToReplicaThread(PartitionId partitionId, List<RemoteR
rinfo.setReplicaThread(rthread);
logger.info("[PARTITION] Added replica {} to thread {}", rinfo, rthread.getName());
}
azureStorageContainerMetricsCollector.addPartitionReplicas(remoteReplicaInfos);
partitionToPartitionInfo.get(partitionId).setReplicaThread(rthread);
rthread.startThread();
}
Expand Down Expand Up @@ -246,6 +250,7 @@ protected void addRemoteReplicaInfoToReplicaThread(List<RemoteReplicaInfo> remot
}
rthread.addRemoteReplicaInfo(rinfo);
rinfo.setReplicaThread(rthread);
azureStorageContainerMetricsCollector.addPartitionReplicas(Collections.singletonList(rinfo));
logger.info("[REPLICA] Added replica {} to thread {}", rinfo, rthread.getName());
} catch (Throwable e) {
vcrMetrics.addPartitionErrorCount.inc();
Expand All @@ -258,6 +263,17 @@ protected void addRemoteReplicaInfoToReplicaThread(List<RemoteReplicaInfo> remot
}
}

/**
* Remove a list of {@link RemoteReplicaInfo} from each's {@link ReplicaThread}.
* @param remoteReplicaInfos List of {@link RemoteReplicaInfo} to remote.
*/
@Override
protected void removeRemoteReplicaInfoFromReplicaThread(List<RemoteReplicaInfo> remoteReplicaInfos) {
super.removeRemoteReplicaInfoFromReplicaThread(remoteReplicaInfos);
azureStorageContainerMetricsCollector.removePartitionReplicas(remoteReplicaInfos);
}


@Override
public void retrieveReplicaTokensAndPersistIfNecessary(String mountPath) {
// nothing to do as tokens are loaded in reloadReplicationTokenIfExists() when helix adds replica
Expand Down Expand Up @@ -420,6 +436,7 @@ void removePartition(PartitionId partitionId) {
storeManager.shutdownBlobStore(partitionId);
storeManager.removeBlobStore(partitionId);
partitionInfo.setReplicaThread(null);
azureStorageContainerMetricsCollector.removePartition(partitionId.getId());
logger.info("Partition {} removed from {}", partitionId, dataNodeId);
} catch (Throwable e) {
// Helix will run into error state if exception throws in Helix context.
Expand Down Expand Up @@ -452,14 +469,12 @@ public CloudStorageCompactor getCloudStorageCompactor() {
@Override
public void updateTotalBytesReadByRemoteReplica(PartitionId partitionId, String hostName, String replicaPath,
long totalBytesRead) {
// Since replica metadata request for a single partition can goto multiple vcr nodes, totalBytesReadByRemoteReplica
// cannot be populated locally on any vcr node.
throw new UnsupportedOperationException("Unimplemented updateTotalBytesReadByRemoteReplica() in VCR");
}

@Override
public long getRemoteReplicaLagFromLocalInBytes(PartitionId partitionId, String hostName, String replicaPath) {
// TODO get replica lag from cosmos?
return -1;
throw new UnsupportedOperationException("Unimplemented getRemoteReplicaLagFromLocalInBytes() in VCR");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
// extend AzureMetricsOld to not break old-code
public class AzureMetrics extends AzureMetricsOld {

// Metric name constants
// Metric name constants - end only with "Count", "Rate" or "Latency"
public static final String AZURE_CONTAINER_LAG_BYTES_COUNT = "AzureContainerLagBytesCount";
public static final String BLOB_UPLOAD_SUCCESS_RATE = "BlobUploadSuccessRate";
public static final String BLOB_UPDATE_TTL_SUCCESS_RATE = "BlobUpdateTTLSuccessRate";
public static final String BLOB_UPDATE_TTL_LATENCY = "BlobUpdateTTLLatency";
Expand Down Expand Up @@ -58,6 +59,7 @@ public class AzureMetrics extends AzureMetricsOld {
public static final String BLOB_BATCH_UPLOAD_LATENCY = "BlobBatchUploadLatency";

// Azure Storage metrics
public final Counter azureContainerLagBytesCount;
public final Counter blobContainerErrorCount;
public final Timer blobCompactionLatency;
public final Timer partitionCompactionLatency;
Expand Down Expand Up @@ -97,6 +99,8 @@ public AzureMetrics(MetricRegistry registry) {
super(registry);
// V2 metrics
// These are registered in the closed-source version of Ambry
azureContainerLagBytesCount = registry.counter(MetricRegistry.name(AzureMetrics.class,
AZURE_CONTAINER_LAG_BYTES_COUNT));
blobBatchUploadLatency = registry.timer(MetricRegistry.name(AzureMetrics.class, BLOB_BATCH_UPLOAD_LATENCY));
blobCheckError = registry.counter(MetricRegistry.name(AzureMetrics.class, BLOB_CHECK_ERROR));
blobCompactionErrorCount = registry.counter(MetricRegistry.name(AzureMetrics.class, BLOB_COMPACTION_ERROR_COUNT));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud.azure;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;


/**
* A class that holds all metrics pertaining to one Azure Storage Container.
*/
public class AzureStorageContainerMetrics {
/**
* id is the unique identifier of the azure-container or ambry-partition.
*/
Long id;
/**
* lag is the number of bytes that the azure-container is behind or ahead of the associated ambry-partition.
* Although we don't emit a positive drift, it is possible to have a positive drift if the azure-container is ahead
* of a bootstrapping ambry-partition.
*/
ConcurrentHashMap<String, AtomicLong> replicaLag;

public AzureStorageContainerMetrics(Long id) {
this.id = id;
replicaLag = new ConcurrentHashMap<>();
}

public void addPartitionReplica(String hostname) {
replicaLag.putIfAbsent(hostname, new AtomicLong(Long.MAX_VALUE));
}

public void removePartitionReplica(String hostname) {
replicaLag.remove(hostname);
}

public Long getPartitionLag() {
return replicaLag.values().stream().map(AtomicLong::get).reduce(Long.MAX_VALUE, Long::min);
}

public void setPartitionReplicaLag(String hostname, long update) {
this.replicaLag.get(hostname).compareAndSet(this.replicaLag.get(hostname).get(), update);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/**
* Copyright 2024 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.cloud.azure;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.replication.RemoteReplicaInfo;
import com.github.ambry.utils.Utils;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A class that aggregates container metrics. We do _NOT_ want to emit per-container metrics because then the number
* of metrics increases proportionally overwhelming telemetry. This was attempted and failed.
*
* A daemon will run at regular intervals emitting aggregate metrics in a controlled and predictable manner.
* This is a singleton class to avoid multiple collector threads.
*/
public class AzureStorageContainerMetricsCollector {
private final AzureMetrics metrics;
private final ConcurrentHashMap<Long, AzureStorageContainerMetrics> metricMap;
private final ScheduledExecutorService executor;
private static AzureStorageContainerMetricsCollector instance;
private final VerifiableProperties properties;

public static final Logger logger = LoggerFactory.getLogger(AzureStorageContainerMetricsCollector.class);

private AzureStorageContainerMetricsCollector(MetricRegistry metrics, VerifiableProperties properties) {
metricMap = new ConcurrentHashMap<>();
this.metrics = new AzureMetrics(metrics);
this.properties = properties;
executor = Utils.newScheduler(1, "azure_storage_container_metrics_collector_", true);
executor.scheduleWithFixedDelay(getCollector(), 0, 2, TimeUnit.MINUTES);
logger.info("Started AzureStorageContainerMetricsCollector");
}

private Runnable getCollector() {
return () -> {
Long totalLag = metricMap.values().stream()
.map(container -> container.getPartitionLag())
.reduce(0L, Long::sum);
this.metrics.azureContainerLagBytesCount.inc(totalLag);
};
}

/**
* Thread-safe singleton initializer
* @param metrics
* @return collector instance
*/
public static synchronized AzureStorageContainerMetricsCollector getInstance(MetricRegistry metrics,
VerifiableProperties properties) {
if (instance == null) {
instance = new AzureStorageContainerMetricsCollector(metrics, properties);
}
return instance;
}

public void addPartitionReplicas(List<RemoteReplicaInfo> remoteReplicaInfos) {
for (RemoteReplicaInfo rinfo : remoteReplicaInfos) {
// Don't store any references to PartitionId or RemoteReplicaInfo.
// With improper clean up, these references linger around and cause memory leaks.
long pid = rinfo.getReplicaId().getPartitionId().getId();
String rid = rinfo.getReplicaId().getDataNodeId().getHostname();
metricMap.putIfAbsent(pid, new AzureStorageContainerMetrics(pid));
metricMap.get(pid).addPartitionReplica(rid);
}
}

public void removePartitionReplicas(List<RemoteReplicaInfo> remoteReplicaInfos) {
for (RemoteReplicaInfo rinfo : remoteReplicaInfos) {
long pid = rinfo.getReplicaId().getPartitionId().getId();
String rid = rinfo.getReplicaId().getDataNodeId().getHostname();
if (metricMap.containsKey(pid)) {
metricMap.get(pid).removePartitionReplica(rid);
}
}
}

public void removePartition(Long id) {
metricMap.remove(id);
}

/**
* Sets the lag of azure-container from ambry-partition.
* We use a compare-set to guard against accidental multithreaded errors, although two threads will most likely
* not be responsible for a single partition in VCR. A single thread handles all replicas of a partition.
* However, we want to avoid any races between reader and writers.
* Use min() as bootstrapping replicas can give a wrong picture and indicate a large lag even though the partition
* is fully backed up in Azure.
* @param rinfo RemoteReplicaInfo
* @param lag Lag in bytes
*/
public synchronized void setPartitionReplicaLag(RemoteReplicaInfo rinfo, long lag) {
long pid = rinfo.getReplicaId().getPartitionId().getId();
String rid = rinfo.getReplicaId().getDataNodeId().getHostname();
metricMap.get(pid).setPartitionReplicaLag(rid, lag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,10 @@ public ExchangeMetadataResponse(Set<MessageInfo> missingStoreMessages, FindToken
this.receivedStoreMessagesWithUpdatesPending = other.receivedStoreMessagesWithUpdatesPending;
}

public long getLocalLagFromRemoteInBytes() {
return localLagFromRemoteInBytes;
}

/**
* Checks if there are any missing store messages in this metadata exchange.
* @return set of missing store messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.ambry.cloud.azure.AzureCloudConfig;
import com.github.ambry.cloud.azure.AzureCloudDestinationFactory;
import com.github.ambry.cloud.azure.AzureCloudDestinationSync;
import com.github.ambry.cloud.azure.AzureStorageContainerMetricsCollector;
import com.github.ambry.cloud.azure.AzuriteUtils;
import com.github.ambry.clustermap.CloudDataNode;
import com.github.ambry.clustermap.CloudReplica;
Expand Down Expand Up @@ -144,6 +145,8 @@ public void beforeTest() throws ReflectiveOperationException {
// Create remote-replica info
replica = new RemoteReplicaInfo(mockPartitionId.getReplicaIds().get(0), null, null,
null, Long.MAX_VALUE, SystemTime.getInstance(), null);
AzureStorageContainerMetricsCollector.getInstance(mockClusterMap.getMetricRegistry(), verifiableProperties)
.addPartitionReplicas(Collections.singletonList(replica));
}

protected Pair<TableEntity, StoreFindToken> getTokenFromAzureTable() throws IOException {
Expand Down

0 comments on commit f591e30

Please sign in to comment.