Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add lock-free thread pool #112

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion system/threading/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,29 @@ cc_library(
]
)

cc_library(
name = '_lock_free_thread_pool',
srcs = [
'lock_free_thread_pool.cpp'
],
deps = [
':_sync_object',
':_this_thread',
':_thread',
'//toft/base:closure',
'//toft/system/atomic:atomic'
]
)

cc_library(
name = 'threading',
deps = [
':_sync_object',
':_this_thread',
':_thread',
':_thread_group',
':_thread_pool'
':_thread_pool',
':_lock_free_thread_pool'
]
)

Expand Down Expand Up @@ -140,3 +155,18 @@ cc_test(
':threading',
]
)

cc_test(
name = 'lock_free_queue_test',
srcs = 'lock_free_queue_test.cpp',
deps = [
]
)

cc_test(
name = 'lock_free_thread_pool_test',
srcs = 'lock_free_thread_pool_test.cpp',
deps = [
':threading',
]
)
156 changes: 156 additions & 0 deletions system/threading/lock_free_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (C) 2013, The Toft Authors.
// Author: An Qin <[email protected]>
//
// Description:

#ifndef TOFT_SYSTEM_THREADING_LOCK_FREE_QUEUE_H
#define TOFT_SYSTEM_THREADING_LOCK_FREE_QUEUE_H

#include <stdint.h>
#include <stdlib.h>

namespace toft {

template<typename T, unsigned N = sizeof(uint32_t)>
struct _Pointer {
public:
union {
uint64_t ui;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个在x64下面不work

struct {
T* ptr;
size_t count;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count这个名字不理想,要不叫version?

};
};

_Pointer() : ptr(NULL), count(0) {}
_Pointer(T* p) : ptr(p), count(0) {}
_Pointer(T* p, size_t c) : ptr(p), count(c) {}

bool cas(_Pointer<T, N> const& nval, _Pointer<T, N> const & cmp) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cas操作在其他模块应该已经有了,还得考虑移植性的问题,最好别自己做了。

bool result;
__asm__ __volatile__(
"lock cmpxchg8b %1\n\t"
"setz %0\n"
: "=q" (result), "+m" (ui)
: "a" (cmp.ptr), "d" (cmp.count), "b" (nval.ptr), "c" (nval.count)
: "cc"
);
return result;
}

bool operator==(_Pointer<T,N> const&x) { return x.ui == ui; }
};

template<typename T>
struct _Pointer <T, sizeof(uint64_t)> {
public:
union {
uint64_t ui[2];
struct {
T* ptr;
size_t count;
} __attribute__ (( __aligned__( 16 ) ));
};

_Pointer() : ptr(NULL), count(0) {}
_Pointer(T* p) : ptr(p), count(0) {}
_Pointer(T* p, size_t c) : ptr(p), count(c) {}

bool cas(_Pointer<T, 8> const& nval, _Pointer<T, 8> const& cmp) {
bool result;
__asm__ __volatile__ (
"lock cmpxchg16b %1\n\t"
"setz %0\n"
: "=q" (result), "+m" (ui)
: "a" (cmp.ptr), "d" (cmp.count), "b" (nval.ptr), "c" (nval.count)
: "cc"
);
return result;
}

bool operator==(_Pointer<T, 8> const&x) {
return x.ptr == ptr && x.count == count;
}
};


/////////// lock-free queue ///////////

template<typename T>
class LockFreeQueue {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

原来的论文里面有一个fixlist方法,这个是必须的,你得给实现了。

public:
struct Node;
typedef _Pointer<Node, sizeof(size_t)> Pointer;

struct Node {
T value;
Pointer next;

Node() : next(NULL) {}
Node(T x, Node* nxt) : value(x), next(nxt) {}
};

Pointer m_head, m_tail;

LockFreeQueue() {
Node *node = new Node();
m_head.ptr = m_tail.ptr = node;
}

~LockFreeQueue() {
Node *node = m_head.ptr;
m_head.ptr = m_tail.ptr = NULL;
delete node;
}

void Enqueue(T x);
bool Dequeue(T& pvalue);
};

template<typename T>
void LockFreeQueue<T>::Enqueue(T x) {
Node *node = new Node(x, NULL);
Pointer tail, next;
do {
tail = m_tail;
next = tail.ptr->next;
if (tail == m_tail) {
if (next.ptr == NULL) {
if (tail.ptr->next.cas(Pointer(node, next.count + 1), next)) {
break;
}
} else {
m_tail.cas(Pointer(next.ptr, tail.count + 1), tail);
}
}
} while (true);
m_tail.cas(Pointer(node,tail.count + 1), tail);
}

template<typename T>
bool LockFreeQueue<T>::Dequeue(T& pvalue) {
Pointer head, tail, next;
do {
head = m_head;
tail = m_tail;
next = head.ptr->next;
if (head == m_head) {
if (head.ptr == tail.ptr) {
if (next.ptr == NULL) return false;
m_tail.cas(Pointer(next.ptr, tail.count + 1), tail);
} else {
pvalue = next.ptr->value;
if (m_head.cas(Pointer(next.ptr, head.count + 1), head)) {
break;
}
}
}
} while (true);

delete head.ptr;
return true;
}

} // namespace toft

#endif // TOFT_SYSTEM_THREADING_LOCK_FREE_QUEUE_H
29 changes: 29 additions & 0 deletions system/threading/lock_free_queue_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (C) 2013, The Toft Authors.
// Author: An Qin <[email protected]>
//
// Description:

#include "toft/system/threading/lock_free_queue.h"

#include "thirdparty/gtest/gtest.h"

namespace toft {

TEST(LockFreeQueue, General) {
LockFreeQueue<int*> my_queue;

for (int i = 0; i < 10; ++i) {
int* value = (int*) malloc(sizeof(int));
*value = i;
my_queue.Enqueue(value);
}
int* value = NULL;
for (int i = 0; i < 10; ++i) {
my_queue.Dequeue(value);
EXPECT_EQ(*value, i);
delete value;
value = NULL;
}
}

} // namespace toft
Loading