Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/subhasis/zfp cudastream #203

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions include/zfp.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
#include "zfp/internal/zfp/system.h"
#include "zfp/internal/zfp/types.h"

#ifdef ZFP_WITH_CUDA
#include <cuda_runtime.h>
#endif

/* macros ------------------------------------------------------------------ */

/* default compression parameters */
Expand Down Expand Up @@ -133,6 +137,11 @@ typedef struct {
size_t nx, ny, nz, nw; /* sizes (zero for unused dimensions) */
ptrdiff_t sx, sy, sz, sw; /* strides (zero for contiguous array a[nw][nz][ny][nx]) */
void* data; /* pointer to array data */

#ifdef ZFP_WITH_CUDA
cudaStream_t cuStream; /* Provision to execute in stream */
#endif

} zfp_field;

#ifdef __cplusplus
Expand Down Expand Up @@ -580,6 +589,12 @@ zfp_field_set_metadata(
uint64 meta /* compact 52-bit encoding of metadata */
);

/* Set cuda stream in case of strem execution */

#ifdef ZFP_WITH_CUDA
void zfp_field_set_cuda_stream(zfp_field* field, cudaStream_t custream);
#endif

/* high-level API: compression and decompression --------------------------- */

/* compress entire field (nonzero return value upon success) */
Expand Down
77 changes: 48 additions & 29 deletions src/cuda_zfp/cuZFP.cu
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ bool is_contigous(const uint dims[3], const int3 &stride, long long int &offset)
// encode expects device pointers
//
template<typename T>
size_t encode(uint dims[3], int3 stride, int bits_per_block, T *d_data, Word *d_stream)
size_t encode(uint dims[3], int3 stride, int bits_per_block, T *d_data, Word *d_stream, cudaStream_t custream)
{

int d = 0;
Expand All @@ -118,15 +118,15 @@ size_t encode(uint dims[3], int3 stride, int bits_per_block, T *d_data, Word *d_
{
int dim = dims[0];
int sx = stride.x;
stream_size = cuZFP::encode1<T>(dim, sx, d_data, d_stream, bits_per_block);
stream_size = cuZFP::encode1<T>(dim, sx, d_data, d_stream, bits_per_block, custream);
}
else if(d == 2)
{
uint2 ndims = make_uint2(dims[0], dims[1]);
int2 s;
s.x = stride.x;
s.y = stride.y;
stream_size = cuZFP::encode2<T>(ndims, s, d_data, d_stream, bits_per_block);
stream_size = cuZFP::encode2<T>(ndims, s, d_data, d_stream, bits_per_block, custream);
}
else if(d == 3)
{
Expand All @@ -135,7 +135,7 @@ size_t encode(uint dims[3], int3 stride, int bits_per_block, T *d_data, Word *d_
s.y = stride.y;
s.z = stride.z;
uint3 ndims = make_uint3(dims[0], dims[1], dims[2]);
stream_size = cuZFP::encode<T>(ndims, s, d_data, d_stream, bits_per_block);
stream_size = cuZFP::encode3<T>(ndims, s, d_data, d_stream, bits_per_block, custream);
}

errors.chk("Encode");
Expand All @@ -144,9 +144,11 @@ size_t encode(uint dims[3], int3 stride, int bits_per_block, T *d_data, Word *d_
}

template<typename T>
size_t decode(uint ndims[3], int3 stride, int bits_per_block, Word *stream, T *out)
size_t decode(uint ndims[3], int3 stride, int bits_per_block, Word *stream, T *out, cudaStream_t custream)
{

/* Include CUDA stream in decode call */

int d = 0;
size_t out_size = 1;
size_t stream_bytes = 0;
Expand All @@ -168,14 +170,14 @@ size_t decode(uint ndims[3], int3 stride, int bits_per_block, Word *stream, T *o
s.y = stride.y;
s.z = stride.z;

stream_bytes = cuZFP::decode3<T>(dims, s, stream, out, bits_per_block);
stream_bytes = cuZFP::decode3<T>(dims, s, stream, out, bits_per_block, custream);
}
else if(d == 1)
{
uint dim = ndims[0];
int sx = stride.x;

stream_bytes = cuZFP::decode1<T>(dim, sx, stream, out, bits_per_block);
stream_bytes = cuZFP::decode1<T>(dim, sx, stream, out, bits_per_block, custream);

}
else if(d == 2)
Expand All @@ -188,7 +190,7 @@ size_t decode(uint ndims[3], int3 stride, int bits_per_block, Word *stream, T *o
s.x = stride.x;
s.y = stride.y;

stream_bytes = cuZFP::decode2<T>(dims, s, stream, out, bits_per_block);
stream_bytes = cuZFP::decode2<T>(dims, s, stream, out, bits_per_block, custream);
}
else std::cerr<<" d == "<<d<<" not implemented\n";

Expand All @@ -207,7 +209,7 @@ Word *setup_device_stream_compress(zfp_stream *stream,const zfp_field *field)

Word *d_stream = NULL;
size_t max_size = zfp_stream_maximum_size(stream, field);
cudaMalloc(&d_stream, max_size);
cudaMallocAsync(&d_stream, max_size, field->cuStream);
return d_stream;
}

Expand All @@ -224,8 +226,11 @@ Word *setup_device_stream_decompress(zfp_stream *stream,const zfp_field *field)
Word *d_stream = NULL;
//TODO: change maximum_size to compressed stream size
size_t size = zfp_stream_maximum_size(stream, field);
cudaMalloc(&d_stream, size);
cudaMemcpy(d_stream, stream->stream->begin, size, cudaMemcpyHostToDevice);

/* Allocate memory per CUDA stream */

cudaMallocAsync(&d_stream, size, field->cuStream);
cudaMemcpyAsync(d_stream, stream->stream->begin, size, cudaMemcpyHostToDevice, field->cuStream);
return d_stream;
}

Expand Down Expand Up @@ -289,9 +294,11 @@ void *setup_device_field_compress(const zfp_field *field, const int3 &stride, lo
if(contig)
{
size_t field_bytes = type_size * field_size;
cudaMalloc(&d_data, field_bytes);
/* allocate memory async per stream */

cudaMemcpy(d_data, host_ptr, field_bytes, cudaMemcpyHostToDevice);
cudaMallocAsync(&d_data, field_bytes, field->cuStream);

cudaMemcpyAsync(d_data, host_ptr, field_bytes, cudaMemcpyHostToDevice, field->cuStream);
}
return offset_void(field->type, d_data, -offset);
}
Expand Down Expand Up @@ -328,12 +335,16 @@ void *setup_device_field_decompress(const zfp_field *field, const int3 &stride,
if(contig)
{
size_t field_bytes = type_size * field_size;
cudaMalloc(&d_data, field_bytes);
/* Allocate GPU memory per CUDA stream */

cudaMallocAsync(&d_data, field_bytes, field->cuStream);
}
return offset_void(field->type, d_data, -offset);
}

void cleanup_device_ptr(void *orig_ptr, void *d_ptr, size_t bytes, long long int offset, zfp_type type)
/* CUDA stream is assigned in the device cleanup */

void cleanup_device_ptr(void *orig_ptr, void *d_ptr, size_t bytes, long long int offset, zfp_type type, cudaStream_t custream)
{
bool device = cuZFP::is_gpu_ptr(orig_ptr);
if(device)
Expand All @@ -346,10 +357,10 @@ void cleanup_device_ptr(void *orig_ptr, void *d_ptr, size_t bytes, long long int

if(bytes > 0)
{
cudaMemcpy(h_offset_ptr, d_offset_ptr, bytes, cudaMemcpyDeviceToHost);
cudaMemcpyAsync(h_offset_ptr, d_offset_ptr, bytes, cudaMemcpyDeviceToHost, custream);
}

cudaFree(d_offset_ptr);
cudaFreeAsync(d_offset_ptr, custream);
}

} // namespace internal
Expand All @@ -366,6 +377,10 @@ cuda_compress(zfp_stream *stream, const zfp_field *field)
stride.x = field->sx ? field->sx : 1;
stride.y = field->sy ? field->sy : field->nx;
stride.z = field->sz ? field->sz : field->nx * field->ny;

/* CUDA stream implementation */

cudaStream_t cudastream = field->cuStream;

size_t stream_bytes = 0;
long long int offset = 0;
Expand All @@ -382,26 +397,26 @@ cuda_compress(zfp_stream *stream, const zfp_field *field)
if(field->type == zfp_type_float)
{
float* data = (float*) d_data;
stream_bytes = internal::encode<float>(dims, stride, (int)stream->maxbits, data, d_stream);
stream_bytes = internal::encode<float>(dims, stride, (int)stream->maxbits, data, d_stream, cudastream);
}
else if(field->type == zfp_type_double)
{
double* data = (double*) d_data;
stream_bytes = internal::encode<double>(dims, stride, (int)stream->maxbits, data, d_stream);
stream_bytes = internal::encode<double>(dims, stride, (int)stream->maxbits, data, d_stream, cudastream);
}
else if(field->type == zfp_type_int32)
{
int * data = (int*) d_data;
stream_bytes = internal::encode<int>(dims, stride, (int)stream->maxbits, data, d_stream);
stream_bytes = internal::encode<int>(dims, stride, (int)stream->maxbits, data, d_stream, cudastream);
}
else if(field->type == zfp_type_int64)
{
long long int * data = (long long int*) d_data;
stream_bytes = internal::encode<long long int>(dims, stride, (int)stream->maxbits, data, d_stream);
stream_bytes = internal::encode<long long int>(dims, stride, (int)stream->maxbits, data, d_stream, cudastream);
}

internal::cleanup_device_ptr(stream->stream->begin, d_stream, stream_bytes, 0, field->type);
internal::cleanup_device_ptr(field->data, d_data, 0, offset, field->type);
internal::cleanup_device_ptr(stream->stream->begin, d_stream, stream_bytes, 0, field->type, cudastream);
internal::cleanup_device_ptr(field->data, d_data, 0, offset, field->type, cudastream);

// zfp wants to flush the stream.
// set bits to wsize because we already did that.
Expand Down Expand Up @@ -436,30 +451,34 @@ cuda_decompress(zfp_stream *stream, zfp_field *field)
return;
}

/* Include CUDA Stream */

cudaStream_t cudastream = field->cuStream;

Word *d_stream = internal::setup_device_stream_decompress(stream, field);

if(field->type == zfp_type_float)
{
float *data = (float*) d_data;
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data);
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data, cudastream);
d_data = (void*) data;
}
else if(field->type == zfp_type_double)
{
double *data = (double*) d_data;
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data);
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data, cudastream);
d_data = (void*) data;
}
else if(field->type == zfp_type_int32)
{
int *data = (int*) d_data;
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data);
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data, cudastream);
d_data = (void*) data;
}
else if(field->type == zfp_type_int64)
{
long long int *data = (long long int*) d_data;
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data);
decoded_bytes = internal::decode(dims, stride, (int)stream->maxbits, d_stream, data, cudastream);
d_data = (void*) data;
}
else
Expand All @@ -480,8 +499,8 @@ cuda_decompress(zfp_stream *stream, zfp_field *field)
}

size_t bytes = type_size * field_size;
internal::cleanup_device_ptr(stream->stream->begin, d_stream, 0, 0, field->type);
internal::cleanup_device_ptr(field->data, d_data, bytes, offset, field->type);
internal::cleanup_device_ptr(stream->stream->begin, d_stream, 0, 0, field->type, cudastream);
internal::cleanup_device_ptr(field->data, d_data, bytes, offset, field->type, cudastream);

// this is how zfp determines if this was a success
size_t words_read = decoded_bytes / sizeof(Word);
Expand Down
16 changes: 9 additions & 7 deletions src/cuda_zfp/decode1.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ size_t decode1launch(uint dim,
int stride,
Word *stream,
Scalar *d_data,
uint maxbits)
uint maxbits,
cudaStream_t custream)
{
const int cuda_block_size = 128;

Expand Down Expand Up @@ -110,10 +111,10 @@ size_t decode1launch(uint dim,
cudaEventCreate(&start);
cudaEventCreate(&stop);

cudaEventRecord(start);
cudaEventRecord(start, custream);
#endif

cudaDecode1<Scalar> << < grid_size, block_size >> >
cudaDecode1<Scalar> << < grid_size, block_size, 0, custream >> >
(stream,
d_data,
dim,
Expand All @@ -123,9 +124,9 @@ size_t decode1launch(uint dim,
maxbits);

#ifdef CUDA_ZFP_RATE_PRINT
cudaEventRecord(stop);
cudaEventRecord(stop, custream);
cudaEventSynchronize(stop);
cudaStreamSynchronize(0);
cudaStreamSynchronize(custream);

float milliseconds = 0;
cudaEventElapsedTime(&milliseconds, start, stop);
Expand All @@ -145,9 +146,10 @@ size_t decode1(int dim,
int stride,
Word *stream,
Scalar *d_data,
uint maxbits)
uint maxbits,
cudaStream_t custream)
{
return decode1launch<Scalar>(dim, stride, stream, d_data, maxbits);
return decode1launch<Scalar>(dim, stride, stream, d_data, maxbits, custream);
}

} // namespace cuZFP
Expand Down
16 changes: 9 additions & 7 deletions src/cuda_zfp/decode2.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ size_t decode2launch(uint2 dims,
int2 stride,
Word *stream,
Scalar *d_data,
uint maxbits)
uint maxbits,
cudaStream_t custream)
{
const int cuda_block_size = 128;
dim3 block_size;
Expand Down Expand Up @@ -134,10 +135,10 @@ size_t decode2launch(uint2 dims,
cudaEvent_t start, stop;
cudaEventCreate(&start);
cudaEventCreate(&stop);
cudaEventRecord(start);
cudaEventRecord(start, custream);
#endif

cudaDecode2<Scalar, 16> << < grid_size, block_size >> >
cudaDecode2<Scalar, 16> << < grid_size, block_size, 0, custream >> >
(stream,
d_data,
dims,
Expand All @@ -146,9 +147,9 @@ size_t decode2launch(uint2 dims,
maxbits);

#ifdef CUDA_ZFP_RATE_PRINT
cudaEventRecord(stop);
cudaEventRecord(stop, custream);
cudaEventSynchronize(stop);
cudaStreamSynchronize(0);
cudaStreamSynchronize(custream);

float milliseconds = 0;
cudaEventElapsedTime(&milliseconds, start, stop);
Expand All @@ -168,9 +169,10 @@ size_t decode2(uint2 dims,
int2 stride,
Word *stream,
Scalar *d_data,
uint maxbits)
uint maxbits,
cudaStream_t custream)
{
return decode2launch<Scalar>(dims, stride, stream, d_data, maxbits);
return decode2launch<Scalar>(dims, stride, stream, d_data, maxbits, custream);
}

} // namespace cuZFP
Expand Down
Loading