Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove deprecated unbounded queue #2925

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
6 changes: 3 additions & 3 deletions check.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/bin/bash

set -ex
set -o pipefail

ON_GITHUB_CI=true
EXIT_STATUS=0
Expand Down Expand Up @@ -55,16 +56,15 @@ MYPY=0
echo "::group::Mypy"
# Cleanup previous runs.
rm -f mypy_annotate.dat
# Pipefail makes these pipelines fail if mypy does, even if mypy_annotate.py succeeds.
set -o pipefail

mypy --show-error-end --platform linux | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Linux \
|| { echo "* Mypy (Linux) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; }
# Darwin tests FreeBSD too
mypy --show-error-end --platform darwin | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Mac \
|| { echo "* Mypy (Mac) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; }
mypy --show-error-end --platform win32 | python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat --platform Windows \
|| { echo "* Mypy (Windows) found type errors." >> "$GITHUB_STEP_SUMMARY"; MYPY=1; }
set +o pipefail

# Re-display errors using Github's syntax, read out of mypy_annotate.dat
python ./src/trio/_tools/mypy_annotate.py --dumpfile mypy_annotate.dat
# Then discard.
Expand Down
1 change: 0 additions & 1 deletion src/trio/_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
temporarily_detach_coroutine_object,
wait_task_rescheduled,
)
from ._unbounded_queue import UnboundedQueue, UnboundedQueueStatistics

# Windows imports
if sys.platform == "win32":
Expand Down
4 changes: 2 additions & 2 deletions src/trio/_core/_generated_io_kqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
if TYPE_CHECKING:
import select

from .. import _core
from .._channel import MemoryReceiveChannel
from .._file_io import _HasFileNo
from ._traps import Abort, RaiseCancelT
import sys
Expand Down Expand Up @@ -43,7 +43,7 @@ def current_kqueue() -> select.kqueue:

def monitor_kevent(
ident: int, filter: int
) -> ContextManager[_core.UnboundedQueue[select.kevent]]:
) -> ContextManager[MemoryReceiveChannel[select.kevent]]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh actually I'm just realizing this is potentially a compat break. I can work around this using a kwarg but I need to see if this currently warns or not...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private subclass which adds the unbounded queue methods but warns if you call them?

"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__.
Expand Down
6 changes: 4 additions & 2 deletions src/trio/_core/_generated_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
if TYPE_CHECKING:
from typing_extensions import Buffer

from .._channel import MemoryReceiveChannel
from .._file_io import _HasFileNo
from ._unbounded_queue import UnboundedQueue
from ._windows_cffi import CData, Handle
import sys

Expand Down Expand Up @@ -189,7 +189,9 @@ def current_iocp() -> int:
raise RuntimeError("must be called from async context") from None


