Skip to content

Commit

Permalink
feat: post process sv cross edges
Browse files Browse the repository at this point in the history
  • Loading branch information
akhileshh committed Aug 11, 2023
1 parent 0dec5a3 commit f3d3e5b
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
6 changes: 3 additions & 3 deletions pychunkedgraph/graph/attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,21 @@ class Connectivity:

L2CrossChunkEdge = _AttributeArray(
pattern=b"l2_cross_edge_%d",
family_id="3",
family_id="4",
serializer=serializers.NumPyArray(
dtype=basetypes.NODE_ID, shape=(-1, 2), compression_level=22
),
)

FakeEdges = _Attribute(
key=b"fake_edges",
family_id="3",
family_id="4",
serializer=serializers.NumPyArray(dtype=basetypes.NODE_ID, shape=(-1, 2)),
)

CrossChunkEdge = _AttributeArray(
pattern=b"atomic_cross_edges_%d",
family_id="4",
family_id="3",
serializer=serializers.NumPyArray(
dtype=basetypes.NODE_ID, shape=(-1, 2), compression_level=22
),
Expand Down
4 changes: 2 additions & 2 deletions pychunkedgraph/graph/client/bigtable/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,9 +636,9 @@ def _create_column_families(self):
f.create()
f = self._table.column_family("2")
f.create()
f = self._table.column_family("3")
f = self._table.column_family("3", gc_rule=MaxAgeGCRule(datetime.timedelta(days=1)))
f.create()
f = self._table.column_family("4", gc_rule=MaxAgeGCRule(datetime.timedelta(days=1)))
f = self._table.column_family("4")
f.create()

def _get_ids_range(self, key: bytes, size: int) -> typing.Tuple:
Expand Down
54 changes: 53 additions & 1 deletion pychunkedgraph/ingest/create/atomic_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ def _get_remapping(chunk_edges_d: dict):


def _process_component(
cg, chunk_edges_d, parent_id, node_ids, sparse_indices, remapping, time_stamp,
cg,
chunk_edges_d,
parent_id,
node_ids,
sparse_indices,
remapping,
time_stamp,
):
nodes = []
chunk_out_edges = [] # out = between + cross
Expand Down Expand Up @@ -145,3 +151,49 @@ def _get_outgoing_edges(node_id, chunk_edges_d, sparse_indices, remapping):
# edges that this node is part of
chunk_out_edges = np.concatenate([chunk_out_edges, edges[row_ids]])
return chunk_out_edges


def postprocess_atomic_chunk(
cg: ChunkedGraph,
chunk_coord: np.ndarray,
time_stamp: Optional[datetime.datetime] = None,
):
time_stamp = get_valid_timestamp(time_stamp)

chunk_id = cg.get_chunk_id(
layer=2, x=chunk_coord[0], y=chunk_coord[1], z=chunk_coord[2]
)

properties = [
attributes.Connectivity.CrossChunkEdge[l] for l in range(2, cg.meta.layer_count)
]

chunk_rr = cg.range_read_chunk(
chunk_id, properties=properties, time_stamp=time_stamp
)

result = {}
for l2id, raw_cx_edges in chunk_rr.items():
try:
cx_edges = {
prop.index: val[0].value.copy() for prop, val in raw_cx_edges.items()
}
result[l2id] = cx_edges
except KeyError:
continue

nodes = []
val_dicts = []
for l2id, cx_edges in result.items():
val_dict = {}
for layer, edges in cx_edges.items():
l2_edges = np.zeros_like(edges)
l2_edges[:, 0] = l2id
l2_edges[:, 1] = cg.get_parents(edges[:, 1])
col = attributes.Connectivity.L2CrossChunkEdge[layer]
val_dict[col] = np.unique(l2_edges, axis=0)
val_dicts.append(val_dict)

r_key = serializers.serialize_uint64(l2id)
nodes.append(cg.client.mutate_row(r_key, val_dict, time_stamp=time_stamp))
cg.client.write(nodes)

0 comments on commit f3d3e5b

Please sign in to comment.