-
Notifications
You must be signed in to change notification settings - Fork 0
/
BlockingMessageQueueTests.kt
369 lines (351 loc) · 15.6 KB
/
BlockingMessageQueueTests.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
package pt.isel.pc.problemsets.set1
import org.junit.jupiter.api.RepeatedTest
import org.junit.jupiter.api.Test
import pt.isel.pc.problemsets.utils.ExchangedValue
import pt.isel.pc.problemsets.utils.MultiThreadTestHelper
import pt.isel.pc.problemsets.utils.randomTo
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
internal class BlockingMessageQueueTests {
private val defaultMsg = "message"
// tests without concurrency stress:
@Test
fun `Queue should let a consumer thread retrieve a value gave by a producer thread`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(10.seconds)
testHelper.createAndStartThread {
val couldEnqueue = queue.tryEnqueue(defaultMsg, Duration.INFINITE)
assertTrue(couldEnqueue)
}
Thread.sleep(1000)
testHelper.createAndStartThread {
val result = queue.tryDequeue(capacity, Duration.INFINITE)
assertNotNull(result)
assertEquals(capacity, result.size)
assertEquals(defaultMsg, result.first())
}
testHelper.join()
}
@Test
fun `Queue should let a consumer thread retrieve all values gave by a producer thread in FIFO order`() {
val capacity = 10
val queue = BlockingMessageQueue<String>(capacity)
val messageList = List(capacity) { "$defaultMsg-$it" }
val testHelper = MultiThreadTestHelper(10.seconds)
// Add two elements to the queue
testHelper.createAndStartThread {
repeat(capacity) {
val couldEnqueue = queue.tryEnqueue(messageList[it], Duration.INFINITE)
assertTrue(couldEnqueue)
}
}
Thread.sleep(1000)
testHelper.createAndStartThread {
val result = queue.tryDequeue(capacity, Duration.INFINITE)
assertNotNull(result)
assertEquals(capacity, result.size)
assertEquals(messageList, result)
}
testHelper.join()
}
@Test
fun `Queue should only operate with a capacity greater than zero`() {
assertFailsWith<IllegalArgumentException> {
BlockingMessageQueue<String>(0)
}
}
@Test
fun `Consumer threads should only be able to dequeue nOfMessages between 1 and capacity`() {
val capacity = 2
val queue = BlockingMessageQueue<String>(capacity)
assertFailsWith<IllegalArgumentException> {
queue.tryDequeue(capacity + 1, Duration.INFINITE)
}
assertFailsWith<IllegalArgumentException> {
queue.tryDequeue(capacity - capacity, Duration.INFINITE)
}
}
// Producer threads related tests
@Test
fun `Producer thread should be blocked trying to enqueue a message in a full queue`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(10.seconds)
// This test could not be generic (for any capacity) since having control of the producer thread
// which is the "last" one to start is required
val pth1 = testHelper.createAndStartThread {
queue.tryEnqueue(defaultMsg, Duration.INFINITE)
}
val pth2 = testHelper.createAndStartThread {
queue.tryEnqueue(defaultMsg, Duration.INFINITE)
}
// Wait for the producer threads to start
Thread.sleep(1000)
assertEquals(Thread.State.TERMINATED, pth1.state)
assertEquals(Thread.State.TIMED_WAITING, pth2.state)
}
@Test
fun `Producer thread should return false when timeout expires`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(10.seconds)
// This test could not be generic (for any capacity) since having control of the producer thread,
// which is the "last" one to start, is required
testHelper.createAndStartThread {
val couldEnqueue = queue.tryEnqueue(defaultMsg, Duration.INFINITE)
assertTrue(couldEnqueue)
}
// The queue is full, so the producer thread should time out
testHelper.createAndStartThread {
val couldEnqueue = queue.tryEnqueue(defaultMsg, Duration.ZERO)
assertFalse(couldEnqueue)
}
testHelper.join()
}
@Test
fun `Producer thread should throw InterruptedException if interruption occurs and could not dequeue in time`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(10.seconds)
// This test could not be generic (for any capacity) since having control of the producer thread,
// which is the "last" one to start, is required
testHelper.createAndStartThread {
val couldEnqueue = queue.tryEnqueue(defaultMsg, Duration.INFINITE)
assertTrue(couldEnqueue)
}
// The queue is full, so the producer thread should time out
val pth2 = testHelper.createAndStartThread {
assertFailsWith<InterruptedException> {
queue.tryEnqueue(defaultMsg, Duration.INFINITE)
}
}
Thread.sleep(1000)
pth2.interrupt()
testHelper.join()
}
@Test
fun `Producer thread which does not want to wait to enqueue leaves immediatly`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(2.seconds)
testHelper.createAndStartThread {
queue.tryEnqueue(defaultMsg, Duration.ZERO)
}
testHelper.createAndStartThread {
assertFalse(queue.tryEnqueue(defaultMsg, Duration.ZERO))
}
testHelper.join()
}
// Consumer threads related tests
@Test
fun `Consumer thread should be blocked trying to retrieve a message from an empty queue`() {
val capacity = 10
val queue = BlockingMessageQueue<String>(capacity)
val cth = Thread {
queue.tryDequeue(1, Duration.INFINITE)
}
cth.start()
// Wait for the consumer thread to start
Thread.sleep(1000)
assertEquals(Thread.State.TIMED_WAITING, cth.state)
}
@Test
fun `Consumer thread should return null when timeout expires`() {
val capacity = 10
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(10.seconds)
testHelper.createAndStartThread {
val result = queue.tryDequeue(1, Duration.ZERO)
assertNull(result)
}
testHelper.join()
}
@Test
fun `Consumer thread should throw InterruptedException if interruption occurs and could not dequeue in time`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(10.seconds)
val cth1 = testHelper.createAndStartThread {
assertFailsWith<InterruptedException> {
queue.tryDequeue(capacity, Duration.INFINITE)
}
}
Thread.sleep(1000)
cth1.interrupt()
testHelper.join()
}
@Test
fun `Consumer thread which does not want to wait to dequeue leaves immediatly`() {
val capacity = 1
val queue = BlockingMessageQueue<String>(capacity)
val testHelper = MultiThreadTestHelper(2.seconds)
testHelper.createAndStartThread {
assertNull(queue.tryDequeue(capacity, Duration.ZERO))
}
testHelper.join()
}
// Tests with concurrency stress:
@RepeatedTest(5)
fun `An arbitrary number of producer and consumer threads should be able to exchange`() {
val capacity = 20
val queue = BlockingMessageQueue<ExchangedValue>(capacity)
val nOfThreads = 24
val testHelper = MultiThreadTestHelper(10.seconds)
// Sets
val originalMsgs = ConcurrentLinkedQueue<ExchangedValue>()
val exchangedMsgs = ConcurrentHashMap<ExchangedValue, Unit>()
val retrievedMsgs = ConcurrentLinkedQueue<ExchangedValue>()
val failedExchangedMsgs = ConcurrentLinkedQueue<ExchangedValue>()
testHelper.createAndStartMultipleThreads(nOfThreads) { threadId, isTestFinished ->
// This counter does not need to be thread safe since each thread will have its own counter
var repetionId = 0
while (!isTestFinished() && repetionId < 10000) {
val value = ExchangedValue(threadId, repetionId++)
originalMsgs.add(value)
val couldEnqueue = queue.tryEnqueue(value, Duration.ZERO)
if (couldEnqueue) {
if (exchangedMsgs.putIfAbsent(value, Unit) != null) {
throw AssertionError(
"The value $value has already been exchanged by another producer thread"
)
}
} else {
// The message was not delivered to the queue because the timeout expired, and this is
// the only cause for this to happen, as no producer thread was interrupted in this test
failedExchangedMsgs.add(value)
}
}
}
testHelper.createAndStartMultipleThreads(nOfThreads) { _, isTestFinished ->
while (!isTestFinished()) {
val result = queue.tryDequeue(
1 randomTo capacity,
(100 randomTo 500).milliseconds
)
if (result != null) retrievedMsgs.addAll(result)
}
}
testHelper.join()
assertTrue(failedExchangedMsgs.isNotEmpty())
val intersection = failedExchangedMsgs.intersect(exchangedMsgs.keys)
assertTrue(intersection.isEmpty())
assertEquals(retrievedMsgs.size, exchangedMsgs.size)
assertEquals(retrievedMsgs.toSet(), exchangedMsgs.keys)
val allExchangedMsgs = failedExchangedMsgs + exchangedMsgs.keys
assertEquals(originalMsgs.size, allExchangedMsgs.size)
assertEquals(originalMsgs.toSet(), allExchangedMsgs.toSet())
}
@Test
fun `Check if an arbitrary number of consumer threads is timedout`() {
val capacity = 20
val queue = BlockingMessageQueue<ExchangedValue>(capacity)
val nOfProducerThreads = 24
val nOfConsumerThreads = 10
val producerTimeout = 1.seconds
// The consumer timeout should be much smaller than the producer timeout
// to ensure that some consumer threads are timed out
val consumerTimeout = producerTimeout / 5
val testHelper = MultiThreadTestHelper(10.seconds)
// Sets
val originalMsgs = ConcurrentLinkedQueue<ExchangedValue>()
val exchangedMsgs = ConcurrentHashMap<ExchangedValue, Unit>()
val retrievedMsgs = ConcurrentLinkedQueue<ExchangedValue>()
val failedExchangedMsgs = ConcurrentLinkedQueue<ExchangedValue>()
val consumerThreadsTimedout = ConcurrentLinkedQueue<Int>()
// Create producer threads
testHelper.createAndStartMultipleThreads(nOfProducerThreads) { threadId, isTestFinished ->
// This counter does not need to be thread safe since each thread will have its own counter
var repetionId = 0
while (!isTestFinished() && repetionId < 100) {
val value = ExchangedValue(threadId, repetionId++)
originalMsgs.add(value)
val couldEnqueue = if (threadId % 2 == 0) {
queue.tryEnqueue(value, Duration.ZERO)
} else {
queue.tryEnqueue(value, producerTimeout)
}
if (couldEnqueue) {
if (exchangedMsgs.putIfAbsent(value, Unit) != null) {
throw AssertionError(
"The value $value has already been exchanged by another producer thread"
)
}
} else {
// The message was not delivered to the queue because the timeout expired, and this is
// the only cause for this to happen, as no producer thread was interrupted in this test
failedExchangedMsgs.add(value)
}
}
}
// Create consumer threads with smaller timeout
testHelper.createAndStartMultipleThreads(nOfConsumerThreads) { threadId, isTestFinished ->
while (!isTestFinished()) {
val result = queue.tryDequeue(1 randomTo capacity, consumerTimeout)
if (result != null) {
retrievedMsgs.addAll(result)
} else {
// The message was not retrieved from the queue because the timeout expired and this is
// the only cause for this to happen, as no consumer thread was interrupted in this test
consumerThreadsTimedout.add(threadId)
}
}
}
testHelper.join()
assertTrue(consumerThreadsTimedout.isNotEmpty())
assertTrue(failedExchangedMsgs.isNotEmpty())
val intersection = failedExchangedMsgs.intersect(exchangedMsgs.keys)
assertTrue(intersection.isEmpty())
assertEquals(retrievedMsgs.size, exchangedMsgs.size)
assertEquals(retrievedMsgs.toSet(), exchangedMsgs.keys)
val allExchangedMsgs = failedExchangedMsgs + exchangedMsgs.keys
assertEquals(originalMsgs.size, allExchangedMsgs.size)
assertEquals(originalMsgs.toSet(), allExchangedMsgs.toSet())
}
@RepeatedTest(5)
fun `Check if FIFO order is preserved when multiple producer and consumer threads exchange messages`() {
val capacity = 10
val queue = BlockingMessageQueue<ExchangedValue>(capacity)
val nOfThreads = 24
val testHelper = MultiThreadTestHelper(10.seconds)
// Starter values
val threadsIdsList = List(nOfThreads) { it to -1 }
// Pair<ThreadId, RepetitionId>
val exchangedMsgs = ConcurrentHashMap<Int, Int>()
exchangedMsgs.putAll(threadsIdsList)
testHelper.createAndStartMultipleThreads(nOfThreads) { threadId, isTestFinished ->
// This counter does not need to be thread safe since each thread will have its own counter
var repetionId = 0
while (!isTestFinished()) {
val value = ExchangedValue(threadId, repetionId++)
val couldEnqueue = queue.tryEnqueue(value, (500 randomTo 1000).milliseconds)
if (couldEnqueue) {
val previousRepetion = exchangedMsgs[threadId]
requireNotNull(previousRepetion)
if (previousRepetion >= repetionId) {
throw AssertionError(
"The value $value has already been exchanged by this producer thread"
)
}
exchangedMsgs[threadId] = repetionId
}
}
}
testHelper.createAndStartMultipleThreads(1) { _, isTestFinished ->
while (!isTestFinished()) {
queue.tryDequeue(1 randomTo capacity, Duration.ZERO)
}
}
// Wait for all threads to finish
testHelper.join()
}
}