diff --git a/include/sqsh_file.h b/include/sqsh_file.h index 91375987..d21554cb 100644 --- a/include/sqsh_file.h +++ b/include/sqsh_file.h @@ -161,6 +161,17 @@ sqsh_file_iterator_new(const struct SqshFile *file, int *err); SQSH_NO_UNUSED bool sqsh_file_iterator_next( struct SqshFileIterator *iterator, size_t desired_size, int *err); +/** + * @brief Checks if the current block is a zero block. + * @memberof SqshFileIterator + * + * @param[in] iterator The file iterator to check. + * + * @return true if the current block is a zero block, false otherwise. + */ +SQSH_NO_UNUSED bool +sqsh_file_iterator_is_zero_block(const struct SqshFileIterator *iterator); + /** * @deprecated Since 1.5.0. Use sqsh_file_iterator_skip2() instead. * @memberof SqshFileIterator diff --git a/include/sqsh_posix.h b/include/sqsh_posix.h index 5639cfc3..03196aca 100644 --- a/include/sqsh_posix.h +++ b/include/sqsh_posix.h @@ -35,6 +35,7 @@ #define SQSH_POSIX_H #include "sqsh_common.h" +#include #include #ifdef __cplusplus @@ -42,6 +43,15 @@ extern "C" { #endif struct SqshFile; +struct SqshFileIterator; + +struct SqshThreadpool; + +typedef void (*sqsh_file_iterator_mt_cb)( + const struct SqshFile *file, const struct SqshFileIterator *iterator, + uint64_t offset, void *data, int err); +typedef void (*sqsh_file_to_stream_mt_cb)( + const struct SqshFile *file, FILE *stream, void *data, int err); /** * @memberof SqshFile @@ -54,6 +64,42 @@ struct SqshFile; */ int sqsh_file_to_stream(const struct SqshFile *file, FILE *stream); +/** + * @memberof SqshFile + * @brief writes data to a file descriptor. + * + * @param[in] file The file context. + * @param[in] threadpool The threadpool to use. + * @param[in] stream The descriptor to write the file contents to. + * @param[in] cb The callback to call when the operation is done. + * @param[in] data The data to pass to the callback. + */ +SQSH_NO_UNUSED int sqsh_file_to_stream_mt( + const struct SqshFile *file, struct SqshThreadpool *threadpool, + FILE *stream, sqsh_file_to_stream_mt_cb cb, void *data); + +struct SqshThreadpool *sqsh_threadpool_new(size_t threads, int *err); + +int sqsh_threadpool_wait(struct SqshThreadpool *pool); + +/** + * @memberof SqshThreadpool + * @brief cleans up a threadpool. + * + * @param[in] pool The threadpool to uclean. + * @return The threadpool on success, NULL on error. + */ +int sqsh__threadpool_cleanup(struct SqshThreadpool *pool); + +/** + * @memberof SqshThreadpool + * @brief creates a new threadpool. + * + * @param[in] pool The threadpool to free. + * @return 0 on success, less than 0 on error. + */ +int sqsh_threadpool_free(struct SqshThreadpool *pool); + #ifdef __cplusplus } #endif diff --git a/libsqsh/include/sqsh_extract_private.h b/libsqsh/include/sqsh_extract_private.h index 9786a95e..a0ddc356 100644 --- a/libsqsh/include/sqsh_extract_private.h +++ b/libsqsh/include/sqsh_extract_private.h @@ -176,7 +176,7 @@ struct SqshExtractManager { /** * @privatesection */ - struct CxRcHashMap hash_map; + struct CxRcRadixTree cache; const struct SqshExtractorImpl *extractor_impl; uint32_t block_size; struct SqshMapManager *map_manager; @@ -192,14 +192,13 @@ struct SqshExtractManager { * @param[in] manager The manager to initialize. * @param[in] archive The archive to use. * @param[in] block_size The block size to use. - * @param[in] size The size of the manager. * @param[in] lru_size The size of the lru cache. * * @return 0 on success, a negative value on error. */ SQSH_NO_EXPORT SQSH_NO_UNUSED int sqsh__extract_manager_init( struct SqshExtractManager *manager, struct SqshArchive *archive, - uint32_t block_size, size_t size, size_t lru_size); + uint32_t block_size, size_t lru_size); /** * @internal @@ -222,12 +221,12 @@ SQSH_NO_EXPORT int sqsh__extract_manager_uncompress( * @brief releases a buffer retrieved by sqsh__extract_manager_uncompress. * * @param[in] manager The manager to use. - * @param[in] buffer The buffer to release. + * @param[in] address The address of the buffer to release. * * @return 0 on success, a negative value on error. */ SQSH_NO_EXPORT int sqsh__extract_manager_release( - struct SqshExtractManager *manager, const struct CxBuffer *buffer); + struct SqshExtractManager *manager, uint64_t address); /** * @internal @@ -254,6 +253,7 @@ struct SqshExtractView { */ struct SqshExtractManager *manager; const struct CxBuffer *buffer; + uint64_t address; size_t size; }; diff --git a/libsqsh/include/sqsh_posix_private.h b/libsqsh/include/sqsh_posix_private.h new file mode 100644 index 00000000..370e2743 --- /dev/null +++ b/libsqsh/include/sqsh_posix_private.h @@ -0,0 +1,56 @@ +/****************************************************************************** + * * + * Copyright (c) 2023-2024, Enno Boland * + * * + * Redistribution and use in source and binary forms, with or without * + * modification, are permitted provided that the following conditions are * + * met: * + * * + * * Redistributions of source code must retain the above copyright notice, * + * this list of conditions and the following disclaimer. * + * * Redistributions in binary form must reproduce the above copyright * + * notice, this list of conditions and the following disclaimer in the * + * documentation and/or other materials provided with the distribution. * + * * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS * + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR * + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * + * * + ******************************************************************************/ + +/** + * @author Enno Boland (mail@eboland.de) + * @file sqsh_file_private.h + */ + +#ifndef SQSH_POSIX_PRIVATE_H +#define SQSH_POSIX_PRIVATE_H + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct SqshThreadpool { + struct CxThreadpool pool; +}; + +int sqsh__threadpool_init(struct SqshThreadpool *pool, size_t threads); + +int sqsh__threadpool_cleanup(struct SqshThreadpool *pool); + +#ifdef __cplusplus +} +#endif +#endif /* SQSH_POSIX_PRIVATE_H */ diff --git a/libsqsh/src/archive/archive.c b/libsqsh/src/archive/archive.c index a9e84617..0eb979d3 100644 --- a/libsqsh/src/archive/archive.c +++ b/libsqsh/src/archive/archive.c @@ -60,23 +60,6 @@ is_initialized(const struct SqshArchive *archive, enum InitializedBitmap mask) { return archive->initialized & mask; } -static uint64_t -get_data_segment_size(const struct SqshSuperblock *superblock) { - const uint64_t inode_table_start = - sqsh_superblock_inode_table_start(superblock); - uint64_t res; - /* BUG: This function does not return exact results. It may report values - * that are too large, as it does not take into account the size of the - * compression options. This is not a problem for the current implementation - * as this size is only used for finding upper limits for the extract - * manager. */ - if (SQSH_SUB_OVERFLOW( - inode_table_start, sizeof(struct SqshDataSuperblock), &res)) { - return inode_table_start; - } - return res; -} - struct SqshArchive * sqsh_archive_open( const void *source, const struct SqshConfig *config, int *err) { @@ -153,24 +136,9 @@ sqsh__archive_init( goto out; } - uint64_t range; - if (SQSH_SUB_OVERFLOW( - sqsh_superblock_bytes_used(&archive->superblock), - get_data_segment_size(&archive->superblock), &range)) { - rv = -SQSH_ERROR_INTEGER_OVERFLOW; - goto out; - } - const uint64_t metablock_capacity = SQSH_DIVIDE_CEIL( - range, - sizeof(struct SqshDataMetablock) + SQSH_METABLOCK_BLOCK_SIZE); - if (metablock_capacity > SIZE_MAX) { - rv = -SQSH_ERROR_INTEGER_OVERFLOW; - goto out; - } rv = sqsh__extract_manager_init( &archive->metablock_extract_manager, archive, - SQSH_METABLOCK_BLOCK_SIZE, (size_t)metablock_capacity, - metablock_lru_size); + SQSH_METABLOCK_BLOCK_SIZE, metablock_lru_size); if (rv < 0) { goto out; } @@ -214,19 +182,12 @@ sqsh__archive_data_extract_manager( if (!is_initialized(archive, INITIALIZED_DATA_COMPRESSION_MANAGER)) { const struct SqshSuperblock *superblock = sqsh_archive_superblock(archive); - const uint64_t range = get_data_segment_size(superblock); - const uint64_t capacity = - SQSH_DIVIDE_CEIL(range, sqsh_superblock_block_size(superblock)); const uint32_t datablock_blocksize = sqsh_superblock_block_size(superblock); - if (capacity > SIZE_MAX) { - rv = -SQSH_ERROR_INTEGER_OVERFLOW; - goto out; - } rv = sqsh__extract_manager_init( &archive->data_extract_manager, archive, datablock_blocksize, - (size_t)capacity, data_lru_size); + data_lru_size); if (rv < 0) { goto out; } diff --git a/libsqsh/src/archive/inode_map.c b/libsqsh/src/archive/inode_map.c index ede112cb..1a04c6a9 100644 --- a/libsqsh/src/archive/inode_map.c +++ b/libsqsh/src/archive/inode_map.c @@ -122,6 +122,7 @@ static uint64_t dyn_map_get(const struct SqshInodeMap *map, uint32_t inode_number, int *err) { int rv = 0; uint64_t inode_ref = 0; + bool locked = false; if (inode_number == 0 || inode_number - 1 >= map->inode_count) { rv = -SQSH_ERROR_OUT_OF_BOUNDS; @@ -131,6 +132,7 @@ dyn_map_get(const struct SqshInodeMap *map, uint32_t inode_number, int *err) { if (rv < 0) { goto out; } + locked = true; sqsh_index_t index = inode_number - 1; sqsh_index_t inner_index = index & 0xff; @@ -148,12 +150,10 @@ dyn_map_get(const struct SqshInodeMap *map, uint32_t inode_number, int *err) { inode_ref = 0; goto out; } - rv = sqsh__mutex_unlock(map->mutex); - if (rv < 0) { - goto out; - } - out: + if (locked) { + sqsh__mutex_unlock(map->mutex); + } if (err != NULL) { *err = rv; } diff --git a/libsqsh/src/extract/extract_manager.c b/libsqsh/src/extract/extract_manager.c index 7a3363a8..3d778b16 100644 --- a/libsqsh/src/extract/extract_manager.c +++ b/libsqsh/src/extract/extract_manager.c @@ -41,39 +41,6 @@ #include #include -/** - * Calculates pow(x,y) % mod - */ -static uint64_t -mod_power(uint64_t x, uint64_t y, uint64_t mod) { - uint64_t res = 1; - - for (; y; y = y >> 1) { - if (y & 1) { - res = (res * x) % mod; - } - - x = (x * x) % mod; - } - - return res; -} - -static bool -maybe_prime(uint64_t n) { - static const uint64_t a = 2; - - return mod_power(a, n - 1, n) == 1; -} - -static size_t -find_next_maybe_prime(size_t n) { - for (; maybe_prime(n) == false; n++) { - } - - return n; -} - static void buffer_cleanup(void *buffer) { cx_buffer_cleanup(buffer); @@ -82,7 +49,7 @@ buffer_cleanup(void *buffer) { SQSH_NO_UNUSED int sqsh__extract_manager_init( struct SqshExtractManager *manager, struct SqshArchive *archive, - uint32_t block_size, size_t size, size_t lru_size) { + uint32_t block_size, size_t lru_size) { int rv; const struct SqshSuperblock *superblock = sqsh_archive_superblock(archive); enum SqshSuperblockCompressionId compression_id = @@ -93,24 +60,17 @@ sqsh__extract_manager_init( return -SQSH_ERROR_COMPRESSION_UNSUPPORTED; } - if (size == 0) { - return -SQSH_ERROR_SIZE_MISMATCH; - } - - /* Give a bit of room to avoid too many key hash collisions */ - size = find_next_maybe_prime(2 * size); - rv = sqsh__mutex_init(&manager->lock); if (rv < 0) { goto out; } - rv = cx_rc_hash_map_init( - &manager->hash_map, size, sizeof(struct CxBuffer), buffer_cleanup); + rv = cx_rc_radix_tree_init( + &manager->cache, sizeof(struct CxBuffer), buffer_cleanup); if (rv < 0) { goto out; } rv = cx_lru_init( - &manager->lru, lru_size, &cx_lru_rc_hash_map, &manager->hash_map); + &manager->lru, lru_size, &cx_lru_rc_radix_tree, &manager->cache); if (rv < 0) { goto out; } @@ -125,70 +85,102 @@ sqsh__extract_manager_init( return rv; } +static int +extract(struct SqshExtractManager *manager, const struct SqshMapReader *reader, + struct CxBuffer *buffer) { + int rv = 0; + struct SqshExtractor extractor = {0}; + const struct SqshExtractorImpl *extractor_impl = manager->extractor_impl; + const uint32_t block_size = manager->block_size; + const size_t size = sqsh__map_reader_size(reader); + + rv = cx_buffer_init(buffer); + if (rv < 0) { + goto out; + } + const uint8_t *data = sqsh__map_reader_data(reader); + + rv = sqsh__extractor_init(&extractor, buffer, extractor_impl, block_size); + if (rv < 0) { + goto out; + } + + rv = sqsh__extractor_write(&extractor, data, size); + if (rv < 0) { + goto out; + } + + rv = sqsh__extractor_finish(&extractor); + if (rv < 0) { + goto out; + } + +out: + if (rv < 0) { + cx_buffer_cleanup(buffer); + } + sqsh__extractor_cleanup(&extractor); + return rv; +} + int sqsh__extract_manager_uncompress( struct SqshExtractManager *manager, const struct SqshMapReader *reader, const struct CxBuffer **target) { int rv = 0; - struct SqshExtractor extractor = {0}; - const struct SqshExtractorImpl *extractor_impl = manager->extractor_impl; - const uint32_t block_size = manager->block_size; + bool locked = false; + struct CxBuffer *buffer = NULL; rv = sqsh__mutex_lock(&manager->lock); if (rv < 0) { goto out; } + locked = true; const uint64_t address = sqsh__map_reader_address(reader); - const size_t size = sqsh__map_reader_size(reader); - *target = cx_rc_hash_map_retain(&manager->hash_map, address); + buffer = cx_rc_radix_tree_retain(&manager->cache, address); - if (*target == NULL) { - struct CxBuffer buffer = {0}; - rv = cx_buffer_init(&buffer); + if (buffer == NULL) { + struct CxBuffer tmp_buffer = {0}; + rv = sqsh__mutex_unlock(&manager->lock); if (rv < 0) { goto out; } - const uint8_t *data = sqsh__map_reader_data(reader); + locked = false; - rv = sqsh__extractor_init( - &extractor, &buffer, extractor_impl, block_size); + rv = extract(manager, reader, &tmp_buffer); if (rv < 0) { goto out; } - rv = sqsh__extractor_write(&extractor, data, size); + rv = sqsh__mutex_lock(&manager->lock); if (rv < 0) { - cx_buffer_cleanup(&buffer); goto out; } + locked = true; - rv = sqsh__extractor_finish(&extractor); - if (rv < 0) { - cx_buffer_cleanup(&buffer); - goto out; - } - - *target = cx_rc_hash_map_put(&manager->hash_map, address, &buffer); + buffer = cx_rc_radix_tree_put(&manager->cache, address, &tmp_buffer); } - rv = cx_lru_touch(&manager->lru, address); + rv = cx_lru_touch_value(&manager->lru, address, buffer); + *target = buffer; out: - sqsh__extractor_cleanup(&extractor); - sqsh__mutex_unlock(&manager->lock); + if (locked) { + sqsh__mutex_unlock(&manager->lock); + } return rv; } int sqsh__extract_manager_release( - struct SqshExtractManager *manager, const struct CxBuffer *buffer) { + struct SqshExtractManager *manager, uint64_t address) { int rv = sqsh__mutex_lock(&manager->lock); if (rv < 0) { goto out; } - rv = cx_rc_hash_map_release(&manager->hash_map, buffer); + rv = cx_rc_radix_tree_release(&manager->cache, address); sqsh__mutex_unlock(&manager->lock); out: @@ -198,7 +190,7 @@ sqsh__extract_manager_release( int sqsh__extract_manager_cleanup(struct SqshExtractManager *manager) { cx_lru_cleanup(&manager->lru); - cx_rc_hash_map_cleanup(&manager->hash_map); + cx_rc_radix_tree_cleanup(&manager->cache); sqsh__mutex_destroy(&manager->lock); return 0; diff --git a/libsqsh/src/extract/extract_view.c b/libsqsh/src/extract/extract_view.c index ddfb2fab..833a8834 100644 --- a/libsqsh/src/extract/extract_view.c +++ b/libsqsh/src/extract/extract_view.c @@ -31,10 +31,10 @@ * @file extract_view.c */ -#include - #include #include +#include +#include int sqsh__extract_view_init( @@ -43,6 +43,7 @@ sqsh__extract_view_init( int rv = 0; view->manager = manager; view->buffer = NULL; + view->address = sqsh__map_reader_address(reader); rv = sqsh__extract_manager_uncompress(manager, reader, &view->buffer); if (rv < 0) { @@ -73,7 +74,7 @@ sqsh__extract_view_cleanup(struct SqshExtractView *view) { int rv = 0; if (view->manager != NULL) { - rv = sqsh__extract_manager_release(view->manager, view->buffer); + rv = sqsh__extract_manager_release(view->manager, view->address); } view->buffer = NULL; view->size = 0; diff --git a/libsqsh/src/file/file_iterator.c b/libsqsh/src/file/file_iterator.c index 9dd96e28..86b41d5e 100644 --- a/libsqsh/src/file/file_iterator.c +++ b/libsqsh/src/file/file_iterator.c @@ -260,6 +260,11 @@ map_fragment(struct SqshFileIterator *iterator) { } } +bool +sqsh_file_iterator_is_zero_block(const struct SqshFileIterator *iterator) { + return iterator->sparse_size > 0; +} + bool sqsh_file_iterator_next( struct SqshFileIterator *iterator, size_t desired_size, int *err) { @@ -279,7 +284,7 @@ sqsh_file_iterator_next( desired_size = 1; } - if (iterator->sparse_size > 0) { + if (sqsh_file_iterator_is_zero_block(iterator)) { rv = map_zero_block(iterator); } else if (iterator->block_index < block_count) { rv = map_block(iterator, desired_size); diff --git a/libsqsh/src/mapper/curl_mapper.c b/libsqsh/src/mapper/curl_mapper.c index d2c48494..0954791f 100644 --- a/libsqsh/src/mapper/curl_mapper.c +++ b/libsqsh/src/mapper/curl_mapper.c @@ -93,8 +93,6 @@ write_data(void *ptr, size_t size, size_t nmemb, void *userdata) { static int get_total_size(CURL *handle, uint64_t *total) { - static const char *format = CONTENT_RANGE_FORMAT; - CURLHcode hcode; uint64_t dummy; struct curl_header *header = NULL; @@ -106,7 +104,8 @@ get_total_size(CURL *handle, uint64_t *total) { return -SQSH_ERROR_CURL_INVALID_RANGE_HEADER; } - scanned_fields = sscanf(header->value, format, &dummy, &dummy, total); + scanned_fields = + sscanf(header->value, CONTENT_RANGE_FORMAT, &dummy, &dummy, total); if (scanned_fields != 3) { return -SQSH_ERROR_CURL_INVALID_RANGE_HEADER; diff --git a/libsqsh/src/meson.build b/libsqsh/src/meson.build index 5f45f433..1f76fbf7 100644 --- a/libsqsh/src/meson.build +++ b/libsqsh/src/meson.build @@ -53,6 +53,7 @@ if get_option('posix').allowed() libsqsh_sources += files( 'posix/file_ext.c', 'posix/mmap_mapper.c', + 'posix/threadpool.c', ) endif diff --git a/libsqsh/src/posix/file_ext.c b/libsqsh/src/posix/file_ext.c index e563b7d2..84cf70ee 100644 --- a/libsqsh/src/posix/file_ext.c +++ b/libsqsh/src/posix/file_ext.c @@ -32,12 +32,19 @@ */ #define _DEFAULT_SOURCE +#define _FILE_OFFSET_BITS 64 -#include -#include - +#include #include +#include #include +#include + +#include +#include +#include +#include +#include int sqsh_file_to_stream(const struct SqshFile *file, FILE *stream) { @@ -63,3 +70,200 @@ sqsh_file_to_stream(const struct SqshFile *file, FILE *stream) { sqsh__file_iterator_cleanup(&iterator); return rv; } + +struct FileIteratorMtBlock { + struct FileIteratorMt *mt; + uint64_t block_offset; +}; + +struct FileIteratorMt { + struct SqshFile file; + sqsh_file_iterator_mt_cb cb; + uint32_t chunk_size; + void *data; + atomic_int rv; + atomic_size_t remaining_blocks; + + struct FileIteratorMtBlock *blocks; +}; + +struct FileToStreamMt { + struct FileIteratorMt mt; + sqsh_file_to_stream_mt_cb cb; + void *data; + FILE *stream; + int fd; +}; + +static void +file_iterator_mt_cleanup(struct FileIteratorMt *mt, int rv) { + mt->cb(&mt->file, NULL, 0, mt->data, rv); + sqsh__file_cleanup(&mt->file); + free(mt->blocks); + free(mt); +} + +static void +iterator_worker(void *data) { + int rv = 0, rv2 = 0; + struct SqshFileIterator iterator = {0}; + + struct FileIteratorMtBlock *block = data; + struct FileIteratorMt *mt = block->mt; + + rv = sqsh__file_iterator_init(&iterator, &mt->file); + if (rv < 0) { + goto out; + } + + uint64_t offset = block->block_offset; + rv = sqsh_file_iterator_skip2(&iterator, &offset, 1); + if (rv < 0) { + goto out; + } + assert(offset == 0); + + mt->cb(&mt->file, &iterator, block->block_offset, mt->data, rv); + +out: + sqsh__file_iterator_cleanup(&iterator); + + assert(rv2 == 0); + + if (rv < 0) { + atomic_store(&mt->rv, rv); + } + + size_t remaining_blocks = atomic_fetch_sub(&mt->remaining_blocks, 1); + assert(remaining_blocks > 0); + if (remaining_blocks == 1) { + file_iterator_mt_cleanup(mt, atomic_load(&mt->rv)); + } +} + +static void +file_iterator_mt( + struct FileIteratorMt *mt, const struct SqshFile *file, + struct SqshThreadpool *threadpool, sqsh_file_iterator_mt_cb cb, + void *data, int rv) { + if (rv < 0) { + goto out; + } + const uint64_t inode_ref = sqsh_file_inode_ref(file); + const struct SqshSuperblock *superblock = + sqsh_archive_superblock(file->archive); + uint32_t block_size = sqsh_superblock_block_size(superblock); + + const uint64_t block_count = + SQSH_DIVIDE_CEIL(sqsh_file_size(file), block_size); + if (block_count > SIZE_MAX) { + rv = -SQSH_ERROR_INTEGER_OVERFLOW; + goto out; + } + + mt->cb = cb; + mt->data = data; + mt->chunk_size = block_size; + atomic_init(&mt->remaining_blocks, (size_t)block_count); + atomic_init(&mt->rv, 0); + + if (block_count == 0) { + goto out; + } + + rv = sqsh__file_init(&mt->file, file->archive, inode_ref); + if (rv < 0) { + goto out; + } + + mt->blocks = + calloc(sizeof(struct FileIteratorMtBlock), (size_t)block_count); + if (mt->blocks == NULL) { + rv = -SQSH_ERROR_MALLOC_FAILED; + goto out; + } + + size_t block_offset = 0; + for (sqsh_index_t i = 0; i < block_count; i++) { + mt->blocks[i].mt = mt; + mt->blocks[i].block_offset = block_offset; + rv = cx_threadpool_schedule( + &threadpool->pool, iterator_worker, &mt->blocks[i]); + if (rv < 0) { + goto out; + } + block_offset += block_size; + } + +out: + if (rv < 0 || block_count == 0) { + file_iterator_mt_cleanup(mt, rv); + } +} + +void +sqsh_file_iterator_mt( + const struct SqshFile *file, struct SqshThreadpool *threadpool, + sqsh_file_iterator_mt_cb cb, void *data) { + int rv = 0; + + struct FileIteratorMt *mt = calloc(sizeof(struct FileIteratorMt), 1); + if (mt == NULL) { + rv = -SQSH_ERROR_MALLOC_FAILED; + } + + file_iterator_mt(mt, file, threadpool, cb, data, rv); +} + +static void +stream_worker( + const struct SqshFile *file, const struct SqshFileIterator *iterator, + uint64_t offset, void *data, int err) { + int rv = 0; + struct FileToStreamMt *mt = data; + if (iterator == NULL) { + mt->cb(file, mt->stream, mt->data, err); + return; + } + + off_t written = 0; + const uint8_t *iterator_data = sqsh_file_iterator_data(iterator); + const size_t iterator_size = sqsh_file_iterator_size(iterator); + + while ((uint64_t)written != iterator_size) { + ssize_t chunk_written = pwrite( + mt->fd, iterator_data + written, + iterator_size - (size_t)written, (off_t)offset + written); + if (chunk_written < 0) { + rv = -errno; + goto out; + } + written += chunk_written; + } + +out: + if (rv < 0) { + atomic_store(&mt->mt.rv, rv); + } +} + +int +sqsh_file_to_stream_mt( + const struct SqshFile *file, struct SqshThreadpool *threadpool, + FILE *stream, sqsh_file_to_stream_mt_cb cb, void *data) { + int rv = 0; + + struct FileToStreamMt *mt = calloc(sizeof(struct FileToStreamMt), 1); + if (mt == NULL) { + rv = -SQSH_ERROR_MALLOC_FAILED; + goto out; + } + mt->cb = cb; + mt->data = data; + mt->stream = stream; + mt->fd = fileno(stream); + file_iterator_mt(&mt->mt, file, threadpool, stream_worker, mt, rv); + +out: + return rv; +} diff --git a/libsqsh/src/posix/threadpool.c b/libsqsh/src/posix/threadpool.c new file mode 100644 index 00000000..679b2439 --- /dev/null +++ b/libsqsh/src/posix/threadpool.c @@ -0,0 +1,63 @@ +/****************************************************************************** + * * + * Copyright (c) 2023-2024, Enno Boland * + * * + * Redistribution and use in source and binary forms, with or without * + * modification, are permitted provided that the following conditions are * + * met: * + * * + * * Redistributions of source code must retain the above copyright notice, * + * this list of conditions and the following disclaimer. * + * * Redistributions in binary form must reproduce the above copyright * + * notice, this list of conditions and the following disclaimer in the * + * documentation and/or other materials provided with the distribution. * + * * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS * + * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, * + * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR * + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, * + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR * + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * + * * + ******************************************************************************/ + +/** + * @author Enno Boland (mail@eboland.de) + * @file thread_pool.c + */ + +#define _DEFAULT_SOURCE + +#include +#include +#include + +int +sqsh__threadpool_init(struct SqshThreadpool *pool, size_t threads) { + return cx_threadpool_init(&pool->pool, threads); +} + +struct SqshThreadpool * +sqsh_threadpool_new(size_t threads, int *err) { + SQSH_NEW_IMPL(sqsh__threadpool_init, struct SqshThreadpool, threads); +} + +int +sqsh__threadpool_cleanup(struct SqshThreadpool *pool) { + return cx_threadpool_cleanup(&pool->pool); +} + +int +sqsh_threadpool_wait(struct SqshThreadpool *pool) { + return cx_threadpool_wait(&pool->pool); +} + +int +sqsh_threadpool_free(struct SqshThreadpool *pool) { + SQSH_FREE_IMPL(sqsh__threadpool_cleanup, pool); +} diff --git a/libsqsh/src/tree/path_resolver.c b/libsqsh/src/tree/path_resolver.c index 210811e9..5739bcf9 100644 --- a/libsqsh/src/tree/path_resolver.c +++ b/libsqsh/src/tree/path_resolver.c @@ -185,6 +185,9 @@ sqsh__path_resolver_to_ref( struct SqshPathResolver *resolver, uint64_t inode_ref) { int rv = 0; rv = sqsh__file_cleanup(&resolver->cwd); + if (rv < 0) { + goto out; + } rv = sqsh__file_init(&resolver->cwd, resolver->archive, inode_ref); if (rv < 0) { diff --git a/subprojects/cextras.wrap b/subprojects/cextras.wrap index 5562806c..89da79d5 100644 --- a/subprojects/cextras.wrap +++ b/subprojects/cextras.wrap @@ -1,6 +1,6 @@ [wrap-git] directory = cextras -revision = 63d5d1eae0ba7f66a34965d3c153bb0add64325d +revision = f3478d1e00d38846af03bd776d92336fbcf6e1f7 url = https://github.com/Gottox/cextras.git depth = 1 diff --git a/test/libsqsh/extract/extract_manager.c b/test/libsqsh/extract/extract_manager.c index 744e3333..80c95a2c 100644 --- a/test/libsqsh/extract/extract_manager.c +++ b/test/libsqsh/extract/extract_manager.c @@ -59,7 +59,7 @@ UTEST(directory_iterator, decompress) { rv = sqsh__map_reader_advance(&reader, 0, CHUNK_SIZE(ZLIB_ABCD)); ASSERT_EQ(0, rv); - rv = sqsh__extract_manager_init(&manager, &archive, 8192, 10, 128); + rv = sqsh__extract_manager_init(&manager, &archive, 8192, 128); ASSERT_EQ(0, rv); rv = sqsh__extract_manager_uncompress(&manager, &reader, &buffer); @@ -69,7 +69,7 @@ UTEST(directory_iterator, decompress) { ASSERT_EQ(0, memcmp(cx_buffer_data(buffer), "abcd", 4)); sqsh__map_reader_cleanup(&reader); - sqsh__extract_manager_release(&manager, buffer); + sqsh__extract_manager_release(&manager, sizeof(struct SqshDataSuperblock)); sqsh__extract_manager_cleanup(&manager); sqsh__archive_cleanup(&archive); } @@ -94,7 +94,7 @@ UTEST(directory_iterator, decompress_and_cached) { rv = sqsh__map_reader_advance(&reader, 0, CHUNK_SIZE(ZLIB_ABCD)); ASSERT_EQ(0, rv); - rv = sqsh__extract_manager_init(&manager, &archive, 8192, 10, 128); + rv = sqsh__extract_manager_init(&manager, &archive, 8192, 128); ASSERT_EQ(0, rv); rv = sqsh__extract_manager_uncompress(&manager, &reader, &buffer); @@ -108,8 +108,8 @@ UTEST(directory_iterator, decompress_and_cached) { ASSERT_EQ(cached_buffer, buffer); sqsh__map_reader_cleanup(&reader); - sqsh__extract_manager_release(&manager, buffer); - sqsh__extract_manager_release(&manager, cached_buffer); + sqsh__extract_manager_release(&manager, sizeof(struct SqshDataSuperblock)); + sqsh__extract_manager_release(&manager, sizeof(struct SqshDataSuperblock)); sqsh__extract_manager_cleanup(&manager); sqsh__archive_cleanup(&archive); } diff --git a/test/libsqsh/integration.c b/test/libsqsh/integration.c index ddd49ad9..1eda67bc 100644 --- a/test/libsqsh/integration.c +++ b/test/libsqsh/integration.c @@ -490,6 +490,7 @@ UTEST(integration, sqsh_test_xattr) { ASSERT_NE(NULL, xattr_iter); ASSERT_EQ(0, rv); has_next = sqsh_xattr_iterator_next(xattr_iter, &rv); + ASSERT_TRUE(has_next); ASSERT_EQ(0, rv); ASSERT_EQ(true, sqsh_xattr_iterator_is_indirect(xattr_iter)); name = sqsh_xattr_iterator_fullname_dup(xattr_iter); @@ -501,6 +502,7 @@ UTEST(integration, sqsh_test_xattr) { ASSERT_EQ(0, strcmp(expected_value, value)); free(value); has_next = sqsh_xattr_iterator_next(xattr_iter, &rv); + ASSERT_FALSE(has_next); ASSERT_EQ(0, rv); rv = sqsh_xattr_iterator_free(xattr_iter); ASSERT_EQ(0, rv); diff --git a/tools/src/unpack.c b/tools/src/unpack.c index 98f333b7..c7885eb8 100644 --- a/tools/src/unpack.c +++ b/tools/src/unpack.c @@ -36,16 +36,24 @@ #include #include #include +#include #include #include #include +#include #include #include #include +typedef int (*extract_fn)( + const char *, enum SqshFileType, const struct SqshFile *); + +sem_t file_descriptor_sem; +size_t extracted_files = 0; bool do_chown = false; bool verbose = false; const char *image_path; +struct SqshThreadpool *threadpool; static void (*print_segment)(const char *segment, size_t segment_size) = print_raw; @@ -71,7 +79,41 @@ static const struct option long_opts[] = { }; static int -prepare_dir(const char *path) { +update_metadata(const char *path, const struct SqshFile *file) { + int rv = 0; + struct timespec times[2] = {0}; + + times[0].tv_sec = times[1].tv_sec = sqsh_file_modified_time(file); + rv = utimensat(AT_FDCWD, path, times, AT_SYMLINK_NOFOLLOW); + if (rv < 0) { + perror(path); + goto out; + } + + uint16_t mode = sqsh_file_permission(file); + rv = fchmodat(AT_FDCWD, path, mode, AT_SYMLINK_NOFOLLOW); + if (rv < 0) { + perror(path); + goto out; + } + + if (do_chown) { + const uint32_t uid = sqsh_file_uid(file); + const uint32_t gid = sqsh_file_gid(file); + + rv = chown(path, uid, gid); + if (rv < 0) { + perror(path); + goto out; + } + } + +out: + return rv; +} + +static int +extract_dir(const char *path) { int rv = 0; rv = mkdir(path, 0700); if (rv < 0 && errno == EEXIST) { @@ -97,7 +139,7 @@ prepare_dir(const char *path) { } static int -extract_dir(const char *path, const struct SqshFile *file) { +update_metadata_dir(const char *path, const struct SqshFile *file) { // The directory should already be created by prepare_dir(), so we just // apply the permissions. int rv = 0; @@ -107,46 +149,112 @@ extract_dir(const char *path, const struct SqshFile *file) { goto out; } + rv = update_metadata(path, file); out: return rv; } +struct ExtractFileData { + char tmp_filename[32]; + char *path; +}; + +static void +extract_file_cleanup(struct ExtractFileData *data, FILE *stream) { + if (data == NULL) { + return; + } + if (stream != NULL) { + fclose(stream); + } + free(data->path); + free(data); +} +static void +extract_file_after( + const struct SqshFile *file, FILE *stream, void *d, int err) { + (void)stream; + int rv = 0; + struct ExtractFileData *data = d; + if (err < 0) { + sqsh_perror(err, data->path); + } + + rv = rename(data->tmp_filename, data->path); + if (rv < 0) { + rv = -errno; + goto out; + } + + fclose(stream); + rv = update_metadata(data->path, file); +out: + sem_post(&file_descriptor_sem); + if (rv < 0) { + sqsh_perror(rv, data->path); + } + extract_file_cleanup(data, NULL); +} + static int extract_file(const char *path, const struct SqshFile *file) { int rv = 0; + int fd = -1; FILE *stream = NULL; - char tmp_filename[] = ".sqsh-unpack-XXXXXX"; + struct ExtractFileData *data = calloc(1, sizeof(struct ExtractFileData)); + if (data == NULL) { + rv = -errno; + goto out; + } + data->path = strdup(path); + if (data->path == NULL) { + rv = -errno; + perror(path); + goto out; + } + strcpy(data->tmp_filename, ".sqsh-unpack-XXXXXX"); - int fd = mkstemp(tmp_filename); - if (fd < 0) { + rv = sem_wait(&file_descriptor_sem); + if (rv < 0) { rv = -errno; perror(path); goto out; } - stream = fdopen(fd, "w"); - if (stream == NULL) { + + fd = mkstemp(data->tmp_filename); + if (fd < 0) { + rv = -errno; perror(path); goto out; } - fd = -1; - rv = sqsh_file_to_stream(file, stream); + rv = ftruncate(fd, sqsh_file_size(file)); if (rv < 0) { - sqsh_perror(rv, path); + rv = -errno; + perror(path); goto out; } - rv = rename(tmp_filename, path); - if (rv < 0) { + stream = fdopen(fd, "w"); + if (stream == NULL) { + rv = -errno; perror(path); goto out; } -out: - if (stream != NULL) { - fclose(stream); + fd = -1; + + rv = sqsh_file_to_stream_mt( + file, threadpool, stream, extract_file_after, data); + if (rv < 0) { + goto out; } - if (fd > 0) { - close(fd); +out: + if (rv < 0) { + sqsh_perror(rv, path); + extract_file_cleanup(data, stream); + if (fd > 0) { + close(fd); + } } return rv; } @@ -162,6 +270,7 @@ extract_symlink(const char *path, const struct SqshFile *file) { goto out; } + rv = update_metadata(path, file); out: free(target); return rv; @@ -195,76 +304,56 @@ extract_device(const char *path, const struct SqshFile *file) { perror(path); goto out; } + + rv = update_metadata(path, file); out: return rv; } static int -extract(const char *path, const struct SqshFile *file) { - int rv = 0; - struct timespec times[2] = {0}; - - if (path[0] == 0) { - path = "."; - } - - enum SqshFileType type = sqsh_file_type(file); +extract_first_pass( + const char *path, enum SqshFileType type, const struct SqshFile *file) { switch (type) { case SQSH_FILE_TYPE_DIRECTORY: - rv = extract_dir(path, file); - break; + return 0; case SQSH_FILE_TYPE_FILE: - rv = extract_file(path, file); - break; + return extract_file(path, file); case SQSH_FILE_TYPE_SYMLINK: - rv = extract_symlink(path, file); - break; + return extract_symlink(path, file); case SQSH_FILE_TYPE_BLOCK: case SQSH_FILE_TYPE_CHAR: case SQSH_FILE_TYPE_FIFO: case SQSH_FILE_TYPE_SOCKET: - rv = extract_device(path, file); - break; + return extract_device(path, file); default: __builtin_unreachable(); } - if (rv < 0) { - sqsh_perror(rv, path); - goto out; - } - - times[0].tv_sec = times[1].tv_sec = sqsh_file_modified_time(file); - rv = utimensat(AT_FDCWD, path, times, AT_SYMLINK_NOFOLLOW); - if (rv < 0) { - perror(path); - goto out; - } +} - uint16_t mode = sqsh_file_permission(file); - rv = fchmodat(AT_FDCWD, path, mode, AT_SYMLINK_NOFOLLOW); - if (rv < 0) { - perror(path); - goto out; +static int +extract_second_pass( + const char *path, enum SqshFileType type, const struct SqshFile *file) { + if (type == SQSH_FILE_TYPE_DIRECTORY) { + return update_metadata_dir(path, file); + } else { + return 0; } +} - if (do_chown) { - const uint32_t uid = sqsh_file_uid(file); - const uint32_t gid = sqsh_file_gid(file); - - rv = chown(path, uid, gid); - if (rv < 0) { - perror(path); - goto out; - } +static int +extract(const char *path, const struct SqshFile *file, extract_fn func) { + if (path[0] == 0) { + path = "."; } -out: - return rv; + enum SqshFileType type = sqsh_file_type(file); + return func(path, type, file); } static int extract_from_traversal( - const char *target_path, const struct SqshTreeTraversal *iter) { + const char *target_path, const struct SqshTreeTraversal *iter, + extract_fn func) { int rv; char *path = sqsh_tree_traversal_path_dup(iter); enum SqshTreeTraversalState state = sqsh_tree_traversal_state(iter); @@ -276,13 +365,13 @@ extract_from_traversal( print_segment("/", 1); print_segment(path, strlen(path)); } - print_segment("\1", 1); + puts(""); } if (state == SQSH_TREE_TRAVERSAL_STATE_DIRECTORY_BEGIN) { // In case we hit a directory, we create it first. The directory meta // data will be set later - rv = prepare_dir(path); + rv = extract_dir(path); if (rv < 0) { goto out; } @@ -293,7 +382,7 @@ extract_from_traversal( goto out; } - rv = extract(path, file); + rv = extract(path, file, func); // Ignore errors, we want to extract as much as possible. rv = 0; } @@ -304,7 +393,8 @@ extract_from_traversal( } static int -extract_all(const char *target_path, const struct SqshFile *base) { +extract_all( + const char *target_path, const struct SqshFile *base, extract_fn func) { char *path = NULL; int rv = 0; struct SqshTreeTraversal *iter = NULL; @@ -334,7 +424,7 @@ extract_all(const char *target_path, const struct SqshFile *base) { } while (sqsh_tree_traversal_next(iter, &rv)) { - rv = extract_from_traversal(target_path, iter); + rv = extract_from_traversal(target_path, iter, func); if (rv < 0) { goto out; } @@ -359,6 +449,7 @@ main(int argc, char *argv[]) { struct SqshArchive *sqsh; struct SqshFile *src_root = NULL; uint64_t offset = 0; + struct rlimit limits = {0}; if (isatty(STDOUT_FILENO)) { print_segment = print_escaped; } @@ -408,6 +499,22 @@ main(int argc, char *argv[]) { goto out; } + threadpool = sqsh_threadpool_new(0, &rv); + + rv = getrlimit(RLIMIT_NOFILE, &limits); + if (rv < 0) { + perror("getrlimit"); + rv = EXIT_FAILURE; + goto out; + } + // Leave some file descriptors for the rest of the system + rv = sem_init(&file_descriptor_sem, 0, limits.rlim_cur - 32); + if (rv < 0) { + perror("sem_init"); + rv = EXIT_FAILURE; + goto out; + } + src_root = sqsh_lopen(sqsh, src_path, &rv); if (rv < 0) { sqsh_perror(rv, src_path); @@ -423,15 +530,30 @@ main(int argc, char *argv[]) { } if (sqsh_file_type(src_root) != SQSH_FILE_TYPE_DIRECTORY) { - rv = extract(target_path, src_root); + rv = extract(target_path, src_root, extract_first_pass); } else { - rv = extract_all(target_path, src_root); + rv = extract_all(target_path, src_root, extract_first_pass); + } + if (rv < 0) { + rv = EXIT_FAILURE; + goto out; } + rv = sqsh_threadpool_wait(threadpool); if (rv < 0) { rv = EXIT_FAILURE; goto out; } + sqsh_threadpool_free(threadpool); + threadpool = NULL; + + if (sqsh_file_type(src_root) != SQSH_FILE_TYPE_DIRECTORY) { + rv = extract(target_path, src_root, extract_second_pass); + } else { + rv = extract_all(target_path, src_root, extract_second_pass); + } out: + sem_destroy(&file_descriptor_sem); + sqsh_threadpool_free(threadpool); sqsh_close(src_root); sqsh_archive_close(sqsh); return rv;