def monitor_completion_key() -> ContextManager[tuple[int, UnboundedQueue[object]]]:
def monitor_completion_key() -> (
ContextManager[tuple[int, MemoryReceiveChannel[object]]]
):
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__ and `#52
Expand Down
31 changes: 16 additions & 15 deletions src/trio/_core/_io_kqueue.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import errno
import math
import select
import sys
from contextlib import contextmanager
Expand All @@ -16,7 +17,8 @@
if TYPE_CHECKING:
from typing_extensions import TypeAlias

from .._core import Abort, RaiseCancelT, Task, UnboundedQueue
from .._channel import MemoryReceiveChannel, MemorySendChannel
from .._core import Abort, RaiseCancelT, Task
from .._file_io import _HasFileNo

assert not TYPE_CHECKING or (sys.platform != "linux" and sys.platform != "win32")
Expand All @@ -34,10 +36,9 @@
@attr.s(slots=True, eq=False)
class KqueueIOManager:
_kqueue: select.kqueue = attr.ib(factory=select.kqueue)
# {(ident, filter): Task or UnboundedQueue}
_registered: dict[tuple[int, int], Task | UnboundedQueue[select.kevent]] = attr.ib(
factory=dict
)
_registered: dict[
tuple[int, int], Task | MemorySendChannel[select.kevent]
] = attr.ib(factory=dict)
_force_wakeup: WakeupSocketpair = attr.ib(factory=WakeupSocketpair)
_force_wakeup_fd: int | None = attr.ib(default=None)

Expand Down Expand Up @@ -94,7 +95,7 @@
if isinstance(receiver, _core.Task):
_core.reschedule(receiver, outcome.Value(event))
else:
receiver.put_nowait(event)
receiver.send_nowait(event)

Check warning on line 98 in src/trio/_core/_io_kqueue.py

View check run for this annotation

Codecov / codecov/patch

src/trio/_core/_io_kqueue.py#L98

Added line #L98 was not covered by tests

# kevent registration is complicated -- e.g. aio submission can
# implicitly perform a EV_ADD, and EVFILT_PROC with NOTE_TRACK will
Expand All @@ -119,21 +120,24 @@
@_public
def monitor_kevent(
self, ident: int, filter: int
) -> Iterator[_core.UnboundedQueue[select.kevent]]:
) -> Iterator[MemoryReceiveChannel[select.kevent]]:
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__.
"""
from .._channel import open_memory_channel

Check warning on line 128 in src/trio/_core/_io_kqueue.py

View check run for this annotation

Codecov / codecov/patch

src/trio/_core/_io_kqueue.py#L128

Added line #L128 was not covered by tests

key = (ident, filter)
if key in self._registered:
raise _core.BusyResourceError(
"attempt to register multiple listeners for same ident/filter pair"
)
q = _core.UnboundedQueue[select.kevent]()
self._registered[key] = q
send, recv = open_memory_channel[select.kevent](math.inf)
self._registered[key] = send

Check warning on line 136 in src/trio/_core/_io_kqueue.py

View check run for this annotation

Codecov / codecov/patch

src/trio/_core/_io_kqueue.py#L135-L136

Added lines #L135 - L136 were not covered by tests
try:
yield q
yield recv

Check warning on line 138 in src/trio/_core/_io_kqueue.py

View check run for this annotation

Codecov / codecov/patch

src/trio/_core/_io_kqueue.py#L138

Added line #L138 was not covered by tests
finally:
send.close()

Check warning on line 140 in src/trio/_core/_io_kqueue.py

View check run for this annotation

Codecov / codecov/patch

src/trio/_core/_io_kqueue.py#L140

Added line #L140 was not covered by tests
del self._registered[key]

@_public
Expand Down Expand Up @@ -274,8 +278,5 @@
_core.reschedule(receiver, outcome.Error(exc))
del self._registered[key]
else:
# XX this is an interesting example of a case where being able
# to close a queue would be useful...
raise NotImplementedError(
"can't close an fd that monitor_kevent is using"
)
receiver.close()
del self._registered[key]

Check warning on line 282 in src/trio/_core/_io_kqueue.py

View check run for this annotation

Codecov / codecov/patch

src/trio/_core/_io_kqueue.py#L281-L282

Added lines #L281 - L282 were not covered by tests
20 changes: 13 additions & 7 deletions src/trio/_core/_io_windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import enum
import itertools
import math
import socket
import sys
from contextlib import contextmanager
Expand Down Expand Up @@ -43,9 +44,9 @@
if TYPE_CHECKING:
from typing_extensions import Buffer, TypeAlias

from .._channel import MemoryReceiveChannel, MemorySendChannel
from .._file_io import _HasFileNo
from ._traps import Abort, RaiseCancelT
from ._unbounded_queue import UnboundedQueue

EventResult: TypeAlias = int
T = TypeVar("T")
Expand Down Expand Up @@ -435,7 +436,7 @@ def __init__(self) -> None:
self._overlapped_waiters: dict[CData, _core.Task] = {}
self._posted_too_late_to_cancel: set[CData] = set()

self._completion_key_queues: dict[int, UnboundedQueue[object]] = {}
self._completion_key_queues: dict[int, MemorySendChannel[object]] = {}
self._completion_key_counter = itertools.count(CKeys.USER_DEFINED)

with socket.socket() as s:
Expand Down Expand Up @@ -610,7 +611,7 @@ def process_events(self, received: EventResult) -> None:
info = CompletionKeyEventInfo(
lpOverlapped=overlapped, dwNumberOfBytesTransferred=transferred
)
queue.put_nowait(info)
queue.send_nowait(info)

def _register_with_iocp(self, handle_: int | CData, completion_key: int) -> None:
handle = _handle(handle_)
Expand Down Expand Up @@ -981,16 +982,21 @@ def current_iocp(self) -> int:

@contextmanager
@_public
def monitor_completion_key(self) -> Iterator[tuple[int, UnboundedQueue[object]]]:
def monitor_completion_key(
self,
) -> Iterator[tuple[int, MemoryReceiveChannel[object]]]:
"""TODO: these are implemented, but are currently more of a sketch than
anything real. See `#26
<https://github.com/python-trio/trio/issues/26>`__ and `#52
<https://github.com/python-trio/trio/issues/52>`__.
"""
from .._channel import open_memory_channel

key = next(self._completion_key_counter)
queue = _core.UnboundedQueue[object]()
self._completion_key_queues[key] = queue
send, recv = open_memory_channel[object](math.inf)
self._completion_key_queues[key] = send
try:
yield (key, queue)
yield (key, recv)
finally:
send.close()
del self._completion_key_queues[key]
11 changes: 5 additions & 6 deletions src/trio/_core/_parking_lot.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
# theirs and our tasks are lighter, so for us #objects is smaller and #tasks
# is larger.
#
# This is in the core because for two reasons. First, it's used by
# UnboundedQueue, and UnboundedQueue is used for a number of things in the
# core. And second, it's responsible for providing fairness to all of our
# high-level synchronization primitives (locks, queues, etc.). For now with
# our FIFO scheduler this is relatively trivial (it's just a FIFO waitqueue),
# but in the future we ever start support task priorities or fair scheduling
# This is in the core because it's responsible for providing fairness to all
# of our high-level synchronization primitives (locks, queues, etc.). For now
# with our FIFO scheduler this is relatively trivial (it's just a FIFO
# waitqueue), but in the future we ever start support task priorities or fair
# scheduling
Comment on lines +27 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grammar here is a bit odd. I would change it to something like
"but in the future if we ever support task priorities or
fair scheduling it should be fairly simple."

#
# https://github.com/python-trio/trio/issues/32
#
Expand Down
154 changes: 0 additions & 154 deletions src/trio/_core/_tests/test_unbounded_queue.py

This file was deleted.

Loading