Skip to content

Commit

Permalink
Merge pull request #1456 from actonlang/rq
Browse files Browse the repository at this point in the history
Per thread malloc and queue optimizations
  • Loading branch information
plajjan authored Aug 28, 2023
2 parents e7a296f + 11aa0da commit dee278b
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 112 deletions.
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,40 @@
single files [#1438]
- It is now possible to have mutually recursive definitions between function
defs, classes and actors [#1433] [#1435]
- RTS now uses a per thread malloc implementation [#1456]
- `malloc` is part of libgc which has two flavours, a global malloc with a
lock and a per thread allocator
- During the restructuring to use Zigs build system, the compilation option to
use the per thread allocator was lost. It is now back!
- For parallel workloads across multiple RTS worker threads, there is a clear
bottleneck around malloc... not very surprisingly ;)
- RTS readyQ and per actor message queue has been optimized [#1456]
- There is now a tail pointer which significantly speeds up insertions
- The fields have been slightly reorganized to allow the most important fields
early so it can be fetched in one cacheline
- Before, having many concurrent actors with many outstanding messages would
scale poorly, on an AMD 5950X running the pairs benchmark:
- before:
- 1 actor pair & 1 token: ~4M continuations per second
- 10 actor pair & 1 token: ~650K continuations per second
- 1000 actor pair & 1 token: ~280K continuations per second
- 1000 actor pair & 500 token: ~280K continuations per second
- after readyQ tail optimization:
- 1 actor pair & 1 token: ~3.8M continuations per second
- 1000 actor pair & 1 token: ~3.6M continuations per second
- 1000 actor pair & 500 token: ~700K continuations per second
- after msg queue tail optimization:
- 1 actor pair & 1 token: ~3.8M continuations per second
- 1000 actor pair & 1 token: ~3.6M continuations per second
- 1000 actor pair & 500 token: ~3.6M continuations per second
- This is an artificial benchmark where there is extremely little real work
done, so continuation switching becomes the dominant load. Thus, the
bottleneck becomes the global readyQ. Using more than 1 worker thread only
leads to lock contention and so the benchmarks are for 1 worker thread.
- Since the actor message queue is per actor, it is possible that multiple
worker threads could work faster but in practice they step on each others
toes enough around the global readyQ that it is slower overall.


### Changed
- `net.TCPIPConnection` is removed and replaced by `net.TCPConnection`
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ dist/depsout/lib/libbsdnt.a: dist/deps/libbsdnt $(DIST_ZIG)
cd $< && $(ZIG) build $(ZIG_TARGET) --prefix $(TD)/dist/depsout

# /deps/libgc --------------------------------------------
LIBGC_REF=ead2119801a6659d1d78406563d5acc6df4d94e3
LIBGC_REF=6d884227b4db1f5bd3e09a86e65a1caed32f3174
deps-download/$(LIBGC_REF).tar.gz:
mkdir -p deps-download
curl -f -L -o $@ https://github.com/actonlang/bdwgc/archive/$(LIBGC_REF).tar.gz
Expand Down
192 changes: 192 additions & 0 deletions base/rts/q.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#include "rts.h"
#include "q.h"

static inline void spinlock_lock($Lock *f) {
while (atomic_flag_test_and_set(f) == true) {
// spin until we could set the flag
}
}
static inline void spinlock_unlock($Lock *f) {
atomic_flag_clear(f);
}

#if defined MPMC && MPMC == 3
int ENQ_ready($Actor a) {
// TODO: atomics!
}
#elif defined MPMC && MPMC == 2
int ENQ_ready($Actor a) {
int i = a->$affinity;
spinlock_lock(&rqs[i].lock);
if (rqs[i].tail) {
rqs[i].tail->$next = a;
rqs[i].tail = a;
} else {
rqs[i].head = a;
rqs[i].tail = a;
}
a->$next = NULL;
rqs[i].count++;
spinlock_unlock(&rqs[i].lock);
// If we enqueue to someone who is not us, immediately wake them up...
int our_wtid = (int)pthread_getspecific(pkey_wtid);
if (our_wtid != i)
wake_wt(i);
return i;
}
#else
int ENQ_ready($Actor a) {
int i = a->$affinity;
spinlock_lock(&rqs[i].lock);
if (rqs[i].head) {
$Actor x = rqs[i].head;
while (x->$next)
x = x->$next;
x->$next = a;
} else {
rqs[i].head = a;
}
a->$next = NULL;
spinlock_unlock(&rqs[i].lock);
// If we enqueue to someone who is not us, immediately wake them up...
int our_wtid = (int)pthread_getspecific(pkey_wtid);
if (our_wtid != i)
wake_wt(i);
return i;
}
#endif

// Atomically enqueue actor "a" onto the right ready-queue, either a thread
// local one or the "default" of 0 which is a shared queue among all worker
// threads. The index is offset by 1 so worker thread 0 is at index 1.

// Atomically dequeue and return the first actor from a ready-queue, first
// dequeueing from the thread specific queue and second from the global shared
// readyQ or return NULL if no work is found.
#if defined MPMC && MPMC == 3
$Actor _DEQ_ready(int idx) {
// TODO: atomics!
}
#elif defined MPMC && MPMC == 2
$Actor _DEQ_ready(int idx) {
$Actor res = NULL;
if (rqs[idx].head == NULL) {
return res;
}

spinlock_lock(&rqs[idx].lock);
res = rqs[idx].head;
if (res) {
rqs[idx].head = res->$next;
res->$next = NULL;
if (rqs[idx].head == NULL) {
rqs[idx].tail = NULL;
}
} else {
rqs[idx].tail = NULL;
}
rqs[idx].count--;
spinlock_unlock(&rqs[idx].lock);
return res;
}
#else
// First version
$Actor _DEQ_ready(int idx) {
$Actor res = NULL;
if (rqs[idx].head == NULL)
return res;

spinlock_lock(&rqs[idx].lock);
res = rqs[idx].head;
if (res) {
rqs[idx].head = res->$next;
res->$next = NULL;
}
spinlock_unlock(&rqs[idx].lock);
return res;
}
#endif

$Actor DEQ_ready(int idx) {
$Actor res = _DEQ_ready(idx);
if (res)
return res;

res = _DEQ_ready(0);
return res;
}


#if MSGQ == 2
// Atomically enqueue message "m" onto the queue of actor "a",
// return true if the queue was previously empty.
bool ENQ_msg(B_Msg m, $Actor a) {
bool did_enq = true;
spinlock_lock(&a->B_Msg_lock);
m->$next = NULL;
if (a->B_Msg_tail) {
a->B_Msg_tail->$next = m;
a->B_Msg_tail = m;
did_enq = false;
} else {
a->B_Msg = m;
a->B_Msg_tail = m;
}
spinlock_unlock(&a->B_Msg_lock);
return did_enq;
}

// Atomically dequeue the first message from the queue of actor "a",
// return true if the queue still holds messages.
bool DEQ_msg($Actor a) {
bool has_more = false;
spinlock_lock(&a->B_Msg_lock);
B_Msg x = a->B_Msg;
if (x) {
a->B_Msg = x->$next;
x->$next = NULL;
if (a->B_Msg == NULL) {
a->B_Msg_tail = NULL;
}
has_more = a->B_Msg != NULL;
} else {
a->B_Msg_tail = NULL;
}
spinlock_unlock(&a->B_Msg_lock);
return has_more;
}
#else // MSGQ == 1
// Atomically enqueue message "m" onto the queue of actor "a",
// return true if the queue was previously empty.
bool ENQ_msg(B_Msg m, $Actor a) {
bool did_enq = true;
spinlock_lock(&a->B_Msg_lock);
m->$next = NULL;
if (a->B_Msg) {
B_Msg x = a->B_Msg;
while (x->$next)
x = x->$next;
x->$next = m;
did_enq = false;
} else {
a->B_Msg = m;
}
spinlock_unlock(&a->B_Msg_lock);
return did_enq;
}

// Atomically dequeue the first message from the queue of actor "a",
// return true if the queue still holds messages.
bool DEQ_msg($Actor a) {
bool has_more = false;
spinlock_lock(&a->B_Msg_lock);
if (a->B_Msg) {
B_Msg x = a->B_Msg;
a->B_Msg = x->$next;
x->$next = NULL;
has_more = a->B_Msg != NULL;
}
spinlock_unlock(&a->B_Msg_lock);
return has_more;
}
#endif // MSGQ
25 changes: 25 additions & 0 deletions base/rts/q.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#define MPMC 2

#include "rts.h"


#if defined MPMC && MPMC == 3
// TODO: do atomics!
struct mpmcq {
$Actor head;
$Actor tail;
unsigned long long count;
$Lock lock;
};
#else
struct mpmcq {
$Actor head;
$Actor tail;
unsigned long long count;
$Lock lock;
};
#endif

extern struct mpmcq rqs[MAX_WTHREADS+1];
Loading

0 comments on commit dee278b

Please sign in to comment.