-
Notifications
You must be signed in to change notification settings - Fork 0
/
thread.h
275 lines (207 loc) · 8.61 KB
/
thread.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#pragma once
namespace KERF_NAMESPACE {
void cleanup_threads();
#pragma mark - Thread
struct THREAD
{
static void * thread_start(void *arg);
static void* thread_noop(void *arg);
static void handle_SIGUSR2(int sig);
static void handle_SIGABRT(int sig);
protected: // POP it's just easier for now if we make every thread init, register with a normalized id, etc. Otherwise we have to track a separate pool [separate parallel registration process for non-kerf_vm_stack thread ids and such] when we want to deliver signals. Also, before disabling this, consider using a thread pool.
bool should_init_deinit_kerf_vm_thread_stuff = true; // POP false: saves resources, true: enables kerfvm use, pools, etc.
public:
// Question. Should we change kerf thread to accept lambdas? Answer. Probably not because then you have to manage the lambdas lifetimes. We can use lambdas in the pool however.
decltype(&thread_start) function_to_call = &thread_start;
void *function_data = nullptr;
pthread_t thr;
pthread_attr_t attr;
I normalized_id = -1;
decltype(PTHREAD_CREATE_JOINABLE) detach_state = PTHREAD_CREATE_DETACHED;
decltype(PTHREAD_SCOPE_PROCESS) scope = PTHREAD_SCOPE_SYSTEM;
decltype(PTHREAD_EXPLICIT_SCHED) inherit_sched = PTHREAD_INHERIT_SCHED;
decltype(SCHED_OTHER) sched_policy = SCHED_FIFO; // POSIX: SCHED_FIFO SCHED_RR SCHED_SPORADIC SCHED_OTHER
struct sched_param param = {.sched_priority=(sched_get_priority_max(sched_policy) + sched_get_priority_min(sched_policy))/2};
decltype(PTHREAD_CANCEL_DEFERRED) cancel_type = PTHREAD_CANCEL_ASYNCHRONOUS;
decltype(PTHREAD_CANCEL_DISABLE) cancel_state = PTHREAD_CANCEL_ENABLE;
static void * kerf_init_wrapper(void *arg);
static void handle_error_en(int en, const char* msg);
int init();
void deinit();
THREAD() = default;
~THREAD() = default;
THREAD(const THREAD &) = default; // we rely on default copy constructor currently
int start();
int join(void **retval = nullptr);
};
#pragma mark - THREAD_POOL
template<typename T = std::function<void()>>
struct THREAD_POOL : REGISTERED_FOR_LONGJMP_WORKSTACK
{
// Observation. A thread pool with 1 thread [that accepts lambdas] is a synchronous queue, with 2+ threads asynchronous.
// Remark. A nice improvement would be to lazily allocate threads. Then we could cap at 1024 say for an async queue but not actually acquire that many threads if we weren't using them. With a limit of 8 say and 8 long running jobs pinned the remaining 100 small jobs will starve instead of flushing.
std::deque<T> jobs;
MUTEX_UNTRACKED mutex;
pthread_cond_t work_avail; // Question. should we munge this onto MUTEX? Answer. Doesn't seem necessary at this point.
pthread_cond_t inactivity;
I active_job_count = 0;
std::vector<THREAD> threads;
std::atomic<bool> time_to_destruct = false;
static void* thread_pool_work_loop(void *v)
{
int s;
THREAD_POOL *pool = (THREAD_POOL*)v;
bool predicate;
while(!pool->time_to_destruct)
{
pool->mutex.lock();
while(!(predicate = pool->jobs.size() > 0) && !pool->time_to_destruct)
{
s = pthread_cond_wait(&pool->work_avail, &pool->mutex.pmutex);
if(s != 0)
{
THREAD::handle_error_en(s, "pthread_cond_wait");
}
}
if(predicate && !pool->time_to_destruct)
{
auto j = std::move(pool->jobs.front());
pool->jobs.pop_front();
__sync_fetch_and_add(&pool->active_job_count, 1);
pool->mutex.unlock();
j();
// this second lock is entirely to prevent a race condition for the `.wait()` functionality
pool->mutex.lock_safe_wrapper([&]{ // this obviates __sync here
__sync_fetch_and_sub(&pool->active_job_count, 1);
s = pthread_cond_signal(&pool->inactivity);
});
if (s != 0)
{
THREAD::handle_error_en(s, "thread pool pthread_cond_signal error");
}
}
else
{
pool->mutex.unlock();
}
}
return nullptr;
}
void init_helper(I threadcount = -1)
{
int s;
s = pthread_cond_init(&this->work_avail, nullptr);
if(s!=0)
{
THREAD::handle_error_en(s, "thread pool pthread_cond_init 1 error");
}
s = pthread_cond_init(&this->inactivity, nullptr);
if(s!=0)
{
THREAD::handle_error_en(s, "thread pool pthread_cond_init 2 error");
}
if (-1 == threadcount)
{
threadcount = kerf_count_hardware_threads();
}
DO(threadcount,
threads.emplace_back(THREAD());
threads.size();
threads[i].detach_state = PTHREAD_CREATE_JOINABLE;
threads[i].function_to_call = thread_pool_work_loop;
threads[i].function_data = this;
int r = threads[i].start();
if(r != 0)
{
THREAD::handle_error_en(r, "thread pool start thread");
}
)
}
THREAD_POOL(I threadcount = -1)
{
init_helper(threadcount);
}
THREAD_POOL(THREAD_POOL &) = delete;
~THREAD_POOL()
{
mutex.lock_safe_wrapper([&]{
// NB. this lock was necessary to solve a subtle bug: worker threads can
// pass the `while` check with 0==time_to_destruct and then after it goes
// to 1, we'd broadcast here, then after that they'd enter work_avail
// condition and wait forever
time_to_destruct = true;
});
pthread_cond_broadcast(&this->work_avail);
// 2021.10.19 Remark. putting
// wait();
// here solves certain ctrl-c bugs (eg, you put a thread pool in main with while(1) looping threads, but don't pool.wait() before exit(), then ctrl-c the abandoned threads), but i haven't played with it much. That the child thread would crash if the POOL was freed before its child thread seems obvious in retrospect. We probably need to enable this wait() on destruct. What's weird though is that it joins() below, so that should be sufficient to "wait". So what's the difference? also...what "extra" does joining here accomplish versus just waiting? looking at "->wait()", it might not survive ctrl-c because the counts will cause it to sleep indefinitely (but then it will eventually be killed by ctrl-c). I don't understand this. wrapping the joins in a critical section didn't work. getting a signal in a mutex may let you go back to sleep?
// Note: we *may possibly* WANT the threads to crash if you don't call wait() yourself, depending on our desired model/regime. it might make more sense to require a paired wait() in c++ land to avoid crashes.
// 2021.10.23 Observation, it's possible this was screwed up only temporarily while we had the undiscovered bug in popping the wrapper guards where they weren't freed. but i haven't gone back to look.
for(auto &t : threads)
{
auto result = t.join();
}
pthread_cond_destroy(&this->work_avail);
pthread_cond_destroy(&this->inactivity);
}
void wait()
{
int s;
mutex.lock_safe_wrapper([&]{
while(jobs.size() > 0 || active_job_count > 0)
{
s = pthread_cond_wait(&this->inactivity, &this->mutex.pmutex);
if(s != 0)
{
THREAD::handle_error_en(s, "pthread_cond_wait 2");
}
}
});
}
// Idea.
// this->pause(): signal the threads to `int ::pause(void);` (from #include <unistd.h>) via pthread_kill
// (have a suite of SIGUSR2 actions: thread_cleanup, pause, etc., using a per-thread action variable enum)
// this->resume(): then signal all threads again with a noop action to escape `pause()`
// (in the destructor, call resume() after time_to_destruct=true but before pthread_cond_broadcast)
// void pause(){}
// void resume(){}
#if !THREAD_POOL_SUPPORTS_FUTURES
template<typename L>
void add(L job)
{
mutex.lock_safe_wrapper([&]{
jobs.emplace_back(job);
});
int s;
s = pthread_cond_signal(&this->work_avail);
if (s != 0)
{
THREAD::handle_error_en(s, "thread pool pthread_cond_signal error");
}
}
#else
// This enables:
// auto result = pool.add([]{return 22;});
// auto rg = result.get();
template<typename F, class... Args>
auto add(F f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto job = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = job->get_future();
mutex.lock_safe_wrapper([&]{
jobs.emplace_back([job](){(*job)();});
});
int s;
s = pthread_cond_signal(&this->work_avail);
if (s != 0)
{
THREAD::handle_error_en(s, "thread pool pthread_cond_signal error");
}
return res;
}
#endif
};
} // namespace