-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
520 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
context: https://www.notion.so/aptoslabs/WIP-Replay-Verify-Service-Design-1128b846eb728029bc51fcafb3de6fca?pvs=4 | ||
|
||
## Prerequiste | ||
Install minikube | ||
|
||
## Local test | ||
minikube start --mount --mount-string="/mnt/testnet_archive:/mnt/testnet_archive" --memory=81920 --cpus=17 | ||
minikb apply -f ./testnet-archive.yaml | ||
|
||
poetry shell | ||
poetry install # install kubenetes | ||
python main.py |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,298 @@ | ||
import yaml | ||
from kubernetes import client, config | ||
from kubernetes.client.rest import ApiException | ||
import time | ||
import logging | ||
import os | ||
from enum import Enum | ||
|
||
def init(): | ||
print("Initializing the cluster ...") | ||
config.load_kube_config() | ||
|
||
def list_nodes(): | ||
v1 = client.CoreV1Api() | ||
print("Listing nodes:") | ||
ret = v1.list_node(watch=False) | ||
for node in ret.items: | ||
print(f"Node Name: {node.metadata.name}") | ||
|
||
def create_pod(pod_name, image_name, namespace="default"): | ||
v1 = client.CoreV1Api() | ||
|
||
# Define the pod spec | ||
pod = client.V1Pod( | ||
metadata=client.V1ObjectMeta(name=pod_name), | ||
spec=client.V1PodSpec( | ||
containers=[client.V1Container( | ||
name=pod_name, | ||
image=image_name, | ||
)], | ||
restart_policy="Never" | ||
|
||
# Hyperparameters | ||
NUM_OF_WORKERS = 2 | ||
RANGE_SIZE = 100_000 | ||
SHARDING_ENABLED = False | ||
MAX_RETRIES = 3 | ||
RETRY_DELAY = 5 # seconds | ||
QUERY_DELAY = 10 # seconds | ||
CONCURRENT_REPLAY = 2 | ||
REPLAY_CONCURRENCY_LEVEL = 1 | ||
|
||
|
||
class Network(Enum): | ||
TESETNET = 1 | ||
MAINNET = 2 | ||
|
||
|
||
logging.basicConfig(level=logging.INFO) | ||
logger = logging.getLogger(__name__) | ||
|
||
PUSH_METRICS_ENDPOINT = "PUSH_METRICS_ENDPOINT" | ||
|
||
|
||
def set_env_var(container, name, value): | ||
if "env" not in container: | ||
container["env"] = [] | ||
|
||
# Check if the environment variable already exists | ||
for env_var in container["env"]: | ||
if env_var["name"] == name: | ||
env_var["value"] = value | ||
return | ||
|
||
# If it doesn't exist, add it | ||
container["env"].append({"name": name, "value": value}) | ||
|
||
|
||
def get_env_var(name, default_value=""): | ||
return os.getenv(name, default_value) | ||
|
||
|
||
class WorkerPod: | ||
def __init__( | ||
self, | ||
start_version, | ||
end_version, | ||
label, | ||
network=Network.TESETNET, | ||
namespace="default", | ||
): | ||
self.client = client.CoreV1Api() | ||
self.name = f"replay-verify-{start_version}-{end_version}" | ||
self.start_version = start_version | ||
self.end_version = end_version | ||
self.status = None | ||
self.log = None | ||
self.namespace = namespace | ||
self.network = network | ||
self.label = label | ||
|
||
def update_status(self): | ||
self.status = self.get_pod_status() | ||
|
||
def is_completed(self): | ||
self.update_status() | ||
if self.status and self.status.status.phase in ["Succeeded", "Failed"]: | ||
return True | ||
return False | ||
|
||
def is_failed(self): | ||
self.update_status() | ||
if self.status and self.status.status.phase == "Failed": | ||
return True | ||
return False | ||
|
||
def get_phase(self): | ||
self.update_status() | ||
if self.status: | ||
return self.status.status.phase | ||
return None | ||
|
||
def has_txn_mismatch(self): | ||
if self.status: | ||
container_statuses = self.status.status.container_statuses | ||
if ( | ||
container_statuses | ||
and container_statuses[0].state | ||
and container_statuses[0].state.terminated | ||
): | ||
return container_statuses[0].state.terminated.exit_code == 2 | ||
return False | ||
|
||
def get_target_db_dir(self): | ||
if self.network == Network.TESETNET: | ||
return "/mnt/testnet_archive/db" | ||
else: | ||
return "/mnt/mainnet_archive/db" | ||
|
||
def start(self): | ||
# Load the worker YAML from the file | ||
with open("replay-verify-worker-template.yaml", "r") as f: | ||
pod_manifest = yaml.safe_load(f) | ||
|
||
# Create the Kubernetes API client to start a pod | ||
api_instance = client.CoreV1Api() | ||
pod_manifest["metadata"]["name"] = self.name # Unique name for each pod | ||
pod_manifest["metadata"]["labels"]["run"] = self.label | ||
pod_manifest["spec"]["containers"][0]["name"] = self.name | ||
pod_manifest["spec"]["containers"][0]["command"] = [ | ||
"aptos-debugger", | ||
"aptos-db", | ||
"replay-on-archive", | ||
"--start-version", | ||
str(self.start_version), | ||
"--end-version", | ||
str(self.end_version), | ||
"--target-db-dir", | ||
self.get_target_db_dir(), | ||
"--concurrent-replay", | ||
f"{CONCURRENT_REPLAY}", | ||
"--replay-concurrency-level", | ||
f"{REPLAY_CONCURRENCY_LEVEL}", | ||
] | ||
|
||
if SHARDING_ENABLED: | ||
pod_manifest["spec"]["containers"][0]["command"].append( | ||
"--enable-storage-sharding" | ||
) | ||
set_env_var( | ||
pod_manifest["spec"]["containers"][0], | ||
PUSH_METRICS_ENDPOINT, | ||
get_env_var(PUSH_METRICS_ENDPOINT, "http://localhost:9091"), | ||
) | ||
retries = 0 | ||
while retries <= MAX_RETRIES: | ||
try: | ||
retries += 1 | ||
response = api_instance.create_namespaced_pod( | ||
namespace=self.namespace, body=pod_manifest | ||
) | ||
logger.info(f"Created pod {self.name}") | ||
return | ||
except ApiException as e: | ||
logger.warning( | ||
f"Retry {retries}/{MAX_RETRIES} for pod {self.name} failed: {e}" | ||
) | ||
time.sleep(RETRY_DELAY) | ||
|
||
def get_pod_exit_code(self): | ||
# Check the status of the pod containers | ||
for container_status in self.status.status.container_statuses: | ||
if container_status.state.terminated: | ||
return container_status.state.terminated.exit_code | ||
return None | ||
|
||
def get_pod_status(self): | ||
pod_status = self.client.read_namespaced_pod_status( | ||
name=self.name, namespace=self.namespace | ||
) | ||
return pod_status | ||
|
||
def get_humio_log_link(self): | ||
# TODO: Implement this ref:get_humio_link_for_node_logs | ||
return f"https://humio.com/search?query=namespace%3D%22{self.namespace}%22%20pod%3D%22{self.name}%22" | ||
|
||
|
||
class ReplayScheduler: | ||
def __init__( | ||
self, | ||
id, | ||
start_version, | ||
end_version, | ||
ranges_to_skip, | ||
worker_cnt=NUM_OF_WORKERS, | ||
range_size=RANGE_SIZE, | ||
network=Network.TESETNET, | ||
): | ||
config.load_kube_config() | ||
self.client = client.CoreV1Api() | ||
self.id = id | ||
self.namespace = "default" | ||
self.start_version = start_version | ||
self.end_version = end_version | ||
self.ranges_to_skip = ranges_to_skip | ||
self.range_size = range_size | ||
self.ranges_to_skip = ranges_to_skip | ||
self.current_workers = [None] * worker_cnt | ||
self.tasks = [] | ||
self.network = network | ||
self.failed_workpod_logs = [] | ||
self.txn_mismatch_logs = [] | ||
|
||
def get_label(self): | ||
return f"{self.id}-{self.network}" | ||
|
||
# Create ranges consisting of start_version and end_version | ||
def create_ranges(self): | ||
ranges = [] | ||
current = self.start_version | ||
|
||
# Sort ranges_to_skip to ensure we process them in order | ||
sorted_skips = sorted(self.ranges_to_skip, key=lambda x: x[0]) | ||
|
||
while current < self.end_version: | ||
# Check if current position falls within any skip range | ||
should_skip = False | ||
for skip_start, skip_end in sorted_skips: | ||
if skip_start <= current < skip_end: | ||
current = skip_end # Jump to end of skip range | ||
should_skip = True | ||
break | ||
|
||
if not should_skip: | ||
range_end = min(current + self.range_size, self.end_version) | ||
ranges.append((current, range_end)) | ||
current += self.range_size | ||
|
||
self.tasks = ranges | ||
|
||
def schedule(self, from_scratch=False): | ||
if from_scratch: | ||
self.kill_all_running_pods(self.get_label()) | ||
self.create_ranges() | ||
|
||
while len(self.tasks) > 0: | ||
for i in range(len(self.current_workers)): | ||
if ( | ||
self.current_workers[i] is None | ||
or self.current_workers[i].is_completed() | ||
): | ||
if ( | ||
self.current_workers[i] is not None | ||
and self.current_workers[i].is_completed() | ||
): | ||
self.collect_logs_from_completed_pods(self.current_workers[i]) | ||
if len(self.tasks) == 0: | ||
break | ||
chunk = self.tasks.pop(0) | ||
worker_pod = WorkerPod( | ||
chunk[0], | ||
chunk[1], | ||
self.get_label(), | ||
self.network, | ||
self.namespace, | ||
) | ||
self.current_workers[i] = worker_pod | ||
worker_pod.start() | ||
if self.current_workers[i] is not None: | ||
print(f"Checking worker {self.current_workers[i].get_phase()}") | ||
time.sleep(QUERY_DELAY) | ||
print("All tasks have been scheduled") | ||
|
||
def collect_logs_from_completed_pods(self, worker_pod): | ||
if worker_pod.has_txn_mismatch(): | ||
logger.info( | ||
f"Worker {self.current_workers[i].name} failed with txn mismatch" | ||
) | ||
self.txn_mismatch_logs.append(worker_pod.get_humio_log_link()) | ||
|
||
if worker_pod.is_failed(): | ||
self.failed_workpod_logs.append(worker_pod.get_humio_log_link()) | ||
|
||
def kill_all_running_pods(self, label): | ||
# Delete all pods in the namespace | ||
response = self.client.delete_collection_namespaced_pod( | ||
namespace=self.namespace, | ||
label_selector=f"run={label}", | ||
) | ||
|
||
def collect_all_failed_logs(self): | ||
logger.info("Collecting logs from remaining pods") | ||
all_completed = False | ||
while not all_completed: | ||
all_completed = True | ||
for worker in self.current_workers: | ||
if worker is not None: | ||
if not worker.is_completed(): | ||
all_completed = False | ||
else: | ||
self.collect_logs_from_completed_pods(worker) | ||
time.sleep(QUERY_DELAY) | ||
return (self.failed_workpod_logs, self.txn_mismatch_logs) | ||
|
||
|
||
if __name__ == "__main__": | ||
scheduler = ReplayScheduler( | ||
"test", 1000000000, 1000500000, [(1000, 2000), (3000, 4000)], range_size=200_000 | ||
) | ||
|
||
try: | ||
# Create the pod | ||
response = v1.create_namespaced_pod(namespace=namespace, body=pod) | ||
print(f"Pod {pod_name} created. Status: {response.status.phase}") | ||
except client.exceptions.ApiException as e: | ||
print(f"Exception when creating pod {pod_name}: {e}") | ||
|
||
def create_pods(image_name, count=2, namespace="default"): | ||
for i in range(count): | ||
pod_name = f"pod-{i+1}" | ||
create_pod(pod_name, image_name, namespace) | ||
|
||
if __name__ == '__main__': | ||
print("Starting the script...") | ||
init() | ||
list_nodes() | ||
|
||
# Create 10 pods using the aptoslabs/tools:nightly image | ||
create_pods("aptoslabs/tools:nightly", count=2, namespace="default") | ||
scheduler.schedule(from_scratch=True) | ||
print(scheduler.collect_all_failed_logs()) |
Oops, something went wrong.