diff --git a/.github/workflows/examples-mpi.yml b/.github/workflows/examples-mpi.yml index 76048d1862..3bbebabb1c 100644 --- a/.github/workflows/examples-mpi.yml +++ b/.github/workflows/examples-mpi.yml @@ -65,12 +65,11 @@ jobs: python3 scripts/clear_devito_cache.py - name: Test mpi notebooks - continue-on-error: true run : | ./scripts/create_ipyparallel_mpi_profile.sh ipcluster start --profile=mpi --engines=mpi -n 4 --daemonize # A few seconds to ensure workers are ready - sleep 20 + sleep 10 py.test --nbval examples/mpi ipcluster stop --profile=mpi diff --git a/FAQ.md b/FAQ.md index 345efb2167..96cfb680dc 100644 --- a/FAQ.md +++ b/FAQ.md @@ -315,6 +315,12 @@ _DevitoPRO only._ Requires: `PLATFORM=amdgpuX` and `ARCH=hip`. +#### LANGUAGE=sycl + +_DevitoPRO only._ + +Requires: `PLATFORM=intelgpuX` or `PLATFORM=intel64`, and `ARCH=sycl`. + [top](#Frequently-Asked-Questions) ## How do you run the unit tests from the command line diff --git a/devito/passes/clusters/aliases.py b/devito/passes/clusters/aliases.py index 8b7f610564..ca5343c9ac 100644 --- a/devito/passes/clusters/aliases.py +++ b/devito/passes/clusters/aliases.py @@ -958,15 +958,58 @@ def pick_best(variants): flops = flops0 + flops1 - # Data movement in the two sweeps - indexeds0 = search([sa.pivot for sa in i.schedule], Indexed) + # Estimate the data movement in the two sweeps + + # With cross-loop blocking, a Function appearing in both sweeps is + # much more likely to be in cache during the second sweep, hence + # we count it only once + functions0 = set() + functions1 = set() + for sa in i.schedule: + indexeds0 = search(sa.pivot, Indexed) + + if any(d.is_Block for d in sa.ispace.itdims): + functions1.update({i.function for i in indexeds0}) + else: + functions0.update({i.function for i in indexeds0}) + indexeds1 = search(i.exprs, Indexed) + functions1.update({i.function for i in indexeds1}) + + nfunctions0 = len(functions0) + nfunctions1 = len(functions1) + + # All temporaries impact data movement, but some kind of temporaries + # are more likely to be in cache than others, so they are given a + # lighter weight + for ii in indexeds1: + grid = ii.function.grid + if grid is None: + continue - ntemps = len(i.schedule) - nfunctions0 = len({i.function for i in indexeds0}) - nfunctions1 = len({i.function for i in indexeds1}) + ntemps = 0 + for sa in i.schedule: + if len(sa.writeto) < grid.dim: + # Tiny temporary, extremely likely to be in cache, hardly + # impacting data movement in a significant way + ntemps += 0.1 + elif any(d.is_Block for d in sa.writeto.itdims): + # Cross-loop blocking temporary, likely to be in some level + # of cache (but unlikely to be in the fastest level) + ntemps += 1 + else: + # Grid-size temporary, likely _not_ to be in cache, and + # therefore requiring at least two costly accesses per + # grid point + ntemps += 2 + + ntemps = int(ntemps) + + break + else: + ntemps = len(i.schedule) - ws = ntemps*2 + nfunctions0 + nfunctions1 + ws = ntemps + nfunctions0 + nfunctions1 if best is None: best, best_flops, best_ws = i, flops, ws diff --git a/devito/types/sparse.py b/devito/types/sparse.py index a32eeca588..3e65800fbf 100644 --- a/devito/types/sparse.py +++ b/devito/types/sparse.py @@ -132,7 +132,26 @@ def __distributor_setup__(self, **kwargs): return distributor - def __subfunc_setup__(self, key, suffix, dtype=None): + def __subfunc_setup__(self, suffix, keys, dtype=None, inkwargs=False, **kwargs): + key = None + for k in keys: + if k not in kwargs: + continue + elif kwargs[k] is None: + # In cases such as rebuild, + # the subfunction may be passed explicitly as None + return None + else: + key = kwargs[k] + break + else: + if inkwargs: + # Only create the subfunction if provided. This is useful + # with PrecomputedSparseFunctions that can have different subfunctions + # to skip creating extra if another one has already + # been provided + return None + # Shape and dimensions from args name = '%s_%s' % (self.name, suffix) @@ -603,11 +622,15 @@ def _dist_subfunc_gather(self, sfuncd, subfunc): # in `_dist_scatter` is here received; a sparse point that is received in # `_dist_scatter` is here sent. - def _dist_scatter(self, data=None): + def _dist_scatter(self, alias=None, data=None): + key = alias or self mapper = {self: self._dist_data_scatter(data=data)} for i in self._sub_functions: - if getattr(self, i) is not None: - mapper.update(self._dist_subfunc_scatter(getattr(self, i))) + if getattr(key, i) is not None: + # Pick up alias' in case runtime SparseFunctions is missing + # a subfunction + sf = getattr(self, i) or getattr(key, i) + mapper.update(self._dist_subfunc_scatter(sf)) return mapper def _eval_at(self, func): @@ -629,7 +652,7 @@ def _arg_defaults(self, alias=None): # Add in the sparse data (as well as any SubFunction data) belonging to # self's local domain only - for k, v in self._dist_scatter().items(): + for k, v in self._dist_scatter(alias=alias).items(): args[mapper[k].name] = v for i, s in zip(mapper[k].indices, v.shape): args.update(i._arg_defaults(_min=0, size=s)) @@ -647,7 +670,7 @@ def _arg_values(self, **kwargs): else: # We've been provided a pure-data replacement (array) values = {} - for k, v in self._dist_scatter(new).items(): + for k, v in self._dist_scatter(data=new).items(): values[k.name] = v for i, s in zip(k.indices, v.shape): size = s - sum(k._size_nodomain[i]) @@ -844,8 +867,8 @@ def __init_finalize__(self, *args, **kwargs): super().__init_finalize__(*args, **kwargs) # Set up sparse point coordinates - coordinates = kwargs.get('coordinates', kwargs.get('coordinates_data')) - self._coordinates = self.__subfunc_setup__(coordinates, 'coords') + keys = ('coordinates', 'coordinates_data') + self._coordinates = self.__subfunc_setup__('coords', keys, **kwargs) self._dist_origin = {self._coordinates: self.grid.origin_offset} def __interp_setup__(self, interpolation='linear', r=None, **kwargs): @@ -1096,11 +1119,33 @@ class PrecomputedSparseFunction(AbstractSparseFunction): def __init_finalize__(self, *args, **kwargs): super().__init_finalize__(*args, **kwargs) - # Process kwargs - coordinates = kwargs.get('coordinates', kwargs.get('coordinates_data')) - gridpoints = kwargs.get('gridpoints', kwargs.get('gridpoints_data')) - interpolation_coeffs = kwargs.get('interpolation_coeffs', - kwargs.get('interpolation_coeffs_data')) + if not any(k in kwargs for k in ('coordinates', 'gridpoints', + 'coordinates_data', 'gridpoints_data')): + raise ValueError("PrecomputedSparseFunction requires `coordinates`" + "or `gridpoints` arguments") + + # Subfunctions setup + self._dist_origin = {} + dtype = kwargs.pop('dtype', self.grid.dtype) + self._gridpoints = self.__subfunc_setup__('gridpoints', + ('gridpoints', 'gridpoints_data'), + inkwargs=True, + dtype=np.int32, **kwargs) + self._coordinates = self.__subfunc_setup__('coords', + ('coordinates', 'coordinates_data'), + inkwargs=self._gridpoints is not None, + dtype=dtype, **kwargs) + + if self._coordinates is not None: + self._dist_origin.update({self._coordinates: self.grid.origin_offset}) + if self._gridpoints is not None: + self._dist_origin.update({self._gridpoints: self.grid.origin_ioffset}) + + # Setup the interpolation coefficients. These are compulsory + ckeys = ('interpolation_coeffs', 'interpolation_coeffs_data') + self._interpolation_coeffs = \ + self.__subfunc_setup__('interp_coeffs', ckeys, dtype=dtype, **kwargs) + # Grid points per sparse point (2 in the case of bilinear and trilinear) r = kwargs.get('r') if not is_integer(r): @@ -1108,8 +1153,8 @@ def __init_finalize__(self, *args, **kwargs): if r <= 0: raise ValueError('`r` must be > 0') # Make sure radius matches the coefficients size - if interpolation_coeffs is not None: - nr = interpolation_coeffs.shape[-1] + if any(c in kwargs for c in ckeys) and self._interpolation_coeffs is not None: + nr = self._interpolation_coeffs.shape[-1] if nr // 2 != r: if nr == r: r = r // 2 @@ -1117,31 +1162,6 @@ def __init_finalize__(self, *args, **kwargs): raise ValueError("Interpolation coefficients shape %d do " "not match specified radius %d" % (r, nr)) self._radius = r - - if coordinates is not None and gridpoints is not None: - raise ValueError("Either `coordinates` or `gridpoints` must be " - "provided, but not both") - - # Specifying only `npoints` is acceptable; this will require the user - # to setup the coordinates data later on - npoint = kwargs.get('npoint', None) - if self.npoint and coordinates is None and gridpoints is None: - coordinates = np.zeros((npoint, self.grid.dim)) - - if coordinates is not None: - self._coordinates = self.__subfunc_setup__(coordinates, 'coords') - self._gridpoints = None - self._dist_origin = {self._coordinates: self.grid.origin_offset} - else: - assert gridpoints is not None - self._coordinates = None - self._gridpoints = self.__subfunc_setup__(gridpoints, 'gridpoints', - dtype=np.int32) - self._dist_origin = {self._gridpoints: self.grid.origin_ioffset} - - # Setup the interpolation coefficients. These are compulsory - self._interpolation_coeffs = \ - self.__subfunc_setup__(interpolation_coeffs, 'interp_coeffs') self._dist_origin.update({self._interpolation_coeffs: None}) self.interpolator = PrecomputedInterpolator(self) @@ -2135,7 +2155,7 @@ def manual_scatter(self, *, data_all_zero=False): **self._build_par_dim_to_nnz(scattered_gp, active_mrow), } - def _dist_scatter(self, data=None): + def _dist_scatter(self, alias=None, data=None): assert data is None if self.scatter_result is None: raise Exception("_dist_scatter called before manual_scatter called") diff --git a/examples/mpi/overview.ipynb b/examples/mpi/overview.ipynb index ff03eacaad..256a0e09ec 100644 --- a/examples/mpi/overview.ipynb +++ b/examples/mpi/overview.ipynb @@ -245,17 +245,7 @@ "output_type": "stream", "text": [ "[stderr:0] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n", - "[stderr:1] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n", - "[stderr:2] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n", - "[stderr:3] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n" + "Operator `Kernel` ran in 0.01 s\n" ] } ], @@ -447,10 +437,10 @@ " double section0;\n", "} ;\n", "\n", - "static void gather0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy);\n", - "static void scatter0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy);\n", "static void sendrecv0(struct dataobj *restrict u_vec, const int x_size, const int y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm);\n", "static void haloupdate0(struct dataobj *restrict u_vec, MPI_Comm comm, struct neighborhood * nb, int otime);\n", + "static void gather0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy);\n", + "static void scatter0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy);\n", "\n", "int Kernel(struct dataobj *restrict u_vec, const float h_x, const int time_M, const int time_m, const int x_M, const int x_m, const int y_M, const int y_m, MPI_Comm comm, struct neighborhood * nb, struct profiler * timers)\n", "{\n", @@ -471,7 +461,7 @@ " #pragma omp simd aligned(u:64)\n", " for (int y = y_m; y <= y_M; y += 1)\n", " {\n", - " u[t1][x + 2][y + 2] = r0*(-u[t0][x + 2][y + 2]) + r0*u[t0][x + 3][y + 2] + 1;\n", + " u[t1][x + 2][y + 2] = -r0*u[t0][x + 2][y + 2] + r0*u[t0][x + 3][y + 2] + 1;\n", " }\n", " }\n", " STOP(section0,timers)\n", @@ -480,6 +470,38 @@ " return 0;\n", "}\n", "\n", + "static void sendrecv0(struct dataobj *restrict u_vec, const int x_size, const int y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm)\n", + "{\n", + " MPI_Request rrecv;\n", + " MPI_Request rsend;\n", + "\n", + " float *restrict bufg_vec __attribute__ ((aligned (64)));\n", + " posix_memalign((void**)(&bufg_vec),64,x_size*y_size*sizeof(float));\n", + " float *restrict bufs_vec __attribute__ ((aligned (64)));\n", + " posix_memalign((void**)(&bufs_vec),64,x_size*y_size*sizeof(float));\n", + "\n", + " MPI_Irecv(bufs_vec,x_size*y_size,MPI_FLOAT,fromrank,13,comm,&(rrecv));\n", + " if (torank != MPI_PROC_NULL)\n", + " {\n", + " gather0(bufg_vec,x_size,y_size,u_vec,ogtime,ogx,ogy);\n", + " }\n", + " MPI_Isend(bufg_vec,x_size*y_size,MPI_FLOAT,torank,13,comm,&(rsend));\n", + " MPI_Wait(&(rsend),MPI_STATUS_IGNORE);\n", + " MPI_Wait(&(rrecv),MPI_STATUS_IGNORE);\n", + " if (fromrank != MPI_PROC_NULL)\n", + " {\n", + " scatter0(bufs_vec,x_size,y_size,u_vec,ostime,osx,osy);\n", + " }\n", + "\n", + " free(bufg_vec);\n", + " free(bufs_vec);\n", + "}\n", + "\n", + "static void haloupdate0(struct dataobj *restrict u_vec, MPI_Comm comm, struct neighborhood * nb, int otime)\n", + "{\n", + " sendrecv0(u_vec,u_vec->hsize[3],u_vec->npsize[2],otime,u_vec->oofs[2],u_vec->hofs[4],otime,u_vec->hofs[3],u_vec->hofs[4],nb->rc,nb->lc,comm);\n", + "}\n", + "\n", "static void gather0(float *restrict buf_vec, int bx_size, int by_size, struct dataobj *restrict u_vec, const int otime, const int ox, const int oy)\n", "{\n", " float (*restrict buf)[bx_size][by_size] __attribute__ ((aligned (64))) = (float (*)[bx_size][by_size]) buf_vec;\n", @@ -489,6 +511,7 @@ " const int y_m = 0;\n", " const int x_M = bx_size - 1;\n", " const int y_M = by_size - 1;\n", + "\n", " for (int x = x_m; x <= x_M; x += 1)\n", " {\n", " #pragma omp simd aligned(u:64)\n", @@ -518,38 +541,6 @@ " }\n", " }\n", "}\n", - "\n", - "static void sendrecv0(struct dataobj *restrict u_vec, const int x_size, const int y_size, int ogtime, int ogx, int ogy, int ostime, int osx, int osy, int fromrank, int torank, MPI_Comm comm)\n", - "{\n", - " float *restrict bufg_vec __attribute__ ((aligned (64)));\n", - " posix_memalign((void**)(&bufg_vec),64,x_size*y_size*sizeof(float));\n", - " float *restrict bufs_vec __attribute__ ((aligned (64)));\n", - " posix_memalign((void**)(&bufs_vec),64,x_size*y_size*sizeof(float));\n", - "\n", - " MPI_Request rrecv;\n", - " MPI_Request rsend;\n", - "\n", - " MPI_Irecv(bufs_vec,x_size*y_size,MPI_FLOAT,fromrank,13,comm,&(rrecv));\n", - " if (torank != MPI_PROC_NULL)\n", - " {\n", - " gather0(bufg_vec,x_size,y_size,u_vec,ogtime,ogx,ogy);\n", - " }\n", - " MPI_Isend(bufg_vec,x_size*y_size,MPI_FLOAT,torank,13,comm,&(rsend));\n", - " MPI_Wait(&(rsend),MPI_STATUS_IGNORE);\n", - " MPI_Wait(&(rrecv),MPI_STATUS_IGNORE);\n", - " if (fromrank != MPI_PROC_NULL)\n", - " {\n", - " scatter0(bufs_vec,x_size,y_size,u_vec,ostime,osx,osy);\n", - " }\n", - "\n", - " free(bufg_vec);\n", - " free(bufs_vec);\n", - "}\n", - "\n", - "static void haloupdate0(struct dataobj *restrict u_vec, MPI_Comm comm, struct neighborhood * nb, int otime)\n", - "{\n", - " sendrecv0(u_vec,u_vec->hsize[3],u_vec->npsize[2],otime,u_vec->oofs[2],u_vec->hofs[4],otime,u_vec->hofs[3],u_vec->hofs[4],nb->rc,nb->lc,comm);\n", - "}\n", "\n" ] }, @@ -789,17 +780,7 @@ "output_type": "stream", "text": [ "[stderr:0] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n", - "[stderr:1] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n", - "[stderr:2] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n", - "[stderr:3] \n", - "Operator `Kernel` ran in 0.01 s\n", - "INFO:Devito:Operator `Kernel` ran in 0.01 s\n" + "Operator `Kernel` ran in 0.01 s\n" ] } ], diff --git a/requirements-mpi.txt b/requirements-mpi.txt index 98557789c5..2c03d50a16 100644 --- a/requirements-mpi.txt +++ b/requirements-mpi.txt @@ -1,2 +1,2 @@ mpi4py<4.0.2 -ipyparallel<8.9 \ No newline at end of file +ipyparallel<9.1 \ No newline at end of file diff --git a/tests/test_dse.py b/tests/test_dse.py index 49cece3b34..33f93bf5e5 100644 --- a/tests/test_dse.py +++ b/tests/test_dse.py @@ -1915,6 +1915,70 @@ def test_tti_adjoint_akin_v2(self): # all redundancies have been detected correctly assert summary1[('section1', None)].ops == 75 + @switchconfig(profiling='advanced') + def test_tti_adjoint_akin_v3(self): + so = 8 + fd_order = 2 + + grid = Grid(shape=(20, 20, 20)) + x, y, z = grid.dimensions + + vx = TimeFunction(name="vx", grid=grid, space_order=so) + vy = TimeFunction(name="vy", grid=grid, space_order=so) + vz = TimeFunction(name="vz", grid=grid, space_order=so) + txy = TimeFunction(name="txy", grid=grid, space_order=so) + txz = TimeFunction(name="txz", grid=grid, space_order=so) + theta = Function(name='theta', grid=grid, space_order=so) + phi = Function(name='phi', grid=grid, space_order=so) + + r00 = cos(theta)*cos(phi) + r01 = cos(theta)*sin(phi) + r02 = -sin(theta) + r10 = -sin(phi) + r11 = cos(phi) + r12 = cos(theta) + r20 = sin(theta)*cos(phi) + r21 = sin(theta)*sin(phi) + r22 = cos(theta) + + def foo0(field): + return ((r00 * field).dx(x0=x+x.spacing/2) + + Derivative(r01 * field, x, deriv_order=0, fd_order=fd_order, + x0=x+x.spacing/2).dy(x0=y) + + Derivative(r02 * field, x, deriv_order=0, fd_order=fd_order, + x0=x+x.spacing/2).dz(x0=z)) + + def foo1(field): + return (Derivative(r10 * field, y, deriv_order=0, fd_order=fd_order, + x0=y+y.spacing/2).dx(x0=x) + + (r11 * field).dy(x0=y+y.spacing/2) + + Derivative(r12 * field, y, deriv_order=0, fd_order=fd_order, + x0=y+y.spacing/2).dz(x0=z)) + + def foo2(field): + return (Derivative(r20 * field, z, deriv_order=0, fd_order=fd_order, + x0=z+z.spacing/2).dx(x0=x) + + Derivative(r21 * field, z, deriv_order=0, fd_order=fd_order, + x0=z+z.spacing/2).dy(x0=y) + + (r22 * field).dz(x0=z+z.spacing/2)) + + eqns = [Eq(txz.forward, txz + foo0(vz.forward) + foo2(vx.forward)), + Eq(txy.forward, txy + foo0(vy.forward) + foo1(vx.forward))] + + op = Operator(eqns, subs=grid.spacing_map, + opt=('advanced', {'openmp': True, + 'cire-rotate': True})) + + # Check code generation + bns, _ = assert_blocking(op, {'x0_blk0'}) + xs, ys, zs = get_params(op, 'x0_blk0_size', 'y0_blk0_size', 'z_size') + arrays = [i for i in FindSymbols().visit(bns['x0_blk0']) if i.is_Array] + assert len(arrays) == 10 + assert len([i for i in arrays if i.shape == (zs,)]) == 2 + assert len([i for i in arrays if i.shape == (9, zs)]) == 2 + + assert op._profiler._sections['section1'].sops == 184 + @pytest.mark.parametrize('rotate', [False, True]) @switchconfig(profiling='advanced') def test_nested_first_derivatives(self, rotate): @@ -2153,7 +2217,7 @@ def test_multiple_rotating_dims(self): # to jit-compile `op1`. However, we also check numerical correctness op1.apply(time_m=0, time_M=nt-2, dt=dt, u=u1, vx=vx1, vy=vy1) - assert np.allclose(u.data, u1.data, rtol=1e-5) + assert np.allclose(u.data, u1.data, rtol=1e-3) def test_maxpar_option_v2(self): """ diff --git a/tests/test_rebuild.py b/tests/test_rebuild.py index 5498d819a7..79b2765a30 100644 --- a/tests/test_rebuild.py +++ b/tests/test_rebuild.py @@ -1,8 +1,8 @@ import numpy as np import pytest -from devito import Dimension, Function -from devito.types import StencilDimension +from devito import Dimension, Function, Grid +from devito.types import StencilDimension, SparseFunction, PrecomputedSparseFunction from devito.data.allocators import DataReference @@ -65,3 +65,19 @@ def test_stencil_dimension_borked(self): # TODO: Look into Symbol._cache_key and the way the key is generated assert sd0 is sd1 + + +class TestSparseFunction: + + @pytest.mark.parametrize('sfunc', [SparseFunction, PrecomputedSparseFunction]) + def test_none_subfunc(self, sfunc): + grid = Grid((4, 4)) + coords = np.zeros((5, 2)) + + s = sfunc(name='s', grid=grid, npoint=5, coordinates=coords, r=1) + + assert s.coordinates is not None + + # Explicity set coordinates to None + sr = s._rebuild(function=None, initializer=None, coordinates=None) + assert sr.coordinates is None