Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix timeout related Python 3.12.0b4 incompatibilities #232

Merged
merged 10 commits into from
Aug 1, 2023

Conversation

d-maurer
Copy link
Contributor

Fixes #231.

The reported test failures all have the same cause: Python 3.12 has changed the asyncio timeout handling; it now requires a proper task context.

ZEO (mostly) uses highly optimized tasks (maybe comparable to the new "eager tasks") instead of standard tasks. Those tasks do not call asyncio.tasks._{enter,exit}task amd therefor do not properly set up the task context.
The PR does not change this but instead uses the timeout implementation from branch direct_socket_access_with_loop_lock (#225). The branch implements an idea from @navytux to access the socket directly. I have observed considerable speedup. Maybe @navytux finds it worth to do a review and we merge #225 eventually. This would prevent the use of standard timeouts anyway.

@d-maurer d-maurer requested a review from navytux July 26, 2023 09:26
@d-maurer
Copy link
Contributor Author

Recent changes on ZODB@master have broken the zodbmaster tests (not only here but in all branches). This PR contains the necessary adaptions to those changes (beside its primary aim).

@d-maurer
Copy link
Contributor Author

Recent changes on ZODB@master have broken the zodbmaster tests (not only here but in all branches). This PR contains the necessary adaptions to those changes (beside its primary aim).

However, the adaptation is incomplete -- most of the tests inherited from the ZODB are no longer executed (they have the prefix test rather than the expected check -- see #233).

… for ZODB inherited tests with prefixes `check` and `test`
@d-maurer
Copy link
Contributor Author

This PR now fixes #233, too: it looks for ZODB inherited tests both with the old prefix check and the new prefix test.

@dataflake
Copy link
Member

Just one test failing on Python 3.12 now

@d-maurer
Copy link
Contributor Author

d-maurer commented Jul 28, 2023 via email

@d-maurer
Copy link
Contributor Author

Jens Vagelpohl wrote at 2023-7-28 00:28 -0700:
Just one test failing on Python 3.12 now
This has been a race condition: ...

We could make the test more robust (should my assumption be correct):
A reconnection timeout is mapped to ClientDisconnected. Thus, currently, the test does not distinguish the reconnection timeout from the initial ClientDisconnected.
However, the ClientDisconnected raised for a reconnection timeout has an associated descriptive text (timed out waiting for connection). If the test checks for this text, it can recognize timeouts and avoid the race condition.

@dataflake
Copy link
Member

Must be a race condition. Running the tests locally works fine, including macOS. When I slow things down by running all Python versions in parallel I start seeing ClientDisconnected in different and changing places.

@navytux
Copy link
Contributor

navytux commented Jul 28, 2023

Dieter, thanks for looking into this problem and for asking for review.
I still want to look into #225 and I'm sorry it could not happen on my side yet.

Given that #225 might be the future I generally agree that it makes sense to
pick up timeout implementation from there instead of going into direction of
setting up full task context.

Regarding hereby PR I have a few questions about await_with_timeout:

We invoke await_with_timeout from two places - from under
await_operational_co and from under close_co. Both those functions are
async def, which _co suffix also indicates, and so are running by the loop
on the IO thread. On the other hand what await_with_timeout does is
loop.call_soon_threadsafe(setup_timeout) as if it is running on a thread
different from IO. Maybe I'm missing something, but an await_with_timeout could
be simpler if it is implemented like below:

async def await_with_timeout(f, timeout, loop):
    waiter = Future(loop)
    def stop()
        if not waiter.done():
            waiter.set_result(None)

    f.add_done_callback(stop)
    handle = loop.call_later(timeout, stop)
    await waiter
    try:
        if f.done():
            return f.result()
        else:
            f.remove_done_callback(stop)
            raise asyncio.TimeoutError
    finally:
        handle.cancel()

Is it indeed correct or I am missing something and call_soon_threadsafe is
actually needed for a reason?

Another thing I wanted to ask is about

         if not self.operational:
+            if timeout <= 0:
+                raise ClientDisconnected("timed out waiting for connection")
+
             try:
-                await asyncio.wait_for(asyncio.shield(self.connected), timeout)
+                await await_with_timeout(self.connected, timeout, self.loop)
             except asyncio.TimeoutError:
                 raise ClientDisconnected("timed out waiting for connection")

Here do we need to add that explicit if timeout <= 0: raise ClientDisconnected
or we can rely on await_with_timeout raising TimeoutError
and followup except block converting that to ClientDisconnected?

Thanks once again for looking into this,
Kirill

@d-maurer
Copy link
Contributor Author

d-maurer commented Jul 28, 2023 via email

@navytux
Copy link
Contributor

navytux commented Jul 31, 2023

Dieter, thanks for feedback.

So to restore context I first reread carefully our conversation on
#224 and
#225.

And while thinking it all over again I started to question myself why asyncio
is needed for ZEO client at all? Frankly I do not see any good answer:
asyncio can be useful for a server that wants to handle many connections
without creating a thread for each client. But on the ZEO client we already
have clients each running in its own thread and accessing ZODB by calls like
loadBefore.

What we currently do is that we forward those from-threads calls to the IO
thread, which in turn forwards the requests to the server. The IO thread is
implemented via asyncio, probably for historic reason, with bringing many
peculiarities if we want to do e.g. direct write to the socket without switching
to IO thread.

The IO thread is also probably needed to demultiplex replies to multiple
request issued by several threads in parallel. That is simple solution, but it
costs another process switch in the reception path.

As #225 (comment) says
for every loadBefore request we currently have 4 switches in between processes,
while ideally that should be only 2. And as we experienced it is the process
switches that contribute the most to object loading latency.

So I squeezed some time to sketch up a ZEO client working without asyncio and
having only 2 process switches for a loadBefore.

In #225 (comment) you wrote:

Possible in principle but not easy to achieve.
You would make the ZEO communication synchronous: the application
thread sends the request and waits for the result.
This would work with RPC calls BUT the server can also send
asynchronous calls AND you must guarantee that messages
are processed in sent order.

A further difficulty: long running requests (such as pack).
You would not want that during such calls other requests are blocked.
This means that you cannot use simple synchronous communication;
instead the communication channel must be multiplexed between
different application threads and an additional actor processing
asnychonous messages.

Please find below an outline implementation that satisfies all those
properties. If we can agree that this approach is sound, I think removing
asyncio usage should be the future instead of trying to monkey patch it with
many limitations.

"""kzeo.py illustrates ideas for ZEO client with low-latency tx and rx.

With standard ZEO it takes 4 process switches to handle 1 loadBefore:

  1. Client thread -> IO thread via `call_soon_threadsafe` which internally sends wakeup via internal socket.
  2. IO thread     -> ZEO server process (request)
  3. IO thread     <- ZEO server         (reply)
  4. client thread <- IO thread          (result)

  ( https://github.com/zopefoundation/ZEO/pull/225#issuecomment-1414441686 )

Hereby implementation avoids "1". It also avoids "4" for one of the client threads.

XXX this code was not tested at all. It currently only illustrates the idea.
"""

from ZEO.asyncio import marshal as zmarshal
from ZEO.asyncio.client import exc_factories as zexc_factories
from ZEO.Exceptions import ServerException
import struct
from time import time, sleep
import socket

from golang import func, defer, chan, select, default, go
from golang import sync


# ZEOClient sketches up a client for ZEO.
# It supports multithreading and can be used by multiple threads simultaneously.
class ZEOClient:
    # ._sk      socket              connection to ZEO server
    # ._tx_lock sync.Mutex          protects writing to _sk
    # ._rx_lock sema via chan(1)    protects reading from _sk + invoking received notifications
    # ._rx_buf  bytearray           buffer for rx data
    #
    # ._inflight_mu sync.Mutex              protects _inflight
    # ._inflight    {} msgid -> _Request    in flight requests
    #
    # ._trxloop_until_reply   float         time of last entry to _rxloop_until_reply
    #
    # lock ordering:
    #  _rx_lock > _inflight_mu
    pass

# _Request represents in-flight request to server.
class _Request:
    # .ready    chan[structZ]   becomes closed when request is complete
    # ._resp    *               response | error
    # .msgid    *               message ID of the request
    def __init__(self, msgid):
        self.ready = chan(dtype='C.structZ')
        self._resp = _missing
        self.msgid = msgid


# ZEOClient() creates new ZEO client connected to particular storage on ZEO server at addr.
@func(ZEOClient)
def __init__(self, addr, storage='1'):
    self._sk = socket.connect(addr)
    self._tx_lock = sync.Mutex()
    self._rx_lock = chan(1)
    self._rx_buf = bytearray()
    self._inflight_mu = sync.Mutex()
    self._inflight = {}
    self._trxloop_until_reply = float('-inf')

    # handshake
    hsrv = self.__rx_pkt()
    encoding = hsrv[0]
    version  = hsrv[1:]
    assert encoding in (b'Z', b'M')
    assert version == b'5'
    hcli = encoding + version
    self._tx_pkt(hcli)

    # handshaked ok
    self.encoder = zmarshal.encoder(encoding)
    self.decoder = zmarshal.decoder(encoding)

    # XXX cache verification ...

    # spawn rx poller so that invalidation messages do not queue indefinitely if there is no local activity
    go(self._rxloop_poller)

    # now ready to serve clients


# example client request to handle
@func(ZEOClient)
def loadBefore(self, oid, tid):
   # XXX try cache first

   # see if another loading request for (oid, tid) is already in flight
   # if yes - only wait for that one to complete
   msgid = (oid, tid)
   with self._inflight_mu:
       req = self._inflight.get(msgid)
       already = (req is not None)
       if not already:
           req = _Request(msgid)
           self._inflight[msgid] = req

   if already:
       req.ready.recv()
       return req.result()

   # this thread becomes responsible for issuing the request to server
   req_pkt = self.encode(msgid, False, 'loadBefore', (msgid,))
   with self._tx_lock:
       self._tx_pkt(req_pkt)

   # and for waiting for and processing the reply.
   # But while we are waiting for the reply replies to other requests and notifications might come first.
   # We handle them all until we get ours.
   self._rxloop_until_reply(req)
   return req.result()

# _rxloop_until_reply receives and processes incoming packets until reply for req is received.
# it can be called simultaneously but receives incoming messages and runs their handlers strictly serially.
@func(ZEOClient)
def _rxloop_until_reply(self, req: _Request):
    # indicate to _rxloop_poller that there is local activity
    self._trxloop_until_reply = time()    # with gil

    # wait until either
    # - we get ._rx_lock, or
    # - req becomes completed by _rxloop_until_reply ran in another thread while we are waiting.
    _, _rx = select(
        (self._rx_lock.send, 0),    # 0
        req.ready.recv,             # 1
    )
    if _ == 1:
        return

    # we've got _rx_lock
    defer(self._rx_lock.recv)

    # see if maybe req still became completed simultaneously to us getting the lock
    _, _rx = select(
        req.ready,  # 0
        default,    # 1
    )
    if _ == 0:
        return

    # not yet. We are holding _rx_lock and noone else can access the reception until we release it.
    # let's receive and handle messages until we see req's reply.
    # when we see a notification - we invoke corresponding handler.
    # when we see a reply to another request - we notify its .ready to wakeup corresponding waiter
    while 1:
        msgid_ = self.__rx_and_handle1()
        if msgid_ == req.msgid:
            return

# _rxloop_poller polls rx from time to time to handle e.g. invalidations.
# it is run in its own thread so that server-originated messages do not queue
# up indefinitely if there is no local activity.
@func(ZEOClient)
def _rxloop_poller(self):
    # XXX stop on close
    while 1:
        self._rx_lock.send(0)
        try:
            self.__rx_and_handle1()
        finally:
            self._rx_lock.recv()

        # pause rx polling for a while if there is local activity
        # this should result in that client thread receives its reply from server directly, not via rx poller thread
        while 1:
            now = time()
            trxloop_until_reply = self._trxloop_until_reply   # with gil
            dt = now - trxloop_until_reply
            if dt >= 1:
                break
            sleep(1-dt)


# __rx_and_handle1 receives one incoming packet and processes it.
# must be called under _rx_lock.
@func(ZEOClient)
def __rx_and_handle1(self): # -> msgid
    pkt = self._rx_pkt()
    msgid, async_, name, args = self.decode(pkt)
    if name != '.reply':
        assert async_   # client only get async calls
        if not name in self.client_methods:
            raise AttributeError(name)
        getattr(self, name)(*args)

    else:
        # name == '.reply'
        with self._inflight_mu:
            req = self._inflight.pop(msgid)
        if async_:  # ZEO5 exception
            klass, args = args
            factory = zexc_factories.get(klass)
            if factory:
                exc = factory(klass, args)
            else:
                exc = ServerException(klass,args)
            req.resp = exc
        else:
            req.resp = args
        req.ready.close()

    return msgid

# result turns received response into either plain return or raised exception.
@func(_Request)
def result(self):
    resp = self._resp
    assert resp is not _missing
    if isinstance(resp, Exception):
        raise resp
    else:
        return resp


# example notification handler
ZEOClient.client_methods = ('invalidateTransaction',)
@func(ZEOClient)
def invalidateTransaction(self, tid, oids):
    1/0 # XXX



# _tx_pkt wraps up raw payload with sized header and transmits it.
# it must be invoked only serially.
@func(ZEOClient)
def _tx_pkt(self, data):
    pkt = struct.pack('>I', len(data)) + data
    _sendall(self._sk, pkt)

# _rx_pkt receives sized message and returns its raw payload.
# it must be called only serially.
@func(ZEOClient)
def _rx_pkt(self): # -> data
    if len(self._rx_buf) < 4:
        self._rx_buf += _readatleast(self._sk, 4 - len(self._rx_buf))
    assert len(self._rx_buf) >= 4
    size, = struct.unpack('>I', self._rx_buf[:4])
    self._rx_buf = self._rx_buf[4:]
    if len(self._rx_buf) < size:
        self._rx_buf += _readatleast(self._sk, size - len(self._rx_buf))
    assert len(self._rx_buf) >= size
    data = self._rx_buf[:size]
    self._rx_buf = self._rx_buf[size:]
    return data



# _sendall sends all data to the socket.
def _sendall(sk, data):
    1/0 # XXX

# _readatleast reads at least specified data from the socket.
def _readatleast(sk, n): # -> data | error
    1/0 # XXX


# _missing represent something missing.
# It is not None because None is valid RPC result.
_missing = object()

Speaking about hereby PR I suggest to do the following:

  1. move if timeout <= 0: raise ... into await_with_timeout so that that behaviour obtained uniformly and in particular we do not need to duplicate raise ClientDisconnected("timed out waiting for connection").
  2. either add _threadsafe suffix to await_with_timeout or change it to await_with_timeout_co and remove multithreading support as we currently do not use it. Personally I believe going with _co and simpler implementation is better at this step.

Kirill

@d-maurer
Copy link
Contributor Author

d-maurer commented Jul 31, 2023 via email

@d-maurer
Copy link
Contributor Author

d-maurer commented Aug 1, 2023

Speaking about hereby PR I suggest to do the following:

1. move `if timeout <= 0: raise ...` into await_with_timeout so that that behaviour obtained uniformly and in particular we do not need to duplicate `raise ClientDisconnected("timed out waiting for connection")`.

2. either add `_threadsafe` suffix to `await_with_timeout` or change it to `await_with_timeout_co` and remove multithreading support as we currently do not use it.  Personally I believe going with `_co` and simpler implementation is better at this step.

Kirill

Thank you for your review.

I did not follow your first suggestion: await_with_timeout is designed for just its current 2 uses; the use in close_co uses timeout > 0, only in await_operational_co timeout <= 0 is possible; handling this case there avoids a timeout -> ClientDisconnected remap. I have verified that calling await_with_timeout with timeout <= 0 does not cause an error, it is just not particularly efficient. Further optimizations (e.g. for f.done()) should be implemented if await_with_timeout should become a general replacement for asyncio.wait_for.

I have closed #225. Thus, it becomes unlikely that await_with_timeout will ever be called from outside the IO thread. I followed your second suggestion to simplify it accordingly.

@d-maurer d-maurer merged commit e563781 into master Aug 1, 2023
26 checks passed
@d-maurer d-maurer deleted the py312_compatibility branch August 1, 2023 05:34
@dataflake
Copy link
Member

With this change when I run all Python version tests in parallel on my slower VM only Python 3.12 still shows issues similar to this:

Error in test checkBadMessage2 (ZEO.tests.testConnection.MappingStorageConnectionTests.checkBadMessage2)
Traceback (most recent call last):
  File "/opt/zope/Python-3.12.0b4/lib/python3.12/unittest/case.py", line 58, in testPartExecutor
    yield
  File "/opt/zope/Python-3.12.0b4/lib/python3.12/unittest/case.py", line 634, in run
    self._callTestMethod(testMethod)
  File "/opt/zope/Python-3.12.0b4/lib/python3.12/unittest/case.py", line 589, in _callTestMethod
    if method() is not None:
  File "/home/zope/src/ZEO/src/ZEO/tests/ConnectionTests.py", line 434, in checkBadMessage2
    self._bad_message(msg)
  File "/home/zope/src/ZEO/src/ZEO/tests/ConnectionTests.py", line 472, in _bad_message
    self._dostore()
  File "/home/zope/src/ZEO/.tox/py312/lib/python3.12/site-packages/ZODB/tests/StorageTestBase.py", line 165, in _dostore
    self._storage.tpc_begin(t)
  File "/home/zope/src/ZEO/src/ZEO/ClientStorage.py", line 916, in tpc_begin
    self._async(
  File "/home/zope/src/ZEO/src/ZEO/asyncio/client.py", line 882, in async_
    raise ClientDisconnected
ZEO.Exceptions.ClientDisconnected

@navytux
Copy link
Contributor

navytux commented Aug 1, 2023

Dieter, thanks for feedback and I appologize for the situation with
#225.

I originally tried to express my ideas in February in
#225 (comment), but, it
seems, I could not convey them for good. And then the combination of stresses
stroke me again, including fighting for someone close for long and despite
that finally loosing him. That's why I asked for the pause in
#225 (comment). But I
can also understand your feelings: waiting for half a year for feedback and
finally seeing suggestion to go via different direction does not feel good.

I'm, once again, sorry for that.


Speaking about my sketch:

The problem comes from the asynchronous calls (e.g. invalidations)
and the need to ensure that all messages are processed in
sent order.

The IO thread allows for an easy solution to this requirement.
It will be much more difficult when there is no single thread
which handles all incoming messages.

I fully agree: for correctness we absoltely must ensure that the order in
which messages are processed must stay the same. I originally tried to explain
how to achieve that with just words in
#225 (comment). But
since, it could not make it in February, this time I tried to show my ideas with the code:

In my implementation, even though reception and processing of incoming messages can
jump from one thread to another all the time, it is still guaranteed that the
reception and processing is done serially and one message by one in exactly
the same order the messages were sent. It is clear to see this because:

  1. the only place where messages are received and processed is __rx_and_handle1:

    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L179-207

  2. __rx_and_handle1 is always run only by one thread at a time. The mechanism to
    ensure this is to invoke __rx_and_handle1 with under holding _rx_lock

    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L33
    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L146-153
    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L162-166
    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L180

  3. __rx_and_handle1 receives and processes messages one by one serially.
    And right after reception corresponding notification handler, e.g.
    invalidateTransaction is invoked.

    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L183-189

  4. In order to make sure we do not grow notification queue indefinitely if
    there is no local activity, an analog of IO thread - _rxloop_poller is run
    in the background. But that thread monitors ZEOClient usage, and if it sees
    that there is local activity, it pauses RX polling for a while, so that it
    actually could be a client thread to receive a reply to its request directly:

    https://lab.nexedi.com/kirr/ZEO/blob/bd123d87/kzeo.py#L155-176

  5. Since both rx poller, and all client threads follow the same procedure to
    receive/process incoming messages (acquire rx_lock, then invoke
    __rx_and_handle1) it should be clear that all incoming messages are
    received and processed serially and in exactly the same order they were sent
    by the server.

This will be a big change.
Earlier you stressed that stability is one of your major concerns.
Big changes have the potential to threaden stability.
Therefore, I suggest, you implement your idea (completely)
and run our tests against it.
If this succeeds, we measure the performance
and if there is significant gain, I will take a close look
at your implementation with a good chance that I agree to its use.

This is fair. And yes, it is me who stresses correctness and stability first.
But given that we saw many issues with asyncio before, and we are kind of
facing them now from the performance point of view, thinking about finding a
way to improve situation should be still possible.

For the reference my idea about how this big change could be made is as
follows: add a second implementation of the client in parallel to having the old
one based on asyncio. Use the one based on asyncio by default but allow users
to opt-in into the new one. Have both clients included into test matrix. If,
over the time, the new one shows it is faster and passes tests reliably, switch
it to be the default one. If after some time no significant problems pop up,
retire the old one based on asyncio.

But Dieter, I cannot afford myself to invest time until we generally agree and
see each other on the common track that doing things the way I propose is at
least clear and correct. So would you please spend a bit of your time to
review the general organization of how messages are sent, received and
processed in my sketch?

The whole thing is ~ 250 lines of code including comments. I tried to keep it
small on purpose so that it is easier to oversee the approach.


Thanks also for accepting part of my suggestions for hereby PR.

Kirill

@d-maurer
Copy link
Contributor Author

d-maurer commented Aug 1, 2023 via email

@navytux
Copy link
Contributor

navytux commented Aug 1, 2023

Ok, clear and fair. Thanks for feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Python 3.12 test failures
3 participants