Skip to content

Commit

Permalink
starting work on providing a ray protocol for distributed composites/…
Browse files Browse the repository at this point in the history
…processes
  • Loading branch information
prismofeverything committed Nov 10, 2023
1 parent bd873d3 commit 98ff5e7
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 2 deletions.
2 changes: 1 addition & 1 deletion process_bigraph/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from process_bigraph.composite import Process, Step, Composite
from process_bigraph.type_system import types
from process_bigraph.protocols import local_lookup
from process_bigraph.protocols.local import local_lookup
from process_bigraph.registry import protocol_registry, process_registry
from process_bigraph.emitter import ConsoleEmitter, RAMEmitter

Expand Down
Empty file.
File renamed without changes.
66 changes: 66 additions & 0 deletions process_bigraph/protocols/ray.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
===============================================
Protocols for retrieving processes from address
===============================================
"""

import importlib
import sys
from process_bigraph.composite import Process
from process_bigraph.registry import protocol_registry

import ray


def local_lookup_module(address):
"""Local Module Protocol
Retrieves local module
"""
if '.' in address:
module_name, class_name = address.rsplit('.', 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)
else:
return getattr(sys.modules[__name__], address)


def local_lookup_registry(address):
"""Process Registry Protocol
Retrieves from the process registry
"""
return process_registry.access(address)


@ray.remote
class RayProcess(Process):
pass


class RayProcessFactory:
def __init__(self, instantiate):
self.instantiate = instantiate


def __call__(self, config):
self.instantiate(config)


def ray_lookup(address):
"""Ray Lookup Protocol
Retrieves processes that operate through Ray's distributed actor system, from the process registry or from a local module
"""
if address[0] == '!':
instantiate = local_lookup_module(address[1:])
else:
instantiate = local_lookup_registry(address)

factory = RayProcessFactory(instantiate)
return factory


protocol_registry.register('ray', ray_lookup)


73 changes: 72 additions & 1 deletion process_bigraph/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,49 @@ def test_composite():
assert updates[0]['exchange'] == 0.999


def test_nested_composite():
# TODO: add support for the various vivarium emitters

# increase = IncreaseProcess({'rate': 0.3})
# TODO: This is the config of the composite,
# we also need a way to serialize the entire composite

composite = Composite({
'state': {
'example': {
'_type': 'process',
'address': 'ray:',
'interval': 0.5,
'config': {
'composition': {
'increase': 'process[level:float]',
'value': 'float'},
'schema': {
'exchange': 'float'},
'bridge': {
'exchange': ['value']},
'state': {
'increase': {
'address': 'local:!process_bigraph.tests.IncreaseProcess',
'config': {'rate': 0.3},
'interval': 1.0,
'wires': {'level': ['value']}},
'value': 11.11}},
'wires': {}}}})

initial_state = {'exchange': 3.33}

updates = composite.update(initial_state, 10.0)

final_exchange = sum([
update['exchange']
for update in [initial_state] + updates])

assert composite.state['value'] > 45
assert 'exchange' in updates[0]
assert updates[0]['exchange'] == 0.999


def test_infer():
composite = Composite({
'state': {
Expand Down Expand Up @@ -343,6 +386,34 @@ def test_reaction():
'inner': ['inner']}}}}}}


def test_ray():
import process_bigraph.protocols.ray

composite = Composite({
'state': {
'increase': {
'_type': 'process',
'address': 'ray:!process_bigraph.tests.IncreaseProcess',
'config': {'rate': 0.3},
'interval': 1.0,
'wires': {'level': ['value']}},
'value': 11.11}})

initial_state = {'exchange': 3.33}

updates = composite.update(initial_state, 10.0)

final_exchange = sum([
update['exchange']
for update in [initial_state] + updates])

assert composite.state['value'] > 45
assert 'exchange' in updates[0]
assert updates[0]['exchange'] == 0.999




if __name__ == '__main__':
test_default_config()
test_merge_collections()
Expand All @@ -351,4 +422,4 @@ def test_reaction():
test_infer()
test_step_initialization()
test_dependencies()
# test_reaction()
# test_reaction()

0 comments on commit 98ff5e7

Please sign in to comment.