Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unix-socket unexpectetly closed by client #554

Open
walterschneider opened this issue Apr 12, 2024 · 0 comments
Open

unix-socket unexpectetly closed by client #554

walterschneider opened this issue Apr 12, 2024 · 0 comments

Comments

@walterschneider
Copy link

Deleting a list of netrefs unexpectedly closes the connection.

  • depending on size of list and net.core.wmem_default, socket write fails
  • the exception is silently ignore, connection closed
  • the next operation fails due to closed connection

sock.send throws an exception (TimeoutError), connection is closed:

write, stream.py:292
send, channel.py:78
_send, protocol.py:308
_async_request, protocol.py:750
async_request, protocol.py:768
asyncreq, netref.py:79
__del__, netref.py:122
<module>, client.py:50
Suggestions/ Workarounds /Questions
  • call syscreq instead of asyncrequest from BaseNetref.__del__
    class BaseNetref(object, metaclass=NetrefMetaclass):
    
        def __del__(self):
            try:
                # syncrequest here???
                asyncreq(self, consts.HANDLE_DEL, self.____refcount__)
            except Exception:
                # raised in a destructor, most likely on program termination,
                # when the connection might have already been closed.
                # it's safe to ignore all exceptions here
                pass
  • TCP-socket connections is not affected
Environment
  • rpyc 6.0.0
  • python 3.10
  • ubuntu 22.04
Minimal example

Server:

import os
import logging
import socket

import rpyc.utils.server
import rpyc.utils.classic

import argparse
Parser = argparse.ArgumentParser()
Parser.add_argument("--unix", action='store_true', help="use unix socket")
args = Parser.parse_args()

hostname = "localhost"
socket_path = "/tmp/rpyc-socket"

if os.path.exists(socket_path):
    os.remove(socket_path)

# setup logger
fmt = logging.Formatter("%(asctime)s [%(process)d] (%(levelname)s) %(module)s.%(funcName)s(%(lineno)d): %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(logging.DEBUG)

Logger = logging.Logger("rpyc_server")
Logger.addHandler(sh)

Logger.info("starting ThreadedServer - rpyc %s" % str(rpyc.__version__))

if args.unix:
    t = rpyc.utils.server.ThreadedServer(rpyc.core.SlaveService, socket_path=socket_path, logger=Logger)
else:
    t = rpyc.utils.server.ThreadedServer(rpyc.core.SlaveService, hostname=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, logger=Logger)
    t.listener.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

# setup socket
t._listen()
# run server
t.start()

Client:

import rpyc.utils.classic
import socket
import logging
import subprocess
import time

import argparse
Parser = argparse.ArgumentParser()
Parser.add_argument("--unix", action='store_true', help="use unix socket")
args = Parser.parse_args()

hostname = "localhost"
socket_path = "/tmp/rpyc-socket"

# setup logger
fmt = logging.Formatter("%(asctime)s [%(process)d] (%(levelname)s) %(module)s.%(funcName)s(%(lineno)d): %(message)s")
sh = logging.StreamHandler()
sh.setFormatter(fmt)
sh.setLevel(logging.DEBUG)

Logger = logging.Logger("rpyc_client")
Logger.addHandler(sh)

Logger.info(subprocess.Popen(["sysctl", "net.core.wmem_max"], stdout=subprocess.PIPE).stdout.readline().decode("ASCII").strip())
Logger.info(subprocess.Popen(["sysctl", "net.core.wmem_default"], stdout=subprocess.PIPE).stdout.readline().decode("ASCII").strip())

Logger.info(f"creating Connection - rpyc {rpyc.__version__}")

if args.unix:
    conn = rpyc.classic.factory.unix_connect(path=socket_path, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1, "nodelay": True})
else:
    #conn = rpyc.classic.factory.connect(host=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1})
    s = rpyc.core.stream.SocketStream.connect(host=hostname, port=rpyc.utils.classic.DEFAULT_SERVER_PORT, ipv6=False, keepalive=False, nodelay=True)
    conn = rpyc.classic.factory.connect_stream(s, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1})



Logger.info(f"Connection: {conn}")

conn.execute("import time")
rtime = conn.modules["time"]

for i in range(1, 6):
    try:
        Logger.info(f"creating list of strict_time ({i})")
        t0 = time.time()
        tl = [rtime.struct_time([0]*9) for _ in range(1000)]
        print(time.time()-t0)
        Logger.info(f"free list ({i})")
        tl = []
        Logger.info(f"done  ({i})")
    except EOFError as e:
        Logger.info(e)
        raise e

Output (Server):

/usr/bin/python3 /home/wasc/playground/rpyc/server.py --unix 
2024-04-12 11:19:09,668 [1282739] (INFO) server.<module>(28): starting ThreadedServer - rpyc 6.0.0
2024-04-12 11:19:09,668 [1282739] (INFO) server._listen(251): server started on []:/tmp/rpyc-socket
2024-04-12 11:19:21,567 [1282739] (INFO) server.accept(157): accepted  with fd 4
2024-04-12 11:19:21,567 [1282739] (INFO) server._serve_client(200): welcome 
2024-04-12 11:19:21,604 [1282739] (INFO) server._serve_client(207): goodbye 

Output (Client):

/usr/bin/python3 /home/wasc/playground/rpyc/client.py --unix 
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(24): net.core.wmem_max = 212992
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(25): net.core.wmem_default = 212992
2024-04-12 11:19:21,566 [1282744] (INFO) client.<module>(27): creating Connection - rpyc 6.0.0
Traceback (most recent call last):
  File "/home/wasc/playground/rpyc/client.py", line 30, in <module>
    conn = rpyc.classic.factory.unix_connect(path=socket_path, service=rpyc.classic.SlaveService, config={"sync_request_timeout": -1, "nodelay": True})
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 117, in unix_connect
    return connect_stream(s, service, config)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 60, in connect_stream
    return connect_channel(Channel(stream), service=service, config=config)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/utils/factory.py", line 48, in connect_channel
    return service._connect(channel, config)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/service.py", line 106, in _connect
    self.on_connect(conn)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/service.py", line 215, in on_connect
    self._install(conn, conn.root)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 777, in root
    self._remote_root = self.sync_request(consts.HANDLE_GETROOT)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 744, in sync_request
    return _async_res.value
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/async_.py", line 109, in value
    self.wait()
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/async_.py", line 51, in wait
    self._conn.serve(self._ttl, waiting=self._waiting)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 472, in serve
    self._dispatch(data)  # Dispatch will unbox, invoke callbacks, etc.
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 421, in _dispatch
    obj = self._unbox(args)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 343, in _unbox
    proxy = self._netref_factory(id_pack)
  File "/home/wasc/.local/lib/python3.10/site-packages/rpyc/core/protocol.py", line 363, in _netref_factory
    return cls(self, id_pack)
TypeError: rpyc.core.service.SlaveService() takes no arguments

Process finished with exit code 1
@walterschneider walterschneider changed the title unix-socket enexpectetly closed by client unix-socket unexpectetly closed by client Apr 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant