GPU-Accelerated Cuckoo Filter
Loading...
Searching...
No Matches
CuckooFilterIPC.cuh
Go to the documentation of this file.
1#pragma once
2
3#include <cuda_runtime.h>
4#include <fcntl.h>
5#include <semaphore.h>
6#include <sys/mman.h>
7#include <sys/stat.h>
8#include <unistd.h>
9#include <atomic>
10#include <cstring>
11#include <string>
12#include <thread>
13#include "CuckooFilter.cuh"
14#include "helpers.cuh"
15
16namespace cuckoogpu {
17
21enum class RequestType {
22 INSERT = 0,
23 CONTAINS = 1,
24 DELETE = 2,
25 CLEAR = 3,
26 SHUTDOWN = 4
27};
28
37 uint32_t count;
38 cudaIpcMemHandle_t keysHandle;
39 cudaIpcMemHandle_t outputHandle;
40 uint64_t requestId;
41 std::atomic<bool> completed;
42 std::atomic<bool> cancelled;
43 size_t result;
44};
45
54 static constexpr size_t QUEUE_SIZE = 256;
55 static_assert(detail::powerOfTwo(QUEUE_SIZE), "queue size must be a power of two");
56
57 std::atomic<uint64_t> head;
58 std::atomic<uint64_t> tail;
61
63
64 std::atomic<bool> initialised;
65 std::atomic<bool> shuttingDown;
66
75 // Check if server is shutting down before trying to do anything
76 if (shuttingDown.load(std::memory_order_acquire)) {
77 return nullptr;
78 }
79
80 // Wait for available slot
81 if (sem_wait(&producerSem) != 0) {
82 return nullptr;
83 }
84
85 // Double-check after acquiring semaphore
86 if (shuttingDown.load(std::memory_order_acquire)) {
87 sem_post(&producerSem);
88 return nullptr;
89 }
90
91 uint64_t headIdx = head.fetch_add(1, std::memory_order_acq_rel);
92 FilterRequest& req = requests[headIdx & (QUEUE_SIZE - 1)];
93
94 return &req;
95 }
96
101 sem_post(&consumerSem);
102 }
103
112 // Wait for a request
113 if (sem_wait(&consumerSem) != 0) {
114 if (errno == EINTR) {
115 return nullptr;
116 }
117 return nullptr;
118 }
119
120 uint64_t tailIdx = tail.load(std::memory_order_acquire);
121 FilterRequest& req = requests[tailIdx & (QUEUE_SIZE - 1)];
122
123 return &req;
124 }
125
130 tail.fetch_add(1, std::memory_order_release);
131 sem_post(&producerSem);
132 }
133
138 [[nodiscard]] size_t pendingRequests() const {
139 return head.load(std::memory_order_acquire) - tail.load(std::memory_order_acquire);
140 }
141
146 shuttingDown.store(true, std::memory_order_release);
147 }
148
153 [[nodiscard]] bool isShuttingDown() const {
154 return shuttingDown.load(std::memory_order_acquire);
155 }
156
165 uint64_t currentTail = tail.load(std::memory_order_acquire);
166 uint64_t currentHead = head.load(std::memory_order_acquire);
167 size_t cancelled = 0;
168
169 for (uint64_t i = currentTail; i < currentHead; i++) {
170 FilterRequest& req = requests[i & (QUEUE_SIZE - 1)];
171 if (!req.completed.load(std::memory_order_acquire)) {
172 req.cancelled.store(true, std::memory_order_release);
173 req.completed.store(true, std::memory_order_release);
174 req.result = 0;
175 cancelled++;
176 }
177 }
178
179 return cancelled;
180 }
181};
182
192template <typename Config>
194 private:
195 Filter<Config>* filter;
196 SharedQueue* queue;
197 int shmFd;
198 std::string shmName;
199 bool running;
200 std::thread workerThread;
201
202 void processRequests() {
203 bool shutdownReceived = false;
204
205 while (true) {
206 // Finally break once queue is drained after shutdown
207 if (shutdownReceived && queue->pendingRequests() == 0) {
208 break;
209 }
210
211 FilterRequest* req = queue->dequeue();
212
213 // check if we should continue
214 if (!req) {
215 if (errno == EINTR) {
216 continue;
217 }
218 break;
219 }
220
221 if (req->type == RequestType::SHUTDOWN) {
222 shutdownReceived = true;
223 req->completed.store(true, std::memory_order_release);
224 queue->signalDequeued();
225 continue;
226 }
227
228 if (req->type == RequestType::CLEAR) {
229 filter->clear();
230 req->result = 0;
231 req->completed.store(true, std::memory_order_release);
232 queue->signalDequeued();
233 continue;
234 }
235
236 // Skip cancelled requests (from force shutdown)
237 if (req->cancelled.load(std::memory_order_acquire)) {
238 req->completed.store(true, std::memory_order_release);
239 queue->signalDequeued();
240 continue;
241 }
242
243 try {
244 processRequest(req);
245 } catch (const std::exception& e) {
246 std::cerr << std::format("Error processing request: {}\n", e.what());
247 req->result = 0;
248 }
249
250 req->completed.store(true, std::memory_order_release);
251
252 queue->signalDequeued();
253 }
254 }
255
256 void processRequest(FilterRequest* req) {
257 using T = typename Config::KeyType;
258
259 T* d_keys = nullptr;
260 bool* d_output = nullptr;
261
262 cudaIpcMemHandle_t zeroHandle = {0};
263 bool hasKeys = memcmp(&req->keysHandle, &zeroHandle, sizeof(cudaIpcMemHandle_t)) != 0;
264
265 if (hasKeys) {
266 CUDA_CALL(cudaIpcOpenMemHandle(
267 (void**)&d_keys, req->keysHandle, cudaIpcMemLazyEnablePeerAccess
268 ));
269 }
270
271 bool hasOutput = false;
272 if (req->type == RequestType::CONTAINS || req->type == RequestType::DELETE) {
273 bool handleValid =
274 memcmp(&req->outputHandle, &zeroHandle, sizeof(cudaIpcMemHandle_t)) != 0;
275
276 if (handleValid) {
277 CUDA_CALL(cudaIpcOpenMemHandle(
278 (void**)&d_output, req->outputHandle, cudaIpcMemLazyEnablePeerAccess
279 ));
280 hasOutput = true;
281 }
282 }
283
284 switch (req->type) {
286 req->result = filter->insertMany(d_keys, req->count);
287 break;
288
290 filter->containsMany(d_keys, req->count, d_output);
291 req->result = 0;
292 break;
293
295 req->result = filter->deleteMany(d_keys, req->count, d_output);
296 break;
297
298 default:
299 req->result = 0;
300 break;
301 }
302
303 if (hasKeys) {
304 CUDA_CALL(cudaIpcCloseMemHandle(d_keys));
305 }
306 if (hasOutput) {
307 CUDA_CALL(cudaIpcCloseMemHandle(d_output));
308 }
309 }
310
311 public:
320 FilterIPCServer(const std::string& name, size_t capacity)
321 : shmName("/cuckoo_filter_" + name), running(false) {
322 filter = new Filter<Config>(capacity);
323
324 // shared memory for queue
325 shmFd = shm_open(shmName.c_str(), O_CREAT | O_RDWR, 0666);
326 if (shmFd == -1) {
327 throw std::runtime_error("Failed to create shared memory");
328 }
329
330 if (ftruncate(shmFd, sizeof(SharedQueue)) == -1) {
331 close(shmFd);
332 throw std::runtime_error("Failed to set shared memory size");
333 }
334
335 queue = static_cast<SharedQueue*>(
336 mmap(nullptr, sizeof(SharedQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0)
337 );
338
339 if (queue == MAP_FAILED) {
340 close(shmFd);
341 throw std::runtime_error("Failed to map shared memory");
342 }
343
344 queue->head.store(0, std::memory_order_release);
345 queue->tail.store(0, std::memory_order_release);
346 queue->shuttingDown.store(false, std::memory_order_release);
347
348 sem_init(&queue->producerSem, 1, SharedQueue::QUEUE_SIZE);
349 sem_init(&queue->consumerSem, 1, 0);
350
351 for (auto& request : queue->requests) {
352 request.completed.store(false, std::memory_order_release);
353 request.cancelled.store(false, std::memory_order_release);
354 }
355
356 queue->initialised.store(true, std::memory_order_release);
357 }
358
365 stop();
366
367 if (queue != MAP_FAILED) {
368 sem_destroy(&queue->producerSem);
369 sem_destroy(&queue->consumerSem);
370 munmap(queue, sizeof(SharedQueue));
371 }
372
373 if (shmFd != -1) {
374 close(shmFd);
375 shm_unlink(shmName.c_str());
376 }
377
378 delete filter;
379 }
380
384 void start() {
385 if (running) {
386 return;
387 }
388
389 running = true;
390 workerThread = std::thread(&FilterIPCServer::processRequests, this);
391 }
392
399 void stop(bool force = false) {
400 if (!running) {
401 return;
402 }
403
404 running = false;
405
406 if (force) {
407 size_t cancelled = queue->cancelPendingRequests();
408 if (cancelled > 0) {
409 std::cout << std::format(
410 "Force shutdown: cancelled {} pending requests\n", cancelled
411 );
412 }
413 } else {
414 size_t pending = queue->pendingRequests();
415 if (pending > 0) {
416 std::cout << std::format(
417 "Graceful shutdown: draining {} pending requests...\n", pending
418 );
419 }
420 }
421
422 // Send shutdown request to the worker thread before
423 // setting shutdown flag so the enqueue() doesn't reject it
424 FilterRequest* req = queue->enqueue();
425 if (req) {
427 req->completed.store(false, std::memory_order_release);
428 req->cancelled.store(false, std::memory_order_release);
429
430 queue->signalEnqueued();
431 }
432
433 queue->initiateShutdown();
434
435 if (workerThread.joinable()) {
436 workerThread.join();
437 }
438 }
439
445 return filter;
446 }
447};
448
458template <typename Config>
460 private:
461 SharedQueue* queue;
462 int shmFd;
463 std::string shmName;
464 uint64_t nextRequestId;
465
466 public:
467 using T = typename Config::KeyType;
468
476 explicit FilterIPCClient(const std::string& name)
477 : shmName("/cuckoo_filter_" + name), nextRequestId(0) {
478 shmFd = shm_open(shmName.c_str(), O_RDWR, 0666);
479 if (shmFd == -1) {
480 throw std::runtime_error("Failed to open shared memory, is the server running?");
481 }
482
483 queue = static_cast<SharedQueue*>(
484 mmap(nullptr, sizeof(SharedQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0)
485 );
486
487 if (queue == MAP_FAILED) {
488 close(shmFd);
489 throw std::runtime_error("Failed to map shared memory");
490 }
491
492 if (!queue->initialised.load(std::memory_order_acquire)) {
493 munmap(queue, sizeof(SharedQueue));
494 close(shmFd);
495 throw std::runtime_error("Ring buffer not initialised, server may not be ready");
496 }
497 }
498
505 if (queue != MAP_FAILED) {
506 munmap(queue, sizeof(SharedQueue));
507 }
508 if (shmFd != -1) {
509 close(shmFd);
510 }
511 }
512
520 size_t insertMany(const T* d_keys, size_t count) {
521 return submitRequest(RequestType::INSERT, d_keys, count, nullptr);
522 }
523
531 void containsMany(const T* d_keys, size_t count, bool* d_output) {
532 submitRequest(RequestType::CONTAINS, d_keys, count, d_output);
533 }
534
543 size_t deleteMany(const T* d_keys, size_t count, bool* d_output = nullptr) {
544 return submitRequest(RequestType::DELETE, d_keys, count, d_output);
545 }
546
550 void clear() {
551 submitRequest(RequestType::CLEAR, nullptr, 0, nullptr);
552 }
553
558 if (queue->isShuttingDown()) {
559 return;
560 }
561
562 queue->initiateShutdown();
563
564 FilterRequest* req = queue->enqueue();
565 if (req) {
567 req->completed.store(false, std::memory_order_release);
568 req->cancelled.store(false, std::memory_order_release);
569
570 queue->signalEnqueued();
571
572 while (!req->completed.load(std::memory_order_acquire)) {
573 std::this_thread::yield();
574 }
575 }
576 }
577
583 size_t insertMany(const thrust::device_vector<T>& d_keys) {
584 return insertMany(thrust::raw_pointer_cast(d_keys.data()), d_keys.size());
585 }
586
592 void
593 containsMany(const thrust::device_vector<T>& d_keys, thrust::device_vector<bool>& d_output) {
594 if (d_output.size() != d_keys.size()) {
595 d_output.resize(d_keys.size());
596 }
598 thrust::raw_pointer_cast(d_keys.data()),
599 d_keys.size(),
600 thrust::raw_pointer_cast(d_output.data())
601 );
602 }
603
609 void
610 containsMany(const thrust::device_vector<T>& d_keys, thrust::device_vector<uint8_t>& d_output) {
611 if (d_output.size() != d_keys.size()) {
612 d_output.resize(d_keys.size());
613 }
615 thrust::raw_pointer_cast(d_keys.data()),
616 d_keys.size(),
617 reinterpret_cast<bool*>(thrust::raw_pointer_cast(d_output.data()))
618 );
619 }
620
627 size_t
628 deleteMany(const thrust::device_vector<T>& d_keys, thrust::device_vector<bool>& d_output) {
629 if (d_output.size() != d_keys.size()) {
630 d_output.resize(d_keys.size());
631 }
632 return deleteMany(
633 thrust::raw_pointer_cast(d_keys.data()),
634 d_keys.size(),
635 thrust::raw_pointer_cast(d_output.data())
636 );
637 }
638
645 size_t
646 deleteMany(const thrust::device_vector<T>& d_keys, thrust::device_vector<uint8_t>& d_output) {
647 if (d_output.size() != d_keys.size()) {
648 d_output.resize(d_keys.size());
649 }
650 return deleteMany(
651 thrust::raw_pointer_cast(d_keys.data()),
652 d_keys.size(),
653 reinterpret_cast<bool*>(thrust::raw_pointer_cast(d_output.data()))
654 );
655 }
656
662 size_t deleteMany(const thrust::device_vector<T>& d_keys) {
663 return deleteMany(thrust::raw_pointer_cast(d_keys.data()), d_keys.size(), nullptr);
664 }
665
666 private:
667 size_t submitRequest(RequestType type, const T* d_keys, size_t count, bool* d_output) {
668 FilterRequest* req = queue->enqueue();
669 if (!req) {
670 throw std::runtime_error("Server is shutting down, not accepting new requests");
671 }
672
673 if (d_keys != nullptr) {
674 CUDA_CALL(cudaIpcGetMemHandle(&req->keysHandle, const_cast<T*>(d_keys)));
675 } else {
676 memset(&req->keysHandle, 0, sizeof(cudaIpcMemHandle_t));
677 }
678
679 if (d_output != nullptr) {
680 CUDA_CALL(cudaIpcGetMemHandle(&req->outputHandle, d_output));
681 } else {
682 memset(&req->outputHandle, 0, sizeof(cudaIpcMemHandle_t));
683 }
684
685 req->type = type;
686 req->count = count;
687 req->requestId = nextRequestId++;
688 req->completed.store(false, std::memory_order_release);
689 req->cancelled.store(false, std::memory_order_release);
690 req->result = 0;
691
692 queue->signalEnqueued();
693
694 while (!req->completed.load(std::memory_order_acquire)) {
695 std::this_thread::yield();
696 }
697
698 if (req->cancelled.load(std::memory_order_acquire)) {
699 throw std::runtime_error("Request cancelled, server is shutting down");
700 }
701
702 return req->result;
703 }
704};
705
706} // namespace cuckoogpu
Client implementation for the IPC Cuckoo Filter.
void containsMany(const T *d_keys, size_t count, bool *d_output)
Checks for existence of multiple keys.
size_t insertMany(const thrust::device_vector< T > &d_keys)
Inserts keys from a Thrust device vector.
void containsMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< bool > &d_output)
Checks for existence of keys in a Thrust device vector.
size_t insertMany(const T *d_keys, size_t count)
Inserts multiple keys into the filter.
size_t deleteMany(const thrust::device_vector< T > &d_keys)
Deletes keys in a Thrust device vector without outputting results.
void requestShutdown()
Requests the server to shut down.
FilterIPCClient(const std::string &name)
Constructs a new FilterIPCClient.
size_t deleteMany(const T *d_keys, size_t count, bool *d_output=nullptr)
Deletes multiple keys from the filter.
typename Config::KeyType T
size_t deleteMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< uint8_t > &d_output)
Deletes keys in a Thrust device vector (uint8_t output).
void containsMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< uint8_t > &d_output)
Checks for existence of keys in a Thrust device vector (uint8_t output).
void clear()
Clears the filter.
size_t deleteMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< bool > &d_output)
Deletes keys in a Thrust device vector.
~FilterIPCClient()
Destroys the FilterIPCClient.
Server implementation for the IPC Cuckoo Filter.
void start()
Starts the worker thread to process requests.
~FilterIPCServer()
Destroys the FilterIPCServer.
Filter< Config > * getFilter()
Returns a pointer to the underlying Filter instance.
void stop(bool force=false)
Stops the server.
FilterIPCServer(const std::string &name, size_t capacity)
Constructs a new FilterIPCServer.
A CUDA-accelerated Cuckoo Filter implementation.
void clear()
Clears the filter, removing all items.
size_t deleteMany(const T *d_keys, const size_t n, bool *d_output=nullptr, cudaStream_t stream={})
Tries to remove a set of keys from the filter.
void containsMany(const T *d_keys, const size_t n, bool *d_output, cudaStream_t stream={})
Checks for the existence of a batch of keys.
size_t insertMany(const T *d_keys, const size_t n, bool *d_output=nullptr, cudaStream_t stream={})
Inserts a batch of keys into the filter.
#define CUDA_CALL(err)
Macro for checking CUDA errors.
Definition helpers.cuh:204
constexpr bool powerOfTwo(size_t n)
Checks if a number is a power of two.
Definition helpers.cuh:16
RequestType
Type of request that can be sent to the IPC server.
Structure representing a filter operation request.
size_t result
Updated number of occupied slots after insert/delete.
cudaIpcMemHandle_t keysHandle
optional handle to device memory containing keys
uint32_t count
Number of keys in this batch.
uint64_t requestId
Unique request identifier.
std::atomic< bool > completed
Completion flag.
cudaIpcMemHandle_t outputHandle
optional handle for results (for lookup/deletion)
std::atomic< bool > cancelled
Cancellation flag (for force shutdown)
RequestType type
Type of request.
A shared memory queue implementation for Inter-Process Communication.
std::atomic< uint64_t > tail
Consumer index.
std::atomic< bool > initialised
Whether the queue has been initialised.
size_t pendingRequests() const
Returns the number of pending requests in the queue.
void initiateShutdown()
Initiates the shutdown process for the queue.
sem_t producerSem
Semaphore for available slots.
FilterRequest * enqueue()
Attempts to acquire a slot in the queue for a new request.
void signalDequeued()
Signals that a request has been processed and the slot is free.
FilterRequest * dequeue()
Dequeues the next pending request.
static constexpr size_t QUEUE_SIZE
void signalEnqueued()
Signals that a new request has been enqueued and is ready for processing.
sem_t consumerSem
Semaphore for pending requests.
FilterRequest requests[QUEUE_SIZE]
bool isShuttingDown() const
Checks if the queue is in shutdown mode.
std::atomic< bool > shuttingDown
Whether the queue is shutting down.
std::atomic< uint64_t > head
Producer index.
size_t cancelPendingRequests()
Cancels all pending requests in the queue.