3#include <cuda_runtime.h>
57 std::atomic<uint64_t>
head;
58 std::atomic<uint64_t>
tail;
91 uint64_t headIdx =
head.fetch_add(1, std::memory_order_acq_rel);
114 if (errno == EINTR) {
120 uint64_t tailIdx =
tail.load(std::memory_order_acquire);
130 tail.fetch_add(1, std::memory_order_release);
139 return head.load(std::memory_order_acquire) -
tail.load(std::memory_order_acquire);
165 uint64_t currentTail =
tail.load(std::memory_order_acquire);
166 uint64_t currentHead =
head.load(std::memory_order_acquire);
167 size_t cancelled = 0;
169 for (uint64_t i = currentTail; i < currentHead; i++) {
171 if (!req.
completed.load(std::memory_order_acquire)) {
172 req.
cancelled.store(
true, std::memory_order_release);
173 req.
completed.store(
true, std::memory_order_release);
192template <
typename Config>
200 std::thread workerThread;
202 void processRequests() {
203 bool shutdownReceived =
false;
215 if (errno == EINTR) {
222 shutdownReceived =
true;
223 req->
completed.store(
true, std::memory_order_release);
231 req->
completed.store(
true, std::memory_order_release);
237 if (req->
cancelled.load(std::memory_order_acquire)) {
238 req->
completed.store(
true, std::memory_order_release);
245 }
catch (
const std::exception& e) {
246 std::cerr << std::format(
"Error processing request: {}\n", e.what());
250 req->
completed.store(
true, std::memory_order_release);
260 bool* d_output =
nullptr;
262 cudaIpcMemHandle_t zeroHandle = {0};
263 bool hasKeys = memcmp(&req->
keysHandle, &zeroHandle,
sizeof(cudaIpcMemHandle_t)) != 0;
267 (
void**)&d_keys, req->
keysHandle, cudaIpcMemLazyEnablePeerAccess
271 bool hasOutput =
false;
274 memcmp(&req->
outputHandle, &zeroHandle,
sizeof(cudaIpcMemHandle_t)) != 0;
278 (
void**)&d_output, req->
outputHandle, cudaIpcMemLazyEnablePeerAccess
304 CUDA_CALL(cudaIpcCloseMemHandle(d_keys));
307 CUDA_CALL(cudaIpcCloseMemHandle(d_output));
321 : shmName(
"/cuckoo_filter_" + name), running(false) {
325 shmFd = shm_open(shmName.c_str(), O_CREAT | O_RDWR, 0666);
327 throw std::runtime_error(
"Failed to create shared memory");
332 throw std::runtime_error(
"Failed to set shared memory size");
336 mmap(
nullptr,
sizeof(
SharedQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0)
339 if (queue == MAP_FAILED) {
341 throw std::runtime_error(
"Failed to map shared memory");
344 queue->
head.store(0, std::memory_order_release);
345 queue->
tail.store(0, std::memory_order_release);
346 queue->
shuttingDown.store(
false, std::memory_order_release);
351 for (
auto& request : queue->
requests) {
352 request.completed.store(
false, std::memory_order_release);
353 request.cancelled.store(
false, std::memory_order_release);
356 queue->
initialised.store(
true, std::memory_order_release);
367 if (queue != MAP_FAILED) {
375 shm_unlink(shmName.c_str());
390 workerThread = std::thread(&FilterIPCServer::processRequests,
this);
399 void stop(
bool force =
false) {
409 std::cout << std::format(
410 "Force shutdown: cancelled {} pending requests\n", cancelled
416 std::cout << std::format(
417 "Graceful shutdown: draining {} pending requests...\n", pending
427 req->
completed.store(
false, std::memory_order_release);
428 req->
cancelled.store(
false, std::memory_order_release);
435 if (workerThread.joinable()) {
458template <
typename Config>
464 uint64_t nextRequestId;
477 : shmName(
"/cuckoo_filter_" + name), nextRequestId(0) {
478 shmFd = shm_open(shmName.c_str(), O_RDWR, 0666);
480 throw std::runtime_error(
"Failed to open shared memory, is the server running?");
484 mmap(
nullptr,
sizeof(
SharedQueue), PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0)
487 if (queue == MAP_FAILED) {
489 throw std::runtime_error(
"Failed to map shared memory");
492 if (!queue->
initialised.load(std::memory_order_acquire)) {
495 throw std::runtime_error(
"Ring buffer not initialised, server may not be ready");
505 if (queue != MAP_FAILED) {
543 size_t deleteMany(
const T* d_keys,
size_t count,
bool* d_output =
nullptr) {
567 req->
completed.store(
false, std::memory_order_release);
568 req->
cancelled.store(
false, std::memory_order_release);
572 while (!req->
completed.load(std::memory_order_acquire)) {
573 std::this_thread::yield();
584 return insertMany(thrust::raw_pointer_cast(d_keys.data()), d_keys.size());
593 containsMany(
const thrust::device_vector<T>& d_keys, thrust::device_vector<bool>& d_output) {
594 if (d_output.size() != d_keys.size()) {
595 d_output.resize(d_keys.size());
598 thrust::raw_pointer_cast(d_keys.data()),
600 thrust::raw_pointer_cast(d_output.data())
610 containsMany(
const thrust::device_vector<T>& d_keys, thrust::device_vector<uint8_t>& d_output) {
611 if (d_output.size() != d_keys.size()) {
612 d_output.resize(d_keys.size());
615 thrust::raw_pointer_cast(d_keys.data()),
617 reinterpret_cast<bool*
>(thrust::raw_pointer_cast(d_output.data()))
628 deleteMany(
const thrust::device_vector<T>& d_keys, thrust::device_vector<bool>& d_output) {
629 if (d_output.size() != d_keys.size()) {
630 d_output.resize(d_keys.size());
633 thrust::raw_pointer_cast(d_keys.data()),
635 thrust::raw_pointer_cast(d_output.data())
646 deleteMany(
const thrust::device_vector<T>& d_keys, thrust::device_vector<uint8_t>& d_output) {
647 if (d_output.size() != d_keys.size()) {
648 d_output.resize(d_keys.size());
651 thrust::raw_pointer_cast(d_keys.data()),
653 reinterpret_cast<bool*
>(thrust::raw_pointer_cast(d_output.data()))
663 return deleteMany(thrust::raw_pointer_cast(d_keys.data()), d_keys.size(),
nullptr);
667 size_t submitRequest(
RequestType type,
const T* d_keys,
size_t count,
bool* d_output) {
670 throw std::runtime_error(
"Server is shutting down, not accepting new requests");
673 if (d_keys !=
nullptr) {
676 memset(&req->
keysHandle, 0,
sizeof(cudaIpcMemHandle_t));
679 if (d_output !=
nullptr) {
682 memset(&req->
outputHandle, 0,
sizeof(cudaIpcMemHandle_t));
688 req->
completed.store(
false, std::memory_order_release);
689 req->
cancelled.store(
false, std::memory_order_release);
694 while (!req->
completed.load(std::memory_order_acquire)) {
695 std::this_thread::yield();
698 if (req->
cancelled.load(std::memory_order_acquire)) {
699 throw std::runtime_error(
"Request cancelled, server is shutting down");
Client implementation for the IPC Cuckoo Filter.
void containsMany(const T *d_keys, size_t count, bool *d_output)
Checks for existence of multiple keys.
size_t insertMany(const thrust::device_vector< T > &d_keys)
Inserts keys from a Thrust device vector.
void containsMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< bool > &d_output)
Checks for existence of keys in a Thrust device vector.
size_t insertMany(const T *d_keys, size_t count)
Inserts multiple keys into the filter.
size_t deleteMany(const thrust::device_vector< T > &d_keys)
Deletes keys in a Thrust device vector without outputting results.
void requestShutdown()
Requests the server to shut down.
FilterIPCClient(const std::string &name)
Constructs a new FilterIPCClient.
size_t deleteMany(const T *d_keys, size_t count, bool *d_output=nullptr)
Deletes multiple keys from the filter.
typename Config::KeyType T
size_t deleteMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< uint8_t > &d_output)
Deletes keys in a Thrust device vector (uint8_t output).
void containsMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< uint8_t > &d_output)
Checks for existence of keys in a Thrust device vector (uint8_t output).
void clear()
Clears the filter.
size_t deleteMany(const thrust::device_vector< T > &d_keys, thrust::device_vector< bool > &d_output)
Deletes keys in a Thrust device vector.
~FilterIPCClient()
Destroys the FilterIPCClient.
Server implementation for the IPC Cuckoo Filter.
void start()
Starts the worker thread to process requests.
~FilterIPCServer()
Destroys the FilterIPCServer.
Filter< Config > * getFilter()
Returns a pointer to the underlying Filter instance.
void stop(bool force=false)
Stops the server.
FilterIPCServer(const std::string &name, size_t capacity)
Constructs a new FilterIPCServer.
A CUDA-accelerated Cuckoo Filter implementation.
void clear()
Clears the filter, removing all items.
size_t deleteMany(const T *d_keys, const size_t n, bool *d_output=nullptr, cudaStream_t stream={})
Tries to remove a set of keys from the filter.
void containsMany(const T *d_keys, const size_t n, bool *d_output, cudaStream_t stream={})
Checks for the existence of a batch of keys.
size_t insertMany(const T *d_keys, const size_t n, bool *d_output=nullptr, cudaStream_t stream={})
Inserts a batch of keys into the filter.
#define CUDA_CALL(err)
Macro for checking CUDA errors.
constexpr bool powerOfTwo(size_t n)
Checks if a number is a power of two.
RequestType
Type of request that can be sent to the IPC server.
Structure representing a filter operation request.
size_t result
Updated number of occupied slots after insert/delete.
cudaIpcMemHandle_t keysHandle
optional handle to device memory containing keys
uint32_t count
Number of keys in this batch.
uint64_t requestId
Unique request identifier.
std::atomic< bool > completed
Completion flag.
cudaIpcMemHandle_t outputHandle
optional handle for results (for lookup/deletion)
std::atomic< bool > cancelled
Cancellation flag (for force shutdown)
RequestType type
Type of request.
A shared memory queue implementation for Inter-Process Communication.
std::atomic< uint64_t > tail
Consumer index.
std::atomic< bool > initialised
Whether the queue has been initialised.
size_t pendingRequests() const
Returns the number of pending requests in the queue.
void initiateShutdown()
Initiates the shutdown process for the queue.
sem_t producerSem
Semaphore for available slots.
FilterRequest * enqueue()
Attempts to acquire a slot in the queue for a new request.
void signalDequeued()
Signals that a request has been processed and the slot is free.
FilterRequest * dequeue()
Dequeues the next pending request.
static constexpr size_t QUEUE_SIZE
void signalEnqueued()
Signals that a new request has been enqueued and is ready for processing.
sem_t consumerSem
Semaphore for pending requests.
FilterRequest requests[QUEUE_SIZE]
bool isShuttingDown() const
Checks if the queue is in shutdown mode.
std::atomic< bool > shuttingDown
Whether the queue is shutting down.
std::atomic< uint64_t > head
Producer index.
size_t cancelPendingRequests()
Cancels all pending requests in the queue.