340 struct PreparedRecordRange {
341 uint64_t recordIndex{};
342 uint64_t inputOffset{};
343 uint64_t outputOffset{};
345 uint64_t validKmers{};
347 struct PreparedRecordBatch {
348 std::string sequence;
349 std::vector<PreparedRecordRange> records;
351 struct FastxChunkAssembly {
352 explicit FastxChunkAssembly(
size_t reservedBytes) {
353 sequence.reserve(reservedBytes);
356 void appendRecord(std::string_view recordSequence) {
359 static_cast<uint64_t
>(sequence.size()),
360 static_cast<uint64_t
>(recordSequence.size()),
363 sequence.append(recordSequence);
366 [[nodiscard]]
bool empty()
const {
367 return ranges.empty();
370 [[nodiscard]]
bool reachedTarget(
size_t targetBytes)
const {
371 return sequence.size() >= targetBytes;
374 [[nodiscard]] uint64_t recordCount()
const {
375 return static_cast<uint64_t
>(ranges.size());
383 std::string sequence;
384 std::vector<RecordRange> ranges;
418 template <u
int64_t HashIndex>
454 [&]<uint64_t HashIndex>(std::integral_constant<uint64_t, HashIndex>) {
456 const uint64_t bitPos = sectorizedBitAddress<HashIndex>(baseHash);
457 const uint64_t bit = uint64_t{1} << bitPos;
459 if constexpr (s == 0) mask0 |= bit;
460 else if constexpr (s == 1) mask1 |= bit;
461 else if constexpr (s == 2) mask2 |= bit;
472 "Fused path expects Theta=1 independent query mapping"
476 "Fused path expects horizontal insert mapping across shard words"
487 explicit Filter(uint64_t requestedFilterBits)
492 cuda::ceil_div(requestedFilterBits,
Config::filterBlockBits)
496 filterBits_(numShards_ *
Config::filterBlockBits),
497 d_shards_(numShards_) {
518 [[nodiscard]] uint64_t
519 insertSequence(std::string_view sequence, cuda::stream_ref stream = cudaStream_t{}) {
520 if (recordSymbolCount(sequence.size()) <
Config::k) {
524 const uint64_t totalKmers = recordKmerCount(sequence.size());
525 const auto d_sequence = stagedSequenceView({sequence.data(), sequence.size()}, stream);
526 launchInsertSequence(d_sequence, stream);
543 cuda::stream_ref stream = cudaStream_t{}
545 const uint64_t totalKmers = sequenceKmerCount(d_sequence);
546 if (totalKmers == 0) {
550 launchInsertSequence(d_sequence, stream);
568 [[nodiscard]] FastxInsertReport
570 const PreparedRecordBatch prepared = prepareRecordBatch(batch);
571 FastxInsertReport report;
572 report.recordsIndexed = prepared.records.size();
573 for (
const PreparedRecordRange& record : prepared.records) {
574 report.indexedBases += record.size;
575 report.insertedKmers += record.validKmers;
577 if (!prepared.sequence.empty()) {
597 double fillFraction = 0.7,
598 cuda::stream_ref stream = cudaStream_t{}
600 return insertFastxStream(input,
"<stream>", fillFraction, stream);
609 std::string_view path,
610 double fillFraction = 0.7,
611 cuda::stream_ref stream = cudaStream_t{}
614 return insertFastxStream(*input, path, fillFraction, stream);
630 cuda::stream_ref stream = cudaStream_t{}
632 if (sequenceKmerCount(d_sequence) == 0) {
636 launchContainsSequence(d_sequence, d_output, stream);
650 [[nodiscard]] std::vector<uint8_t>
651 containsSequence(std::string_view sequence, cuda::stream_ref stream = cudaStream_t{})
const {
652 if (recordSymbolCount(sequence.size()) <
Config::k) {
656 std::vector<uint8_t> output(recordKmerCount(sequence.size()));
658 const auto d_sequence = stagedSequenceView({sequence.data(), sequence.size()}, stream);
659 ensureResultCapacity(output.size());
660 launchContainsSequence(
662 device_span<uint8_t>{thrust::raw_pointer_cast(d_resultBuffer_.data()), output.size()},
667 thrust::raw_pointer_cast(d_resultBuffer_.data()),
668 output.size() *
sizeof(uint8_t),
669 cudaMemcpyDeviceToHost,
691 [[nodiscard]] FastxQueryReport
709 template <
typename Consumer>
711 RecordBatchView batch,
713 cuda::stream_ref stream = cudaStream_t{}
715 return queryPreparedRecordBatch(prepareRecordBatch(batch), batch.sequence, consume, stream);
725 double fillFraction = 0.7,
726 cuda::stream_ref stream = cudaStream_t{}
728 return queryFastxStream(input,
"<stream>", fillFraction, stream);
737 std::string_view path,
738 double fillFraction = 0.7,
739 cuda::stream_ref stream = cudaStream_t{}
742 return queryFastxStream(*input, path, fillFraction, stream);
758 template <
typename Consumer>
762 double fillFraction = 0.7,
763 cuda::stream_ref stream = cudaStream_t{}
765 return queryFastxRecordsStream(input,
"<stream>", consume, fillFraction, stream);
773 template <
typename Consumer>
775 std::string_view path,
777 double fillFraction = 0.7,
778 cuda::stream_ref stream = cudaStream_t{}
781 return queryFastxRecordsStream(*input, path, consume, fillFraction, stream);
797 double fillFraction = 0.7,
798 cuda::stream_ref stream = cudaStream_t{}
800 return queryFastxDetailedStream(input,
"<stream>", fillFraction, stream);
810 std::string_view path,
811 double fillFraction = 0.7,
812 cuda::stream_ref stream = cudaStream_t{}
815 return queryFastxDetailedStream(*input, path, fillFraction, stream);
823 void clear(cuda::stream_ref stream = cudaStream_t{}) {
825 thrust::raw_pointer_cast(d_shards_.data()),
827 d_shards_.size() *
sizeof(Shard),
840 const auto* wordsBegin =
841 reinterpret_cast<const uint64_t*
>(thrust::raw_pointer_cast(d_shards_.data()));
843 const uint64_t setBits = thrust::transform_reduce(
846 wordsBegin + totalWords,
847 [] __device__(uint64_t w) -> uint64_t {
return cuda::std::popcount(w); },
849 cuda::std::plus<uint64_t>()
851 return static_cast<float>(setBits) /
static_cast<float>(filterBits_);
865 uint64_t numShards_{};
866 uint64_t filterBits_{};
868 thrust::device_vector<Shard> d_shards_;
869 mutable thrust::device_vector<char> d_sequence_;
870 mutable thrust::device_vector<uint8_t> d_resultBuffer_;
873 [[nodiscard]] uint64_t sizeBytes()
const {
877 [[nodiscard]]
static uint64_t recordSymbolCount(uint64_t bases) {
881 [[nodiscard]]
static uint64_t recordKmerCount(uint64_t bases) {
882 const uint64_t symbols = recordSymbolCount(bases);
886 [[nodiscard]]
static uint64_t validRecordKmerCount(std::string_view sequence) {
887 if (recordSymbolCount(sequence.size()) <
Config::k) {
891 uint64_t invalidSymbols = 0;
892 for (uint64_t i = 0; i <
Config::k; ++i) {
894 Config::Alphabet::invalidSymbol;
897 uint64_t validKmers = invalidSymbols == 0 ? 1 : 0;
898 for (uint64_t start = 1; start < recordKmerCount(sequence.size()); ++start) {
901 Config::Alphabet::invalidSymbol;
902 invalidSymbols += Config::Alphabet::encode(
904 ) == Config::Alphabet::invalidSymbol;
905 validKmers += invalidSymbols == 0;
910 static void appendRecordBoundary(std::string& sequence) {
912 if (remainder != 0) {
920 static void validateRecordBatch(RecordBatchView batch) {
921 uint64_t nextOffset = 0;
922 for (
const RecordRange& record : batch.records) {
923 if (record.sequenceOffset < nextOffset) {
924 throw std::invalid_argument(
925 "record batch ranges must be ordered and non-overlapping"
928 if (record.sequenceOffset > batch.sequence.size() ||
929 record.sequenceBytes > batch.sequence.size() - record.sequenceOffset) {
930 throw std::invalid_argument(
"record batch range exceeds the source sequence");
934 throw std::invalid_argument(
935 "record batch ranges must align to the configured alphabet symbol width"
938 nextOffset = record.sequenceOffset + record.sequenceBytes;
942 static void appendPreparedRecord(
944 std::vector<PreparedRecordRange>& ranges,
945 uint64_t recordIndex,
946 uint64_t inputOffset,
947 std::string_view recordSequence
949 if (!output.empty()) {
950 appendRecordBoundary(output);
952 const uint64_t outputOffset = recordSymbolCount(output.size());
953 output.append(recordSequence);
959 static_cast<uint64_t
>(recordSequence.size()),
960 validRecordKmerCount(recordSequence),
965 [[nodiscard]]
static PreparedRecordBatch prepareRecordBatch(RecordBatchView batch) {
966 validateRecordBatch(batch);
968 PreparedRecordBatch prepared;
969 prepared.sequence.reserve(
972 prepared.records.reserve(batch.records.size());
974 for (uint64_t recordIndex = 0; recordIndex < batch.records.size(); ++recordIndex) {
975 const RecordRange& record = batch.records[recordIndex];
976 appendPreparedRecord(
980 record.sequenceOffset,
981 batch.sequence.substr(record.sequenceOffset, record.sequenceBytes)
987 [[nodiscard]]
static RecordBatchView
988 makeBatchView(
const std::string& sequence,
const std::vector<RecordRange>& ranges) {
989 return RecordBatchView{
991 cuda::std::span<const RecordRange>{ranges.data(), ranges.size()},
995 static void accumulateInsertReport(FastxInsertReport& total,
const FastxInsertReport& chunk) {
996 total.recordsIndexed += chunk.recordsIndexed;
997 total.indexedBases += chunk.indexedBases;
998 total.insertedKmers += chunk.insertedKmers;
1001 static void accumulateQueryReport(FastxQueryReport& total,
const FastxQueryReport& chunk) {
1002 total.recordsQueried += chunk.recordsQueried;
1003 total.queriedBases += chunk.queriedBases;
1004 total.queriedKmers += chunk.queriedKmers;
1005 total.positiveKmers += chunk.positiveKmers;
1008 [[nodiscard]]
static size_t fastxChunkTargetBytes(
double fillFraction) {
1009 size_t freeBytes = 0;
1010 size_t totalBytes = 0;
1012 return static_cast<size_t>(
static_cast<double>(freeBytes) * fillFraction);
1015 template <
typename Consumer>
1016 [[nodiscard]] FastxQueryReport queryPreparedRecordBatch(
1017 const PreparedRecordBatch& batch,
1018 std::string_view inputSequence,
1020 cuda::stream_ref stream
1022 FastxQueryReport report;
1023 report.recordsQueried = batch.records.size();
1024 for (
const PreparedRecordRange& record : batch.records) {
1025 report.queriedBases += record.size;
1026 report.queriedKmers += record.validKmers;
1030 for (
const PreparedRecordRange& record : batch.records) {
1031 const uint64_t kmers = recordKmerCount(record.size);
1032 const auto sequence = inputSequence.substr(
1033 static_cast<size_t>(record.inputOffset),
static_cast<size_t>(record.size)
1043 cuda::std::span<const uint8_t>{},
1049 const auto* hitBegin = hits.data() +
static_cast<ptrdiff_t
>(record.outputOffset);
1050 const auto hitSpan =
1051 cuda::std::span<const uint8_t>{hitBegin,
static_cast<size_t>(kmers)};
1052 const auto positiveKmers =
1053 static_cast<uint64_t
>(std::count(hitSpan.begin(), hitSpan.end(), uint8_t{1}));
1054 report.positiveKmers += positiveKmers;
1070 [[nodiscard]] FastxInsertReport insertFastxStream(
1071 std::istream& input,
1072 std::string_view sourceName,
1073 double fillFraction,
1074 cuda::stream_ref stream
1078 FastxInsertReport report;
1080 const auto chunkTargetBytes = fastxChunkTargetBytes(fillFraction);
1081 FastxChunkAssembly chunk(chunkTargetBytes);
1083 auto flush = [&]() {
1084 if (chunk.empty()) {
1087 accumulateInsertReport(
1093 while (reader.nextRecord(record)) {
1094 chunk.appendRecord(record.sequence);
1095 if (chunk.reachedTarget(chunkTargetBytes)) {
1105 [[nodiscard]] FastxQueryReport queryFastxStream(
1106 std::istream& input,
1107 std::string_view sourceName,
1108 double fillFraction,
1109 cuda::stream_ref stream
1111 return queryFastxRecordsStream(
1112 input, sourceName, [](
const FastxRecordView&) {}, fillFraction, stream
1116 template <
typename Consumer>
1117 [[nodiscard]] FastxQueryReport queryFastxRecordsStream(
1118 std::istream& input,
1119 std::string_view sourceName,
1121 double fillFraction,
1122 cuda::stream_ref stream
1126 FastxQueryReport report;
1128 const auto chunkTargetBytes = fastxChunkTargetBytes(fillFraction);
1129 FastxChunkAssembly chunk(chunkTargetBytes);
1130 std::vector<detail::FastxRecord> records;
1131 uint64_t recordIndexBase = 0;
1133 auto flush = [&]() {
1134 if (chunk.empty()) {
1138 makeBatchView(chunk.sequence, chunk.ranges),
1139 [&](
const RecordQueryView& recordView) {
1140 const detail::FastxRecord& fastxRecord =
1141 records[static_cast<size_t>(recordView.recordIndex)];
1144 recordIndexBase + recordView.recordIndex,
1146 fastxRecord.sequence,
1147 recordView.queriedBases,
1148 recordView.queriedKmers,
1149 recordView.positiveKmers,
1156 accumulateQueryReport(report, chunkReport);
1157 recordIndexBase += chunk.recordCount();
1162 while (reader.nextRecord(record)) {
1163 chunk.appendRecord(record.sequence);
1164 records.push_back(std::move(record));
1165 if (chunk.reachedTarget(chunkTargetBytes)) {
1175 [[nodiscard]] FastxDetailedQueryReport queryFastxDetailedStream(
1176 std::istream& input,
1177 std::string_view sourceName,
1178 double fillFraction,
1179 cuda::stream_ref stream
1181 FastxDetailedQueryReport report;
1182 report.summary = queryFastxRecordsStream(
1185 [&report](
const FastxRecordView& record) {
1186 report.records.push_back(
1187 FastxDetailedQueryRecord{
1189 std::string(record.header),
1190 std::string(record.sequence),
1191 record.queriedBases,
1192 record.queriedKmers,
1193 record.positiveKmers,
1194 std::vector<uint8_t>(record.hits.begin(), record.hits.end()),
1208 void ensureSequenceCapacity(uint64_t bases)
const {
1209 if (bases > d_sequence_.size()) {
1210 d_sequence_.resize(bases);
1218 void ensureResultCapacity(uint64_t kmers)
const {
1219 if (kmers > d_resultBuffer_.size()) {
1220 d_resultBuffer_.resize(kmers);
1229 void stageSequence(cuda::std::span<const char> sequence, cuda::stream_ref stream)
const {
1230 ensureSequenceCapacity(sequence.size());
1232 thrust::raw_pointer_cast(d_sequence_.data()),
1234 sequence.size_bytes(),
1235 cudaMemcpyHostToDevice,
1240 [[nodiscard]]
static uint64_t sequenceKmerCount(device_span<const char> d_sequence) {
1244 [[nodiscard]] device_span<const char>
1245 stagedSequenceView(cuda::std::span<const char> sequence, cuda::stream_ref stream)
const {
1246 stageSequence(sequence, stream);
1247 return device_span<const char>{
1248 thrust::raw_pointer_cast(d_sequence_.data()), sequence.size()
1257 void launchInsertSequence(device_span<const char> d_sequence, cuda::stream_ref stream) {
1259 const uint64_t numKmers = input.kmerCount();
1260 if (numKmers == 0) {
1267 input, device_span<Shard>{thrust::raw_pointer_cast(d_shards_.data()), numShards_}
1278 void launchContainsSequence(
1279 device_span<const char> d_sequence,
1280 device_span<uint8_t> d_output,
1281 cuda::stream_ref stream
1284 const uint64_t numKmers = input.kmerCount();
1285 const uint64_t gridSize =
1291 device_span<const Shard>{thrust::raw_pointer_cast(d_shards_.data()), numShards_},
1503__global__ __launch_bounds__(Config::cudaBlockSize, 6) void containsSequenceKmersKernel(
1510 constexpr uint64_t sequenceTileBases = Config::cudaBlockSize * kStride + Config::k - 1;
1512 __shared__ uint8_t sequenceTile[sequenceTileBases];
1514 const uint64_t numKmers = input.kmerCount();
1515 const uint64_t blockStartKmer =
1516 static_cast<uint64_t
>(blockIdx.x) * Config::cudaBlockSize * kStride;
1517 if (blockStartKmer >= numKmers) {
1521 const uint64_t blockKmers = min(Config::cudaBlockSize * kStride, numKmers - blockStartKmer);
1523 const bool blockAllValid = prepareSequenceHashTiles<Config>(
1524 input.sequence.data(), blockStartKmer, blockKmers, sequenceTile
1527 const uint64_t threadOffset =
static_cast<uint64_t
>(threadIdx.x) * kStride;
1528 if (threadOffset >= blockKmers) {
1533 uint32_t kmerValidMask = 0;
1535 for (uint32_t s = 0; s < kStride; ++s) {
1536 if ((threadOffset + s) < blockKmers) {
1537 kmerValidMask |= (1u << s);
1541 if (!blockAllValid) {
1543 for (uint32_t s = 0; s < kStride; ++s) {
1544 if (!(kmerValidMask & (1u << s))) {
1547 const uint64_t localIdx = threadOffset + s;
1548 if (!kmerIsValid<Config>(sequenceTile, localIdx)) {
1549 kmerValidMask &= ~(1u << s);
1556 uint64_t packedKmer = packKmerFromTile<Config, Config::k>(sequenceTile, threadOffset);
1558 for (uint32_t s = 0; s < kStride; ++s) {
1559 const uint64_t localIdx = threadOffset + s;
1560 if (localIdx >= blockKmers) {
1564 const uint64_t kmerIndex = blockStartKmer + localIdx;
1567 packedKmer = advancePackedKmer<Config, Config::k>(
1568 packedKmer, sequenceTile[localIdx + Config::k - 1]
1572 if (!(kmerValidMask & (1u << s))) {
1573 output[kmerIndex] = 0;
1577 const uint64_t minimizerHash = packedKmerMinimizerHash<Config>(packedKmer);
1580 const auto shardIdx =
static_cast<uint32_t
>(minimizerHash & (shards.size() - 1));
1581 const uint32_t peers = __match_any_sync(0xFFFFFFFFu, shardIdx);
1582 const int leader = __ffs(
static_cast<int>(peers)) - 1;
1585 if (
static_cast<int>(threadIdx.x & 31u) == leader) {
1586 loadShardWords4<Config>(shards.data(), shardIdx, w);
1588 w[0] = __shfl_sync(peers, w[0], leader);
1589 w[1] = __shfl_sync(peers, w[1], leader);
1590 w[2] = __shfl_sync(peers, w[2], leader);
1591 w[3] = __shfl_sync(peers, w[3], leader);
1593 const bool present = sectorizedContainsPackedKmer<Config>(packedKmer, w);
1594 output[kmerIndex] = present;
1614 constexpr uint64_t sequenceTileBases = Config::cudaBlockSize + Config::k - 1;
1615 constexpr uint32_t warpSize = 32;
1616 constexpr uint32_t warpsPerBlock = Config::cudaBlockSize / warpSize;
1618 using WarpReduceWord = cub::WarpReduce<uint64_t>;
1620 __shared__ uint8_t sequenceTile[sequenceTileBases];
1621 __shared__
typename WarpReduceWord::TempStorage reduceStorage[warpsPerBlock][4];
1623 const uint64_t numKmers = input.
kmerCount();
1624 const uint64_t blockStartKmer =
static_cast<uint64_t
>(blockIdx.x) * Config::cudaBlockSize;
1625 if (blockStartKmer >= numKmers) {
1629 const uint64_t blockKmers = min(Config::cudaBlockSize, numKmers - blockStartKmer);
1630 const auto localKmerIndex =
static_cast<uint64_t
>(threadIdx.x);
1631 const bool inRange = localKmerIndex < blockKmers;
1633 const bool blockAllValid = prepareSequenceHashTiles<Config>(
1634 input.
sequence.data(), blockStartKmer, blockKmers, sequenceTile
1639 bool active = inRange;
1641 if (active && !blockAllValid) {
1642 active = kmerIsValid<Config>(sequenceTile, localKmerIndex);
1647 uint64_t minimizerHash = 0;
1648 uint64_t wordMask0 = 0;
1649 uint64_t wordMask1 = 0;
1650 uint64_t wordMask2 = 0;
1651 uint64_t wordMask3 = 0;
1654 const uint64_t packedKmer =
1655 packKmerFromTile<Config, Config::k>(sequenceTile, localKmerIndex);
1656 minimizerHash = packedKmerMinimizerHash<Config>(packedKmer);
1658 uint64_t h_s = packedKmerSmerHash<Config>(packedKmer, 0);
1660 h_s, wordMask0, wordMask1, wordMask2, wordMask3
1663 for (uint64_t smerOffset = 1; smerOffset < Config::findereSpan; ++smerOffset) {
1664 h_s = packedKmerSmerHash<Config>(packedKmer, smerOffset);
1666 h_s, wordMask0, wordMask1, wordMask2, wordMask3
1673 const auto shardIdx =
1674 static_cast<uint32_t
>(active ? (minimizerHash & (shards.size() - 1)) : ~threadIdx.x);
1676 const uint32_t lane = threadIdx.x & (warpSize - 1);
1677 const uint32_t warpIdx = threadIdx.x / warpSize;
1678 const uint32_t prevShardIdx = __shfl_up_sync(0xffffffff, shardIdx, 1);
1679 const bool runHead = (lane == 0) || (shardIdx != prevShardIdx);
1682 wordMask0 = WarpReduceWord(reduceStorage[warpIdx][0])
1683 .HeadSegmentedReduce(wordMask0, runHead, bitwiseOr);
1684 wordMask1 = WarpReduceWord(reduceStorage[warpIdx][1])
1685 .HeadSegmentedReduce(wordMask1, runHead, bitwiseOr);
1686 wordMask2 = WarpReduceWord(reduceStorage[warpIdx][2])
1687 .HeadSegmentedReduce(wordMask2, runHead, bitwiseOr);
1688 wordMask3 = WarpReduceWord(reduceStorage[warpIdx][3])
1689 .HeadSegmentedReduce(wordMask3, runHead, bitwiseOr);
1691 if (runHead && active) {
1692 auto& shard = shards[shardIdx];
1693 if (wordMask0 != 0) {
1696 if (wordMask1 != 0) {
1699 if (wordMask2 != 0) {
1702 if (wordMask3 != 0) {