GPU-Accelerated Cuckoo Filter
Loading...
Searching...
No Matches
Public Member Functions | List of all members
cuckoogpu::FilterIPCServer< Config > Class Template Reference

Server implementation for the IPC Cuckoo Filter. More...

Public Member Functions

 FilterIPCServer (const std::string &name, size_t capacity)
 Constructs a new FilterIPCServer.
 
 ~FilterIPCServer ()
 Destroys the FilterIPCServer.
 
void start ()
 Starts the worker thread to process requests.
 
void stop (bool force=false)
 Stops the server.
 
Filter< Config > * getFilter ()
 Returns a pointer to the underlying Filter instance.
 

Detailed Description

template<typename Config>
class cuckoogpu::FilterIPCServer< Config >

Server implementation for the IPC Cuckoo Filter.

This class manages the shared memory segment and processes requests from clients. It runs a worker thread that polls the shared queue and executes filter operations on the GPU.

Template Parameters
ConfigThe configuration structure for the Cuckoo Filter.

Definition at line 193 of file CuckooFilterIPC.cuh.

Constructor & Destructor Documentation

◆ FilterIPCServer()

template<typename Config >
cuckoogpu::FilterIPCServer< Config >::FilterIPCServer ( const std::string &  name,
size_t  capacity 
)
inline

Constructs a new FilterIPCServer.

Creates the shared memory segment and initializes the shared queue.

Parameters
nameUnique name for the shared memory segment.
capacityCapacity of the Cuckoo Filter.

Definition at line 320 of file CuckooFilterIPC.cuh.

321 : shmName("/cuckoo_filter_" + name), running(false) {
322 filter = new Filter<Config>(capacity);
323
324 // shared memory for queue
325 shmFd = shm_open(shmName.c_str(), O_CREAT | O_RDWR, 0666);
326 if (shmFd == -1) {
327 throw std::runtime_error("Failed to create shared memory");
328 }
329
330 if (ftruncate(shmFd, sizeof(SharedQueue)) == -1) {
331 close(shmFd);
332 throw std::runtime_error("Failed to set shared memory size");
333 }
334
335 queue = static_cast<SharedQueue*>(
336 mmap(nullptr, sizeof(SharedQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0)
337 );
338
339 if (queue == MAP_FAILED) {
340 close(shmFd);
341 throw std::runtime_error("Failed to map shared memory");
342 }
343
344 queue->head.store(0, std::memory_order_release);
345 queue->tail.store(0, std::memory_order_release);
346 queue->shuttingDown.store(false, std::memory_order_release);
347
348 sem_init(&queue->producerSem, 1, SharedQueue::QUEUE_SIZE);
349 sem_init(&queue->consumerSem, 1, 0);
350
351 for (auto& request : queue->requests) {
352 request.completed.store(false, std::memory_order_release);
353 request.cancelled.store(false, std::memory_order_release);
354 }
355
356 queue->initialised.store(true, std::memory_order_release);
357 }
std::atomic< uint64_t > tail
Consumer index.
std::atomic< bool > initialised
Whether the queue has been initialised.
sem_t producerSem
Semaphore for available slots.
static constexpr size_t QUEUE_SIZE
sem_t consumerSem
Semaphore for pending requests.
std::atomic< bool > shuttingDown
Whether the queue is shutting down.
std::atomic< uint64_t > head
Producer index.

◆ ~FilterIPCServer()

template<typename Config >
cuckoogpu::FilterIPCServer< Config >::~FilterIPCServer ( )
inline

Destroys the FilterIPCServer.

Stops the server, cleans up shared memory and resources.

Definition at line 364 of file CuckooFilterIPC.cuh.

364 {
365 stop();
366
367 if (queue != MAP_FAILED) {
368 sem_destroy(&queue->producerSem);
369 sem_destroy(&queue->consumerSem);
370 munmap(queue, sizeof(SharedQueue));
371 }
372
373 if (shmFd != -1) {
374 close(shmFd);
375 shm_unlink(shmName.c_str());
376 }
377
378 delete filter;
379 }
void stop(bool force=false)
Stops the server.
Here is the call graph for this function:

Member Function Documentation

◆ getFilter()

template<typename Config >
Filter< Config > * cuckoogpu::FilterIPCServer< Config >::getFilter ( )
inline

Returns a pointer to the underlying Filter instance.

Returns
Filter<Config>* Pointer to the filter.

Definition at line 444 of file CuckooFilterIPC.cuh.

444 {
445 return filter;
446 }

◆ start()

template<typename Config >
void cuckoogpu::FilterIPCServer< Config >::start ( )
inline

Starts the worker thread to process requests.

Definition at line 384 of file CuckooFilterIPC.cuh.

384 {
385 if (running) {
386 return;
387 }
388
389 running = true;
390 workerThread = std::thread(&FilterIPCServer::processRequests, this);
391 }

◆ stop()

template<typename Config >
void cuckoogpu::FilterIPCServer< Config >::stop ( bool  force = false)
inline

Stops the server.

Parameters
forceIf true, cancels pending requests immediately. If false, waits for pending requests to complete.

Definition at line 399 of file CuckooFilterIPC.cuh.

399 {
400 if (!running) {
401 return;
402 }
403
404 running = false;
405
406 if (force) {
407 size_t cancelled = queue->cancelPendingRequests();
408 if (cancelled > 0) {
409 std::cout << std::format(
410 "Force shutdown: cancelled {} pending requests\n", cancelled
411 );
412 }
413 } else {
414 size_t pending = queue->pendingRequests();
415 if (pending > 0) {
416 std::cout << std::format(
417 "Graceful shutdown: draining {} pending requests...\n", pending
418 );
419 }
420 }
421
422 // Send shutdown request to the worker thread before
423 // setting shutdown flag so the enqueue() doesn't reject it
424 FilterRequest* req = queue->enqueue();
425 if (req) {
427 req->completed.store(false, std::memory_order_release);
428 req->cancelled.store(false, std::memory_order_release);
429
430 queue->signalEnqueued();
431 }
432
433 queue->initiateShutdown();
434
435 if (workerThread.joinable()) {
436 workerThread.join();
437 }
438 }
RequestType type
Type of request.
size_t pendingRequests() const
Returns the number of pending requests in the queue.
void initiateShutdown()
Initiates the shutdown process for the queue.
FilterRequest * enqueue()
Attempts to acquire a slot in the queue for a new request.
void signalEnqueued()
Signals that a new request has been enqueued and is ready for processing.
size_t cancelPendingRequests()
Cancels all pending requests in the queue.
Here is the call graph for this function:
Here is the caller graph for this function:

The documentation for this class was generated from the following file: