From 64c33200ef729e8d2319186df4597005da0294b7 Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Mon, 30 Sep 2024 14:41:42 +0530 Subject: [PATCH 1/2] Interface Change For File Copy --- .../com/github/ambry/protocol/RequestAPI.java | 6 +++ ambry-files-transfer/build.gradle | 19 +++++++++ .../com/github/ambry/FileCopyManager.java | 42 +++++++++++++++++++ .../src/main/java/com/github/ambry/Main.java | 7 ++++ .../ambry/protocol/FileChunkRequest.java | 7 ++++ .../ambry/protocol/FileChunkResponse.java | 11 +++++ .../ambry/protocol/FileMetaDataRequest.java | 17 ++++++++ .../ambry/protocol/FileMetaDataResponse.java | 11 +++++ .../ambry/protocol/RequestOrResponseType.java | 9 +++- .../ambry/protocol/StopCompactionRequest.java | 7 ++++ .../protocol/StopCompactionResponse.java | 11 +++++ .../github/ambry/server/AmbryRequests.java | 24 ++++++++++- settings.gradle | 1 + 13 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 ambry-files-transfer/build.gradle create mode 100644 ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java create mode 100644 ambry-files-transfer/src/main/java/com/github/ambry/Main.java create mode 100644 ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java create mode 100644 ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java create mode 100644 ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java create mode 100644 ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java create mode 100644 ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionRequest.java create mode 100644 ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionResponse.java diff --git a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java index cb912b7379..8b4ff236f4 100644 --- a/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java +++ b/ambry-api/src/main/java/com/github/ambry/protocol/RequestAPI.java @@ -106,4 +106,10 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException { throw new UnsupportedOperationException("Undelete request not supported on this node"); } + + void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException; + + void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException; + + void handleStopCompactionRequest(NetworkRequest request) throws InterruptedException, IOException; } diff --git a/ambry-files-transfer/build.gradle b/ambry-files-transfer/build.gradle new file mode 100644 index 0000000000..5b04880927 --- /dev/null +++ b/ambry-files-transfer/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'java' +} + +group = 'com.github.ambry' +version = '0.4.432' + +repositories { + mavenCentral() +} + +dependencies { + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java b/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java new file mode 100644 index 0000000000..fdb62f1d6f --- /dev/null +++ b/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java @@ -0,0 +1,42 @@ +package com.github.ambry; + +import com.github.ambry.clustermap.PartitionStateChangeListener; + + +public class FileCopyManager { + class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { + @Override + public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + + } + + @Override + public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { + + } + + @Override + public void onPartitionBecomeLeaderFromStandby(String partitionName) { + + } + + @Override + public void onPartitionBecomeStandbyFromLeader(String partitionName) { + + } + + @Override + public void onPartitionBecomeInactiveFromStandby(String partitionName) { + + } + + @Override + public void onPartitionBecomeOfflineFromInactive(String partitionName) { + + } + + @Override + public void onPartitionBecomeDroppedFromOffline(String partitionName) { + + } + } diff --git a/ambry-files-transfer/src/main/java/com/github/ambry/Main.java b/ambry-files-transfer/src/main/java/com/github/ambry/Main.java new file mode 100644 index 0000000000..c152d820d7 --- /dev/null +++ b/ambry-files-transfer/src/main/java/com/github/ambry/Main.java @@ -0,0 +1,7 @@ +package com.github.ambry; + +public class Main { + public static void main(String[] args) { + System.out.println("Hello world!"); + } +} \ No newline at end of file diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java new file mode 100644 index 0000000000..e552f52404 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java @@ -0,0 +1,7 @@ +package com.github.ambry.protocol; + +public class FileChunkRequest extends RequestOrResponse{ + public FileChunkRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId) { + super(type, versionId, correlationId, clientId); + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java new file mode 100644 index 0000000000..bdafc07064 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java @@ -0,0 +1,11 @@ +package com.github.ambry.protocol; + +import com.github.ambry.server.ServerErrorCode; + + +public class FileChunkResponse extends Response{ + public FileChunkResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId, String clientId, + ServerErrorCode error) { + super(type, requestResponseVersion, correlationId, clientId, error); + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java new file mode 100644 index 0000000000..6840e471c0 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataRequest.java @@ -0,0 +1,17 @@ +package com.github.ambry.protocol; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ServerCnxn; + + +public class FileMetaDataRequest extends RequestOrResponse { + + public FileMetaDataRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId) { + super(type, versionId, correlationId, clientId); + } + + +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java new file mode 100644 index 0000000000..fc33749b4b --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileMetaDataResponse.java @@ -0,0 +1,11 @@ +package com.github.ambry.protocol; + +import com.github.ambry.server.ServerErrorCode; + + +public class FileMetaDataResponse extends Response{ + public FileMetaDataResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId, + String clientId, ServerErrorCode error) { + super(type, requestResponseVersion, correlationId, clientId, error); + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index 115562bd9a..a9c3774346 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -37,5 +37,12 @@ public enum RequestOrResponseType { PurgeRequest, PurgeResponse, BatchDeleteRequest, - BatchDeleteResponse + BatchDeleteResponse, + FileMetaDataRequest, + FileMetaDataResponse, + + FileChunkRequest, + FileChunkResponse, + StopCompactionRequest, + StopCompactionResponse } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionRequest.java new file mode 100644 index 0000000000..1c13ed1ab6 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionRequest.java @@ -0,0 +1,7 @@ +package com.github.ambry.protocol; + +public class StopCompactionRequest extends RequestOrResponse{ + public StopCompactionRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId) { + super(type, versionId, correlationId, clientId); + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionResponse.java new file mode 100644 index 0000000000..a64a17a537 --- /dev/null +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/StopCompactionResponse.java @@ -0,0 +1,11 @@ +package com.github.ambry.protocol; + +import com.github.ambry.server.ServerErrorCode; + + +public class StopCompactionResponse extends Response{ + public StopCompactionResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId, + String clientId, ServerErrorCode error) { + super(type, requestResponseVersion, correlationId, clientId, error); + } +} diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index c24a641522..a650d5ff6b 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -225,6 +225,13 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce case ReplicateBlobRequest: handleReplicateBlobRequest(networkRequest); break; + case FileMetaDataRequest: + handleFileMetaDataRequest(networkRequest); + break; + case FileChunkRequest: + handleFileChunkRequest(networkRequest); + case StopCompactionRequest: + handleStopCompactionRequest(networkRequest); default: throw new UnsupportedOperationException("Request type not supported"); } @@ -1489,6 +1496,21 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent)); } + @Override + public void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException { + + } + + @Override + public void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException { + + } + + @Override + public void handleStopCompactionRequest(NetworkRequest request) throws InterruptedException, IOException { + + } + /** * Get the formatted messages which needs to be written to Store. * @param receivedRequest received Put Request @@ -1863,7 +1885,7 @@ public void visit(DeleteRequest deleteRequest) { @Override public void visit(BatchDeleteRequest deleteRequest) { - + } @Override diff --git a/settings.gradle b/settings.gradle index d19582e993..c4db1e3bb5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -31,4 +31,5 @@ include 'ambry-api', 'ambry-mysql', 'ambry-filesystem', 'ambry-vcr' +include 'ambry-files-transfer' From cdf4f95b46dde8ec556a09deae8a8df8fbc615ae Mon Sep 17 00:00:00 2001 From: Suryakant Agarwal Date: Tue, 1 Oct 2024 15:21:22 +0530 Subject: [PATCH 2/2] Adding Implementation Details For Bootstrap --- .../PartitionStateChangeListener.java | 12 +++++ .../github/ambry/clustermap/ReplicaState.java | 6 +++ .../clustermap/StateModelListenerType.java | 5 ++ .../github/ambry/config/FileCopyConfig.java | 6 +++ .../ambry/clustermap/HelixParticipant.java | 35 +++++++++++++ .../com/github/ambry/FileCopyManager.java | 51 ++++++++++++++++++- .../github/ambry/ReplicaSyncUpManager.java | 7 +++ .../ambry/protocol/FileChunkRequest.java | 11 +++- .../ambry/protocol/FileChunkResponse.java | 9 ++++ .../ambry/protocol/RequestOrResponseType.java | 5 +- .../github/ambry/server/AmbryRequests.java | 2 +- .../com/github/ambry/server/AmbryServer.java | 8 +++ 12 files changed, 150 insertions(+), 7 deletions(-) create mode 100644 ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java create mode 100644 ambry-files-transfer/src/main/java/com/github/ambry/ReplicaSyncUpManager.java diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/PartitionStateChangeListener.java b/ambry-api/src/main/java/com/github/ambry/clustermap/PartitionStateChangeListener.java index 63c6b69b21..138f11f9b8 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/PartitionStateChangeListener.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/PartitionStateChangeListener.java @@ -24,6 +24,18 @@ public interface PartitionStateChangeListener { */ void onPartitionBecomeBootstrapFromOffline(String partitionName); + /** + * Action to take when partition becomes bootstrap from offline. + * @param partitionName of the partition. + */ + void onPartitionBecomeHydrateFromOffline(String partitionName); + + /** + * Action to take when partition becomes bootstrap from offline. + * @param partitionName of the partition. + */ + void onPartitionBecomeBootstrapFromHydrate(String partitionName); + /** * Action to take when partition becomes standby from bootstrap. * @param partitionName of the partition. diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaState.java b/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaState.java index bb5c5f3bd0..ead577f603 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaState.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/ReplicaState.java @@ -24,6 +24,12 @@ public enum ReplicaState { */ OFFLINE, + /** + * A new state to Be introduced between OFFline And Bootstrap + * Router should not send any request to replica in this state. + */ + HYDRATE, + /** * Bootstrap state is an intermediate state between OFFLINE and STANDBY. * This state allows replica to do some bootstrap work like checking replication lag and catching up with peers. diff --git a/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java b/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java index 83c5f43618..8704f74dba 100644 --- a/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java +++ b/ambry-api/src/main/java/com/github/ambry/clustermap/StateModelListenerType.java @@ -31,6 +31,11 @@ public enum StateModelListenerType { * keeps checking replication lag of this replica and ensures it catches up with its peer replicas. */ ReplicationManagerListener, + + /** + * + */ + FileCopyManagerListener, /** * The partition state change listener owned by stats manager. It takes actions when new replica is added (OFFLINE -> * BOOTSTRAP) or old replica is removed (INACTIVE -> OFFLINE) diff --git a/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java new file mode 100644 index 0000000000..2c9308a9de --- /dev/null +++ b/ambry-api/src/main/java/com/github/ambry/config/FileCopyConfig.java @@ -0,0 +1,6 @@ +package com.github.ambry.config; + +public class FileCopyConfig { + public FileCopyConfig(VerifiableProperties verifiableProperties) {} + +} diff --git a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java index 88f61e0b19..68af24f238 100644 --- a/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java +++ b/ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java @@ -832,6 +832,41 @@ public void onPartitionBecomeBootstrapFromOffline(String partitionName) { localPartitionAndState.put(partitionName, ReplicaState.BOOTSTRAP); } + @Override + public void onPartitionBecomeHydrateFromOffline(String partitionName) { + PartitionStateChangeListener fileCopyListener = + partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener); + try{ + if(fileCopyListener !=null){ + //fileCopyListener.(); + } + } + } + + @Override + public void onPartitionBecomeBootstrapFromHydrate(String partitionName) { + PartitionStateChangeListener replicationManagerListener = + partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener); + try { + if (replicationManagerListener != null) { + + replicationManagerListener.onPartitionBecomeStandbyFromBootstrap(partitionName); + // after bootstrap is initiated in ReplicationManager, transition is blocked here and wait until local replica has + // caught up with enough peer replicas. + replicaSyncUpManager.waitBootstrapCompleted(partitionName); + } + } catch (InterruptedException e) { + logger.error("Bootstrap was interrupted on partition {}", partitionName); + localPartitionAndState.put(partitionName, ReplicaState.ERROR); + throw new StateTransitionException("Bootstrap failed or was interrupted", BootstrapFailure); + } catch (StateTransitionException e) { + logger.error("Bootstrap didn't complete on partition {}", partitionName, e); + localPartitionAndState.put(partitionName, ReplicaState.ERROR); + throw e; + } + localPartitionAndState.put(partitionName, ReplicaState.STANDBY); + } + @Override public void onPartitionBecomeStandbyFromBootstrap(String partitionName) { PartitionStateChangeListener replicationManagerListener = diff --git a/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java b/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java index fdb62f1d6f..fe370d9128 100644 --- a/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java +++ b/ambry-files-transfer/src/main/java/com/github/ambry/FileCopyManager.java @@ -1,12 +1,58 @@ package com.github.ambry; +import com.codahale.metrics.MetricRegistry; +import com.github.ambry.clustermap.ClusterMap; +import com.github.ambry.clustermap.ClusterParticipant; +import com.github.ambry.clustermap.DataNodeId; import com.github.ambry.clustermap.PartitionStateChangeListener; - - +import com.github.ambry.clustermap.StateModelListenerType; +import com.github.ambry.config.ClusterMapConfig; +import com.github.ambry.config.FileCopyConfig; +import com.github.ambry.config.StoreConfig; +import com.github.ambry.network.NetworkClientFactory; +import com.github.ambry.server.StoreManager; +import com.github.ambry.store.StoreKeyFactory; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; + +/* +Responsibilities of this class is to receive state transitions for a replica, +ensure that the these replicas are stored in a prioritised data structure. +Then for each file, +* */ public class FileCopyManager { + Map diskToPartitionQueue; + + public FileCopyManager(FileCopyConfig fileCopyConfig, ClusterMapConfig clusterMapConfig, + StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap, + ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory, + MetricRegistry metricRegistry, ClusterParticipant clusterParticipant){ + if(clusterParticipant != null){ + clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener, + new PartitionStateChangeListenerImpl()); + } + } + public void start() throws InterruptedException, IOException { + + } class PartitionStateChangeListenerImpl implements PartitionStateChangeListener { + + public void waitFileCopyCompleted(){ + return ; + } @Override public void onPartitionBecomeBootstrapFromOffline(String partitionName) { + //diskTopartitoionQueue.put(partitionName); + } + + @Override + public void onPartitionBecomeHydrateFromOffline(String partitionName) { + + } + + @Override + public void onPartitionBecomeBootstrapFromHydrate(String partitionName) { } @@ -40,3 +86,4 @@ public void onPartitionBecomeDroppedFromOffline(String partitionName) { } } +} diff --git a/ambry-files-transfer/src/main/java/com/github/ambry/ReplicaSyncUpManager.java b/ambry-files-transfer/src/main/java/com/github/ambry/ReplicaSyncUpManager.java new file mode 100644 index 0000000000..327d4819c5 --- /dev/null +++ b/ambry-files-transfer/src/main/java/com/github/ambry/ReplicaSyncUpManager.java @@ -0,0 +1,7 @@ +package com.github.ambry; + +public class ReplicaSyncUpManager { + public void waitFileCopyCompleted(){ + + } +} diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java index e552f52404..a4f1cd3433 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkRequest.java @@ -1,7 +1,16 @@ package com.github.ambry.protocol; public class FileChunkRequest extends RequestOrResponse{ - public FileChunkRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId) { + + String partitionName; + final String fileName; + long startOffSet; + long sizeInBytes; + + + public FileChunkRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId, + String fileName) { super(type, versionId, correlationId, clientId); + this.fileName = fileName; } } diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java index bdafc07064..ceb9bbc9d6 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/FileChunkResponse.java @@ -1,9 +1,18 @@ package com.github.ambry.protocol; import com.github.ambry.server.ServerErrorCode; +import io.netty.buffer.ByteBuf; +import java.util.zip.CRC32; public class FileChunkResponse extends Response{ + String partitionName; + String fileName; + long startOffSet; + long sizeInBytes; + + CRC32 crc32; + ByteBuf data; public FileChunkResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId, String clientId, ServerErrorCode error) { super(type, requestResponseVersion, correlationId, clientId, error); diff --git a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java index a9c3774346..6ff51e6ff4 100644 --- a/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java +++ b/ambry-protocol/src/main/java/com/github/ambry/protocol/RequestOrResponseType.java @@ -40,9 +40,8 @@ public enum RequestOrResponseType { BatchDeleteResponse, FileMetaDataRequest, FileMetaDataResponse, - FileChunkRequest, FileChunkResponse, - StopCompactionRequest, - StopCompactionResponse + EnableDisableCompactionRequest, + EnableDisableCompactionResponse } diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java index a650d5ff6b..cfec9f1693 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryRequests.java @@ -230,7 +230,7 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce break; case FileChunkRequest: handleFileChunkRequest(networkRequest); - case StopCompactionRequest: + case EnableDisableCompactionRequest: handleStopCompactionRequest(networkRequest); default: throw new UnsupportedOperationException("Request type not supported"); diff --git a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java index 428bb55af2..b841aeda9d 100644 --- a/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java +++ b/ambry-server/src/main/java/com/github/ambry/server/AmbryServer.java @@ -15,6 +15,7 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.jmx.JmxReporter; +import com.github.ambry.FileCopyManager; import com.github.ambry.account.AccountService; import com.github.ambry.account.AccountServiceCallback; import com.github.ambry.account.AccountServiceFactory; @@ -43,6 +44,7 @@ import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.ConnectionPoolConfig; import com.github.ambry.config.DiskManagerConfig; +import com.github.ambry.config.FileCopyConfig; import com.github.ambry.config.Http2ClientConfig; import com.github.ambry.config.NettyConfig; import com.github.ambry.config.NetworkConfig; @@ -223,6 +225,8 @@ public void startup() throws InstantiationException { SSLConfig sslConfig = new SSLConfig(properties); ClusterMapConfig clusterMapConfig = new ClusterMapConfig(properties); StatsManagerConfig statsConfig = new StatsManagerConfig(properties); + FileCopyConfig fileCopyConfig = new FileCopyConfig(properties); + // verify the configs properties.verify(); @@ -256,6 +260,7 @@ public void startup() throws InstantiationException { new StorageManager(storeConfig, diskManagerConfig, scheduler, registry, storeKeyFactory, clusterMap, nodeId, new BlobStoreHardDelete(), clusterParticipants, time, new BlobStoreRecovery(), accountService); storageManager.start(); + networkClientFactory = new RecoveryNetworkClientFactory(properties, registry, clusterMap, storageManager, accountService); recoveryManager = @@ -361,6 +366,9 @@ public void startup() throws InstantiationException { skipPredicate); replicationManager.start(); + FileCopyManager fileCopyManager = new FileCopyManager(fileCopyConfig, clusterMapConfig, storeConfig, storageManager, storeKeyFactory, + clusterMap, scheduler, nodeId, networkClientFactory, registry, clusterParticipant); + fileCopyManager.start(); logger.info("Creating StatsManager to publish stats");