Skip to content

Commit

Permalink
setup infra for local replay-verify
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Oct 30, 2024
1 parent 44c6628 commit bdb718b
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 60 deletions.
13 changes: 13 additions & 0 deletions testsuite/replay-verify/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
context: https://www.notion.so/aptoslabs/WIP-Replay-Verify-Service-Design-1128b846eb728029bc51fcafb3de6fca?pvs=4

## Prerequiste
Install minikube

## Local test
minikube start --mount --mount-string="/path/to/your/archive_db_folder:/mnt/testnet_archive"
minikb apply -f ./testnet-archive.yaml
# minikb delete pods --all && minikb delete pvc --all && minikb delete pv --all

poetry shell
poetry install # install kubenetes
python main.py
14 changes: 0 additions & 14 deletions testsuite/replay-verify/controller.yaml

This file was deleted.

337 changes: 294 additions & 43 deletions testsuite/replay-verify/main.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,298 @@
import yaml
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import time
import logging
import os
from enum import Enum

def init():
print("Initializing the cluster ...")
config.load_kube_config()

def list_nodes():
v1 = client.CoreV1Api()
print("Listing nodes:")
ret = v1.list_node(watch=False)
for node in ret.items:
print(f"Node Name: {node.metadata.name}")

def create_pod(pod_name, image_name, namespace="default"):
v1 = client.CoreV1Api()

# Define the pod spec
pod = client.V1Pod(
metadata=client.V1ObjectMeta(name=pod_name),
spec=client.V1PodSpec(
containers=[client.V1Container(
name=pod_name,
image=image_name,
)],
restart_policy="Never"

# Hyperparameters
NUM_OF_WORKERS = 2
RANGE_SIZE = 100_000
SHARDING_ENABLED = False
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
QUERY_DELAY = 10 # seconds
CONCURRENT_REPLAY = 2
REPLAY_CONCURRENCY_LEVEL = 1


class Network(Enum):
TESETNET = 1
MAINNET = 2


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

PUSH_METRICS_ENDPOINT = "PUSH_METRICS_ENDPOINT"


def set_env_var(container, name, value):
if "env" not in container:
container["env"] = []

# Check if the environment variable already exists
for env_var in container["env"]:
if env_var["name"] == name:
env_var["value"] = value
return

# If it doesn't exist, add it
container["env"].append({"name": name, "value": value})


def get_env_var(name, default_value=""):
return os.getenv(name, default_value)


class WorkerPod:
def __init__(
self,
start_version,
end_version,
label,
network=Network.TESETNET,
namespace="default",
):
self.client = client.CoreV1Api()
self.name = f"replay-verify-{start_version}-{end_version}"
self.start_version = start_version
self.end_version = end_version
self.status = None
self.log = None
self.namespace = namespace
self.network = network
self.label = label

def update_status(self):
self.status = self.get_pod_status()

def is_completed(self):
self.update_status()
if self.status and self.status.status.phase in ["Succeeded", "Failed"]:
return True
return False

def is_failed(self):
self.update_status()
if self.status and self.status.status.phase == "Failed":
return True
return False

def get_phase(self):
self.update_status()
if self.status:
return self.status.status.phase
return None

def has_txn_mismatch(self):
if self.status:
container_statuses = self.status.status.container_statuses
if (
container_statuses
and container_statuses[0].state
and container_statuses[0].state.terminated
):
return container_statuses[0].state.terminated.exit_code == 2
return False

def get_target_db_dir(self):
if self.network == Network.TESETNET:
return "/mnt/testnet_archive/db"
else:
return "/mnt/mainnet_archive/db"

def start(self):
# Load the worker YAML from the file
with open("replay-verify-worker-template.yaml", "r") as f:
pod_manifest = yaml.safe_load(f)

# Create the Kubernetes API client to start a pod
api_instance = client.CoreV1Api()
pod_manifest["metadata"]["name"] = self.name # Unique name for each pod
pod_manifest["metadata"]["labels"]["run"] = self.label
pod_manifest["spec"]["containers"][0]["name"] = self.name
pod_manifest["spec"]["containers"][0]["command"] = [
"aptos-debugger",
"aptos-db",
"replay-on-archive",
"--start-version",
str(self.start_version),
"--end-version",
str(self.end_version),
"--target-db-dir",
self.get_target_db_dir(),
"--concurrent-replay",
f"{CONCURRENT_REPLAY}",
"--replay-concurrency-level",
f"{REPLAY_CONCURRENCY_LEVEL}",
]

if SHARDING_ENABLED:
pod_manifest["spec"]["containers"][0]["command"].append(
"--enable-storage-sharding"
)
set_env_var(
pod_manifest["spec"]["containers"][0],
PUSH_METRICS_ENDPOINT,
get_env_var(PUSH_METRICS_ENDPOINT, "http://localhost:9091"),
)
retries = 0
while retries <= MAX_RETRIES:
try:
retries += 1
response = api_instance.create_namespaced_pod(
namespace=self.namespace, body=pod_manifest
)
logger.info(f"Created pod {self.name}")
return
except ApiException as e:
logger.warning(
f"Retry {retries}/{MAX_RETRIES} for pod {self.name} failed: {e}"
)
time.sleep(RETRY_DELAY)

def get_pod_exit_code(self):
# Check the status of the pod containers
for container_status in self.status.status.container_statuses:
if container_status.state.terminated:
return container_status.state.terminated.exit_code
return None

def get_pod_status(self):
pod_status = self.client.read_namespaced_pod_status(
name=self.name, namespace=self.namespace
)
return pod_status

def get_humio_log_link(self):
# TODO: Implement this ref:get_humio_link_for_node_logs
return f"https://humio.com/search?query=namespace%3D%22{self.namespace}%22%20pod%3D%22{self.name}%22"


class ReplayScheduler:
def __init__(
self,
id,
start_version,
end_version,
ranges_to_skip,
worker_cnt=NUM_OF_WORKERS,
range_size=RANGE_SIZE,
network=Network.TESETNET,
):
config.load_kube_config()
self.client = client.CoreV1Api()
self.id = id
self.namespace = "default"
self.start_version = start_version
self.end_version = end_version
self.ranges_to_skip = ranges_to_skip
self.range_size = range_size
self.ranges_to_skip = ranges_to_skip
self.current_workers = [None] * worker_cnt
self.tasks = []
self.network = network
self.failed_workpod_logs = []
self.txn_mismatch_logs = []

def get_label(self):
return f"{self.id}-{self.network}"

# Create ranges consisting of start_version and end_version
def create_ranges(self):
ranges = []
current = self.start_version

# Sort ranges_to_skip to ensure we process them in order
sorted_skips = sorted(self.ranges_to_skip, key=lambda x: x[0])

while current < self.end_version:
# Check if current position falls within any skip range
should_skip = False
for skip_start, skip_end in sorted_skips:
if skip_start <= current < skip_end:
current = skip_end # Jump to end of skip range
should_skip = True
break

if not should_skip:
range_end = min(current + self.range_size, self.end_version)
ranges.append((current, range_end))
current += self.range_size

self.tasks = ranges

def schedule(self, from_scratch=False):
if from_scratch:
self.kill_all_running_pods(self.get_label())
self.create_ranges()

while len(self.tasks) > 0:
for i in range(len(self.current_workers)):
if (
self.current_workers[i] is None
or self.current_workers[i].is_completed()
):
if (
self.current_workers[i] is not None
and self.current_workers[i].is_completed()
):
self.collect_logs_from_completed_pods(self.current_workers[i])
if len(self.tasks) == 0:
break
chunk = self.tasks.pop(0)
worker_pod = WorkerPod(
chunk[0],
chunk[1],
self.get_label(),
self.network,
self.namespace,
)
self.current_workers[i] = worker_pod
worker_pod.start()
if self.current_workers[i] is not None:
print(f"Checking worker {self.current_workers[i].get_phase()}")
time.sleep(QUERY_DELAY)
print("All tasks have been scheduled")

def collect_logs_from_completed_pods(self, worker_pod):
if worker_pod.has_txn_mismatch():
logger.info(
f"Worker {self.current_workers[i].name} failed with txn mismatch"
)
self.txn_mismatch_logs.append(worker_pod.get_humio_log_link())

if worker_pod.is_failed():
self.failed_workpod_logs.append(worker_pod.get_humio_log_link())

def kill_all_running_pods(self, label):
# Delete all pods in the namespace
response = self.client.delete_collection_namespaced_pod(
namespace=self.namespace,
label_selector=f"run={label}",
)

def collect_all_failed_logs(self):
# wait for all the remaining workers to complete
all_completed = False
while not all_completed:
all_completed = True
for worker in self.current_workers:
if worker is not None:
if not worker.is_completed():
all_completed = False
else:
self.collect_logs_from_completed_pods(worker)
time.sleep(QUERY_DELAY)
return (self.failed_workpod_logs, self.txn_mismatch_logs)


if __name__ == "__main__":
scheduler = ReplayScheduler(
"test", 1000000000, 1000500000, [(1000, 2000), (3000, 4000)], range_size=200_000
)

try:
# Create the pod
response = v1.create_namespaced_pod(namespace=namespace, body=pod)
print(f"Pod {pod_name} created. Status: {response.status.phase}")
except client.exceptions.ApiException as e:
print(f"Exception when creating pod {pod_name}: {e}")

def create_pods(image_name, count=2, namespace="default"):
for i in range(count):
pod_name = f"pod-{i+1}"
create_pod(pod_name, image_name, namespace)

if __name__ == '__main__':
print("Starting the script...")
init()
list_nodes()

# Create 10 pods using the aptoslabs/tools:nightly image
create_pods("aptoslabs/tools:nightly", count=2, namespace="default")
scheduler.schedule(from_scratch=True)
scheduler.collect_all_failed_logs()
Loading

0 comments on commit bdb718b

Please sign in to comment.