diff --git a/pychunkedgraph/__init__.py b/pychunkedgraph/__init__.py index e94f36fe8..6ed01825f 100644 --- a/pychunkedgraph/__init__.py +++ b/pychunkedgraph/__init__.py @@ -1 +1 @@ -__version__ = "3.0.5" +__version__ = "3.0.6" diff --git a/pychunkedgraph/ingest/cli.py b/pychunkedgraph/ingest/cli.py index 928e1852f..c50525ec6 100644 --- a/pychunkedgraph/ingest/cli.py +++ b/pychunkedgraph/ingest/cli.py @@ -16,15 +16,17 @@ bootstrap, chunk_id_str, print_completion_rate, - print_ingest_status, + print_status, queue_layer_helper, + job_type_guard, ) from .simple_tests import run_all from .create.parent_layer import add_parent_chunk from ..graph.chunkedgraph import ChunkedGraph from ..utils.redis import get_redis_connection, keys as r_keys -ingest_cli = AppGroup("ingest") +group_name = "ingest" +ingest_cli = AppGroup(group_name) def init_ingest_cmds(app): @@ -32,6 +34,8 @@ def init_ingest_cmds(app): @ingest_cli.command("flush_redis") +@click.confirmation_option(prompt="Are you sure you want to flush redis?") +@job_type_guard(group_name) def flush_redis(): """FLush redis db.""" redis = get_redis_connection() @@ -44,6 +48,7 @@ def flush_redis(): @click.option("--raw", is_flag=True, help="Read edges from agglomeration output.") @click.option("--test", is_flag=True, help="Test 8 chunks at the center of dataset.") @click.option("--retry", is_flag=True, help="Rerun without creating a new table.") +@job_type_guard(group_name) def ingest_graph( graph_id: str, dataset: click.Path, raw: bool, test: bool, retry: bool ): @@ -51,6 +56,8 @@ def ingest_graph( Main ingest command. Takes ingest config from a yaml file and queues atomic tasks. """ + redis = get_redis_connection() + redis.set(r_keys.JOB_TYPE, group_name) with open(dataset, "r") as stream: config = yaml.safe_load(stream) @@ -70,6 +77,7 @@ def ingest_graph( @click.argument("graph_id", type=str) @click.argument("dataset", type=click.Path(exists=True)) @click.option("--raw", is_flag=True) +@job_type_guard(group_name) def pickle_imanager(graph_id: str, dataset: click.Path, raw: bool): """ Load ingest config into redis server. @@ -83,11 +91,12 @@ def pickle_imanager(graph_id: str, dataset: click.Path, raw: bool): meta, ingest_config, _ = bootstrap(graph_id, config=config, raw=raw) imanager = IngestionManager(ingest_config, meta) - imanager.redis # pylint: disable=pointless-statement + imanager.redis.set(r_keys.JOB_TYPE, group_name) @ingest_cli.command("layer") @click.argument("parent_layer", type=int) +@job_type_guard(group_name) def queue_layer(parent_layer): """ Queue all chunk tasks at a given layer. @@ -100,16 +109,21 @@ def queue_layer(parent_layer): @ingest_cli.command("status") +@job_type_guard(group_name) def ingest_status(): """Print ingest status to console by layer.""" redis = get_redis_connection() - imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) - print_ingest_status(imanager, redis) + try: + imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) + print_status(imanager, redis) + except TypeError as err: + print(f"\nNo current `{group_name}` job found in redis: {err}") @ingest_cli.command("chunk") @click.argument("queue", type=str) @click.argument("chunk_info", nargs=4, type=int) +@job_type_guard(group_name) def ingest_chunk(queue: str, chunk_info): """Manually queue chunk when a job is stuck for whatever reason.""" redis = get_redis_connection() @@ -135,6 +149,7 @@ def ingest_chunk(queue: str, chunk_info): @click.argument("graph_id", type=str) @click.argument("chunk_info", nargs=4, type=int) @click.option("--n_threads", type=int, default=1) +@job_type_guard(group_name) def ingest_chunk_local(graph_id: str, chunk_info, n_threads: int): """Manually ingest a chunk on a local machine.""" layer, coords = chunk_info[0], chunk_info[1:] @@ -150,6 +165,7 @@ def ingest_chunk_local(graph_id: str, chunk_info, n_threads: int): @ingest_cli.command("rate") @click.argument("layer", type=int) @click.option("--span", default=10, help="Time span to calculate rate.") +@job_type_guard(group_name) def rate(layer: int, span: int): redis = get_redis_connection() imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) @@ -158,5 +174,6 @@ def rate(layer: int, span: int): @ingest_cli.command("run_tests") @click.argument("graph_id", type=str) +@job_type_guard(group_name) def run_tests(graph_id): run_all(ChunkedGraph(graph_id=graph_id)) diff --git a/pychunkedgraph/ingest/cli_upgrade.py b/pychunkedgraph/ingest/cli_upgrade.py index c77c0be64..84939544b 100644 --- a/pychunkedgraph/ingest/cli_upgrade.py +++ b/pychunkedgraph/ingest/cli_upgrade.py @@ -24,15 +24,17 @@ from .utils import ( chunk_id_str, print_completion_rate, - print_ingest_status, + print_status, queue_layer_helper, start_ocdbt_server, + job_type_guard, ) from ..graph.chunkedgraph import ChunkedGraph, ChunkedGraphMeta from ..utils.redis import get_redis_connection from ..utils.redis import keys as r_keys -upgrade_cli = AppGroup("upgrade") +group_name = "upgrade" +upgrade_cli = AppGroup(group_name) def init_upgrade_cmds(app): @@ -40,6 +42,8 @@ def init_upgrade_cmds(app): @upgrade_cli.command("flush_redis") +@click.confirmation_option(prompt="Are you sure you want to flush redis?") +@job_type_guard(group_name) def flush_redis(): """FLush redis db.""" redis = get_redis_connection() @@ -50,11 +54,13 @@ def flush_redis(): @click.argument("graph_id", type=str) @click.option("--test", is_flag=True, help="Test 8 chunks at the center of dataset.") @click.option("--ocdbt", is_flag=True, help="Store edges using ts ocdbt kv store.") +@job_type_guard(group_name) def upgrade_graph(graph_id: str, test: bool, ocdbt: bool): """ - Main upgrade command. - Takes upgrade config from a yaml file and queues atomic tasks. + Main upgrade command. Queues atomic tasks. """ + redis = get_redis_connection() + redis.set(r_keys.JOB_TYPE, group_name) ingest_config = IngestConfig(TEST_RUN=test) cg = ChunkedGraph(graph_id=graph_id) cg.client.add_graph_version(__version__, overwrite=True) @@ -91,6 +97,7 @@ def upgrade_graph(graph_id: str, test: bool, ocdbt: bool): @upgrade_cli.command("layer") @click.argument("parent_layer", type=int) +@job_type_guard(group_name) def queue_layer(parent_layer): """ Queue all chunk tasks at a given layer. @@ -103,17 +110,22 @@ def queue_layer(parent_layer): @upgrade_cli.command("status") -def ingest_status(): +@job_type_guard(group_name) +def upgrade_status(): """Print upgrade status to console.""" redis = get_redis_connection() - imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) - print_ingest_status(imanager, redis, upgrade=True) + try: + imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) + print_status(imanager, redis, upgrade=True) + except TypeError as err: + print(f"\nNo current `{group_name}` job found in redis: {err}") @upgrade_cli.command("chunk") @click.argument("queue", type=str) @click.argument("chunk_info", nargs=4, type=int) -def ingest_chunk(queue: str, chunk_info): +@job_type_guard(group_name) +def upgrade_chunk(queue: str, chunk_info): """Manually queue chunk when a job is stuck for whatever reason.""" redis = get_redis_connection() imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) @@ -137,6 +149,7 @@ def ingest_chunk(queue: str, chunk_info): @upgrade_cli.command("rate") @click.argument("layer", type=int) @click.option("--span", default=10, help="Time span to calculate rate.") +@job_type_guard(group_name) def rate(layer: int, span: int): redis = get_redis_connection() imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) diff --git a/pychunkedgraph/ingest/upgrade/parent_layer.py b/pychunkedgraph/ingest/upgrade/parent_layer.py index a7e79b8f0..7c95cc1b6 100644 --- a/pychunkedgraph/ingest/upgrade/parent_layer.py +++ b/pychunkedgraph/ingest/upgrade/parent_layer.py @@ -6,7 +6,7 @@ import fastremap import numpy as np -from multiwrapper import multiprocessing_utils as mu +from tqdm import tqdm from pychunkedgraph.graph import ChunkedGraph from pychunkedgraph.graph.attributes import Connectivity, Hierarchy @@ -51,7 +51,7 @@ def _get_cx_edges_at_timestamp(node, response, ts): def _populate_cx_edges_with_timestamps( - cg: ChunkedGraph, layer: int, nodes: list, nodes_ts:list, earliest_ts + cg: ChunkedGraph, layer: int, nodes: list, nodes_ts: list, earliest_ts ): """ Collect timestamps of edits from children, since we use the same timestamp @@ -83,7 +83,6 @@ def update_cross_edges(cg: ChunkedGraph, layer, node, node_ts, earliest_ts) -> l try: cx_edges_d = CX_EDGES[node][node_ts] except KeyError: - print(CX_EDGES) raise KeyError(f"{node}:{node_ts}") edges = np.concatenate([empty_2d] + list(cx_edges_d.values())) if edges.size: @@ -158,15 +157,14 @@ def update_chunk( chunked_nodes_ts = chunked(nodes_ts, task_size) cg_info = cg.get_serialized_info() - multi_args = [] + tasks = [] for chunk, ts_chunk in zip(chunked_nodes, chunked_nodes_ts): args = (cg_info, layer, chunk, ts_chunk, earliest_ts) - multi_args.append(args) - - print(f"nodes: {len(nodes)}, tasks: {len(multi_args)}, size: {task_size}") - mu.multiprocess_func( - _update_cross_edges_helper, - multi_args, - n_threads=min(len(multi_args), mp.cpu_count()), - ) + tasks.append(args) + + with mp.Pool(min(mp.cpu_count(), len(tasks))) as pool: + tqdm( + pool.imap_unordered(_update_cross_edges_helper, tasks), + total=len(tasks), + ) print(f"total elaspsed time: {time.time() - start}") diff --git a/pychunkedgraph/ingest/utils.py b/pychunkedgraph/ingest/utils.py index 3d573ce37..1692db43b 100644 --- a/pychunkedgraph/ingest/utils.py +++ b/pychunkedgraph/ingest/utils.py @@ -1,6 +1,7 @@ # pylint: disable=invalid-name, missing-docstring import logging +import functools from os import environ from time import sleep from typing import Any, Generator, Tuple @@ -16,6 +17,8 @@ from ..graph.client import BackendClientInfo from ..graph.client.bigtable import BigTableConfig from ..utils.general import chunked +from ..utils.redis import get_redis_connection +from ..utils.redis import keys as r_keys chunk_id_str = lambda layer, coords: f"{layer}_{'_'.join(map(str, coords))}" @@ -116,7 +119,7 @@ def print_completion_rate(imanager: IngestionManager, layer: int, span: int = 10 print(f"{rate} chunks per second.") -def print_ingest_status(imanager: IngestionManager, redis, upgrade: bool = False): +def print_status(imanager: IngestionManager, redis, upgrade: bool = False): """ Helper to print status to console. If `upgrade=True`, status does not include the root layer, @@ -128,6 +131,7 @@ def print_ingest_status(imanager: IngestionManager, redis, upgrade: bool = False layer_counts = imanager.cg_meta.layer_chunk_counts pipeline = redis.pipeline() + pipeline.get(r_keys.JOB_TYPE) worker_busy = [] for layer in layers: pipeline.scard(f"{layer}c") @@ -138,25 +142,32 @@ def print_ingest_status(imanager: IngestionManager, redis, upgrade: bool = False worker_busy.append(sum([w.get_state() == WorkerStatus.BUSY for w in workers])) results = pipeline.execute() + job_type = "not_available" + if results[0] is not None: + job_type = results[0].decode() completed = [] queued = [] failed = [] - for i in range(0, len(results), 3): + for i in range(1, len(results), 3): result = results[i : i + 3] completed.append(result[0]) queued.append(result[1]) failed.append(result[2]) - print(f"version: \t{imanager.cg.version}") - print(f"graph_id: \t{imanager.cg.graph_id}") - print(f"chunk_size: \t{imanager.cg.meta.graph_config.CHUNK_SIZE}") - print("\nlayer status:") + header = ( + f"\njob_type: \t{job_type}" + f"\nversion: \t{imanager.cg.version}" + f"\ngraph_id: \t{imanager.cg.graph_id}" + f"\nchunk_size: \t{imanager.cg.meta.graph_config.CHUNK_SIZE}" + "\n\nlayer status:" + ) + print(header) for layer, done, count in zip(layers, completed, layer_counts): - print(f"{layer}\t: {done:<9} / {count}") + print(f"{layer}\t| {done:9} / {count} \t| {done/count:6.1%}") print("\n\nqueue status:") for layer, q, f, wb in zip(layers, queued, failed, worker_busy): - print(f"l{layer}\t: queued: {q:<10} failed: {f:<10} busy: {wb}") + print(f"l{layer}\t| queued: {q:<10} failed: {f:<10} busy: {wb}") def queue_layer_helper(parent_layer: int, imanager: IngestionManager, fn): @@ -190,3 +201,25 @@ def queue_layer_helper(parent_layer: int, imanager: IngestionManager, fn): ) ) q.enqueue_many(job_datas) + + +def job_type_guard(job_type: str): + def decorator_job_type_guard(func): + @functools.wraps(func) + def wrapper_job_type_guard(*args, **kwargs): + redis = get_redis_connection() + current_type = redis.get(r_keys.JOB_TYPE) + if current_type is not None: + current_type = current_type.decode() + msg = ( + f"Currently running `{current_type}`. You're attempting to run `{job_type}`." + f"\nRun `[flask] {current_type} flush_redis` to clear the current job and restart." + ) + if current_type != job_type: + print(f"\n*WARNING*\n{msg}") + exit(1) + return func(*args, **kwargs) + + return wrapper_job_type_guard + + return decorator_job_type_guard diff --git a/pychunkedgraph/utils/redis.py b/pychunkedgraph/utils/redis.py index 420a849f1..fa43c867a 100644 --- a/pychunkedgraph/utils/redis.py +++ b/pychunkedgraph/utils/redis.py @@ -19,8 +19,8 @@ REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "") REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0" -keys_fields = ("INGESTION_MANAGER",) -keys_defaults = ("pcg:imanager",) +keys_fields = ("INGESTION_MANAGER", "JOB_TYPE") +keys_defaults = ("pcg:imanager", "pcg:job_type") Keys = namedtuple("keys", keys_fields, defaults=keys_defaults) keys = Keys()