50 __host__ __device__ gossip::gpu_id_t
operator()(
const T& key)
const {
52 return static_cast<gossip::gpu_id_t
>(hash %
numGPUs);
61 size_t capacityPerGPU;
63 std::vector<Filter<Config>*> filters;
65 gossip::context_t gossipContext;
66 gossip::multisplit_t multisplit;
67 gossip::all2all_t all2all;
68 gossip::all2all_t all2allResults;
71 std::vector<T*> srcBuffers;
72 std::vector<T*> dstBuffers;
73 std::vector<size_t> bufferCapacities;
75 std::vector<bool*> resultSrcBuffers;
76 std::vector<bool*> resultDstBuffers;
78 size_t totalBufferCapacity;
84 [[nodiscard]] std::vector<size_t> getGpuMemoryInfo()
const {
85 std::vector<size_t> freeMem(numGPUs);
89 freeMem[gpuId] = free;
101 void allocateBuffers() {
103 const size_t bytesPerKey = 2 *
sizeof(
T) + 2 *
sizeof(
bool);
105 totalBufferCapacity = 0;
108 size_t freeMem, totalMem;
109 CUDA_CALL(cudaMemGetInfo(&freeMem, &totalMem));
112 auto availableBytes =
static_cast<size_t>(freeMem * memoryFactor);
113 size_t maxKeys = availableBytes / bytesPerKey;
116 CUDA_CALL(cudaMalloc(&srcBuffers[gpuId], maxKeys *
sizeof(
T)));
117 CUDA_CALL(cudaMalloc(&dstBuffers[gpuId], maxKeys *
sizeof(
T)));
120 CUDA_CALL(cudaMalloc(&resultSrcBuffers[gpuId], maxKeys *
sizeof(
bool)));
121 CUDA_CALL(cudaMalloc(&resultDstBuffers[gpuId], maxKeys *
sizeof(
bool)));
123 bufferCapacities[gpuId] = maxKeys;
124 totalBufferCapacity += maxKeys;
133 if (srcBuffers[gpuId]) {
134 cudaFree(srcBuffers[gpuId]);
135 srcBuffers[gpuId] =
nullptr;
137 if (dstBuffers[gpuId]) {
138 cudaFree(dstBuffers[gpuId]);
139 dstBuffers[gpuId] =
nullptr;
141 if (resultSrcBuffers[gpuId]) {
142 cudaFree(resultSrcBuffers[gpuId]);
143 resultSrcBuffers[gpuId] =
nullptr;
145 if (resultDstBuffers[gpuId]) {
146 cudaFree(resultDstBuffers[gpuId]);
147 resultDstBuffers[gpuId] =
nullptr;
149 bufferCapacities[gpuId] = 0;
172 template <
bool returnOccupied,
bool hasOutput,
typename FilterFunc>
173 size_t executeOperation(
const T* h_keys,
size_t n,
bool* h_output, FilterFunc filterOp) {
178 size_t processed = 0;
180 while (processed < n) {
181 size_t chunkSize = std::min(n - processed, totalBufferCapacity);
184 std::vector<size_t> inputLens(numGPUs);
185 std::vector<size_t> inputOffsets(numGPUs + 1, 0);
187 size_t remaining = chunkSize;
188 for (
size_t gpu = 0; gpu < numGPUs; ++gpu) {
189 if (gpu == numGPUs - 1) {
190 inputLens[gpu] = std::min(remaining, bufferCapacities[gpu]);
193 static_cast<double>(bufferCapacities[gpu]) / totalBufferCapacity;
194 inputLens[gpu] = std::min(
195 static_cast<size_t>(chunkSize * proportion), bufferCapacities[gpu]
197 inputLens[gpu] = std::min(inputLens[gpu], remaining);
199 remaining -= inputLens[gpu];
200 inputOffsets[gpu + 1] = inputOffsets[gpu] + inputLens[gpu];
205 if (inputLens[gpuId] > 0) {
208 h_keys + processed + inputOffsets[gpuId],
209 inputLens[gpuId] *
sizeof(
T),
210 cudaMemcpyHostToDevice
214 gossipContext.sync_hard();
217 std::vector<std::vector<size_t>> partitionTable(numGPUs, std::vector<size_t>(numGPUs));
219 Partitioner partitioner{numGPUs};
220 multisplit.execAsync(
230 std::swap(srcBuffers, dstBuffers);
233 std::vector<size_t> recvCounts(numGPUs, 0);
234 for (
size_t dst = 0; dst < numGPUs; ++dst) {
235 for (
size_t src = 0; src < numGPUs; ++src) {
236 recvCounts[dst] += partitionTable[src][dst];
251 if constexpr (!hasOutput) {
253 size_t localCount = recvCounts[gpuId];
254 if (localCount == 0) {
257 auto stream = gossipContext.get_streams(gpuId)[0];
258 filterOp(filters[gpuId], dstBuffers[gpuId],
nullptr, localCount, stream);
260 gossipContext.sync_all_streams();
263 for (
size_t i = 0; i < numGPUs; ++i) {
264 for (
size_t j = i + 1; j < numGPUs; ++j) {
265 std::swap(partitionTable[i][j], partitionTable[j][i]);
271 size_t localCount = recvCounts[gpuId];
272 if (localCount == 0) {
275 auto stream = gossipContext.get_streams(gpuId)[0];
279 resultSrcBuffers[gpuId],
284 gossipContext.sync_all_streams();
286 all2allResults.execAsync(
287 resultSrcBuffers, recvCounts, resultDstBuffers, bufferCapacities, partitionTable
289 all2allResults.sync();
292 size_t localCount = inputLens[gpuId];
293 if (localCount == 0) {
298 h_output + processed + inputOffsets[gpuId],
299 resultDstBuffers[gpuId],
300 localCount *
sizeof(
bool),
301 cudaMemcpyDeviceToHost
304 gossipContext.sync_hard();
307 processed += chunkSize;
326 capacityPerGPU(static_cast<size_t>(
SDIV(capacity, numGPUs) * 1.02)),
327 memoryFactor(memFactor),
328 gossipContext(numGPUs),
329 multisplit(gossipContext),
330 all2all(gossipContext, gossip::all2all::default_plan(numGPUs)),
331 all2allResults(gossipContext, gossip::all2all::default_plan(numGPUs)),
332 srcBuffers(numGPUs, nullptr),
333 dstBuffers(numGPUs, nullptr),
334 bufferCapacities(numGPUs, 0),
335 resultSrcBuffers(numGPUs, nullptr),
336 resultDstBuffers(numGPUs, nullptr),
337 totalBufferCapacity(0) {
338 assert(numGPUs > 0 &&
"Number of GPUs must be at least 1");
340 filters.resize(numGPUs);
342 for (
size_t i = 0; i < numGPUs; ++i) {
343 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
349 gossipContext.sync_hard();
369 const char* transferPlanPath,
373 capacityPerGPU(static_cast<size_t>(
SDIV(capacity, numGPUs) * 1.02)),
374 memoryFactor(memFactor),
375 gossipContext(numGPUs),
376 multisplit(gossipContext),
380 auto plan = parse_plan(transferPlanPath);
381 if (plan.num_gpus() == 0) {
382 return gossip::all2all::default_plan(numGPUs);
390 auto plan = parse_plan(transferPlanPath);
391 if (plan.num_gpus() == 0) {
392 return gossip::all2all::default_plan(numGPUs);
397 srcBuffers(numGPUs,
nullptr),
398 dstBuffers(numGPUs,
nullptr),
399 bufferCapacities(numGPUs, 0),
400 resultSrcBuffers(numGPUs,
nullptr),
401 resultDstBuffers(numGPUs,
nullptr),
402 totalBufferCapacity(0) {
403 assert(numGPUs > 0 &&
"Number of GPUs must be at least 1");
405 filters.resize(numGPUs);
407 for (
size_t i = 0; i < numGPUs; ++i) {
408 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
409 Filter<Config>* filter;
410 CUDA_CALL(cudaMallocManaged(&filter,
sizeof(Filter<Config>)));
411 new (filter) Filter<Config>(capacityPerGPU);
414 gossipContext.sync_hard();
426 for (
size_t i = 0; i < numGPUs; ++i) {
427 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
428 filters[i]->~Filter<
Config>();
445 size_t insertMany(
const T* h_keys,
size_t n,
bool* h_output =
nullptr) {
447 return executeOperation<true, true>(
455 cudaStream_t stream) { filter->
insertMany(keys, count, results, stream); }
458 return executeOperation<true, false>(
466 cudaStream_t stream) { filter->
insertMany(keys, count,
nullptr, stream); }
478 executeOperation<false, true>(
486 cudaStream_t stream) { filter->
containsMany(keys, count, results, stream); }
497 size_t deleteMany(
const T* h_keys,
size_t n,
bool* h_output =
nullptr) {
499 return executeOperation<true, true>(
507 cudaStream_t stream) { filter->
deleteMany(keys, count, results, stream); }
510 return executeOperation<true, false>(
518 cudaStream_t stream) { filter->
deleteMany(keys, count,
nullptr, stream); }
539 template <
typename Func>
541 std::vector<std::thread> threads;
542 for (
size_t i = 0; i < numGPUs; ++i) {
543 threads.emplace_back([=,
this]() {
544 CUDA_CALL(cudaSetDevice(gossipContext.get_device_id(i)));
549 for (
auto& t : threads) {
558 gossipContext.sync_all_streams();
566 std::atomic<size_t> total(0);
568 total.fetch_add(filters[i]->occupiedSlots(), std::memory_order_relaxed);
586 std::atomic<size_t> total(0);
588 total.fetch_add(filters[i]->capacity(), std::memory_order_relaxed);
594 std::atomic<size_t> total(0);
596 total.fetch_add(filters[i]->
sizeInBytes(), std::memory_order_relaxed);
607 size_t insertMany(
const thrust::host_vector<T>& h_keys, thrust::host_vector<bool>& h_output) {
608 h_output.resize(h_keys.size());
610 thrust::raw_pointer_cast(h_keys.data()),
612 thrust::raw_pointer_cast(h_output.data())
623 insertMany(
const thrust::host_vector<T>& h_keys, thrust::host_vector<uint8_t>& h_output) {
624 h_output.resize(h_keys.size());
626 thrust::raw_pointer_cast(h_keys.data()),
628 reinterpret_cast<bool*
>(thrust::raw_pointer_cast(h_output.data()))
638 return insertMany(thrust::raw_pointer_cast(h_keys.data()), h_keys.size(),
nullptr);
646 void containsMany(
const thrust::host_vector<T>& h_keys, thrust::host_vector<bool>& h_output) {
647 h_output.resize(h_keys.size());
649 thrust::raw_pointer_cast(h_keys.data()),
651 thrust::raw_pointer_cast(h_output.data())
661 containsMany(
const thrust::host_vector<T>& h_keys, thrust::host_vector<uint8_t>& h_output) {
662 h_output.resize(h_keys.size());
664 thrust::raw_pointer_cast(h_keys.data()),
666 reinterpret_cast<bool*
>(thrust::raw_pointer_cast(h_output.data()))
676 size_t deleteMany(
const thrust::host_vector<T>& h_keys, thrust::host_vector<bool>& h_output) {
677 h_output.resize(h_keys.size());
679 thrust::raw_pointer_cast(h_keys.data()),
681 thrust::raw_pointer_cast(h_output.data())
692 deleteMany(
const thrust::host_vector<T>& h_keys, thrust::host_vector<uint8_t>& h_output) {
693 h_output.resize(h_keys.size());
695 thrust::raw_pointer_cast(h_keys.data()),
697 reinterpret_cast<bool*
>(thrust::raw_pointer_cast(h_output.data()))
707 return deleteMany(thrust::raw_pointer_cast(h_keys.data()), h_keys.size(),
nullptr);