GPU-Accelerated Cuckoo Filter
Loading...
Searching...
No Matches
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
cuckoogpu::SharedQueue Struct Reference

A shared memory queue implementation for Inter-Process Communication. More...

Collaboration diagram for cuckoogpu::SharedQueue:
[legend]

Public Member Functions

FilterRequestenqueue ()
 Attempts to acquire a slot in the queue for a new request.
 
void signalEnqueued ()
 Signals that a new request has been enqueued and is ready for processing.
 
FilterRequestdequeue ()
 Dequeues the next pending request.
 
void signalDequeued ()
 Signals that a request has been processed and the slot is free.
 
size_t pendingRequests () const
 Returns the number of pending requests in the queue.
 
void initiateShutdown ()
 Initiates the shutdown process for the queue.
 
bool isShuttingDown () const
 Checks if the queue is in shutdown mode.
 
size_t cancelPendingRequests ()
 Cancels all pending requests in the queue.
 

Public Attributes

std::atomic< uint64_t > head
 Producer index.
 
std::atomic< uint64_t > tail
 Consumer index.
 
sem_t producerSem
 Semaphore for available slots.
 
sem_t consumerSem
 Semaphore for pending requests.
 
FilterRequest requests [QUEUE_SIZE]
 
std::atomic< bool > initialised
 Whether the queue has been initialised.
 
std::atomic< bool > shuttingDown
 Whether the queue is shutting down.
 

Static Public Attributes

static constexpr size_t QUEUE_SIZE = 256
 

Detailed Description

A shared memory queue implementation for Inter-Process Communication.

This structure manages a ring buffer of requests in shared memory, using semaphores for synchronization between the producer (client) and consumer (server).

Definition at line 53 of file CuckooFilterIPC.cuh.

Member Function Documentation

◆ cancelPendingRequests()

size_t cuckoogpu::SharedQueue::cancelPendingRequests ( )
inline

Cancels all pending requests in the queue.

Marks all uncompleted requests as cancelled and completed.

Returns
size_t Number of requests cancelled.

Definition at line 164 of file CuckooFilterIPC.cuh.

164 {
165 uint64_t currentTail = tail.load(std::memory_order_acquire);
166 uint64_t currentHead = head.load(std::memory_order_acquire);
167 size_t cancelled = 0;
168
169 for (uint64_t i = currentTail; i < currentHead; i++) {
170 FilterRequest& req = requests[i & (QUEUE_SIZE - 1)];
171 if (!req.completed.load(std::memory_order_acquire)) {
172 req.cancelled.store(true, std::memory_order_release);
173 req.completed.store(true, std::memory_order_release);
174 req.result = 0;
175 cancelled++;
176 }
177 }
178
179 return cancelled;
180 }
std::atomic< bool > cancelled
Cancellation flag (for force shutdown)
std::atomic< uint64_t > tail
Consumer index.
static constexpr size_t QUEUE_SIZE
FilterRequest requests[QUEUE_SIZE]
std::atomic< uint64_t > head
Producer index.
Here is the caller graph for this function:

◆ dequeue()

FilterRequest * cuckoogpu::SharedQueue::dequeue ( )
inline

Dequeues the next pending request.

This function blocks until a request is available.

Returns
FilterRequest* Pointer to the request to process, or nullptr on error/interrupt.

Definition at line 111 of file CuckooFilterIPC.cuh.

111 {
112 // Wait for a request
113 if (sem_wait(&consumerSem) != 0) {
114 if (errno == EINTR) {
115 return nullptr;
116 }
117 return nullptr;
118 }
119
120 uint64_t tailIdx = tail.load(std::memory_order_acquire);
121 FilterRequest& req = requests[tailIdx & (QUEUE_SIZE - 1)];
122
123 return &req;
124 }
sem_t consumerSem
Semaphore for pending requests.

◆ enqueue()

FilterRequest * cuckoogpu::SharedQueue::enqueue ( )
inline

Attempts to acquire a slot in the queue for a new request.

This function blocks until a slot is available or the server shuts down.

Returns
FilterRequest* Pointer to the acquired request slot, or nullptr if shutting down.

