Skip to content

Commit 0f61faf

Browse files
committed
A concurrent single-producer/multiple-consumer queue
1 parent e2cdfe2 commit 0f61faf

File tree

3 files changed

+234
-0
lines changed

3 files changed

+234
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [httpd](httpd/): A multi-threaded web server.
99
- [ringbuffer](ringbuffer/): A lock-less ring buffer.
1010
- [mbus](mbus/): A concurrent message bus.
11+
- [spmc](spmc/): A concurrent single-producer/multiple-consumer queue
1112

1213
## License
1314

spmc/Makefile

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
all:
2+
gcc -Wall -Wextra -o spmc spmc.c -lpthread
3+
4+
clean:
5+
rm -f spmc
6+
7+
indent:
8+
clang-format -i spmc.c

spmc/spmc.c

+225
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/* A concurrent single-producer, multiple-consumer (SPMC) queue using C11
2+
* Atomics. It is lock-free and atomic, allowing one enqueue-caller/producer,
3+
* arbitrary amount of dequeue-callers/consumers.
4+
*
5+
* Known issue: if one has multiple consumers, some of them will be swapped
6+
* off the CPU after grabbing curr_dequeue, and will have dequeued an element
7+
* from a different node, if that node ends up having free space.
8+
*/
9+
10+
#include <assert.h>
11+
#include <limits.h>
12+
#include <stdatomic.h>
13+
#include <stdbool.h>
14+
#include <stddef.h>
15+
#include <stdint.h>
16+
#include <stdlib.h>
17+
18+
typedef struct __spmc_node {
19+
size_t cap; /* One more than the number of slots available */
20+
_Atomic size_t front, back;
21+
struct __spmc_node *_Atomic next;
22+
uintptr_t buf[];
23+
} spmc_node_t;
24+
25+
typedef void (*spmc_destructor_t)(uintptr_t);
26+
struct spmc_base {
27+
/* current node which enqueues/dequeues */
28+
spmc_node_t *_Atomic curr_enqueue, *_Atomic curr_dequeue;
29+
uint8_t last_power;
30+
spmc_destructor_t destructor;
31+
};
32+
typedef struct spmc_base *spmc_ref_t;
33+
34+
#define DEFAULT_INITIAL_POWER 6 /* Initial capacity: 64, as a power of two */
35+
36+
#define SIZE_FROM_CAP(cap, offset) ((cap) * sizeof(uintptr_t) + (offset))
37+
38+
#define MODULO(lhs, rhs) ((lhs) & (rhs - 1)) /* Requires rhs is power of 2 */
39+
#define INDEX_OF(idx, node) (MODULO((idx), (node)->cap))
40+
#define IS_READABLE(idx, node) ((node)->back - (idx) != 0)
41+
#define IS_WRITABLE(idx, node) ((idx) - (node)->front < (node)->cap)
42+
43+
/* The head of the spmc resides contiguously after the spmc_base struct itself.
44+
* Here, two objects are stored in the same block of memory, but are accessed
45+
* separately.
46+
*/
47+
#define HEAD_OF(spmc) ((spmc_node_t *) (void *) ((spmc_ref_t)(spmc) + 1))
48+
49+
static void init_node(spmc_node_t *node, spmc_node_t *next, size_t cap)
50+
{
51+
node->cap = cap;
52+
atomic_init(&node->front, 0), atomic_init(&node->back, 0);
53+
atomic_init(&node->next, next);
54+
}
55+
56+
/* In the event initial_cap is 0, the spmc will select a default capacity.
57+
* Takes capacities as powers of two. i.e., initial_cap argument of 4 =>
58+
* an allocation of ~16 machine words.
59+
*/
60+
spmc_ref_t spmc_new(size_t initial_cap, spmc_destructor_t destructor)
61+
{
62+
assert(initial_cap < sizeof(size_t) * CHAR_BIT);
63+
const uint8_t power = initial_cap ? initial_cap : DEFAULT_INITIAL_POWER;
64+
const size_t cap = 1 << power;
65+
66+
/* Allocate spmc_base and head spmc_node in the same underlying buffer */
67+
spmc_ref_t spmc = malloc(
68+
SIZE_FROM_CAP(cap, sizeof(struct spmc_base) + sizeof(spmc_node_t)));
69+
spmc_node_t *const head = HEAD_OF(spmc);
70+
init_node(head, head, cap);
71+
72+
atomic_init(&spmc->curr_enqueue, head);
73+
atomic_init(&spmc->curr_dequeue, head);
74+
spmc->destructor = destructor;
75+
spmc->last_power = power;
76+
77+
return spmc;
78+
}
79+
80+
/* Destroy the SPMC, freeing all nodes/elements now assoicated with it.
81+
* Assume all users of the channel are done with it.
82+
*/
83+
void spmc_delete(spmc_ref_t spmc)
84+
{
85+
const spmc_node_t *const head = HEAD_OF(spmc);
86+
spmc_node_t *prev;
87+
if (spmc->destructor) {
88+
for (spmc_node_t *node = head->next; node != head;
89+
prev = node, node = node->next, free(prev))
90+
for (size_t i = node->front; IS_READABLE(i, node); ++i)
91+
spmc->destructor(node->buf[i]);
92+
} else {
93+
for (spmc_node_t *node = head->next; node != head;
94+
prev = node, node = node->next, free(prev))
95+
;
96+
}
97+
/* Also frees the head; it resides reside in the same buffer. */
98+
free(spmc);
99+
}
100+
101+
/* Send (enqueue) an item onto the SPMC */
102+
bool spmc_enqueue(spmc_ref_t spmc, uintptr_t element)
103+
{
104+
spmc_node_t *node =
105+
atomic_load_explicit(&spmc->curr_enqueue, memory_order_relaxed);
106+
size_t idx;
107+
retry:
108+
idx = atomic_load_explicit(&node->back, memory_order_consume);
109+
if (!IS_WRITABLE(idx, node)) {
110+
spmc_node_t *const next =
111+
atomic_load_explicit(&node->next, memory_order_relaxed);
112+
/* Never move to write on top of the node that is currently being read;
113+
* In that case, items would be read out of order they were enqueued.
114+
*/
115+
if (next !=
116+
atomic_load_explicit(&spmc->curr_dequeue, memory_order_relaxed)) {
117+
node = next;
118+
goto retry;
119+
}
120+
121+
const uint8_t power = ++spmc->last_power;
122+
assert(power < sizeof(size_t) * CHAR_BIT);
123+
const size_t cap = 1 << power;
124+
spmc_node_t *new_node = malloc(SIZE_FROM_CAP(cap, sizeof(spmc_node_t)));
125+
if (!new_node)
126+
return false;
127+
128+
init_node(new_node, next, cap);
129+
atomic_store_explicit(&node->next, new_node, memory_order_release);
130+
idx = 0;
131+
node = new_node;
132+
}
133+
node->buf[INDEX_OF(idx, node)] = element;
134+
atomic_store_explicit(&spmc->curr_enqueue, node, memory_order_relaxed);
135+
atomic_fetch_add_explicit(&node->back, 1, memory_order_release);
136+
return true;
137+
}
138+
139+
/* Recieve (dequeue) an item from the SPMC */
140+
bool spmc_dequeue(spmc_ref_t spmc, uintptr_t *slot)
141+
{
142+
spmc_node_t *node =
143+
atomic_load_explicit(&spmc->curr_dequeue, memory_order_consume);
144+
size_t idx;
145+
no_increment:
146+
do {
147+
idx = atomic_load_explicit(&node->front, memory_order_consume);
148+
if (!IS_READABLE(idx, node)) {
149+
if (node != spmc->curr_enqueue)
150+
atomic_compare_exchange_strong(
151+
&spmc->curr_dequeue, &node,
152+
atomic_load_explicit(&node->next, memory_order_relaxed));
153+
goto no_increment;
154+
} else
155+
*slot = node->buf[INDEX_OF(idx, node)];
156+
} while (
157+
!atomic_compare_exchange_weak(&node->front, &(size_t){idx}, idx + 1));
158+
return true;
159+
}
160+
161+
#include <pthread.h>
162+
#include <stdio.h>
163+
#include <string.h>
164+
165+
#define N_ITEMS (1024UL * 8)
166+
static void *producer_thread(void *arg)
167+
{
168+
spmc_ref_t spmc = arg;
169+
for (uintptr_t i = 0; i < N_ITEMS; ++i) {
170+
if (!spmc_enqueue(spmc, i))
171+
fprintf(stderr, "Failed to enqueue on %zu.\n", (size_t) i);
172+
}
173+
return NULL;
174+
}
175+
176+
#define N_MC_ITEMS (1024UL * 8)
177+
static _Atomic size_t observed_count[N_MC_ITEMS + 1];
178+
179+
static void *mc_thread(void *arg)
180+
{
181+
spmc_ref_t spmc = arg;
182+
uintptr_t element = 0, greatest = 0;
183+
184+
for (;;) {
185+
greatest = (greatest > element) ? greatest : element;
186+
if (!spmc_dequeue(spmc, &element))
187+
fprintf(stderr, "Failed to dequeue in mc_thread.\n");
188+
else if (observed_count[element]++)
189+
fprintf(stderr, "Consumed twice!\n");
190+
else if (element < greatest)
191+
fprintf(stderr, "%zu after %zu; bad order!\n", (size_t) element,
192+
(size_t) greatest);
193+
printf("Observed %zu.\n", (size_t) element);
194+
195+
/* Test for sentinel signalling termination */
196+
if (element >= (N_MC_ITEMS - 1)) {
197+
spmc_enqueue(spmc, element + 1); /* notify other threads */
198+
break;
199+
}
200+
}
201+
return NULL;
202+
}
203+
204+
#define N_MC_THREADS 16
205+
int main()
206+
{
207+
spmc_ref_t spmc = spmc_new(0, NULL);
208+
pthread_t mc[N_MC_THREADS], producer;
209+
210+
pthread_create(&producer, NULL, producer_thread, spmc);
211+
for (int i = 0; i < N_MC_THREADS; i++)
212+
pthread_create(&mc[i], NULL, mc_thread, spmc);
213+
214+
pthread_join(producer, NULL);
215+
for (int i = 0; i < N_MC_THREADS; i++)
216+
pthread_join(mc[i], NULL);
217+
218+
for (size_t i = 0; i < N_MC_ITEMS; ++i) {
219+
if (observed_count[i] == 1)
220+
continue;
221+
fprintf(stderr, "An item seen %zu times: %zu.\n", observed_count[i], i);
222+
}
223+
spmc_delete(spmc);
224+
return 0;
225+
}

0 commit comments

Comments
 (0)