Skip to content

Commit

Permalink
Do not use threads, PETSc would not be happy
Browse files Browse the repository at this point in the history
  • Loading branch information
connorjward committed Nov 23, 2023
1 parent 894d1d4 commit d7eb841
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 58 deletions.
11 changes: 0 additions & 11 deletions pyop3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,6 @@ class Configuration(dict):
cdim > 1 be built as block sparsities, or dof sparsities. The
former saves memory but changes which preconditioners are
available for the resulting matrices. (Default yes)
:param thread_model: Thread support provided by the underlying OS. This
is used for the interleaving of computation and communication in parallel
loops. Possible values are:
* "SINGLE": Only a single thread is supported.
* "SERIALIZED": Multiple threads are allowed but only one may make MPI calls.
* "MULTIPLE": Multiple threads are allowed and they can all make MPI calls.
This is the default.
Note that these names have been drawn from MPI_Init_thread terminology.
"""
# name, env variable, type, default, write once
Expand All @@ -74,7 +64,6 @@ class Configuration(dict):
"print_cache_size": ("PYOP3_PRINT_CACHE_SIZE", bool, False),
"matnest": ("PYOP3_MATNEST", bool, True),
"block_sparsity": ("PYOP3_BLOCK_SPARSITY", bool, True),
"thread_model": ("PYOP3_THREAD_MODEL", str, "MULTIPLE"),
}
"""Default values for PyOP2 configuration parameters"""

Expand Down
60 changes: 13 additions & 47 deletions pyop3/lang.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,59 +253,28 @@ def _array_updates(self):
(initializers, (finalizers0, finalizers1))
Collections of callables to be executed at the right times.
Notes
-----
To avoid blocking, updates are done using a different Python thread per
array per operation. The function returns a tuple of initializer callables
that trigger the operations, as well as two collections of finalizer callables.
These are separated into those that must be complete for "root" points to be
valid, and those that must be complete for "leaf" points to be valid.
This does not release the GIL but this is acceptable for this application.
"""
# NOTE: It is safe to include reductions in the finalizers because
# core entities (in the iterset) are defined as being those that do
# not overlap with any points in the star forest.
if config["thread_model"] in {"SINGLE", "SERIALIZED"}:
# SINGLE means other threads cannot be used and SERIALIZED means
# that ghost exchanges can only be done on a single thread
raise NotImplementedError

initializers = []
finalizerss = ([], [])
for array, intent, touches_ghost_points in self._distarray_args:
if intent in {READ, RW}:
if touches_ghost_points:
if not array._roots_valid:
bcast_thread = threading.Thread(
action=array._broadcast_roots_to_leaves,
)
# As soon as the reduction is done we start the broadcast. This
# is done on a different thread so we can use the termination of
# the original thread as a finalizer.
reduce_thread = threading.Thread(
action=map(
operator.call,
[array._reduce_leaves_to_roots, bcast_thread.start],
)
initializers.append(array._reduce_leaves_to_roots_begin)
finalizerss[0].extend(
[
array._reduce_leaves_to_roots_end,
array._broadcast_roots_to_leaves_begin,
]
)
initializers.append(reduce_thread.start)
finalizerss[0].append(reduce_thread.join)
finalizerss[1].append(bcast_thread.join)
finalizerss[1].append(array._broadcast_roots_to_leaves_end)
else:
thread = threading.Thread(
action=array._broadcast_roots_to_leaves
)
initializers.append(thread.start)
finalizerss[1].append(thread.join)
initializers.append(array._broadcast_roots_to_leaves_begin)
finalizerss[1].append(array._broadcast_roots_to_leaves_end)
else:
if not array._roots_valid:
thread = threading.Thread(
action=array._reduce_leaves_to_roots,
)
initializers.append(thread.start)
finalizerss[0].append(thread.join)
initializers.append(array._reduce_leaves_to_roots_begin)
finalizerss[0].append(array._reduce_leaves_to_roots_end)

elif intent == WRITE:
# Assumes that all points are written to (i.e. not a subset). If
Expand All @@ -328,11 +297,8 @@ def _array_updates(self):
# explained in the documentation.
if intent in {INC, MIN_RW, MAX_RW}:
assert array._pending_reduction is not None
thread = threading.Thread(
action=array._reduce_leaves_to_roots,
)
initializers.append(thread.start)
finalizerss[0].append(thread.join)
initializers.append(array._reduce_leaves_to_roots_begin)
finalizerss[0].append(array._reduce_leaves_to_roots_end)

# We are modifying owned values so the leaves must now be wrong
array._leaves_valid = False
Expand Down

0 comments on commit d7eb841

Please sign in to comment.