Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More flexible services for RPC #1206

Merged
merged 4 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Added `show_cells` flag to `compas.scene.VolMeshObject`.
* Added `compas.data.Data.to_jsonstring` and `compas.data.Data.from_jsonstring`.
* Added `compas.data.Data.attributes`.
* Added optional param `working_directory` to `compas.rpc.Proxy` to be able to start services defined in random locations.

### Changed

Expand All @@ -52,6 +53,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Changed `compas.datastructures.Network` to take additional `**kwargs`, instead of only `name=None` specifically.
* Changed `compas.datastructures.Halfedge` to take additional `**kwargs`, instead of only `name=None` specifically.
* Changed `compas.datastructures.Mesh` to take additional `**kwargs`, instead of only `name=None` specifically.
* Moved registration of `ping` and `remote_shutdown` of the RPC server to `compas.rpc.Server.__init__()`.
* Moved `FileWatcherService` to `compas.rpc.services.watcher` so it can be reused.

### Removed

Expand Down
16 changes: 16 additions & 0 deletions src/compas/rpc/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class Proxy(object):
capture_output : bool, optional
If True, capture the stdout/stderr output of the remote process.
In general, `capture_output` should be True when using a `pythonw` as executable (default).
path : str, optional
Path to the folder containing the module to be proxied.
This is useful for cases where the module to be proxied is not on the PYTHONPATH.
working_directory : str, optional
Current working directory for the process that will be started to run the server.
This is useful for cases where a custom service is used and the service is not on the PYTHONPATH.

Attributes
----------
Expand Down Expand Up @@ -109,6 +115,7 @@ def __init__(
autoreload=True,
capture_output=True,
path=None,
working_directory=None,
):
self._package = None
self._python = compas._os.select_python(python)
Expand All @@ -120,6 +127,7 @@ def __init__(
self._function = None
self._profile = None
self._path = path
self._working_directory = working_directory

self.service = service
self.package = package
Expand Down Expand Up @@ -256,13 +264,15 @@ def start_server(self):

"""
env = compas._os.prepare_environment()

# this part starts the server side of the RPC setup
# it basically launches a subprocess
# to start the default service
# the default service creates a server
# and registers a dispatcher for custom functionality
try:
Popen

except NameError:
self._process = Process()
for name in env:
Expand Down Expand Up @@ -291,12 +301,16 @@ def start_server(self):
if self.capture_output:
kwargs["stdout"] = PIPE
kwargs["stderr"] = PIPE
if self._working_directory:
kwargs["cwd"] = self._working_directory

self._process = Popen(args, **kwargs)

# this starts the client side
# it creates a proxy for the server
# and tries to connect the proxy to the actual server
server = ServerProxy(self.address)

print("Starting a new proxy server...")
success = False
attempt_count = 0
Expand All @@ -310,10 +324,12 @@ def start_server(self):
else:
success = True
break

if not success:
raise RPCServerError("The server is not available.")
else:
print("New proxy server started.")

return server

def stop_server(self):
Expand Down
5 changes: 5 additions & 0 deletions src/compas/rpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class DefaultService(Dispatcher):

"""

def __init__(self, address, *args, **kwargs):
super(Server, self).__init__(address, *args, **kwargs)
self.register_function(self.ping)
self.register_function(self.remote_shutdown)

def ping(self):
"""Simple function used to check if a remote server can be reached.

Expand Down
76 changes: 26 additions & 50 deletions src/compas/rpc/services/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,28 @@
it listens to requests on port ``1753``.

"""
import os
import sys

from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer

from compas.rpc import Dispatcher
from compas.rpc import Server
from compas.rpc import Dispatcher

from .watcher import FileWatcherService


class DefaultService(Dispatcher):
def __init__(self):
super(DefaultService, self).__init__()

def special(self):
return "special"

class FileWatcherService(Dispatcher):
def __init__(self):
super(FileWatcherService, self).__init__()
self.current_module = None
self.current_observer = None

def on_module_imported(self, module, newly_loaded_modules):
module_spec = module.__spec__
module_dir = os.path.dirname(module_spec.origin)

# Stop existing observer, if any
if self.current_module != module and self.current_observer:
self.current_observer.stop()

self.current_module = module
reload_event_handler = ModuleReloader(newly_loaded_modules)

print("Watching on {}".format(module_dir))
self.current_observer = Observer()
self.current_observer.schedule(reload_event_handler, module_dir, recursive=True)
self.current_observer.start()


class ModuleReloader(PatternMatchingEventHandler):
def __init__(self, module_names):
super(ModuleReloader, self).__init__(ignore_patterns=["__pycache__"])
self.module_names = module_names

def on_any_event(self, event):
if event.src_path.endswith(".py"):
# Unload modules so that they are reloaded on the next invocation
for module in self.module_names:
if module in sys.modules:
sys.modules.pop(module)


def start_service(port, autoreload, **kwargs):
def start_service(port=1753, autoreload=True, **kwargs):
print("Starting default RPC service on port {0}...".format(port))

# start the server on *localhost*
# and listen to requests on port *1753*
server = Server(("0.0.0.0", port))

# register a few utility functions
server.register_function(server.ping)
server.register_function(server.remote_shutdown)
host = "0.0.0.0"
address = host, port
server = Server(address)

# register an instance of the default service
# the default service extends the base service
Expand All @@ -88,22 +49,37 @@ def start_service(port, autoreload, **kwargs):
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--port", "-p", action="store", default=1753, type=int, help="RPC port number")

parser.add_argument(
"--port",
"-p",
action="store",
default=1753,
type=int,
help="RPC port number",
)

parser.add_argument(
"--autoreload",
dest="autoreload",
action="store_true",
help="Autoreload modules",
)

parser.add_argument(
"--no-autoreload",
dest="autoreload",
action="store_false",
help="Do not autoreload modules",
)
parser.set_defaults(autoreload=True, func=start_service)

parser.set_defaults(
autoreload=True,
func=start_service,
)

args = parser.parse_args()

if hasattr(args, "func"):
args.func(**vars(args))
else:
Expand Down
43 changes: 43 additions & 0 deletions src/compas/rpc/services/watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import os
import sys

from watchdog.events import PatternMatchingEventHandler
from watchdog.observers import Observer

from compas.rpc import Dispatcher


class FileWatcherService(Dispatcher):
def __init__(self):
super(FileWatcherService, self).__init__()
self.current_module = None
self.current_observer = None

def on_module_imported(self, module, newly_loaded_modules):
module_spec = module.__spec__
module_dir = os.path.dirname(module_spec.origin)

# Stop existing observer, if any
if self.current_module != module and self.current_observer:
self.current_observer.stop()

self.current_module = module
reload_event_handler = ModuleReloader(newly_loaded_modules)

print("Watching on {}".format(module_dir))
self.current_observer = Observer()
self.current_observer.schedule(reload_event_handler, module_dir, recursive=True)
self.current_observer.start()


class ModuleReloader(PatternMatchingEventHandler):
def __init__(self, module_names):
super(ModuleReloader, self).__init__(ignore_patterns=["__pycache__"])
self.module_names = module_names

def on_any_event(self, event):
if event.src_path.endswith(".py"):
# Unload modules so that they are reloaded on the next invocation
for module in self.module_names:
if module in sys.modules:
sys.modules.pop(module)
Loading