Skip to content

Commit

Permalink
adds job type guard, flush_redis prompts, improved status output
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Sep 29, 2024
1 parent fbea005 commit dcbecd1
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pychunkedgraph/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "3.0.5"
__version__ = "3.0.6"
27 changes: 22 additions & 5 deletions pychunkedgraph/ingest/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,26 @@
bootstrap,
chunk_id_str,
print_completion_rate,
print_ingest_status,
print_status,
queue_layer_helper,
job_type_guard,
)
from .simple_tests import run_all
from .create.parent_layer import add_parent_chunk
from ..graph.chunkedgraph import ChunkedGraph
from ..utils.redis import get_redis_connection, keys as r_keys

ingest_cli = AppGroup("ingest")
group_name = "ingest"
ingest_cli = AppGroup(group_name)


def init_ingest_cmds(app):
app.cli.add_command(ingest_cli)


@ingest_cli.command("flush_redis")
@click.confirmation_option(prompt="Are you sure you want to flush redis?")
@job_type_guard(group_name)
def flush_redis():
"""FLush redis db."""
redis = get_redis_connection()
Expand All @@ -44,13 +48,16 @@ def flush_redis():
@click.option("--raw", is_flag=True, help="Read edges from agglomeration output.")
@click.option("--test", is_flag=True, help="Test 8 chunks at the center of dataset.")
@click.option("--retry", is_flag=True, help="Rerun without creating a new table.")
@job_type_guard(group_name)
def ingest_graph(
graph_id: str, dataset: click.Path, raw: bool, test: bool, retry: bool
):
"""
Main ingest command.
Takes ingest config from a yaml file and queues atomic tasks.
"""
redis = get_redis_connection()
redis.set(r_keys.JOB_TYPE, group_name)
with open(dataset, "r") as stream:
config = yaml.safe_load(stream)

Expand All @@ -70,6 +77,7 @@ def ingest_graph(
@click.argument("graph_id", type=str)
@click.argument("dataset", type=click.Path(exists=True))
@click.option("--raw", is_flag=True)
@job_type_guard(group_name)
def pickle_imanager(graph_id: str, dataset: click.Path, raw: bool):
"""
Load ingest config into redis server.
Expand All @@ -83,11 +91,12 @@ def pickle_imanager(graph_id: str, dataset: click.Path, raw: bool):

meta, ingest_config, _ = bootstrap(graph_id, config=config, raw=raw)
imanager = IngestionManager(ingest_config, meta)
imanager.redis # pylint: disable=pointless-statement
imanager.redis.set(r_keys.JOB_TYPE, group_name)


@ingest_cli.command("layer")
@click.argument("parent_layer", type=int)
@job_type_guard(group_name)
def queue_layer(parent_layer):
"""
Queue all chunk tasks at a given layer.
Expand All @@ -100,16 +109,21 @@ def queue_layer(parent_layer):


@ingest_cli.command("status")
@job_type_guard(group_name)
def ingest_status():
"""Print ingest status to console by layer."""
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
print_ingest_status(imanager, redis)
try:
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
print_status(imanager, redis)
except TypeError as err:
print(f"\nNo current `{group_name}` job found in redis: {err}")


@ingest_cli.command("chunk")
@click.argument("queue", type=str)
@click.argument("chunk_info", nargs=4, type=int)
@job_type_guard(group_name)
def ingest_chunk(queue: str, chunk_info):
"""Manually queue chunk when a job is stuck for whatever reason."""
redis = get_redis_connection()
Expand All @@ -135,6 +149,7 @@ def ingest_chunk(queue: str, chunk_info):
@click.argument("graph_id", type=str)
@click.argument("chunk_info", nargs=4, type=int)
@click.option("--n_threads", type=int, default=1)
@job_type_guard(group_name)
def ingest_chunk_local(graph_id: str, chunk_info, n_threads: int):
"""Manually ingest a chunk on a local machine."""
layer, coords = chunk_info[0], chunk_info[1:]
Expand All @@ -150,6 +165,7 @@ def ingest_chunk_local(graph_id: str, chunk_info, n_threads: int):
@ingest_cli.command("rate")
@click.argument("layer", type=int)
@click.option("--span", default=10, help="Time span to calculate rate.")
@job_type_guard(group_name)
def rate(layer: int, span: int):
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
Expand All @@ -158,5 +174,6 @@ def rate(layer: int, span: int):

@ingest_cli.command("run_tests")
@click.argument("graph_id", type=str)
@job_type_guard(group_name)
def run_tests(graph_id):
run_all(ChunkedGraph(graph_id=graph_id))
29 changes: 21 additions & 8 deletions pychunkedgraph/ingest/cli_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,26 @@
from .utils import (
chunk_id_str,
print_completion_rate,
print_ingest_status,
print_status,
queue_layer_helper,
start_ocdbt_server,
job_type_guard,
)
from ..graph.chunkedgraph import ChunkedGraph, ChunkedGraphMeta
from ..utils.redis import get_redis_connection
from ..utils.redis import keys as r_keys

upgrade_cli = AppGroup("upgrade")
group_name = "upgrade"
upgrade_cli = AppGroup(group_name)


def init_upgrade_cmds(app):
app.cli.add_command(upgrade_cli)


@upgrade_cli.command("flush_redis")
@click.confirmation_option(prompt="Are you sure you want to flush redis?")
@job_type_guard(group_name)
def flush_redis():
"""FLush redis db."""
redis = get_redis_connection()
Expand All @@ -50,11 +54,13 @@ def flush_redis():
@click.argument("graph_id", type=str)
@click.option("--test", is_flag=True, help="Test 8 chunks at the center of dataset.")
@click.option("--ocdbt", is_flag=True, help="Store edges using ts ocdbt kv store.")
@job_type_guard(group_name)
def upgrade_graph(graph_id: str, test: bool, ocdbt: bool):
"""
Main upgrade command.
Takes upgrade config from a yaml file and queues atomic tasks.
Main upgrade command. Queues atomic tasks.
"""
redis = get_redis_connection()
redis.set(r_keys.JOB_TYPE, group_name)
ingest_config = IngestConfig(TEST_RUN=test)
cg = ChunkedGraph(graph_id=graph_id)
cg.client.add_graph_version(__version__, overwrite=True)
Expand Down Expand Up @@ -91,6 +97,7 @@ def upgrade_graph(graph_id: str, test: bool, ocdbt: bool):

@upgrade_cli.command("layer")
@click.argument("parent_layer", type=int)
@job_type_guard(group_name)
def queue_layer(parent_layer):
"""
Queue all chunk tasks at a given layer.
Expand All @@ -103,17 +110,22 @@ def queue_layer(parent_layer):


@upgrade_cli.command("status")
def ingest_status():
@job_type_guard(group_name)
def upgrade_status():
"""Print upgrade status to console."""
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
print_ingest_status(imanager, redis, upgrade=True)
try:
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
print_status(imanager, redis, upgrade=True)
except TypeError as err:
print(f"\nNo current `{group_name}` job found in redis: {err}")


@upgrade_cli.command("chunk")
@click.argument("queue", type=str)
@click.argument("chunk_info", nargs=4, type=int)
def ingest_chunk(queue: str, chunk_info):
@job_type_guard(group_name)
def upgrade_chunk(queue: str, chunk_info):
"""Manually queue chunk when a job is stuck for whatever reason."""
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
Expand All @@ -137,6 +149,7 @@ def ingest_chunk(queue: str, chunk_info):
@upgrade_cli.command("rate")
@click.argument("layer", type=int)
@click.option("--span", default=10, help="Time span to calculate rate.")
@job_type_guard(group_name)
def rate(layer: int, span: int):
redis = get_redis_connection()
imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER))
Expand Down
22 changes: 10 additions & 12 deletions pychunkedgraph/ingest/upgrade/parent_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import fastremap
import numpy as np
from multiwrapper import multiprocessing_utils as mu
from tqdm import tqdm

from pychunkedgraph.graph import ChunkedGraph
from pychunkedgraph.graph.attributes import Connectivity, Hierarchy
Expand Down Expand Up @@ -51,7 +51,7 @@ def _get_cx_edges_at_timestamp(node, response, ts):


def _populate_cx_edges_with_timestamps(
cg: ChunkedGraph, layer: int, nodes: list, nodes_ts:list, earliest_ts
cg: ChunkedGraph, layer: int, nodes: list, nodes_ts: list, earliest_ts
):
"""
Collect timestamps of edits from children, since we use the same timestamp
Expand Down Expand Up @@ -83,7 +83,6 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l
try:
cx_edges_d = CX_EDGES[node][node_ts]
except KeyError:
print(CX_EDGES)
raise KeyError(f"{node}:{node_ts}")
edges = np.concatenate([empty_2d] + list(cx_edges_d.values()))
if edges.size:
Expand Down Expand Up @@ -158,15 +157,14 @@ def update_chunk(
chunked_nodes_ts = chunked(nodes_ts, task_size)
cg_info = cg.get_serialized_info()

multi_args = []
tasks = []
for chunk, ts_chunk in zip(chunked_nodes, chunked_nodes_ts):
args = (cg_info, layer, chunk, ts_chunk, earliest_ts)
multi_args.append(args)

print(f"nodes: {len(nodes)}, tasks: {len(multi_args)}, size: {task_size}")
mu.multiprocess_func(
_update_cross_edges_helper,
multi_args,
n_threads=min(len(multi_args), mp.cpu_count()),
)
tasks.append(args)

with mp.Pool(min(mp.cpu_count(), len(tasks))) as pool:
tqdm(
pool.imap_unordered(_update_cross_edges_helper, tasks),
total=len(tasks),
)
print(f"total elaspsed time: {time.time() - start}")
49 changes: 41 additions & 8 deletions pychunkedgraph/ingest/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# pylint: disable=invalid-name, missing-docstring

import logging
import functools
from os import environ
from time import sleep
from typing import Any, Generator, Tuple
Expand All @@ -16,6 +17,8 @@
from ..graph.client import BackendClientInfo
from ..graph.client.bigtable import BigTableConfig
from ..utils.general import chunked
from ..utils.redis import get_redis_connection
from ..utils.redis import keys as r_keys

chunk_id_str = lambda layer, coords: f"{layer}_{'_'.join(map(str, coords))}"

Expand Down Expand Up @@ -116,7 +119,7 @@ def print_completion_rate(imanager: IngestionManager, layer: int, span: int = 10
print(f"{rate} chunks per second.")


def print_ingest_status(imanager: IngestionManager, redis, upgrade: bool = False):
def print_status(imanager: IngestionManager, redis, upgrade: bool = False):
"""
Helper to print status to console.
If `upgrade=True`, status does not include the root layer,
Expand All @@ -128,6 +131,7 @@ def print_ingest_status(imanager: IngestionManager, redis, upgrade: bool = False
layer_counts = imanager.cg_meta.layer_chunk_counts

pipeline = redis.pipeline()
pipeline.get(r_keys.JOB_TYPE)
worker_busy = []
for layer in layers:
pipeline.scard(f"{layer}c")
Expand All @@ -138,25 +142,32 @@ def print_ingest_status(imanager: IngestionManager, redis, upgrade: bool = False
worker_busy.append(sum([w.get_state() == WorkerStatus.BUSY for w in workers]))

results = pipeline.execute()
job_type = "not_available"
if results[0] is not None:
job_type = results[0].decode()
completed = []
queued = []
failed = []
for i in range(0, len(results), 3):
for i in range(1, len(results), 3):
result = results[i : i + 3]
completed.append(result[0])
queued.append(result[1])
failed.append(result[2])

print(f"version: \t{imanager.cg.version}")
print(f"graph_id: \t{imanager.cg.graph_id}")
print(f"chunk_size: \t{imanager.cg.meta.graph_config.CHUNK_SIZE}")
print("\nlayer status:")
header = (
f"\njob_type: \t{job_type}"
f"\nversion: \t{imanager.cg.version}"
f"\ngraph_id: \t{imanager.cg.graph_id}"
f"\nchunk_size: \t{imanager.cg.meta.graph_config.CHUNK_SIZE}"
"\n\nlayer status:"
)
print(header)
for layer, done, count in zip(layers, completed, layer_counts):
print(f"{layer}\t: {done:<9} / {count}")
print(f"{layer}\t| {done:9} / {count} \t| {done/count:6.1%}")

print("\n\nqueue status:")
for layer, q, f, wb in zip(layers, queued, failed, worker_busy):
print(f"l{layer}\t: queued: {q:<10} failed: {f:<10} busy: {wb}")
print(f"l{layer}\t| queued: {q:<10} failed: {f:<10} busy: {wb}")


def queue_layer_helper(parent_layer: int, imanager: IngestionManager, fn):
Expand Down Expand Up @@ -190,3 +201,25 @@ def queue_layer_helper(parent_layer: int, imanager: IngestionManager, fn):
)
)
q.enqueue_many(job_datas)


def job_type_guard(job_type: str):
def decorator_job_type_guard(func):
@functools.wraps(func)
def wrapper_job_type_guard(*args, **kwargs):
redis = get_redis_connection()
current_type = redis.get(r_keys.JOB_TYPE)
if current_type is not None:
current_type = current_type.decode()
msg = (
f"Currently running `{current_type}`. You're attempting to run `{job_type}`."
f"\nRun `[flask] {current_type} flush_redis` to clear the current job and restart."
)
if current_type != job_type:
print(f"\n*WARNING*\n{msg}")
exit(1)
return func(*args, **kwargs)

return wrapper_job_type_guard

return decorator_job_type_guard
4 changes: 2 additions & 2 deletions pychunkedgraph/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "")
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0"

keys_fields = ("INGESTION_MANAGER",)
keys_defaults = ("pcg:imanager",)
keys_fields = ("INGESTION_MANAGER", "JOB_TYPE")
keys_defaults = ("pcg:imanager", "pcg:job_type")
Keys = namedtuple("keys", keys_fields, defaults=keys_defaults)

keys = Keys()
Expand Down

0 comments on commit dcbecd1

Please sign in to comment.