From f1fae1542311c9d4a307d287f43201114ed331bc Mon Sep 17 00:00:00 2001 From: mloubout Date: Tue, 7 Nov 2023 12:20:03 -0500 Subject: [PATCH 1/5] mpi: fix empty aindices halo --- devito/ir/stree/algorithms.py | 4 +-- devito/mpi/distributed.py | 2 ++ devito/operations/interpolators.py | 15 ++++---- devito/types/basic.py | 4 +++ devito/types/sparse.py | 5 +-- tests/test_mpi.py | 57 +++++++++++++++++++++++++++++- 6 files changed, 76 insertions(+), 11 deletions(-) diff --git a/devito/ir/stree/algorithms.py b/devito/ir/stree/algorithms.py index f40288946f..b679ef0d3a 100644 --- a/devito/ir/stree/algorithms.py +++ b/devito/ir/stree/algorithms.py @@ -150,7 +150,7 @@ def preprocess(clusters, options=None, **kwargs): for c in clusters: if c.is_halo_touch: hs = HaloScheme.union(e.rhs.halo_scheme for e in c.exprs) - queue.append(c.rebuild(halo_scheme=hs)) + queue.append(c.rebuild(expr=None, halo_scheme=hs)) elif c.is_critical_region and c.syncs: processed.append(c.rebuild(exprs=None, guards=c.guards, syncs=c.syncs)) elif c.is_wild: @@ -165,7 +165,7 @@ def preprocess(clusters, options=None, **kwargs): # Skip if the halo exchange would end up outside # its iteration space - if h_indices and not h_indices & dims: + if h_indices and dims and not h_indices & dims: continue diff = dims - distributed_aindices diff --git a/devito/mpi/distributed.py b/devito/mpi/distributed.py index 1ffaae1067..b9efa0010f 100644 --- a/devito/mpi/distributed.py +++ b/devito/mpi/distributed.py @@ -17,6 +17,8 @@ from devito.types.utils import DimensionTuple +__all__ = ['CustomTopology'] + # Do not prematurely initialize MPI # This allows launching a Devito program from within another Python program # that has *already* initialized MPI diff --git a/devito/operations/interpolators.py b/devito/operations/interpolators.py index c932802f4f..bee8bc0de4 100644 --- a/devito/operations/interpolators.py +++ b/devito/operations/interpolators.py @@ -7,7 +7,7 @@ from devito.finite_differences.differentiable import Mul from devito.finite_differences.elementary import floor from devito.symbolics import retrieve_function_carriers, retrieve_functions, INT -from devito.tools import as_tuple, flatten +from devito.tools import as_tuple, flatten, filter_ordered from devito.types import (ConditionalDimension, Eq, Inc, Evaluable, Symbol, CustomDimension) from devito.types.utils import DimensionTuple @@ -163,7 +163,7 @@ def r(self): @cached_property def _rdim(self): - parent = self.sfunction.dimensions[-1] + parent = self.sfunction._sparse_dim dims = [CustomDimension("r%s%s" % (self.sfunction.name, d.name), -self.r+1, self.r, 2*self.r, parent) for d in self._gdims] @@ -184,15 +184,18 @@ def _rdim(self): def _augment_implicit_dims(self, implicit_dims, extras=None): if extras is not None: - extra = set([i for v in extras for i in v.dimensions]) - set(self._gdims) + extra = filter_ordered([i for v in extras for i in v.dimensions + if i not in self._gdims and + i not in self.sfunction.dimensions]) extra = tuple(extra) else: extra = tuple() if self.sfunction._sparse_position == -1: - return self.sfunction.dimensions + as_tuple(implicit_dims) + extra + idims = self.sfunction.dimensions + as_tuple(implicit_dims) + extra else: - return as_tuple(implicit_dims) + self.sfunction.dimensions + extra + idims = extra + as_tuple(implicit_dims) + self.sfunction.dimensions + return tuple(filter_ordered(idims)) def _coeff_temps(self, implicit_dims): return [] @@ -283,7 +286,7 @@ def _interpolate(self, expr, increment=False, self_subs={}, implicit_dims=None): variables = list(retrieve_function_carriers(_expr)) # Implicit dimensions - implicit_dims = self._augment_implicit_dims(implicit_dims) + implicit_dims = self._augment_implicit_dims(implicit_dims, variables) # List of indirection indices for all adjacent grid points idx_subs, temps = self._interp_idx(variables, implicit_dims=implicit_dims) diff --git a/devito/types/basic.py b/devito/types/basic.py index e1ce7fea30..a06e9c9a09 100644 --- a/devito/types/basic.py +++ b/devito/types/basic.py @@ -1453,6 +1453,10 @@ def _hashable_content(self): def indices(self): return DimensionTuple(*super().indices, getters=self.function.dimensions) + @cached_property + def dimensions(self): + return self.function.dimensions + @property def function(self): return self.base.function diff --git a/devito/types/sparse.py b/devito/types/sparse.py index f268467dda..75dbdf502b 100644 --- a/devito/types/sparse.py +++ b/devito/types/sparse.py @@ -896,7 +896,7 @@ class SparseTimeFunction(AbstractSparseTimeFunction, SparseFunction): __rkwargs__ = tuple(filter_ordered(AbstractSparseTimeFunction.__rkwargs__ + SparseFunction.__rkwargs__)) - def interpolate(self, expr, u_t=None, p_t=None, increment=False): + def interpolate(self, expr, u_t=None, p_t=None, increment=False, implicit_dims=None): """ Generate equations interpolating an arbitrary expression into ``self``. @@ -921,7 +921,8 @@ def interpolate(self, expr, u_t=None, p_t=None, increment=False): if p_t is not None: subs = {self.time_dim: p_t} - return super().interpolate(expr, increment=increment, self_subs=subs) + return super().interpolate(expr, increment=increment, self_subs=subs, + implicit_dims=implicit_dims) def inject(self, field, expr, u_t=None, p_t=None, implicit_dims=None): """ diff --git a/tests/test_mpi.py b/tests/test_mpi.py index ce420558b0..dfa831a7e3 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -6,7 +6,8 @@ from devito import (Grid, Constant, Function, TimeFunction, SparseFunction, SparseTimeFunction, Dimension, ConditionalDimension, SubDimension, SubDomain, Eq, Ne, Inc, NODE, Operator, norm, inner, configuration, - switchconfig, generic_derivative, PrecomputedSparseFunction) + switchconfig, generic_derivative, PrecomputedSparseFunction, + DefaultDimension) from devito.arch.compiler import OneapiCompiler from devito.data import LEFT, RIGHT from devito.ir.iet import (Call, Conditional, Iteration, FindNodes, FindSymbols, @@ -601,6 +602,60 @@ def test_precomputed_sparse(self, r): Operator(sf1.interpolate(u))() assert np.all(sf1.data == 4) + @pytest.mark.parallel(mode=4) + def test_no_grid_dim_slow(self): + shape = (12, 13, 14) + nfreq = 5 + nrec = 2 + + grid = Grid(shape=shape) + f = DefaultDimension(name="f", default_value=nfreq) + + u = Function(name="u", grid=grid, dimensions=(*grid.dimensions, f), + shape=(*shape, nfreq), space_order=2) + u.data.fill(1) + + class CoordSlowSparseFunction(SparseFunction): + _sparse_position = 0 + + r = Dimension(name="r") + s = CoordSlowSparseFunction(name="s", grid=grid, dimensions=(r, f), + shape=(nrec, nfreq), npoint=nrec) + + rec_eq = s.interpolate(expr=u) + + op = Operator([Eq(u, 1)] + rec_eq) + print(op) + op.apply() + assert np.all(s.data == 1) + + @pytest.mark.parallel(mode=4) + def test_no_grid_dim_slow_time(self): + shape = (12, 13, 14) + nfreq = 5 + nrec = 2 + + grid = Grid(shape=shape) + t = grid.stepping_dim + f = DefaultDimension(name="f", default_value=nfreq) + + u = TimeFunction(name="u", grid=grid, dimensions=(t, *grid.dimensions, f), + shape=(2, *shape, nfreq), space_order=2) + + class CoordSlowSparseFunction(SparseTimeFunction): + _sparse_position = 0 + + r = Dimension(name="r") + s = CoordSlowSparseFunction(name="s", grid=grid, dimensions=(r, f), + shape=(nrec, nfreq), npoint=nrec, nt=5) + + rec_eq = s.interpolate(expr=u) + + op = Operator([Eq(u, 1)] + rec_eq) + print(op) + op.apply() + assert np.all(s.data == 1) + class TestOperatorSimple(object): From afd4f5ee1de16991208cad3548416b94479b7029 Mon Sep 17 00:00:00 2001 From: mloubout Date: Wed, 8 Nov 2023 15:05:06 -0500 Subject: [PATCH 2/5] mpi: patch missing dimensions for halospot --- devito/ir/iet/algorithms.py | 5 +++- devito/ir/stree/algorithms.py | 47 ++++++++++++++++++++++++++---- devito/mpi/halo_scheme.py | 4 +++ devito/operations/interpolators.py | 2 +- devito/passes/iet/mpi.py | 3 ++ examples/seismic/tti/operators.py | 12 +++----- tests/test_interpolation.py | 4 +-- tests/test_mpi.py | 42 +++++++++++++++++++++++--- 8 files changed, 98 insertions(+), 21 deletions(-) diff --git a/devito/ir/iet/algorithms.py b/devito/ir/iet/algorithms.py index 59f7237fca..ec6381698e 100644 --- a/devito/ir/iet/algorithms.py +++ b/devito/ir/iet/algorithms.py @@ -40,7 +40,10 @@ def iet_build(stree): nsections += 1 elif i.is_Halo: - body = HaloSpot(queues.pop(i), i.halo_scheme) + try: + body = HaloSpot(queues.pop(i), i.halo_scheme) + except KeyError: + body = HaloSpot(None, i.halo_scheme) elif i.is_Sync: body = SyncSpot(i.sync_ops, body=queues.pop(i, None)) diff --git a/devito/ir/stree/algorithms.py b/devito/ir/stree/algorithms.py index b679ef0d3a..15291dda31 100644 --- a/devito/ir/stree/algorithms.py +++ b/devito/ir/stree/algorithms.py @@ -81,10 +81,17 @@ def stree_build(clusters, profiler=None, **kwargs): tip = augment_whole_subtree(c, tip, mapper, it) # Attach NodeHalo if necessary - for it, v in mapper.items(): - if needs_nodehalo(it.dim, c.halo_scheme): - v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) - break + if c.exprs: + for it, v in mapper.items(): + if needs_nodehalo(it.dim, c.halo_scheme): + v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) + break + else: + for it, v in reversed(mapper.items()): + if needs_nodehalo_dim(it.dim, c.halo_scheme): + v.bottom.children = (*v.bottom.children, + NodeHalo(c.halo_scheme, v.bottom.parent)) + break # Add in NodeExprs exprs = [] @@ -182,7 +189,33 @@ def preprocess(clusters, options=None, **kwargs): processed.append(c.rebuild(exprs=[], ispace=ispace, syncs=syncs)) halo_scheme = HaloScheme.union([c1.halo_scheme for c1 in found]) - processed.append(c.rebuild(halo_scheme=halo_scheme)) + itdims = c.ispace.itdims + if halo_scheme: + # We make sure that the halo scheme placement is valid for this cluster + # Mainly if a cluster has iteration dimension {t, d, f} + # and the halo_scheme has distributed_aindices {d} and + # loc_dirs {f} then the haloscheme needs to be lifted into its own + # {t, f} cluster because it needs to be inside an {f} loop while it would + # be placed outside the {d} loop and therefore outside the {f} loop if + # attached to the cluster. + dindex = max([itdims.index(d) for d in halo_scheme.loc_dirs + if d in itdims] + [0]) + aindex = max([itdims.index(d) for d in halo_scheme.distributed_aindices + if d in itdims] + [0]) + if dindex > aindex: + hispace = c.ispace.project(itdims[:dindex]) + else: + hispace = None + else: + hispace = None + + if hispace and options['mpi']: + processed.append(c.rebuild(exprs=[], + ispace=hispace, + halo_scheme=halo_scheme)) + processed.append(c.rebuild(halo_scheme=None)) + else: + processed.append(c.rebuild(halo_scheme=halo_scheme)) # Sanity check! try: @@ -229,6 +262,10 @@ def needs_nodehalo(d, hs): return d and hs and d._defines.intersection(hs.distributed_aindices) +def needs_nodehalo_dim(d, hs): + return d and hs and d._defines.intersection(hs.loc_indices) + + def reuse_section(candidate, section): try: if not section or candidate.siblings[-1] is not section: diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 970e84633d..4a5a8b26fc 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -365,6 +365,10 @@ def distributed_aindices(self): def loc_indices(self): return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()]) + @cached_property + def loc_dirs(self): + return set().union(*[i.loc_dirs.keys() for i in self.fmapper.values()]) + @cached_property def arguments(self): return self.dimensions | set(flatten(self.honored.values())) diff --git a/devito/operations/interpolators.py b/devito/operations/interpolators.py index bee8bc0de4..98e0105e8a 100644 --- a/devito/operations/interpolators.py +++ b/devito/operations/interpolators.py @@ -195,7 +195,7 @@ def _augment_implicit_dims(self, implicit_dims, extras=None): idims = self.sfunction.dimensions + as_tuple(implicit_dims) + extra else: idims = extra + as_tuple(implicit_dims) + self.sfunction.dimensions - return tuple(filter_ordered(idims)) + return tuple(idims) def _coeff_temps(self, implicit_dims): return [] diff --git a/devito/passes/iet/mpi.py b/devito/passes/iet/mpi.py index 00d96213aa..3436736711 100644 --- a/devito/passes/iet/mpi.py +++ b/devito/passes/iet/mpi.py @@ -91,6 +91,9 @@ def rule1(dep, candidates, loc_dims): for q in d._defines]) for n, i in enumerate(iters): + if i not in scopes: + continue + candidates = [i.dim._defines for i in iters[n:]] all_candidates = set().union(*candidates) diff --git a/examples/seismic/tti/operators.py b/examples/seismic/tti/operators.py index b6e8687c04..87a00f4cba 100644 --- a/examples/seismic/tti/operators.py +++ b/examples/seismic/tti/operators.py @@ -551,8 +551,7 @@ def ForwardOperator(model, geometry, space_order=4, # Source and receivers expr = src * dt / m if kernel == 'staggered' else src * dt**2 / m - stencils += src.inject(field=u.forward, expr=expr) - stencils += src.inject(field=v.forward, expr=expr) + stencils += src.inject(field=(u.forward, v.forward), expr=expr) stencils += rec.interpolate(expr=u + v) # Substitute spacing terms to reduce flops @@ -601,8 +600,7 @@ def AdjointOperator(model, geometry, space_order=4, # Construct expression to inject receiver values expr = rec * dt / m if kernel == 'staggered' else rec * dt**2 / m - stencils += rec.inject(field=p.backward, expr=expr) - stencils += rec.inject(field=r.backward, expr=expr) + stencils += rec.inject(field=(p.backward, r.backward), expr=expr) # Create interpolation expression for the adjoint-source stencils += srca.interpolate(expr=p + r) @@ -661,8 +659,7 @@ def JacobianOperator(model, geometry, space_order=4, eqn2 = FD_kernel(model, du, dv, space_order, qu=lin_usrc, qv=lin_vsrc) # Construct expression to inject source values, injecting at u0(t+dt)/v0(t+dt) - src_term = src.inject(field=u0.forward, expr=src * dt**2 / m) - src_term += src.inject(field=v0.forward, expr=src * dt**2 / m) + src_term = src.inject(field=(u0.forward, v0.forward), expr=src * dt**2 / m) # Create interpolation expression for receivers, extracting at du(t)+dv(t) rec_term = rec.interpolate(expr=du + dv) @@ -716,8 +713,7 @@ def JacobianAdjOperator(model, geometry, space_order=4, dm_update = Inc(dm, - (u0 * du.dt2 + v0 * dv.dt2)) # Add expression for receiver injection - rec_term = rec.inject(field=du.backward, expr=rec * dt**2 / m) - rec_term += rec.inject(field=dv.backward, expr=rec * dt**2 / m) + rec_term = rec.inject(field=(du.backward, dv.backward), expr=rec * dt**2 / m) # Substitute spacing terms to reduce flops return Operator(eqn + rec_term + [dm_update], subs=model.spacing_map, diff --git a/tests/test_interpolation.py b/tests/test_interpolation.py index efde867402..506a3b88de 100644 --- a/tests/test_interpolation.py +++ b/tests/test_interpolation.py @@ -703,8 +703,8 @@ class SparseFirst(SparseFunction): ds = DefaultDimension("ps", default_value=3) grid = Grid((11, 11)) dims = grid.dimensions - s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3)) - s.coordinates.data[:] = [[.5, .5], [.2, .2]] + s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3), + coordinates=[[.5, .5], [.2, .2]]) # Check dimensions and shape are correctly initialized assert s.indices[s._sparse_position] == dr diff --git a/tests/test_mpi.py b/tests/test_mpi.py index dfa831a7e3..9d5d09328d 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -602,6 +602,42 @@ def test_precomputed_sparse(self, r): Operator(sf1.interpolate(u))() assert np.all(sf1.data == 4) + @pytest.mark.parallel(mode=1) + def test_sparse_first(self): + """ + Tests custom sprase function with sparse dimension as first index. + """ + + class SparseFirst(SparseFunction): + """ Custom sparse class with the sparse dimension as the first one""" + _sparse_position = 0 + + dr = Dimension("cd") + ds = DefaultDimension("ps", default_value=3) + grid = Grid((11, 11)) + dims = grid.dimensions + s = SparseFirst(name="s", grid=grid, npoint=2, dimensions=(dr, ds), shape=(2, 3), + coordinates=[[.5, .5], [.2, .2]]) + + # Check dimensions and shape are correctly initialized + assert s.indices[s._sparse_position] == dr + assert s.shape == (2, 3) + assert s.coordinates.indices[0] == dr + + # Operator + u = TimeFunction(name="u", grid=grid, time_order=1) + fs = Function(name="fs", grid=grid, dimensions=(*dims, ds), shape=(11, 11, 3)) + + eqs = [Eq(u.forward, u+1), Eq(fs, u)] + # No time dependence so need the implicit dim + rec = s.interpolate(expr=s+fs, implicit_dims=grid.stepping_dim) + op = Operator(eqs + rec) + + op(time_M=10) + print + expected = 10*11/2 # n (n+1)/2 + assert np.allclose(s.data, expected) + @pytest.mark.parallel(mode=4) def test_no_grid_dim_slow(self): shape = (12, 13, 14) @@ -625,11 +661,10 @@ class CoordSlowSparseFunction(SparseFunction): rec_eq = s.interpolate(expr=u) op = Operator([Eq(u, 1)] + rec_eq) - print(op) op.apply() assert np.all(s.data == 1) - @pytest.mark.parallel(mode=4) + @pytest.mark.parallel(mode=1) def test_no_grid_dim_slow_time(self): shape = (12, 13, 14) nfreq = 5 @@ -652,8 +687,7 @@ class CoordSlowSparseFunction(SparseTimeFunction): rec_eq = s.interpolate(expr=u) op = Operator([Eq(u, 1)] + rec_eq) - print(op) - op.apply() + op.apply(time_M=5) assert np.all(s.data == 1) From 3a2857434da28550f8bcd4936319027cc387b23a Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 20 Nov 2023 08:53:09 +0000 Subject: [PATCH 3/5] compiler: Revert stree-level changes --- devito/ir/iet/algorithms.py | 5 +--- devito/ir/stree/algorithms.py | 51 +++++------------------------------ 2 files changed, 8 insertions(+), 48 deletions(-) diff --git a/devito/ir/iet/algorithms.py b/devito/ir/iet/algorithms.py index ec6381698e..59f7237fca 100644 --- a/devito/ir/iet/algorithms.py +++ b/devito/ir/iet/algorithms.py @@ -40,10 +40,7 @@ def iet_build(stree): nsections += 1 elif i.is_Halo: - try: - body = HaloSpot(queues.pop(i), i.halo_scheme) - except KeyError: - body = HaloSpot(None, i.halo_scheme) + body = HaloSpot(queues.pop(i), i.halo_scheme) elif i.is_Sync: body = SyncSpot(i.sync_ops, body=queues.pop(i, None)) diff --git a/devito/ir/stree/algorithms.py b/devito/ir/stree/algorithms.py index 15291dda31..f40288946f 100644 --- a/devito/ir/stree/algorithms.py +++ b/devito/ir/stree/algorithms.py @@ -81,17 +81,10 @@ def stree_build(clusters, profiler=None, **kwargs): tip = augment_whole_subtree(c, tip, mapper, it) # Attach NodeHalo if necessary - if c.exprs: - for it, v in mapper.items(): - if needs_nodehalo(it.dim, c.halo_scheme): - v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) - break - else: - for it, v in reversed(mapper.items()): - if needs_nodehalo_dim(it.dim, c.halo_scheme): - v.bottom.children = (*v.bottom.children, - NodeHalo(c.halo_scheme, v.bottom.parent)) - break + for it, v in mapper.items(): + if needs_nodehalo(it.dim, c.halo_scheme): + v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) + break # Add in NodeExprs exprs = [] @@ -157,7 +150,7 @@ def preprocess(clusters, options=None, **kwargs): for c in clusters: if c.is_halo_touch: hs = HaloScheme.union(e.rhs.halo_scheme for e in c.exprs) - queue.append(c.rebuild(expr=None, halo_scheme=hs)) + queue.append(c.rebuild(halo_scheme=hs)) elif c.is_critical_region and c.syncs: processed.append(c.rebuild(exprs=None, guards=c.guards, syncs=c.syncs)) elif c.is_wild: @@ -172,7 +165,7 @@ def preprocess(clusters, options=None, **kwargs): # Skip if the halo exchange would end up outside # its iteration space - if h_indices and dims and not h_indices & dims: + if h_indices and not h_indices & dims: continue diff = dims - distributed_aindices @@ -189,33 +182,7 @@ def preprocess(clusters, options=None, **kwargs): processed.append(c.rebuild(exprs=[], ispace=ispace, syncs=syncs)) halo_scheme = HaloScheme.union([c1.halo_scheme for c1 in found]) - itdims = c.ispace.itdims - if halo_scheme: - # We make sure that the halo scheme placement is valid for this cluster - # Mainly if a cluster has iteration dimension {t, d, f} - # and the halo_scheme has distributed_aindices {d} and - # loc_dirs {f} then the haloscheme needs to be lifted into its own - # {t, f} cluster because it needs to be inside an {f} loop while it would - # be placed outside the {d} loop and therefore outside the {f} loop if - # attached to the cluster. - dindex = max([itdims.index(d) for d in halo_scheme.loc_dirs - if d in itdims] + [0]) - aindex = max([itdims.index(d) for d in halo_scheme.distributed_aindices - if d in itdims] + [0]) - if dindex > aindex: - hispace = c.ispace.project(itdims[:dindex]) - else: - hispace = None - else: - hispace = None - - if hispace and options['mpi']: - processed.append(c.rebuild(exprs=[], - ispace=hispace, - halo_scheme=halo_scheme)) - processed.append(c.rebuild(halo_scheme=None)) - else: - processed.append(c.rebuild(halo_scheme=halo_scheme)) + processed.append(c.rebuild(halo_scheme=halo_scheme)) # Sanity check! try: @@ -262,10 +229,6 @@ def needs_nodehalo(d, hs): return d and hs and d._defines.intersection(hs.distributed_aindices) -def needs_nodehalo_dim(d, hs): - return d and hs and d._defines.intersection(hs.loc_indices) - - def reuse_section(candidate, section): try: if not section or candidate.siblings[-1] is not section: From 7544b89f8008417f8d77ad84733ebaef152503b4 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Mon, 20 Nov 2023 09:05:13 +0000 Subject: [PATCH 4/5] compiler: Revamp HaloScheme lowering --- devito/ir/clusters/algorithms.py | 59 ++++++++++++++++---------------- devito/ir/iet/algorithms.py | 5 ++- devito/ir/stree/algorithms.py | 34 +++++++++++++++--- devito/ir/support/space.py | 13 +++++++ devito/mpi/halo_scheme.py | 16 +++++---- devito/passes/iet/mpi.py | 7 ++-- tests/test_mpi.py | 18 +++++++--- 7 files changed, 105 insertions(+), 47 deletions(-) diff --git a/devito/ir/clusters/algorithms.py b/devito/ir/clusters/algorithms.py index 91a145d966..075f7275bd 100644 --- a/devito/ir/clusters/algorithms.py +++ b/devito/ir/clusters/algorithms.py @@ -374,54 +374,53 @@ class Communications(Queue): B = Symbol(name='⊥') - @timed_pass(name='schedule') + @timed_pass(name='communications') def process(self, clusters): return self._process_fatd(clusters, 1, seen=set()) def callback(self, clusters, prefix, seen=None): - if seen.issuperset(clusters): + if not prefix: return clusters d = prefix[-1].dim - # Construct the mock exprs representing the halo accesses - exprs = [] + # Construct a representation of the halo accesses + processed = [] for c in clusters: - if c.properties.is_sequential(d): + if c.properties.is_sequential(d) or \ + c in seen: continue - halo_scheme = HaloScheme(c.exprs, c.ispace) + hs = HaloScheme(c.exprs, c.ispace) + if hs.is_void or \ + not d._defines & hs.distributed_aindices: + continue - if not halo_scheme.is_void and \ - c.properties.is_parallel_relaxed(d): - points = set() - for f in halo_scheme.fmapper: - for a in c.scope.getreads(f): - points.add(a.access) + points = set() + for f in hs.fmapper: + for a in c.scope.getreads(f): + points.add(a.access) - # We also add all written symbols to ultimately create mock WARs - # with `c`, which will prevent the newly created HaloTouch to ever - # be rescheduled after `c` upon topological sorting - points.update(a.access for a in c.scope.accesses if a.is_write) + # We also add all written symbols to ultimately create mock WARs + # with `c`, which will prevent the newly created HaloTouch to ever + # be rescheduled after `c` upon topological sorting + points.update(a.access for a in c.scope.accesses if a.is_write) - # Sort for determinism - # NOTE: not sorting might impact code generation. The order of - # the args is important because that's what search functions honor! - points = sorted(points, key=str) + # Sort for determinism + # NOTE: not sorting might impact code generation. The order of + # the args is important because that's what search functions honor! + points = sorted(points, key=str) - rhs = HaloTouch(*points, halo_scheme=halo_scheme) + # Construct the HaloTouch Cluster + expr = Eq(self.B, HaloTouch(*points, halo_scheme=hs)) - # Insert only if not redundant, to avoid useless pollution - if not any(rhs == e.rhs for e in exprs): - exprs.append(Eq(self.B, rhs)) + key = lambda i: i in prefix[:-1] or i in hs.loc_indices + ispace = c.ispace.project(key) - processed = [] - if exprs: - ispace = prefix[:prefix.index(d)] - properties = prefix.properties.drop(d) + halo_touch = c.rebuild(exprs=expr, ispace=ispace) - processed.append(Cluster(exprs, ispace, c.guards, properties)) - seen.update(clusters) + processed.append(halo_touch) + seen.update({halo_touch, c}) processed.extend(clusters) diff --git a/devito/ir/iet/algorithms.py b/devito/ir/iet/algorithms.py index 59f7237fca..ec6381698e 100644 --- a/devito/ir/iet/algorithms.py +++ b/devito/ir/iet/algorithms.py @@ -40,7 +40,10 @@ def iet_build(stree): nsections += 1 elif i.is_Halo: - body = HaloSpot(queues.pop(i), i.halo_scheme) + try: + body = HaloSpot(queues.pop(i), i.halo_scheme) + except KeyError: + body = HaloSpot(None, i.halo_scheme) elif i.is_Sync: body = SyncSpot(i.sync_ops, body=queues.pop(i, None)) diff --git a/devito/ir/stree/algorithms.py b/devito/ir/stree/algorithms.py index f40288946f..19be9880d1 100644 --- a/devito/ir/stree/algorithms.py +++ b/devito/ir/stree/algorithms.py @@ -9,7 +9,7 @@ from devito.ir.support import (SEQUENTIAL, Any, Interval, IterationInterval, IterationSpace, normalize_properties, normalize_syncs) from devito.mpi.halo_scheme import HaloScheme -from devito.tools import Bunch, DefaultOrderedDict +from devito.tools import Bunch, DefaultOrderedDict, as_mapper __all__ = ['stree_build'] @@ -85,6 +85,10 @@ def stree_build(clusters, profiler=None, **kwargs): if needs_nodehalo(it.dim, c.halo_scheme): v.bottom.parent = NodeHalo(c.halo_scheme, v.bottom.parent) break + else: + if c.halo_scheme: + assert not c.exprs # See preprocess() -- we rarely end up here! + tip = NodeHalo(c.halo_scheme, v.bottom) # Add in NodeExprs exprs = [] @@ -150,11 +154,14 @@ def preprocess(clusters, options=None, **kwargs): for c in clusters: if c.is_halo_touch: hs = HaloScheme.union(e.rhs.halo_scheme for e in c.exprs) - queue.append(c.rebuild(halo_scheme=hs)) + queue.append(c.rebuild(exprs=[], halo_scheme=hs)) + elif c.is_critical_region and c.syncs: processed.append(c.rebuild(exprs=None, guards=c.guards, syncs=c.syncs)) + elif c.is_wild: continue + else: dims = set(c.ispace.promote(lambda d: d.is_Block).itdims) @@ -181,8 +188,27 @@ def preprocess(clusters, options=None, **kwargs): ispace = c.ispace.project(syncs) processed.append(c.rebuild(exprs=[], ispace=ispace, syncs=syncs)) - halo_scheme = HaloScheme.union([c1.halo_scheme for c1 in found]) - processed.append(c.rebuild(halo_scheme=halo_scheme)) + if all(c1.ispace.is_subset(c.ispace) for c1 in found): + # 99% of the cases we end up here + hs = HaloScheme.union([c1.halo_scheme for c1 in found]) + processed.append(c.rebuild(halo_scheme=hs)) + else: + # We end up here with e.g. `t,x,y,z,f` where `f` is a sequential + # dimension requiring a loc-index in the HaloScheme. The compiler + # will generate the non-perfect loop nest `t,f ; t,x,y,z,f`, with + # the first nest triggering all necessary halo exchanges along `f` + if not options['mpi']: + # Avoid ugly empty loops + processed.append(c) + else: + mapper = as_mapper(found, lambda c1: c1.ispace) + for ispace, v in mapper.items(): + hs = HaloScheme.union([c1.halo_scheme for c1 in v]) + processed.append( + c.rebuild(exprs=[], ispace=ispace, halo_scheme=hs) + ) + + processed.append(c) # Sanity check! try: diff --git a/devito/ir/support/space.py b/devito/ir/support/space.py index dc25bc54f5..bb8c3c7823 100644 --- a/devito/ir/support/space.py +++ b/devito/ir/support/space.py @@ -966,6 +966,19 @@ def reorder(self, relations=None, mode=None): return IterationSpace(intervals, self.sub_iterators, self.directions) + def is_subset(self, other): + """ + True if `self` is included within `other`, False otherwise. + """ + if not self: + return True + + d = self[-1].dim + try: + return self == other[:other.index(d) + 1] + except ValueError: + return False + def is_compatible(self, other): """ A relaxed version of ``__eq__``, in which only non-derived dimensions diff --git a/devito/mpi/halo_scheme.py b/devito/mpi/halo_scheme.py index 4a5a8b26fc..549995f3e9 100644 --- a/devito/mpi/halo_scheme.py +++ b/devito/mpi/halo_scheme.py @@ -107,7 +107,7 @@ def __len__(self): return len(self._mapper) def __hash__(self): - return (self._mapper.__hash__(), self.honored.__hash__()) + return hash((self._mapper.__hash__(), self.honored.__hash__())) @classmethod def build(cls, fmapper, honored): @@ -365,10 +365,6 @@ def distributed_aindices(self): def loc_indices(self): return set().union(*[i.loc_indices.keys() for i in self.fmapper.values()]) - @cached_property - def loc_dirs(self): - return set().union(*[i.loc_dirs.keys() for i in self.fmapper.values()]) - @cached_property def arguments(self): return self.dimensions | set(flatten(self.honored.values())) @@ -586,13 +582,21 @@ def _sympystr(self, printer): return str(self) def __hash__(self): - return id(self) + return hash(self.halo_scheme) def __eq__(self, other): return isinstance(other, HaloTouch) and self.halo_scheme == other.halo_scheme func = Reconstructable._rebuild + @property + def fmapper(self): + return self.halo_scheme.fmapper + + @property + def dims(self): + return frozenset().union(*[v.dims for v in self.fmapper.values()]) + def _uxreplace_dispatch_haloscheme(hs0, rule): changed = False diff --git a/devito/passes/iet/mpi.py b/devito/passes/iet/mpi.py index 3436736711..a5b97b37ff 100644 --- a/devito/passes/iet/mpi.py +++ b/devito/passes/iet/mpi.py @@ -254,9 +254,10 @@ def _mark_overlappable(iet): found = [] for hs in FindNodes(HaloSpot).visit(iet): expressions = FindNodes(Expression).visit(hs) - scope = Scope([i.expr for i in expressions]) + if not expressions: + continue - test = True + scope = Scope([i.expr for i in expressions]) # Comp/comm overlaps is legal only if the OWNED regions can grow # arbitrarly, which means all of the dependences must be carried @@ -273,6 +274,8 @@ def _mark_overlappable(iet): # f[x, y] = ... test = False break + else: + test = True # Heuristic: avoid comp/comm overlap for sparse Iteration nests if test: diff --git a/tests/test_mpi.py b/tests/test_mpi.py index 9d5d09328d..1bc5330889 100644 --- a/tests/test_mpi.py +++ b/tests/test_mpi.py @@ -602,7 +602,7 @@ def test_precomputed_sparse(self, r): Operator(sf1.interpolate(u))() assert np.all(sf1.data == 4) - @pytest.mark.parallel(mode=1) + @pytest.mark.parallel(mode=4) def test_sparse_first(self): """ Tests custom sprase function with sparse dimension as first index. @@ -633,12 +633,14 @@ class SparseFirst(SparseFunction): rec = s.interpolate(expr=s+fs, implicit_dims=grid.stepping_dim) op = Operator(eqs + rec) + # Check generated code -- expected one halo exchange + assert len(FindNodes(Call).visit(op)) == 1 + op(time_M=10) - print expected = 10*11/2 # n (n+1)/2 assert np.allclose(s.data, expected) - @pytest.mark.parallel(mode=4) + @pytest.mark.parallel(mode=[(4, 'diag2')]) def test_no_grid_dim_slow(self): shape = (12, 13, 14) nfreq = 5 @@ -661,10 +663,14 @@ class CoordSlowSparseFunction(SparseFunction): rec_eq = s.interpolate(expr=u) op = Operator([Eq(u, 1)] + rec_eq) + + # Check generated code -- expected one halo exchange + assert len(FindNodes(Call).visit(op)) == 1 + op.apply() assert np.all(s.data == 1) - @pytest.mark.parallel(mode=1) + @pytest.mark.parallel(mode=4) def test_no_grid_dim_slow_time(self): shape = (12, 13, 14) nfreq = 5 @@ -687,6 +693,10 @@ class CoordSlowSparseFunction(SparseTimeFunction): rec_eq = s.interpolate(expr=u) op = Operator([Eq(u, 1)] + rec_eq) + + # Check generated code -- expected one halo exchange + assert len(FindNodes(Call).visit(op)) == 1 + op.apply(time_M=5) assert np.all(s.data == 1) From d6d8a6e349df66f8e038d18068b048c85e90d378 Mon Sep 17 00:00:00 2001 From: Fabio Luporini Date: Tue, 21 Nov 2023 13:51:20 +0000 Subject: [PATCH 5/5] compiler: Refactor stree.preprocess --- devito/ir/stree/algorithms.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/devito/ir/stree/algorithms.py b/devito/ir/stree/algorithms.py index 19be9880d1..99113d9bb8 100644 --- a/devito/ir/stree/algorithms.py +++ b/devito/ir/stree/algorithms.py @@ -192,23 +192,19 @@ def preprocess(clusters, options=None, **kwargs): # 99% of the cases we end up here hs = HaloScheme.union([c1.halo_scheme for c1 in found]) processed.append(c.rebuild(halo_scheme=hs)) - else: + elif options['mpi']: # We end up here with e.g. `t,x,y,z,f` where `f` is a sequential # dimension requiring a loc-index in the HaloScheme. The compiler # will generate the non-perfect loop nest `t,f ; t,x,y,z,f`, with # the first nest triggering all necessary halo exchanges along `f` - if not options['mpi']: - # Avoid ugly empty loops - processed.append(c) - else: - mapper = as_mapper(found, lambda c1: c1.ispace) - for ispace, v in mapper.items(): - hs = HaloScheme.union([c1.halo_scheme for c1 in v]) - processed.append( - c.rebuild(exprs=[], ispace=ispace, halo_scheme=hs) - ) - - processed.append(c) + mapper = as_mapper(found, lambda c1: c1.ispace) + for k, v in mapper.items(): + hs = HaloScheme.union([c1.halo_scheme for c1 in v]) + processed.append(c.rebuild(exprs=[], ispace=k, halo_scheme=hs)) + processed.append(c) + else: + # Avoid ugly empty loops + processed.append(c) # Sanity check! try: