diff --git a/CMakeLists.txt b/CMakeLists.txt index 576d1e53169..f196c1861d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -90,6 +90,7 @@ auto_option( PACKAGE_DEPENDS jemalloc ) +auto_option(MMAP FEATURE_VAR TS_USE_MMAP DEFAULT OFF) auto_option( MIMALLOC FEATURE_VAR diff --git a/include/iocore/aio/AIO.h b/include/iocore/aio/AIO.h index fba6925faaa..5b571272dd0 100644 --- a/include/iocore/aio/AIO.h +++ b/include/iocore/aio/AIO.h @@ -34,6 +34,10 @@ #include "iocore/eventsystem/EventSystem.h" #include "records/RecProcess.h" +#if TS_USE_MMAP +#include +#endif + static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVersion::PUBLIC); #define AIO_EVENT_DONE (AIO_EVENT_EVENTS_START + 0) @@ -48,7 +52,21 @@ enum AIOBackend { }; struct ink_aiocb { - int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */ +#if TS_USE_MMAP + struct aio_mmap { + void *first = MAP_FAILED; /* file descriptor or status: AIO_NOT_IN_PROGRESS */ + void *last = nullptr; + operator char *() const { return (char *)first; } + void + operator=(void *p) + { + first = p; + last = nullptr; + } + } aio_fildes; +#else + int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */ +#endif void *aio_buf = nullptr; /* buffer location */ size_t aio_nbytes = 0; /* length of transfer */ off_t aio_offset = 0; /* file offset */ diff --git a/include/iocore/eventsystem/EThread.h b/include/iocore/eventsystem/EThread.h index 8e404e7540b..b08f2a3cf01 100644 --- a/include/iocore/eventsystem/EThread.h +++ b/include/iocore/eventsystem/EThread.h @@ -326,10 +326,10 @@ class EThread : public Thread /** Block of memory to allocate thread specific data e.g. stat system arrays. */ char thread_private[PER_THREAD_DATA]; - +#if !TS_USE_MMAP /** Private Data for the Disk Processor. */ DiskHandler *diskHandler = nullptr; - +#endif /** Private Data for AIO. */ Que(Continuation, link) aio_ops; diff --git a/include/tscore/ink_config.h.cmake.in b/include/tscore/ink_config.h.cmake.in index 260ea7a2447..6859b08131c 100644 --- a/include/tscore/ink_config.h.cmake.in +++ b/include/tscore/ink_config.h.cmake.in @@ -151,6 +151,7 @@ const int DEFAULT_STACKSIZE = @DEFAULT_STACK_SIZE@; #cmakedefine01 TS_USE_ALLOCATOR_METRICS #cmakedefine01 TS_USE_POSIX_CAP #cmakedefine01 TS_USE_QUIC +#cmakedefine01 TS_USE_MMAP #cmakedefine01 TS_USE_REMOTE_UNWINDING #cmakedefine01 TS_USE_TLS13 #cmakedefine01 TS_USE_TLS_ASYNC diff --git a/src/api/InkAPI.cc b/src/api/InkAPI.cc index 4e81bef41f4..1cb021e7786 100644 --- a/src/api/InkAPI.cc +++ b/src/api/InkAPI.cc @@ -6868,7 +6868,11 @@ TSHttpTxnClientStreamPriorityGet(TSHttpTxn txnp, TSHttpPriority *priority) } TSReturnCode +#if TS_USE_MMAP +TSAIORead(ink_aiocb::aio_mmap &fd, off_t offset, char *buf, size_t buffSize, TSCont contp) +#else TSAIORead(int fd, off_t offset, char *buf, size_t buffSize, TSCont contp) +#endif { sdk_assert(sdk_sanity_check_iocore_structure(contp) == TS_SUCCESS); @@ -6887,6 +6891,10 @@ TSAIORead(int fd, off_t offset, char *buf, size_t buffSize, TSCont contp) pAIO->action = pCont; pAIO->thread = pCont->mutex->thread_holding; +#if TS_USE_MMAP + pAIO->mutex = pCont->mutex; +#endif + if (ink_aio_read(pAIO, 1) == 1) { return TS_SUCCESS; } @@ -6909,7 +6917,11 @@ TSAIONBytesGet(TSAIOCallback data) } TSReturnCode +#if TS_USE_MMAP +TSAIOWrite(ink_aiocb::aio_mmap &fd, off_t offset, char *buf, const size_t bufSize, TSCont contp) +#else TSAIOWrite(int fd, off_t offset, char *buf, const size_t bufSize, TSCont contp) +#endif { sdk_assert(sdk_sanity_check_iocore_structure(contp) == TS_SUCCESS); diff --git a/src/iocore/aio/AIO.cc b/src/iocore/aio/AIO.cc index c4990571037..7831d60b0e6 100644 --- a/src/iocore/aio/AIO.cc +++ b/src/iocore/aio/AIO.cc @@ -36,6 +36,12 @@ #include "iocore/aio/AIO_fault_injection.h" #endif +#if TS_USE_MMAP +#define PRI_FD "%p" +#else +#define PRI_FD "%i" +#endif + #define MAX_DISKS_POSSIBLE 100 // globals @@ -208,7 +214,11 @@ struct AIOThreadInfo : public Continuation { /* insert an entry for file descriptor fildes into aio_reqs */ static AIO_Reqs * +#if TS_USE_MMAP +aio_init_fildes(void *fildes, int fromAPI = 0) +#else aio_init_fildes(int fildes, int fromAPI = 0) +#endif { char thr_name[MAX_THREAD_NAME_LENGTH]; int i; @@ -222,8 +232,12 @@ aio_init_fildes(int fildes, int fromAPI = 0) RecInt thread_num; if (fromAPI) { - request->index = 0; - request->filedes = -1; + request->index = 0; +#if TS_USE_MMAP + request->filedes = MAP_FAILED; +#else + request->filedes = -1; +#endif aio_reqs[0] = request; thread_is_created = 1; thread_num = api_config_threads_per_disk; @@ -245,7 +259,7 @@ aio_init_fildes(int fildes, int fromAPI = 0) } else { thr_info = new AIOThreadInfo(request, 0); } - snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes); + snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:" PRI_FD "]", i, fildes); ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize)); } @@ -335,10 +349,18 @@ aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0) } op->aio_req = req; } +#if TS_USE_MMAP + if (fromAPI && (!req || req->filedes != MAP_FAILED)) { +#else if (fromAPI && (!req || req->filedes != -1)) { +#endif ink_mutex_acquire(&insert_mutex); if (aio_reqs[0] == nullptr) { +#if TS_USE_MMAP + req = aio_init_fildes(MAP_FAILED, 1); +#else req = aio_init_fildes(-1, 1); +#endif } else { req = aio_reqs[0]; } @@ -367,7 +389,7 @@ aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0) static inline int cache_op(AIOCallbackInternal *op) { - bool read = (op->aiocb.aio_lio_opcode == LIO_READ); + bool const read = (op->aiocb.aio_lio_opcode == LIO_READ); for (; op; op = (AIOCallbackInternal *)op->then) { ink_aiocb *a = &op->aiocb; ssize_t err, res = 0; @@ -378,15 +400,33 @@ cache_op(AIOCallbackInternal *op) #ifdef AIO_FAULT_INJECTION err = aioFaultInjection.pread(a->aio_fildes, (static_cast(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res); +#else +#if TS_USE_MMAP + if ((static_cast(a->aio_fildes.first) + a->aio_offset + a->aio_nbytes) <= a->aio_fildes.last) { + memcpy(static_cast(a->aio_buf) + res, static_cast(a->aio_fildes.first) + a->aio_offset + res, + err = a->aio_nbytes - res); + } else { + err = -1; + } #else err = pread(a->aio_fildes, (static_cast(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res); +#endif #endif } else { #ifdef AIO_FAULT_INJECTION err = aioFaultInjection.pwrite(a->aio_fildes, (static_cast(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res); +#else +#if TS_USE_MMAP + if ((static_cast(a->aio_fildes.first) + a->aio_offset + a->aio_nbytes) <= a->aio_fildes.last) { + memcpy(static_cast(a->aio_buf) + res, static_cast(a->aio_fildes.first) + a->aio_offset + res, + err = a->aio_nbytes - res); + } else { + err = -1; + } #else err = pwrite(a->aio_fildes, (static_cast(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res); +#endif #endif } } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM)); diff --git a/src/iocore/aio/P_AIO.h b/src/iocore/aio/P_AIO.h index 0e1e6654878..c651e91c79d 100644 --- a/src/iocore/aio/P_AIO.h +++ b/src/iocore/aio/P_AIO.h @@ -84,11 +84,15 @@ struct AIO_Reqs { ASLL(AIOCallbackInternal, alink) aio_temp_list; ink_mutex aio_mutex; ink_cond aio_cond; - int index = 0; /* position of this struct in the aio_reqs array */ - int pending = 0; /* number of outstanding requests on the disk */ - int queued = 0; /* total number of aio_todo requests */ - int filedes = -1; /* the file descriptor for the requests or status IO_NOT_IN_PROGRESS */ - int requests_queued = 0; + int index = 0; /* position of this struct in the aio_reqs array */ + int pending = 0; /* number of outstanding requests on the disk */ + int queued = 0; /* total number of aio_todo requests */ +#if TS_USE_MMAP + void *filedes = MAP_FAILED; /* the file descriptor for the requests or status IO_NOT_IN_PROGRESS */ +#else + int filedes = -1; /* the file descriptor for the requests or status IO_NOT_IN_PROGRESS */ +#endif + int requests_queued = 0; }; TS_INLINE int diff --git a/src/iocore/aio/test_AIO.cc b/src/iocore/aio/test_AIO.cc index 065407bd9e9..5bbcdf7a300 100644 --- a/src/iocore/aio/test_AIO.cc +++ b/src/iocore/aio/test_AIO.cc @@ -302,6 +302,9 @@ AIO_Device::do_fd(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) io->aiocb.aio_offset = seq_read_point; io->aiocb.aio_nbytes = seq_read_size; io->aiocb.aio_lio_opcode = LIO_READ; +#if TS_USE_MMAP + io->mutex = mutex; +#endif ink_assert(ink_aio_read(io.get()) >= 0); seq_read_point += seq_read_size; if (seq_read_point > max_offset) { diff --git a/src/iocore/cache/CacheDir.cc b/src/iocore/cache/CacheDir.cc index 0cb0d80486d..547d256fd4b 100644 --- a/src/iocore/cache/CacheDir.cc +++ b/src/iocore/cache/CacheDir.cc @@ -32,6 +32,12 @@ #define DIR_LOOP_THRESHOLD 1000 #endif +#if TS_USE_MMAP +#define PRI_FD "%p" +#else +#define PRI_FD "%i" +#endif + namespace { @@ -574,7 +580,7 @@ dir_probe(const CacheKey *key, Stripe *stripe, Dir *result, Dir **last_collision goto Lcont; } if (dir_valid(stripe, e)) { - DDbg(dbg_ctl_dir_probe_hit, "found %X %X vol %d bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1), + DDbg(dbg_ctl_dir_probe_hit, "found %X %X vol " PRI_FD " bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1), stripe->fd, b, dir_offset(e)); dir_assign(result, e); *last_collision = e; @@ -601,7 +607,8 @@ dir_probe(const CacheKey *key, Stripe *stripe, Dir *result, Dir **last_collision collision = nullptr; goto Lagain; } - DDbg(dbg_ctl_dir_probe_miss, "missed %X %X on vol %d bucket %d at %p", key->slice32(0), key->slice32(1), stripe->fd, b, seg); + DDbg(dbg_ctl_dir_probe_miss, "missed %X %X on vol " PRI_FD " bucket %d at %p", key->slice32(0), key->slice32(1), stripe->fd, b, + seg); CHECK_DIR(d); return 0; } @@ -664,8 +671,8 @@ dir_insert(const CacheKey *key, Stripe *stripe, Dir *to_part) dir_assign_data(e, to_part); dir_set_tag(e, key->slice32(2)); ink_assert(stripe->vol_offset(e) < (stripe->skip + stripe->len)); - DDbg(dbg_ctl_dir_insert, "insert %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), stripe->fd, - bi, e, key->slice32(1), dir_tag(e), dir_offset(e)); + DDbg(dbg_ctl_dir_insert, "insert %p %X into vol " PRI_FD " bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), + stripe->fd, bi, e, key->slice32(1), dir_tag(e), dir_offset(e)); CHECK_DIR(d); stripe->header->dirty = 1; Metrics::Gauge::increment(cache_rsb.direntries_used); @@ -754,8 +761,8 @@ dir_overwrite(const CacheKey *key, Stripe *stripe, Dir *dir, Dir *overwrite, boo dir_assign_data(e, dir); dir_set_tag(e, t); ink_assert(stripe->vol_offset(e) < stripe->skip + stripe->len); - DDbg(dbg_ctl_dir_overwrite, "overwrite %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), - stripe->fd, bi, e, t, dir_tag(e), dir_offset(e)); + DDbg(dbg_ctl_dir_overwrite, "overwrite %p %X into vol " PRI_FD " bucket %d at %p tag %X %X boffset %" PRId64 "", e, + key->slice32(0), stripe->fd, bi, e, t, dir_tag(e), dir_offset(e)); CHECK_DIR(d); stripe->header->dirty = 1; return res; @@ -920,7 +927,11 @@ dir_sync_init() } void +#if TS_USE_MMAP +CacheSync::aio_write(ink_aiocb::aio_mmap &fd, char *b, int n, off_t o) +#else CacheSync::aio_write(int fd, char *b, int n, off_t o) +#endif { io.aiocb.aio_fildes = fd; io.aiocb.aio_offset = o; diff --git a/src/iocore/cache/CacheDisk.cc b/src/iocore/cache/CacheDisk.cc index 484a0ae6316..36ef1157232 100644 --- a/src/iocore/cache/CacheDisk.cc +++ b/src/iocore/cache/CacheDisk.cc @@ -24,6 +24,12 @@ #include "P_Cache.h" #include "P_CacheVol.h" +#if TS_USE_MMAP +#define PRI_FD "%p" +#else +#define PRI_FD "%i" +#endif + void CacheDisk::incrErrors(const AIOCallback *io) { @@ -37,7 +43,7 @@ CacheDisk::incrErrors(const AIOCallback *io) const char *opname = "unknown"; int opcode = io->aiocb.aio_lio_opcode; - int fd = io->aiocb.aio_fildes; + auto fd = io->aiocb.aio_fildes; switch (io->aiocb.aio_lio_opcode) { case LIO_READ: opname = "READ"; @@ -50,7 +56,7 @@ CacheDisk::incrErrors(const AIOCallback *io) default: break; } - Warning("failed operation: %s (opcode=%d), span: %s (fd=%d)", opname, opcode, path, fd); + Warning("failed operation: %s (opcode=%d), span: %s (fd=" PRI_FD ")", opname, opcode, path, fd); } int @@ -58,9 +64,15 @@ CacheDisk::open(char *s, off_t blocks, off_t askip, int ahw_sector_size, int fil { path = ats_strdup(s); hw_sector_size = ahw_sector_size; - fd = fildes; - skip = askip; - start = skip; +#if TS_USE_MMAP + fd.first = mmap(0, blocks * STORE_BLOCK_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fildes, 0); + fd.last = fd.first + blocks * STORE_BLOCK_SIZE - 1; + ink_assert(MAP_FAILED != fd); +#else + fd = fildes; +#endif + skip = askip; + start = skip; /* we can't use fractions of store blocks. */ len = blocks; io.aiocb.aio_fildes = fd; @@ -100,6 +112,9 @@ CacheDisk::open(char *s, off_t blocks, off_t askip, int ahw_sector_size, int fil // read disk header SET_HANDLER(&CacheDisk::openStart); +#if TS_USE_MMAP + io.mutex = mutex; +#endif io.aiocb.aio_offset = skip; io.aiocb.aio_buf = reinterpret_cast(header); io.aiocb.aio_nbytes = header_len; diff --git a/src/iocore/cache/CacheVC.cc b/src/iocore/cache/CacheVC.cc index 12ef88ef327..de2887e4eeb 100644 --- a/src/iocore/cache/CacheVC.cc +++ b/src/iocore/cache/CacheVC.cc @@ -494,6 +494,9 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) io.action = this; io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding; SET_HANDLER(&CacheVC::handleReadDone); +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io) >= 0); // ToDo: Why are these for debug only ?? @@ -899,6 +902,9 @@ CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) io.aiocb.aio_nbytes = stripe->skip + stripe->len - io.aiocb.aio_offset; } offset = 0; +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io) >= 0); Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this, (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes); diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc index dec0da46872..cfc25823f56 100644 --- a/src/iocore/cache/CacheWrite.cc +++ b/src/iocore/cache/CacheWrite.cc @@ -631,6 +631,9 @@ Stripe::evac_range(off_t low, off_t high, int evac_phase) io.thread = AIO_CALLBACK_THREAD_ANY; DDbg(dbg_ctl_cache_evac, "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir)); SET_HANDLER(&Stripe::evacuateDocReadDone); +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io) >= 0); return -1; } diff --git a/src/iocore/cache/P_CacheDir.h b/src/iocore/cache/P_CacheDir.h index e9473984e8d..e51e64edba4 100644 --- a/src/iocore/cache/P_CacheDir.h +++ b/src/iocore/cache/P_CacheDir.h @@ -259,7 +259,11 @@ struct CacheSync : public Continuation { Event *trigger = nullptr; ink_hrtime start_time = 0; int mainEvent(int event, Event *e); - void aio_write(int fd, char *b, int n, off_t o); +#if TS_USE_MMAP + void aio_write(ink_aiocb::aio_mmap &fd, char *b, int n, off_t o); +#else + void aio_write(int fd, char *b, int n, off_t o); +#endif CacheSync() : Continuation(new_ProxyMutex()) { SET_HANDLER(&CacheSync::mainEvent); } }; diff --git a/src/iocore/cache/P_CacheDisk.h b/src/iocore/cache/P_CacheDisk.h index f3bcea37e4f..fabc13f17a3 100644 --- a/src/iocore/cache/P_CacheDisk.h +++ b/src/iocore/cache/P_CacheDisk.h @@ -83,15 +83,19 @@ struct CacheDisk : public Continuation { off_t skip = 0; off_t num_usable_blocks = 0; int hw_sector_size = 0; - int fd = -1; - off_t free_space = 0; - off_t wasted_space = 0; - DiskStripe **disk_stripes = nullptr; - DiskStripe *free_blocks = nullptr; - int num_errors = 0; - int cleared = 0; - bool read_only_p = false; - bool online = true; /* flag marking cache disk online or offline (because of too many failures or by the operator). */ +#if TS_USE_MMAP + ink_aiocb::aio_mmap fd = {MAP_FAILED, nullptr}; +#else + int fd = -1; +#endif + off_t free_space = 0; + off_t wasted_space = 0; + DiskStripe **disk_stripes = nullptr; + DiskStripe *free_blocks = nullptr; + int num_errors = 0; + int cleared = 0; + bool read_only_p = false; + bool online = true; /* flag marking cache disk online or offline (because of too many failures or by the operator). */ // Extra configuration values int forced_volume_num = -1; ///< Volume number for this disk. diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h index aa3a5a86148..b369e900e07 100644 --- a/src/iocore/cache/P_CacheVol.h +++ b/src/iocore/cache/P_CacheVol.h @@ -43,24 +43,29 @@ #define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y)) // Stripe -#define STRIPE_MAGIC 0xF1D0F00D -#define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE -#define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE) -#define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB -#define STRIPE_BLOCK_SIZE (1024 * 1024 * 128) // 128MB -#define MIN_STRIPE_SIZE STRIPE_BLOCK_SIZE -#define MAX_STRIPE_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024) // 512TB -#define MAX_FRAG_SIZE (AGG_SIZE - sizeof(Doc)) // true max -#define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE -#define PIN_SCAN_EVERY 16 // scan every 1/16 of disk -#define STRIPE_HASH_TABLE_SIZE 32707 -#define STRIPE_HASH_EMPTY 0xFFFF -#define STRIPE_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit -#define LOOKASIDE_SIZE 256 -#define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB -#define RECOVERY_SIZE EVACUATION_SIZE // 8MB -#define AIO_NOT_IN_PROGRESS -1 -#define AIO_AGG_WRITE_IN_PROGRESS -2 +#define STRIPE_MAGIC 0xF1D0F00D +#define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE +#define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE) +#define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB +#define STRIPE_BLOCK_SIZE (1024 * 1024 * 128) // 128MB +#define MIN_STRIPE_SIZE STRIPE_BLOCK_SIZE +#define MAX_STRIPE_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024) // 512TB +#define MAX_FRAG_SIZE (AGG_SIZE - sizeof(Doc)) // true max +#define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE +#define PIN_SCAN_EVERY 16 // scan every 1/16 of disk +#define STRIPE_HASH_TABLE_SIZE 32707 +#define STRIPE_HASH_EMPTY 0xFFFF +#define STRIPE_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit +#define LOOKASIDE_SIZE 256 +#define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB +#define RECOVERY_SIZE EVACUATION_SIZE // 8MB +#if TS_USE_MMAP +#define AIO_NOT_IN_PROGRESS reinterpret_cast(-1) +#define AIO_AGG_WRITE_IN_PROGRESS reinterpret_cast(-2) +#else +#define AIO_NOT_IN_PROGRESS -1 +#define AIO_AGG_WRITE_IN_PROGRESS -2 +#endif #define AUTO_SIZE_RAM_CACHE -1 // 1-1 with directory size #define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeof(Doc)) // 1MB #define STORE_BLOCKS_PER_STRIPE (STRIPE_BLOCK_SIZE / STORE_BLOCK_SIZE) @@ -131,7 +136,11 @@ class Stripe : public Continuation char *path = nullptr; ats_scoped_str hash_text; CryptoHash hash_id; - int fd = -1; +#if TS_USE_MMAP + ink_aiocb::aio_mmap fd = {MAP_FAILED, nullptr}; +#else + int fd = -1; +#endif char *raw_dir = nullptr; Dir *dir = nullptr; diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc index e9feed52b3c..f64143c9d9f 100644 --- a/src/iocore/cache/Stripe.cc +++ b/src/iocore/cache/Stripe.cc @@ -158,6 +158,9 @@ Stripe::clear_dir_aio() io.action = this; io.thread = AIO_CALLBACK_THREAD_ANY; io.then = nullptr; +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_write(&io)); return 0; @@ -171,12 +174,14 @@ Stripe::clear_dir() { size_t dir_len = this->dirlen(); this->_clear_init(); - +#if TS_USE_MMAP + memcpy(static_cast(this->fd) + this->skip, this->raw_dir, dir_len); +#else if (pwrite(this->fd, this->raw_dir, dir_len, this->skip) < 0) { Warning("unable to clear cache directory '%s'", this->hash_text.get()); return -1; } - +#endif return 0; } @@ -269,7 +274,11 @@ Stripe::handle_dir_clear(int event, void *data) if (!op->ok()) { Warning("unable to clear cache directory '%s'", hash_text.get()); disk->incrErrors(op); +#if TS_USE_MMAP + fd = MAP_FAILED; +#else fd = -1; +#endif } if (op->aiocb.aio_nbytes == dir_len) { @@ -278,6 +287,9 @@ Stripe::handle_dir_clear(int event, void *data) skip + len */ op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter)); op->aiocb.aio_offset = skip + dir_len; +#if TS_USE_MMAP + ink_assert(op->mutex); +#endif ink_assert(ink_aio_write(op)); return EVENT_DONE; } @@ -544,6 +556,9 @@ Stripe::handle_recover_from_data(int event, void * /* data ATS_UNUSED */) } prev_recover_pos = recover_pos; io.aiocb.aio_offset = recover_pos; +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io)); return EVENT_CONT; @@ -672,6 +687,9 @@ Stripe::handle_header_read(int event, void *data) Note("using directory A for '%s'", hash_text.get()); } io.aiocb.aio_offset = skip; +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io)); } // try B @@ -681,6 +699,9 @@ Stripe::handle_header_read(int event, void *data) Note("using directory B for '%s'", hash_text.get()); } io.aiocb.aio_offset = skip + this->dirlen(); +#if TS_USE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io)); } else { Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get()); @@ -708,7 +729,11 @@ Stripe::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */) ink_assert(!gstripes[i]); gstripes[i] = this; SET_HANDLER(&Stripe::aggWrite); +#if TS_USE_MMAP + cache->vol_initialized(fd != MAP_FAILED); +#else cache->vol_initialized(fd != -1); +#endif return EVENT_DONE; } } @@ -1005,7 +1030,12 @@ Stripe::shutdown(EThread *shutdown_thread) CHECK_DIR(d); size_t B = this->header->sync_serial & 1; off_t start = this->skip + (B ? dirlen : 0); - B = pwrite(this->fd, this->raw_dir, dirlen, start); +#if TS_USE_MMAP + B = dirlen; + memcpy(static_cast(this->fd) + start, this->raw_dir, dirlen); +#else + B = pwrite(this->fd, this->raw_dir, dirlen, start); +#endif ink_assert(B == dirlen); Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", this->hash_text.get()); } @@ -1016,9 +1046,20 @@ Stripe::flush_aggregate_write_buffer() // set write limit this->header->agg_pos = this->header->write_pos + this->_write_buffer.get_buffer_pos(); +#if TS_USE_MMAP + printf("flush_aggregate_write_buffer"); + int r = this->get_agg_buf_pos(); + ink_assert(static_cast(this->fd) + this->header->write_pos + this->_write_buffer.get_buffer_pos() <= this->fd.last); + memcpy(static_cast(this->fd.first) + this->header->write_pos, this->_write_buffer.get_buffer(), + this->_write_buffer.get_buffer_pos()); + if (r != get_agg_buf_pos()) { + ink_assert(!"flushing agg buffer failed"); + } +#else if (!this->_write_buffer.flush(this->fd, this->header->write_pos)) { return false; } +#endif this->header->last_write_pos = this->header->write_pos; this->header->write_pos += this->_write_buffer.get_buffer_pos(); ink_assert(this->header->write_pos == this->header->agg_pos);