GPU-Accelerated Cuckoo Filter
Loading...
Searching...
No Matches
CuckooFilterMultiGPU.cuh
Go to the documentation of this file.
1#pragma once
2
3#include <algorithm>
4#include <atomic>
5#include <cstddef>
6#include <cstdint>
7#include <cstring>
8#include <iostream>
9#include <memory>
10#include <numeric>
11#include <thread>
12#include <type_traits>
13#include <vector>
14#include "CuckooFilter.cuh"
15#include "helpers.cuh"
16
17#include <thrust/device_vector.h>
18#include <thrust/execution_policy.h>
19#include <thrust/host_vector.h>
20#include <thrust/scatter.h>
21
22#include <gossip.cuh>
23#include <plan_parser.hpp>
24
25namespace cuckoogpu {
26
36template <typename Config>
38 public:
39 using T = typename Config::KeyType;
40
47 struct Partitioner {
48 size_t numGPUs;
49
50 __host__ __device__ gossip::gpu_id_t operator()(const T& key) const {
51 uint64_t hash = Filter<Config>::hash64(key);
52 return static_cast<gossip::gpu_id_t>(hash % numGPUs);
53 }
54 };
55
57 static constexpr float defaultMemoryFactor = 0.8f;
58
59 private:
60 size_t numGPUs;
61 size_t capacityPerGPU;
62 float memoryFactor;
63 std::vector<Filter<Config>*> filters;
64
65 gossip::context_t gossipContext;
66 gossip::multisplit_t multisplit;
67 gossip::all2all_t all2all;
68 gossip::all2all_t all2allResults;
69
70 // Pre-allocated per-GPU buffers for gossip operations
71 std::vector<T*> srcBuffers;
72 std::vector<T*> dstBuffers;
73 std::vector<size_t> bufferCapacities;
74
75 std::vector<bool*> resultSrcBuffers;
76 std::vector<bool*> resultDstBuffers;
77
78 size_t totalBufferCapacity;
79
84 [[nodiscard]] std::vector<size_t> getGpuMemoryInfo() const {
85 std::vector<size_t> freeMem(numGPUs);
86 parallelForGPUs([&](size_t gpuId) {
87 size_t free, total;
88 CUDA_CALL(cudaMemGetInfo(&free, &total));
89 freeMem[gpuId] = free;
90 });
91 return freeMem;
92 }
93
101 void allocateBuffers() {
102 // Bytes per element: 2 key buffers (src + dst) + 2 result buffers (src + dst)
103 const size_t bytesPerKey = 2 * sizeof(T) + 2 * sizeof(bool);
104
105 totalBufferCapacity = 0;
106
107 parallelForGPUs([&](size_t gpuId) {
108 size_t freeMem, totalMem;
109 CUDA_CALL(cudaMemGetInfo(&freeMem, &totalMem));
110
111 // Calculate max keys this GPU can buffer
112 auto availableBytes = static_cast<size_t>(freeMem * memoryFactor);
113 size_t maxKeys = availableBytes / bytesPerKey;
114
115 // Allocate key buffers
116 CUDA_CALL(cudaMalloc(&srcBuffers[gpuId], maxKeys * sizeof(T)));
117 CUDA_CALL(cudaMalloc(&dstBuffers[gpuId], maxKeys * sizeof(T)));
118
119 // Allocate result buffers
120 CUDA_CALL(cudaMalloc(&resultSrcBuffers[gpuId], maxKeys * sizeof(bool)));
121 CUDA_CALL(cudaMalloc(&resultDstBuffers[gpuId], maxKeys * sizeof(bool)));
122
123 bufferCapacities[gpuId] = maxKeys;
124 totalBufferCapacity += maxKeys;
125 });
126 }
127
131 void freeBuffers() {
132 parallelForGPUs([&](size_t gpuId) {
133 if (srcBuffers[gpuId]) {
134 cudaFree(srcBuffers[gpuId]);
135 srcBuffers[gpuId] = nullptr;
136 }
137 if (dstBuffers[gpuId]) {
138 cudaFree(dstBuffers[gpuId]);
139 dstBuffers[gpuId] = nullptr;
140 }
141 if (resultSrcBuffers[gpuId]) {
142 cudaFree(resultSrcBuffers[gpuId]);
143 resultSrcBuffers[gpuId] = nullptr;
144 }
145 if (resultDstBuffers[gpuId]) {
146 cudaFree(resultDstBuffers[gpuId]);
147 resultDstBuffers[gpuId] = nullptr;
148 }
149 bufferCapacities[gpuId] = 0;
150 });
151 }
152
172 template <bool returnOccupied, bool hasOutput, typename FilterFunc>
173 size_t executeOperation(const T* h_keys, size_t n, bool* h_output, FilterFunc filterOp) {
174 if (n == 0) {
175 return returnOccupied ? totalOccupiedSlots() : 0;
176 }
177
178 size_t processed = 0;
179
180 while (processed < n) {
181 size_t chunkSize = std::min(n - processed, totalBufferCapacity);
182
183 // Distribute chunk proportionally based on each GPU's buffer capacity
184 std::vector<size_t> inputLens(numGPUs);
185 std::vector<size_t> inputOffsets(numGPUs + 1, 0);
186
187 size_t remaining = chunkSize;
188 for (size_t gpu = 0; gpu < numGPUs; ++gpu) {
189 if (gpu == numGPUs - 1) {
190 inputLens[gpu] = std::min(remaining, bufferCapacities[gpu]);
191 } else {
192 double proportion =
193 static_cast<double>(bufferCapacities[gpu]) / totalBufferCapacity;
194 inputLens[gpu] = std::min(
195 static_cast<size_t>(chunkSize * proportion), bufferCapacities[gpu]
196 );
197 inputLens[gpu] = std::min(inputLens[gpu], remaining);
198 }
199 remaining -= inputLens[gpu];
200 inputOffsets[gpu + 1] = inputOffsets[gpu] + inputLens[gpu];
201 }
202
203 // Copy input data to source buffers on each GPU
204 parallelForGPUs([&](size_t gpuId) {
205 if (inputLens[gpuId] > 0) {
206 CUDA_CALL(cudaMemcpy(
207 srcBuffers[gpuId],
208 h_keys + processed + inputOffsets[gpuId],
209 inputLens[gpuId] * sizeof(T),
210 cudaMemcpyHostToDevice
211 ));
212 }
213 });
214 gossipContext.sync_hard();
215
216 // Partition keys by target GPU
217 std::vector<std::vector<size_t>> partitionTable(numGPUs, std::vector<size_t>(numGPUs));
218
219 Partitioner partitioner{numGPUs};
220 multisplit.execAsync(
221 srcBuffers, // source pointers (per GPU)
222 inputLens, // source lengths (per GPU)
223 dstBuffers, // destination pointers (per GPU)
224 bufferCapacities, // destination capacities (per GPU)
225 partitionTable, // output: partition counts [src][dst]
226 partitioner
227 );
228 multisplit.sync();
229
230 std::swap(srcBuffers, dstBuffers);
231
232 // Calculate how many keys each GPU will receive after all2all
233 std::vector<size_t> recvCounts(numGPUs, 0);
234 for (size_t dst = 0; dst < numGPUs; ++dst) {
235 for (size_t src = 0; src < numGPUs; ++src) {
236 recvCounts[dst] += partitionTable[src][dst];
237 }
238 }
239
240 // Shuffle partitioned keys to correct GPUs
241 all2all.execAsync(
242 srcBuffers, // partitioned source data
243 bufferCapacities, // source buffer capacities
244 dstBuffers, // destination for received data
245 bufferCapacities, // destination buffer capacities
246 partitionTable // partition counts from multisplit
247 );
248 all2all.sync();
249
250 // If no output is required, execute filter ops and continue
251 if constexpr (!hasOutput) {
252 parallelForGPUs([&](size_t gpuId) {
253 size_t localCount = recvCounts[gpuId];
254 if (localCount == 0) {
255 return;
256 }
257 auto stream = gossipContext.get_streams(gpuId)[0];
258 filterOp(filters[gpuId], dstBuffers[gpuId], nullptr, localCount, stream);
259 });
260 gossipContext.sync_all_streams();
261 } else {
262 // Transpose partitionTable in-place for reverse all-to-all
263 for (size_t i = 0; i < numGPUs; ++i) {
264 for (size_t j = i + 1; j < numGPUs; ++j) {
265 std::swap(partitionTable[i][j], partitionTable[j][i]);
266 }
267 }
268
269 // Execute filter operations
270 parallelForGPUs([&](size_t gpuId) {
271 size_t localCount = recvCounts[gpuId];
272 if (localCount == 0) {
273 return;
274 }
275 auto stream = gossipContext.get_streams(gpuId)[0];
276 filterOp(
277 filters[gpuId],
278 dstBuffers[gpuId],
279 resultSrcBuffers[gpuId],
280 localCount,
281 stream
282 );
283 });
284 gossipContext.sync_all_streams();
285
286 all2allResults.execAsync(
287 resultSrcBuffers, recvCounts, resultDstBuffers, bufferCapacities, partitionTable
288 );
289 all2allResults.sync();
290
291 parallelForGPUs([&](size_t gpuId) {
292 size_t localCount = inputLens[gpuId];
293 if (localCount == 0) {
294 return;
295 }
296
297 CUDA_CALL(cudaMemcpy(
298 h_output + processed + inputOffsets[gpuId],
299 resultDstBuffers[gpuId],
300 localCount * sizeof(bool),
301 cudaMemcpyDeviceToHost
302 ));
303 });
304 gossipContext.sync_hard();
305 }
306
307 processed += chunkSize;
308 }
309
310 return returnOccupied ? totalOccupiedSlots() : 0;
311 }
312
313 public:
324 FilterMultiGPU(size_t numGPUs, size_t capacity, float memFactor = defaultMemoryFactor)
325 : numGPUs(numGPUs),
326 capacityPerGPU(static_cast<size_t>(SDIV(capacity, numGPUs) * 1.02)),
327 memoryFactor(memFactor),
328 gossipContext(numGPUs),
329 multisplit(gossipContext),
330 all2all(gossipContext, gossip::all2all::default_plan(numGPUs)),
331 all2allResults(gossipContext, gossip::all2all::default_plan(numGPUs)),
332 srcBuffers(numGPUs, nullptr),
333 dstBuffers(numGPUs, nullptr),
334 bufferCapacities(numGPUs, 0),
335 resultSrcBuffers(numGPUs, nullptr),
336 resultDstBuffers(numGPUs, nullptr),
337 totalBufferCapacity(0) {
338 assert(numGPUs > 0 && "Number of GPUs must be at least 1");
339
340 filters.resize(numGPUs);
341
342 for (size_t i = 0; i < numGPUs; ++i) {
343 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
344 Filter<Config>* filter;
345 CUDA_CALL(cudaMallocManaged(&filter, sizeof(Filter<Config>)));
346 new (filter) Filter<Config>(capacityPerGPU);
347 filters[i] = filter;
348 }
349 gossipContext.sync_hard();
350
351 allocateBuffers();
352 }
353
367 size_t numGPUs,
368 size_t capacity,
369 const char* transferPlanPath,
370 float memFactor = defaultMemoryFactor
371 )
372 : numGPUs(numGPUs),
373 capacityPerGPU(static_cast<size_t>(SDIV(capacity, numGPUs) * 1.02)),
374 memoryFactor(memFactor),
375 gossipContext(numGPUs),
376 multisplit(gossipContext),
377 all2all(
378 gossipContext,
379 [&]() {
380 auto plan = parse_plan(transferPlanPath);
381 if (plan.num_gpus() == 0) {
382 return gossip::all2all::default_plan(numGPUs);
383 }
384 return plan;
385 }()
386 ),
387 all2allResults(
388 gossipContext,
389 [&]() {
390 auto plan = parse_plan(transferPlanPath);
391 if (plan.num_gpus() == 0) {
392 return gossip::all2all::default_plan(numGPUs);
393 }
394 return plan;
395 }()
396 ),
397 srcBuffers(numGPUs, nullptr),
398 dstBuffers(numGPUs, nullptr),
399 bufferCapacities(numGPUs, 0),
400 resultSrcBuffers(numGPUs, nullptr),
401 resultDstBuffers(numGPUs, nullptr),
402 totalBufferCapacity(0) {
403 assert(numGPUs > 0 && "Number of GPUs must be at least 1");
404
405 filters.resize(numGPUs);
406
407 for (size_t i = 0; i < numGPUs; ++i) {
408 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
409 Filter<Config>* filter;
410 CUDA_CALL(cudaMallocManaged(&filter, sizeof(Filter<Config>)));
411 new (filter) Filter<Config>(capacityPerGPU);
412 filters[i] = filter;
413 }
414 gossipContext.sync_hard();
415
416 allocateBuffers();
417 }
418
425 freeBuffers();
426 for (size_t i = 0; i < numGPUs; ++i) {
427 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
428 filters[i]->~Filter<Config>();
429 CUDA_CALL(cudaFree(filters[i]));
430 }
431 }
432
435
445 size_t insertMany(const T* h_keys, size_t n, bool* h_output = nullptr) {
446 if (h_output) {
447 return executeOperation<true, true>(
448 h_keys,
449 n,
450 h_output,
451 [](Filter<Config>* filter,
452 const T* keys,
453 bool* results,
454 size_t count,
455 cudaStream_t stream) { filter->insertMany(keys, count, results, stream); }
456 );
457 } else {
458 return executeOperation<true, false>(
459 h_keys,
460 n,
461 nullptr,
462 [](Filter<Config>* filter,
463 const T* keys,
464 bool* /*unused results*/,
465 size_t count,
466 cudaStream_t stream) { filter->insertMany(keys, count, nullptr, stream); }
467 );
468 }
469 }
470
477 void containsMany(const T* h_keys, size_t n, bool* h_output) {
478 executeOperation<false, true>(
479 h_keys,
480 n,
481 h_output,
482 [](Filter<Config>* filter,
483 const T* keys,
484 bool* results,
485 size_t count,
486 cudaStream_t stream) { filter->containsMany(keys, count, results, stream); }
487 );
488 }
489
497 size_t deleteMany(const T* h_keys, size_t n, bool* h_output = nullptr) {
498 if (h_output) {
499 return executeOperation<true, true>(
500 h_keys,
501 n,
502 h_output,
503 [](Filter<Config>* filter,
504 const T* keys,
505 bool* results,
506 size_t count,
507 cudaStream_t stream) { filter->deleteMany(keys, count, results, stream); }
508 );
509 } else {
510 return executeOperation<true, false>(
511 h_keys,
512 n,
513 nullptr,
514 [](Filter<Config>* filter,
515 const T* keys,
516 bool* /*unused results*/,
517 size_t count,
518 cudaStream_t stream) { filter->deleteMany(keys, count, nullptr, stream); }
519 );
520 }
521 }
522
527 [[nodiscard]] float loadFactor() const {
528 return static_cast<float>(totalOccupiedSlots()) / static_cast<float>(totalCapacity());
529 }
530
539 template <typename Func>
540 void parallelForGPUs(Func func) const {
541 std::vector<std::thread> threads;
542 for (size_t i = 0; i < numGPUs; ++i) {
543 threads.emplace_back([=, this]() {
544 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
545 func(i);
546 });
547 }
548
549 for (auto& t : threads) {
550 t.join();
551 }
552 }
553
558 gossipContext.sync_all_streams();
559 }
560
565 [[nodiscard]] size_t totalOccupiedSlots() const {
566 std::atomic<size_t> total(0);
567 parallelForGPUs([&](size_t i) {
568 total.fetch_add(filters[i]->occupiedSlots(), std::memory_order_relaxed);
569 });
570
571 return total.load();
572 }
573
577 void clear() {
578 parallelForGPUs([&](size_t i) { filters[i]->clear(); });
579 }
580
585 [[nodiscard]] size_t totalCapacity() const {
586 std::atomic<size_t> total(0);
587 parallelForGPUs([&](size_t i) {
588 total.fetch_add(filters[i]->capacity(), std::memory_order_relaxed);
589 });
590 return total.load();
591 }
592
593 [[nodiscard]] size_t sizeInBytes() const {
594 std::atomic<size_t> total(0);
595 parallelForGPUs([&](size_t i) {
596 total.fetch_add(filters[i]->sizeInBytes(), std::memory_order_relaxed);
597 });
598 return total.load();
599 }
600
607 size_t insertMany(const thrust::host_vector<T>& h_keys, thrust::host_vector<bool>& h_output) {
608 h_output.resize(h_keys.size());
609 return insertMany(
610 thrust::raw_pointer_cast(h_keys.data()),
611 h_keys.size(),
612 thrust::raw_pointer_cast(h_output.data())
613 );
614 }
615
622 size_t
623 insertMany(const thrust::host_vector<T>& h_keys, thrust::host_vector<uint8_t>& h_output) {
624 h_output.resize(h_keys.size());
625 return insertMany(
626 thrust::raw_pointer_cast(h_keys.data()),
627 h_keys.size(),
628 reinterpret_cast<bool*>(thrust::raw_pointer_cast(h_output.data()))
629 );
630 }
631
637 size_t insertMany(const thrust::host_vector<T>& h_keys) {
638 return insertMany(thrust::raw_pointer_cast(h_keys.data()), h_keys.size(), nullptr);
639 }
640
646 void containsMany(const thrust::host_vector<T>& h_keys, thrust::host_vector<bool>& h_output) {
647 h_output.resize(h_keys.size());
649 thrust::raw_pointer_cast(h_keys.data()),
650 h_keys.size(),
651 thrust::raw_pointer_cast(h_output.data())
652 );
653 }
654
660 void
661 containsMany(const thrust::host_vector<T>& h_keys, thrust::host_vector<uint8_t>& h_output) {
662 h_output.resize(h_keys.size());
664 thrust::raw_pointer_cast(h_keys.data()),
665 h_keys.size(),
666 reinterpret_cast<bool*>(thrust::raw_pointer_cast(h_output.data()))
667 );
668 }
669
676 size_t deleteMany(const thrust::host_vector<T>& h_keys, thrust::host_vector<bool>& h_output) {
677 h_output.resize(h_keys.size());
678 return deleteMany(
679 thrust::raw_pointer_cast(h_keys.data()),
680 h_keys.size(),
681 thrust::raw_pointer_cast(h_output.data())
682 );
683 }
684
691 size_t
692 deleteMany(const thrust::host_vector<T>& h_keys, thrust::host_vector<uint8_t>& h_output) {
693 h_output.resize(h_keys.size());
694 return deleteMany(
695 thrust::raw_pointer_cast(h_keys.data()),
696 h_keys.size(),
697 reinterpret_cast<bool*>(thrust::raw_pointer_cast(h_output.data()))
698 );
699 }
700
706 size_t deleteMany(const thrust::host_vector<T>& h_keys) {
707 return deleteMany(thrust::raw_pointer_cast(h_keys.data()), h_keys.size(), nullptr);
708 }
709};
710
711} // namespace cuckoogpu
A multi-GPU implementation of the Cuckoo Filter.
size_t insertMany(const thrust::host_vector< T > &h_keys, thrust::host_vector< uint8_t > &h_output)
Inserts keys from a Thrust host vector (uint8_t output).
~FilterMultiGPU()
Destroys the FilterMultiGPU.
void containsMany(const thrust::host_vector< T > &h_keys, thrust::host_vector< uint8_t > &h_output)
Checks for existence of keys in a Thrust host vector (uint8_t output).
void containsMany(const T *h_keys, size_t n, bool *h_output)
Checks for the presence of multiple keys in the filter.
FilterMultiGPU(const FilterMultiGPU &)=delete
void parallelForGPUs(Func func) const
Executes a function in parallel across all GPUs.
size_t insertMany(const T *h_keys, size_t n, bool *h_output=nullptr)
Inserts a batch of keys into the distributed filter.
size_t deleteMany(const thrust::host_vector< T > &h_keys, thrust::host_vector< uint8_t > &h_output)
Deletes keys in a Thrust host vector (uint8_t output).
void containsMany(const thrust::host_vector< T > &h_keys, thrust::host_vector< bool > &h_output)
Checks for existence of keys in a Thrust host vector.
size_t deleteMany(const T *h_keys, size_t n, bool *h_output=nullptr)
Deletes multiple keys from the filter.
typename Config::KeyType T
size_t totalCapacity() const
Returns the total capacity of the distributed filter.
size_t deleteMany(const thrust::host_vector< T > &h_keys)
Deletes keys in a Thrust host vector without outputting results.
FilterMultiGPU(size_t numGPUs, size_t capacity, float memFactor=defaultMemoryFactor)
Constructs a new FilterMultiGPU with default transfer plan.
size_t insertMany(const thrust::host_vector< T > &h_keys, thrust::host_vector< bool > &h_output)
Inserts keys from a Thrust host vector.
static constexpr float defaultMemoryFactor
Default fraction of free GPU memory to use for buffers (after filter allocation)
size_t totalOccupiedSlots() const
Returns the total number of occupied slots across all GPUs.
void clear()
Clears all filters on all GPUs.
FilterMultiGPU & operator=(const FilterMultiGPU &)=delete
FilterMultiGPU(size_t numGPUs, size_t capacity, const char *transferPlanPath, float memFactor=defaultMemoryFactor)
Constructs a new FilterMultiGPU with custom transfer plan.
void synchronizeAllGPUs()
Synchronizes all GPU streams used by this filter.
size_t insertMany(const thrust::host_vector< T > &h_keys)
Inserts keys from a Thrust host vector without outputting results.
float loadFactor() const
Calculates the global load factor.
size_t deleteMany(const thrust::host_vector< T > &h_keys, thrust::host_vector< bool > &h_output)
Deletes keys in a Thrust host vector.
A CUDA-accelerated Cuckoo Filter implementation.
static __host__ __device__ uint64_t hash64(const H &key)
size_t deleteMany(const T *d_keys, const size_t n, bool *d_output=nullptr, cudaStream_t stream={})
Tries to remove a set of keys from the filter.
void containsMany(const T *d_keys, const size_t n, bool *d_output, cudaStream_t stream={})
Checks for the existence of a batch of keys.
size_t insertMany(const T *d_keys, const size_t n, bool *d_output=nullptr, cudaStream_t stream={})
Inserts a batch of keys into the filter.
#define SDIV(x, y)
Integer division with rounding up (ceiling).
Definition helpers.cuh:198
#define CUDA_CALL(err)
Macro for checking CUDA errors.
Definition helpers.cuh:204
Configuration structure for the Cuckoo Filter.
Functor for partitioning keys across GPUs.
__host__ __device__ gossip::gpu_id_t operator()(const T &key) const