Skip to content

Commit

Permalink
Add manager offset reset functionality (slackhq#1064)
Browse files Browse the repository at this point in the history
Co-authored-by: Bryan Burkholder <[email protected]>
  • Loading branch information
bryanlb and bryanlb authored Sep 5, 2024
1 parent b4057a9 commit 83d2138
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class SnapshotMetadata extends AstraPartitionedMetadata {
public final String snapshotId;
public final long startTimeEpochMs;
public final long endTimeEpochMs;
public final long maxOffset;
public final String partitionId;
public long maxOffset;
public long sizeInBytesOnDisk;

public SnapshotMetadata(
Expand Down
39 changes: 39 additions & 0 deletions astra/src/main/java/com/slack/astra/server/ManagerApiGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -365,4 +366,42 @@ private static ImmutableList<DatasetPartitionMetadata> addNewPartition(
new DatasetPartitionMetadata(partitionCutoverTime + 1, MAX_TIME, newPartitionIdsList);
return builder.add(newPartitionMetadata).build();
}

@Override
public void resetPartitionData(
ManagerApi.ResetPartitionDataRequest request,
StreamObserver<ManagerApi.ResetPartitionDataResponse> responseObserver) {
List<SnapshotMetadata> snapshotMetadataList = snapshotMetadataStore.listSync();

int resetCount = 0;
for (SnapshotMetadata snapshotMetadata : snapshotMetadataList) {
if (Objects.equals(snapshotMetadata.partitionId, request.getPartitionId())) {
if (!request.getDryRun()) {
snapshotMetadata.maxOffset = 0;
snapshotMetadataStore.updateSync(snapshotMetadata);
}
resetCount++;
}
}

if (request.getDryRun()) {
responseObserver.onNext(
ManagerApi.ResetPartitionDataResponse.newBuilder()
.setStatus(
String.format(
"%s snapshots matching partitionId '%s' out of %s total snapshots, none were reset as this was a dry-run.",
resetCount, request.getPartitionId(), snapshotMetadataList.size()))
.build());
} else {
responseObserver.onNext(
ManagerApi.ResetPartitionDataResponse.newBuilder()
.setStatus(
String.format(
"Reset %s snapshots matching partitionId '%s' out of %s total snapshots.",
resetCount, request.getPartitionId(), snapshotMetadataList.size()))
.build());
}

responseObserver.onCompleted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ public long determineStartingOffset(
if (currentEndOffsetForPartition < highestDurableOffsetForPartition) {
final String message =
String.format(
"The current head for the partition %d can't "
+ "be lower than the highest durable offset for that partition %d",
"The current head for the partition %d can't be lower than the highest durable offset for that "
+ "partition %d. To manually reset this partition's stored offset, see the ResetPartitionData "
+ "function in the manager.",
currentEndOffsetForPartition, highestDurableOffsetForPartition);
LOG.error(message);
throw new IllegalStateException(message);
Expand Down
12 changes: 12 additions & 0 deletions astra/src/main/proto/manager_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ service ManagerApiService {

rpc RestoreReplica(RestoreReplicaRequest) returns (RestoreReplicaResponse) {}
rpc RestoreReplicaIds(RestoreReplicaIdsRequest) returns (RestoreReplicaIdsResponse) {}

// Resets the partition offset data to allow using a newer offset for a partition ID
rpc ResetPartitionData(ResetPartitionDataRequest) returns (ResetPartitionDataResponse) {}
}

// CreateDatasetMetadataRequest represents a new dataset with uninitialized thoughput and partition assignments
Expand Down Expand Up @@ -93,3 +96,12 @@ message RestoreReplicaIdsRequest {
message RestoreReplicaIdsResponse {
string status = 1;
}

message ResetPartitionDataRequest {
string partition_id = 1;
bool dry_run = 2;
}

message ResetPartitionDataResponse {
string status = 1;
}

0 comments on commit 83d2138

Please sign in to comment.