Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Bootstrap Changes For DR #2906

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ public interface PartitionStateChangeListener {
*/
void onPartitionBecomeBootstrapFromOffline(String partitionName);

/**
* Action to take when partition becomes bootstrap from offline.
* @param partitionName of the partition.
*/
void onPartitionBecomeHydrateFromOffline(String partitionName);

/**
* Action to take when partition becomes bootstrap from offline.
* @param partitionName of the partition.
*/
void onPartitionBecomeBootstrapFromHydrate(String partitionName);

/**
* Action to take when partition becomes standby from bootstrap.
* @param partitionName of the partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public enum ReplicaState {
*/
OFFLINE,

/**
* A new state to Be introduced between OFFline And Bootstrap
* Router should not send any request to replica in this state.
*/
HYDRATE,

/**
* Bootstrap state is an intermediate state between OFFLINE and STANDBY.
* This state allows replica to do some bootstrap work like checking replication lag and catching up with peers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public enum StateModelListenerType {
* keeps checking replication lag of this replica and ensures it catches up with its peer replicas.
*/
ReplicationManagerListener,

/**
*
*/
FileCopyManagerListener,
/**
* The partition state change listener owned by stats manager. It takes actions when new replica is added (OFFLINE ->
* BOOTSTRAP) or old replica is removed (INACTIVE -> OFFLINE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.ambry.config;

public class FileCopyConfig {
public FileCopyConfig(VerifiableProperties verifiableProperties) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,4 +106,10 @@ default void handleAdminRequest(NetworkRequest request) throws InterruptedExcept
default void handleUndeleteRequest(NetworkRequest request) throws InterruptedException, IOException {
throw new UnsupportedOperationException("Undelete request not supported on this node");
}

void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException;

void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException;

void handleStopCompactionRequest(NetworkRequest request) throws InterruptedException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,41 @@
localPartitionAndState.put(partitionName, ReplicaState.BOOTSTRAP);
}

@Override
public void onPartitionBecomeHydrateFromOffline(String partitionName) {
PartitionStateChangeListener fileCopyListener =
partitionStateChangeListeners.get(StateModelListenerType.FileCopyManagerListener);
try{
if(fileCopyListener !=null){
//fileCopyListener.();
}
}
}

@Override
public void onPartitionBecomeBootstrapFromHydrate(String partitionName) {
PartitionStateChangeListener replicationManagerListener =
partitionStateChangeListeners.get(StateModelListenerType.ReplicationManagerListener);
try {
if (replicationManagerListener != null) {

replicationManagerListener.onPartitionBecomeStandbyFromBootstrap(partitionName);
// after bootstrap is initiated in ReplicationManager, transition is blocked here and wait until local replica has
// caught up with enough peer replicas.

Check failure on line 855 in ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java

View workflow job for this annotation

GitHub Actions / store-test

[Task :ambry-clustermap:compileJava FAILED] 'try' without 'catch', 'finally' or resource declarations try{ ^

Check failure on line 855 in ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java

View workflow job for this annotation

GitHub Actions / server-int-test

[Task :ambry-clustermap:compileJava FAILED] 'try' without 'catch', 'finally' or resource declarations try{ ^

Check failure on line 855 in ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java

View workflow job for this annotation

GitHub Actions / int-test

[Task :ambry-clustermap:compileJava FAILED] 'try' without 'catch', 'finally' or resource declarations try{ ^

Check failure on line 855 in ambry-clustermap/src/main/java/com/github/ambry/clustermap/HelixParticipant.java

View workflow job for this annotation

GitHub Actions / unit-test

[Task :ambry-clustermap:compileJava FAILED] 'try' without 'catch', 'finally' or resource declarations try{ ^
replicaSyncUpManager.waitBootstrapCompleted(partitionName);
}
} catch (InterruptedException e) {
logger.error("Bootstrap was interrupted on partition {}", partitionName);
localPartitionAndState.put(partitionName, ReplicaState.ERROR);
throw new StateTransitionException("Bootstrap failed or was interrupted", BootstrapFailure);
} catch (StateTransitionException e) {
logger.error("Bootstrap didn't complete on partition {}", partitionName, e);
localPartitionAndState.put(partitionName, ReplicaState.ERROR);
throw e;
}
localPartitionAndState.put(partitionName, ReplicaState.STANDBY);
}

@Override
public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {
PartitionStateChangeListener replicationManagerListener =
Expand Down
19 changes: 19 additions & 0 deletions ambry-files-transfer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'java'
}

group = 'com.github.ambry'
version = '0.4.432'

repositories {
mavenCentral()
}

dependencies {
testImplementation platform('org.junit:junit-bom:5.9.1')
testImplementation 'org.junit.jupiter:junit-jupiter'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.github.ambry;

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterParticipant;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.PartitionStateChangeListener;
import com.github.ambry.clustermap.StateModelListenerType;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.FileCopyConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.network.NetworkClientFactory;
import com.github.ambry.server.StoreManager;
import com.github.ambry.store.StoreKeyFactory;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;

/*
Responsibilities of this class is to receive state transitions for a replica,
ensure that the these replicas are stored in a prioritised data structure.
Then for each file,
* */
public class FileCopyManager {
Map<String, String> diskToPartitionQueue;

public FileCopyManager(FileCopyConfig fileCopyConfig, ClusterMapConfig clusterMapConfig,
StoreConfig storeConfig, StoreManager storeManager, StoreKeyFactory storeKeyFactory, ClusterMap clusterMap,
ScheduledExecutorService scheduler, DataNodeId dataNode, NetworkClientFactory networkClientFactory,
MetricRegistry metricRegistry, ClusterParticipant clusterParticipant){
if(clusterParticipant != null){
clusterParticipant.registerPartitionStateChangeListener(StateModelListenerType.FileCopyManagerListener,
new PartitionStateChangeListenerImpl());
}
}
public void start() throws InterruptedException, IOException {

}
class PartitionStateChangeListenerImpl implements PartitionStateChangeListener {

public void waitFileCopyCompleted(){
return ;
}
@Override
public void onPartitionBecomeBootstrapFromOffline(String partitionName) {
//diskTopartitoionQueue.put(partitionName);
}

@Override
public void onPartitionBecomeHydrateFromOffline(String partitionName) {

}

@Override
public void onPartitionBecomeBootstrapFromHydrate(String partitionName) {

}

@Override
public void onPartitionBecomeStandbyFromBootstrap(String partitionName) {

}

@Override
public void onPartitionBecomeLeaderFromStandby(String partitionName) {

}

@Override
public void onPartitionBecomeStandbyFromLeader(String partitionName) {

}

@Override
public void onPartitionBecomeInactiveFromStandby(String partitionName) {

}

@Override
public void onPartitionBecomeOfflineFromInactive(String partitionName) {

}

@Override
public void onPartitionBecomeDroppedFromOffline(String partitionName) {

}
}
}
7 changes: 7 additions & 0 deletions ambry-files-transfer/src/main/java/com/github/ambry/Main.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.ambry;

public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.ambry;

public class ReplicaSyncUpManager {
public void waitFileCopyCompleted(){

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.github.ambry.protocol;

public class FileChunkRequest extends RequestOrResponse{

String partitionName;
final String fileName;
long startOffSet;
long sizeInBytes;


public FileChunkRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId,
String fileName) {
super(type, versionId, correlationId, clientId);
this.fileName = fileName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.github.ambry.protocol;

import com.github.ambry.server.ServerErrorCode;
import io.netty.buffer.ByteBuf;
import java.util.zip.CRC32;


public class FileChunkResponse extends Response{
String partitionName;
String fileName;
long startOffSet;
long sizeInBytes;

CRC32 crc32;
ByteBuf data;
public FileChunkResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId, String clientId,
ServerErrorCode error) {
super(type, requestResponseVersion, correlationId, clientId, error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.github.ambry.protocol;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;


public class FileMetaDataRequest extends RequestOrResponse {

public FileMetaDataRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId) {
super(type, versionId, correlationId, clientId);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.github.ambry.protocol;

import com.github.ambry.server.ServerErrorCode;


public class FileMetaDataResponse extends Response{
public FileMetaDataResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId,
String clientId, ServerErrorCode error) {
super(type, requestResponseVersion, correlationId, clientId, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,11 @@ public enum RequestOrResponseType {
PurgeRequest,
PurgeResponse,
BatchDeleteRequest,
BatchDeleteResponse
BatchDeleteResponse,
FileMetaDataRequest,
FileMetaDataResponse,
FileChunkRequest,
FileChunkResponse,
EnableDisableCompactionRequest,
EnableDisableCompactionResponse
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.github.ambry.protocol;

public class StopCompactionRequest extends RequestOrResponse{
public StopCompactionRequest(RequestOrResponseType type, short versionId, int correlationId, String clientId) {
super(type, versionId, correlationId, clientId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.github.ambry.protocol;

import com.github.ambry.server.ServerErrorCode;


public class StopCompactionResponse extends Response{
public StopCompactionResponse(RequestOrResponseType type, short requestResponseVersion, int correlationId,
String clientId, ServerErrorCode error) {
super(type, requestResponseVersion, correlationId, clientId, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,13 @@ public void handleRequests(NetworkRequest networkRequest) throws InterruptedExce
case ReplicateBlobRequest:
handleReplicateBlobRequest(networkRequest);
break;
case FileMetaDataRequest:
handleFileMetaDataRequest(networkRequest);
break;
case FileChunkRequest:
handleFileChunkRequest(networkRequest);
case EnableDisableCompactionRequest:
handleStopCompactionRequest(networkRequest);
default:
throw new UnsupportedOperationException("Request type not supported");
}
Expand Down Expand Up @@ -1489,6 +1496,21 @@ public void handleUndeleteRequest(NetworkRequest request) throws IOException, In
metrics.undeleteBlobTotalTimeInMs, null, null, totalTimeSpent));
}

@Override
public void handleFileMetaDataRequest(NetworkRequest request) throws InterruptedException, IOException {

}

@Override
public void handleFileChunkRequest(NetworkRequest request) throws InterruptedException, IOException {

}

@Override
public void handleStopCompactionRequest(NetworkRequest request) throws InterruptedException, IOException {

}

/**
* Get the formatted messages which needs to be written to Store.
* @param receivedRequest received Put Request
Expand Down Expand Up @@ -1863,7 +1885,7 @@ public void visit(DeleteRequest deleteRequest) {

@Override
public void visit(BatchDeleteRequest deleteRequest) {

}

@Override
Expand Down
Loading
Loading