Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory map: mmap+memcopy instead pread+pwrite #11484

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ auto_option(
PACKAGE_DEPENDS
jemalloc
)
auto_option(MMAP FEATURE_VAR TS_USE_MMAP DEFAULT OFF)
auto_option(
MIMALLOC
FEATURE_VAR
Expand Down
20 changes: 19 additions & 1 deletion include/iocore/aio/AIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
#include "iocore/eventsystem/EventSystem.h"
#include "records/RecProcess.h"

#if TS_USE_MMAP
#include <sys/mman.h>
#endif

static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVersion::PUBLIC);

#define AIO_EVENT_DONE (AIO_EVENT_EVENTS_START + 0)
Expand All @@ -48,7 +52,21 @@ enum AIOBackend {
};

struct ink_aiocb {
int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */
#if TS_USE_MMAP
struct aio_mmap {
void *first = MAP_FAILED; /* file descriptor or status: AIO_NOT_IN_PROGRESS */
void *last = nullptr;
operator char *() const { return (char *)first; }
void
operator=(void *p)
{
first = p;
last = nullptr;
}
} aio_fildes;
#else
int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */
#endif
void *aio_buf = nullptr; /* buffer location */
size_t aio_nbytes = 0; /* length of transfer */
off_t aio_offset = 0; /* file offset */
Expand Down
4 changes: 2 additions & 2 deletions include/iocore/eventsystem/EThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,10 @@ class EThread : public Thread

/** Block of memory to allocate thread specific data e.g. stat system arrays. */
char thread_private[PER_THREAD_DATA];

#if !TS_USE_MMAP
/** Private Data for the Disk Processor. */
DiskHandler *diskHandler = nullptr;

#endif
/** Private Data for AIO. */
Que(Continuation, link) aio_ops;

Expand Down
1 change: 1 addition & 0 deletions include/tscore/ink_config.h.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ const int DEFAULT_STACKSIZE = @DEFAULT_STACK_SIZE@;
#cmakedefine01 TS_USE_ALLOCATOR_METRICS
#cmakedefine01 TS_USE_POSIX_CAP
#cmakedefine01 TS_USE_QUIC
#cmakedefine01 TS_USE_MMAP
JosiahWI marked this conversation as resolved.
Show resolved Hide resolved
#cmakedefine01 TS_USE_REMOTE_UNWINDING
#cmakedefine01 TS_USE_TLS13
#cmakedefine01 TS_USE_TLS_ASYNC
Expand Down
12 changes: 12 additions & 0 deletions src/api/InkAPI.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6868,7 +6868,11 @@ TSHttpTxnClientStreamPriorityGet(TSHttpTxn txnp, TSHttpPriority *priority)
}

TSReturnCode
#if TS_USE_MMAP
TSAIORead(ink_aiocb::aio_mmap &fd, off_t offset, char *buf, size_t buffSize, TSCont contp)
#else
TSAIORead(int fd, off_t offset, char *buf, size_t buffSize, TSCont contp)
#endif
{
sdk_assert(sdk_sanity_check_iocore_structure(contp) == TS_SUCCESS);

Expand All @@ -6887,6 +6891,10 @@ TSAIORead(int fd, off_t offset, char *buf, size_t buffSize, TSCont contp)
pAIO->action = pCont;
pAIO->thread = pCont->mutex->thread_holding;

#if TS_USE_MMAP
pAIO->mutex = pCont->mutex;
#endif

if (ink_aio_read(pAIO, 1) == 1) {
return TS_SUCCESS;
}
Expand All @@ -6909,7 +6917,11 @@ TSAIONBytesGet(TSAIOCallback data)
}

TSReturnCode
#if TS_USE_MMAP
TSAIOWrite(ink_aiocb::aio_mmap &fd, off_t offset, char *buf, const size_t bufSize, TSCont contp)
#else
TSAIOWrite(int fd, off_t offset, char *buf, const size_t bufSize, TSCont contp)
#endif
{
sdk_assert(sdk_sanity_check_iocore_structure(contp) == TS_SUCCESS);

Expand Down
42 changes: 38 additions & 4 deletions src/iocore/aio/AIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
#include "iocore/aio/AIO_fault_injection.h"
#endif

#if TS_USE_MMAP
#define PRI_FD "%p"
#else
#define PRI_FD "%i"
#endif

#define MAX_DISKS_POSSIBLE 100

// globals
Expand Down Expand Up @@ -208,7 +214,11 @@ struct AIOThreadInfo : public Continuation {

/* insert an entry for file descriptor fildes into aio_reqs */
static AIO_Reqs *
#if TS_USE_MMAP
aio_init_fildes(void *fildes, int fromAPI = 0)
#else
aio_init_fildes(int fildes, int fromAPI = 0)
#endif
{
char thr_name[MAX_THREAD_NAME_LENGTH];
int i;
Expand All @@ -222,8 +232,12 @@ aio_init_fildes(int fildes, int fromAPI = 0)
RecInt thread_num;

if (fromAPI) {
request->index = 0;
request->filedes = -1;
request->index = 0;
#if TS_USE_MMAP
request->filedes = MAP_FAILED;
#else
request->filedes = -1;
#endif
aio_reqs[0] = request;
thread_is_created = 1;
thread_num = api_config_threads_per_disk;
Expand All @@ -245,7 +259,7 @@ aio_init_fildes(int fildes, int fromAPI = 0)
} else {
thr_info = new AIOThreadInfo(request, 0);
}
snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:%d]", i, fildes);
snprintf(thr_name, MAX_THREAD_NAME_LENGTH, "[ET_AIO %d:" PRI_FD "]", i, fildes);
ink_assert(eventProcessor.spawn_thread(thr_info, thr_name, stacksize));
}

Expand Down Expand Up @@ -335,10 +349,18 @@ aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
}
op->aio_req = req;
}
#if TS_USE_MMAP
if (fromAPI && (!req || req->filedes != MAP_FAILED)) {
#else
if (fromAPI && (!req || req->filedes != -1)) {
#endif
ink_mutex_acquire(&insert_mutex);
if (aio_reqs[0] == nullptr) {
#if TS_USE_MMAP
req = aio_init_fildes(MAP_FAILED, 1);
#else
req = aio_init_fildes(-1, 1);
#endif
} else {
req = aio_reqs[0];
}
Expand Down Expand Up @@ -367,7 +389,7 @@ aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
static inline int
cache_op(AIOCallbackInternal *op)
{
bool read = (op->aiocb.aio_lio_opcode == LIO_READ);
bool const read = (op->aiocb.aio_lio_opcode == LIO_READ);
for (; op; op = (AIOCallbackInternal *)op->then) {
ink_aiocb *a = &op->aiocb;
ssize_t err, res = 0;
Expand All @@ -378,15 +400,27 @@ cache_op(AIOCallbackInternal *op)
#ifdef AIO_FAULT_INJECTION
err = aioFaultInjection.pread(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res,
a->aio_offset + res);
#else
#if TS_USE_MMAP
ink_assert((static_cast<char const *>(a->aio_fildes.first) + a->aio_offset + a->aio_nbytes) <= a->aio_fildes.last);
monkuta marked this conversation as resolved.
Show resolved Hide resolved
memcpy(static_cast<char *>(a->aio_buf) + res, static_cast<char const *>(a->aio_fildes.first) + a->aio_offset + res,
err = a->aio_nbytes - res);
#else
err = pread(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res);
#endif
#endif
} else {
#ifdef AIO_FAULT_INJECTION
err = aioFaultInjection.pwrite(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res,
a->aio_offset + res);
#else
#if TS_USE_MMAP
ink_assert((static_cast<char const *>(a->aio_fildes.first) + a->aio_offset + a->aio_nbytes) <= a->aio_fildes.last);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto here

memcpy(static_cast<char *>(a->aio_fildes.first) + a->aio_offset + res, static_cast<char const *>(a->aio_buf) + res,
err = a->aio_nbytes - res);
#else
err = pwrite(a->aio_fildes, (static_cast<char *>(a->aio_buf)) + res, a->aio_nbytes - res, a->aio_offset + res);
#endif
#endif
}
} while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
Expand Down
14 changes: 9 additions & 5 deletions src/iocore/aio/P_AIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,15 @@ struct AIO_Reqs {
ASLL(AIOCallbackInternal, alink) aio_temp_list;
ink_mutex aio_mutex;
ink_cond aio_cond;
int index = 0; /* position of this struct in the aio_reqs array */
int pending = 0; /* number of outstanding requests on the disk */
int queued = 0; /* total number of aio_todo requests */
int filedes = -1; /* the file descriptor for the requests or status IO_NOT_IN_PROGRESS */
int requests_queued = 0;
int index = 0; /* position of this struct in the aio_reqs array */
int pending = 0; /* number of outstanding requests on the disk */
int queued = 0; /* total number of aio_todo requests */
#if TS_USE_MMAP
void *filedes = MAP_FAILED; /* the file descriptor for the requests or status IO_NOT_IN_PROGRESS */
#else
int filedes = -1; /* the file descriptor for the requests or status IO_NOT_IN_PROGRESS */
#endif
int requests_queued = 0;
};

TS_INLINE int
Expand Down
3 changes: 3 additions & 0 deletions src/iocore/aio/test_AIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ AIO_Device::do_fd(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
io->aiocb.aio_offset = seq_read_point;
io->aiocb.aio_nbytes = seq_read_size;
io->aiocb.aio_lio_opcode = LIO_READ;
#if TS_USE_MMAP
io->mutex = mutex;
#endif
ink_assert(ink_aio_read(io.get()) >= 0);
seq_read_point += seq_read_size;
if (seq_read_point > max_offset) {
Expand Down
23 changes: 17 additions & 6 deletions src/iocore/cache/CacheDir.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
#define DIR_LOOP_THRESHOLD 1000
#endif

#if TS_USE_MMAP
#define PRI_FD "%p"
#else
#define PRI_FD "%i"
#endif

namespace
{

Expand Down Expand Up @@ -574,7 +580,7 @@ dir_probe(const CacheKey *key, Stripe *stripe, Dir *result, Dir **last_collision
goto Lcont;
}
if (dir_valid(stripe, e)) {
DDbg(dbg_ctl_dir_probe_hit, "found %X %X vol %d bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1),
DDbg(dbg_ctl_dir_probe_hit, "found %X %X vol " PRI_FD " bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1),
stripe->fd, b, dir_offset(e));
dir_assign(result, e);
*last_collision = e;
Expand All @@ -601,7 +607,8 @@ dir_probe(const CacheKey *key, Stripe *stripe, Dir *result, Dir **last_collision
collision = nullptr;
goto Lagain;
}
DDbg(dbg_ctl_dir_probe_miss, "missed %X %X on vol %d bucket %d at %p", key->slice32(0), key->slice32(1), stripe->fd, b, seg);
DDbg(dbg_ctl_dir_probe_miss, "missed %X %X on vol " PRI_FD " bucket %d at %p", key->slice32(0), key->slice32(1), stripe->fd, b,
seg);
CHECK_DIR(d);
return 0;
}
Expand Down Expand Up @@ -664,8 +671,8 @@ dir_insert(const CacheKey *key, Stripe *stripe, Dir *to_part)
dir_assign_data(e, to_part);
dir_set_tag(e, key->slice32(2));
ink_assert(stripe->vol_offset(e) < (stripe->skip + stripe->len));
DDbg(dbg_ctl_dir_insert, "insert %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), stripe->fd,
bi, e, key->slice32(1), dir_tag(e), dir_offset(e));
DDbg(dbg_ctl_dir_insert, "insert %p %X into vol " PRI_FD " bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0),
stripe->fd, bi, e, key->slice32(1), dir_tag(e), dir_offset(e));
CHECK_DIR(d);
stripe->header->dirty = 1;
Metrics::Gauge::increment(cache_rsb.direntries_used);
Expand Down Expand Up @@ -754,8 +761,8 @@ dir_overwrite(const CacheKey *key, Stripe *stripe, Dir *dir, Dir *overwrite, boo
dir_assign_data(e, dir);
dir_set_tag(e, t);
ink_assert(stripe->vol_offset(e) < stripe->skip + stripe->len);
DDbg(dbg_ctl_dir_overwrite, "overwrite %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0),
stripe->fd, bi, e, t, dir_tag(e), dir_offset(e));
DDbg(dbg_ctl_dir_overwrite, "overwrite %p %X into vol " PRI_FD " bucket %d at %p tag %X %X boffset %" PRId64 "", e,
key->slice32(0), stripe->fd, bi, e, t, dir_tag(e), dir_offset(e));
CHECK_DIR(d);
stripe->header->dirty = 1;
return res;
Expand Down Expand Up @@ -920,7 +927,11 @@ dir_sync_init()
}

void
#if TS_USE_MMAP
CacheSync::aio_write(ink_aiocb::aio_mmap &fd, char *b, int n, off_t o)
#else
CacheSync::aio_write(int fd, char *b, int n, off_t o)
#endif
{
io.aiocb.aio_fildes = fd;
io.aiocb.aio_offset = o;
Expand Down
25 changes: 20 additions & 5 deletions src/iocore/cache/CacheDisk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
#include "P_Cache.h"
#include "P_CacheVol.h"

#if TS_USE_MMAP
#define PRI_FD "%p"
#else
#define PRI_FD "%i"
#endif

void
CacheDisk::incrErrors(const AIOCallback *io)
{
Expand All @@ -37,7 +43,7 @@ CacheDisk::incrErrors(const AIOCallback *io)

const char *opname = "unknown";
int opcode = io->aiocb.aio_lio_opcode;
int fd = io->aiocb.aio_fildes;
auto fd = io->aiocb.aio_fildes;
switch (io->aiocb.aio_lio_opcode) {
case LIO_READ:
opname = "READ";
Expand All @@ -50,17 +56,23 @@ CacheDisk::incrErrors(const AIOCallback *io)
default:
break;
}
Warning("failed operation: %s (opcode=%d), span: %s (fd=%d)", opname, opcode, path, fd);
Warning("failed operation: %s (opcode=%d), span: %s (fd=" PRI_FD ")", opname, opcode, path, fd);
}

int
CacheDisk::open(char *s, off_t blocks, off_t askip, int ahw_sector_size, int fildes, bool clear)
{
path = ats_strdup(s);
hw_sector_size = ahw_sector_size;
fd = fildes;
skip = askip;
start = skip;
#if TS_USE_MMAP
fd.first = mmap(0, blocks * STORE_BLOCK_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, fildes, 0);
fd.last = fd.first + blocks * STORE_BLOCK_SIZE - 1;
ink_assert(MAP_FAILED != fd);
#else
fd = fildes;
#endif
skip = askip;
start = skip;
/* we can't use fractions of store blocks. */
len = blocks;
io.aiocb.aio_fildes = fd;
Expand Down Expand Up @@ -100,6 +112,9 @@ CacheDisk::open(char *s, off_t blocks, off_t askip, int ahw_sector_size, int fil

// read disk header
SET_HANDLER(&CacheDisk::openStart);
#if TS_USE_MMAP
io.mutex = mutex;
#endif
io.aiocb.aio_offset = skip;
io.aiocb.aio_buf = reinterpret_cast<char *>(header);
io.aiocb.aio_nbytes = header_len;
Expand Down
6 changes: 6 additions & 0 deletions src/iocore/cache/CacheVC.cc
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,9 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
io.action = this;
io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
SET_HANDLER(&CacheVC::handleReadDone);
#if TS_USE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io) >= 0);

// ToDo: Why are these for debug only ??
Expand Down Expand Up @@ -899,6 +902,9 @@ CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
io.aiocb.aio_nbytes = stripe->skip + stripe->len - io.aiocb.aio_offset;
}
offset = 0;
#if TS_USE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io) >= 0);
Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this, (int64_t)io.aiocb.aio_offset,
(size_t)io.aiocb.aio_nbytes);
Expand Down
3 changes: 3 additions & 0 deletions src/iocore/cache/CacheWrite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,9 @@ Stripe::evac_range(off_t low, off_t high, int evac_phase)
io.thread = AIO_CALLBACK_THREAD_ANY;
DDbg(dbg_ctl_cache_evac, "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
SET_HANDLER(&Stripe::evacuateDocReadDone);
#if TS_USE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io) >= 0);
return -1;
}
Expand Down
Loading