Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async/Batching of coroutines #2855

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import msgpack
from dataclasses_json import DataClassJsonMixin, dataclass_json
from flyteidl.core import literals_pb2
from fsspec.asyn import _run_coros_in_chunks # pylint: disable=W0212
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we bring in this code from fsspec? I rather not depend on private API from another library.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsspec is not going anywhere. I don't understand why that particular method is private, but that function existed in its current form since 2021. IMO it's safe to assume that's going to live there for a long time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asked them to make it public: fsspec/filesystem_spec#1740. i think we should just use the private function until they make it public. if they refuse, we can always copy later. If we copy, i'll also be copying in the config code cuz I think it is nice to have some user facing controls around that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thomasjpfan? is this satisfactory?

from google.protobuf import json_format as _json_format
from google.protobuf import struct_pb2 as _struct
from google.protobuf.json_format import MessageToDict as _MessageToDict
Expand Down Expand Up @@ -1539,8 +1540,10 @@ async def async_to_literal(
raise TypeTransformerFailedError("Expected a list")

t = self.get_sub_type(python_type)
lit_list = [TypeEngine.async_to_literal(ctx, x, t, expected.collection_type) for x in python_val]
lit_list = await asyncio.gather(*lit_list)
lit_list = [
asyncio.create_task(TypeEngine.async_to_literal(ctx, x, t, expected.collection_type)) for x in python_val
]
lit_list = await _run_coros_in_chunks(lit_list)

return Literal(collection=LiteralCollection(literals=lit_list))

Expand All @@ -1562,7 +1565,7 @@ async def async_to_python_value( # type: ignore

st = self.get_sub_type(expected_python_type)
result = [TypeEngine.async_to_python_value(ctx, x, st) for x in lits]
result = await asyncio.gather(*result)
result = await _run_coros_in_chunks(result)
return result # type: ignore # should be a list, thinks its a tuple

def guess_python_type(self, literal_type: LiteralType) -> list: # type: ignore
Expand Down Expand Up @@ -1968,7 +1971,7 @@ async def async_to_literal(
TypeEngine.async_to_literal(ctx, v, cast(type, v_type), expected.map_value_type)
)

await asyncio.gather(*lit_map.values())
await _run_coros_in_chunks([c for c in lit_map.values()])
for k, v in lit_map.items():
lit_map[k] = v.result()

Expand All @@ -1994,7 +1997,7 @@ async def async_to_python_value(self, ctx: FlyteContext, lv: Literal, expected_p
fut = asyncio.create_task(TypeEngine.async_to_python_value(ctx, v, cast(Type, tp[1])))
py_map[k] = fut

await asyncio.gather(*py_map.values())
await _run_coros_in_chunks([c for c in py_map.values()])
for k, v in py_map.items():
py_map[k] = v.result()

Expand Down
Loading