Definition at line 74 of file CuckooFilterIPC.cuh.

74 {
75 // Check if server is shutting down before trying to do anything
76 if (shuttingDown.load(std::memory_order_acquire)) {
77 return nullptr;
78 }
79
80 // Wait for available slot
81 if (sem_wait(&producerSem) != 0) {
82 return nullptr;
83 }
84
85 // Double-check after acquiring semaphore
86 if (shuttingDown.load(std::memory_order_acquire)) {
87 sem_post(&producerSem);
88 return nullptr;
89 }
90
91 uint64_t headIdx = head.fetch_add(1, std::memory_order_acq_rel);
92 FilterRequest& req = requests[headIdx & (QUEUE_SIZE - 1)];
93
94 return &req;
95 }
sem_t producerSem
Semaphore for available slots.
std::atomic< bool > shuttingDown
Whether the queue is shutting down.
Here is the caller graph for this function:

◆ initiateShutdown()

void cuckoogpu::SharedQueue::initiateShutdown ( )
inline

Initiates the shutdown process for the queue.

Definition at line 145 of file CuckooFilterIPC.cuh.

145 {
146 shuttingDown.store(true, std::memory_order_release);
147 }
Here is the caller graph for this function:

◆ isShuttingDown()

bool cuckoogpu::SharedQueue::isShuttingDown ( ) const
inline

Checks if the queue is in shutdown mode.

Returns
true if shutting down, false otherwise.

Definition at line 153 of file CuckooFilterIPC.cuh.

153 {
154 return shuttingDown.load(std::memory_order_acquire);
155 }
Here is the caller graph for this function:

◆ pendingRequests()

size_t cuckoogpu::SharedQueue::pendingRequests ( ) const
inline

Returns the number of pending requests in the queue.

Returns
size_t Number of pending requests.

Definition at line 138 of file CuckooFilterIPC.cuh.

138 {
139 return head.load(std::memory_order_acquire) - tail.load(std::memory_order_acquire);
140 }
Here is the caller graph for this function:

◆ signalDequeued()

void cuckoogpu::SharedQueue::signalDequeued ( )
inline

Signals that a request has been processed and the slot is free.

Definition at line 129 of file CuckooFilterIPC.cuh.

129 {
130 tail.fetch_add(1, std::memory_order_release);
131 sem_post(&producerSem);
132 }

◆ signalEnqueued()

void cuckoogpu::SharedQueue::signalEnqueued ( )
inline

Signals that a new request has been enqueued and is ready for processing.

Definition at line 100 of file CuckooFilterIPC.cuh.

100 {
101 sem_post(&consumerSem);
102 }
Here is the caller graph for this function:

Member Data Documentation

◆ consumerSem

sem_t cuckoogpu::SharedQueue::consumerSem

Semaphore for pending requests.

Definition at line 60 of file CuckooFilterIPC.cuh.

◆ head

std::atomic<uint64_t> cuckoogpu::SharedQueue::head

Producer index.

Definition at line 57 of file CuckooFilterIPC.cuh.

◆ initialised

std::atomic<bool> cuckoogpu::SharedQueue::initialised

Whether the queue has been initialised.

Definition at line 64 of file CuckooFilterIPC.cuh.

◆ producerSem

sem_t cuckoogpu::SharedQueue::producerSem

Semaphore for available slots.

Definition at line 59 of file CuckooFilterIPC.cuh.

◆ QUEUE_SIZE

constexpr size_t cuckoogpu::SharedQueue::QUEUE_SIZE = 256
staticconstexpr

Definition at line 54 of file CuckooFilterIPC.cuh.

◆ requests

FilterRequest cuckoogpu::SharedQueue::requests[QUEUE_SIZE]

Definition at line 62 of file CuckooFilterIPC.cuh.

◆ shuttingDown

std::atomic<bool> cuckoogpu::SharedQueue::shuttingDown

Whether the queue is shutting down.

Definition at line 65 of file CuckooFilterIPC.cuh.

◆ tail

std::atomic<uint64_t> cuckoogpu::SharedQueue::tail

Consumer index.

Definition at line 58 of file CuckooFilterIPC.cuh.


The documentation for this struct was generated from the following file: