Skip to content

Commit 45a31e3

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent ca9c46c commit 45a31e3

4 files changed

Lines changed: 248 additions & 21 deletions

File tree

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer3.hpp

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,47 +30,41 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
3030
}
3131
}
3232

33-
ElementType* allocate([[maybe_unused]] const uint8_t writerId) noexcept override
33+
ElementType* allocate(const uint8_t writerIndex) noexcept override
3434
{
35-
static thread_local std::mt19937 gen(std::random_device {}());
36-
static thread_local std::uniform_int_distribution<> dist(0, 31);
37-
static thread_local uint64_t threadQueueIndex = dist(gen);
35+
WriterData& writerData = m_writersData[writerIndex].get();
3836
while (true) {
39-
threadQueueIndex = (threadQueueIndex + 1) % m_queues.size();
40-
// const uint64_t queueIndex = m_nextQueue++ % m_queues.size();
41-
const std::optional<ElementType*> res = m_queues[threadQueueIndex].get().tryPop();
42-
43-
if (res.has_value()) {
44-
return *res;
37+
writerData.queueIndex = (writerData.queueIndex + 1) % m_queues.size();
38+
ElementType* res = m_queues[writerData.queueIndex]->tryPop();
39+
if (res) {
40+
return res;
4541
}
4642
}
4743
}
4844

49-
void deallocate(ElementType* element, [[maybe_unused]] const uint8_t writerId) noexcept override
45+
void deallocate(ElementType* element, const uint8_t writerIndex) noexcept override
5046
{
51-
static thread_local std::mt19937 gen(std::random_device {}());
52-
static thread_local std::uniform_int_distribution<> dist(0, 31);
53-
static thread_local uint64_t threadQueueIndex = dist(gen);
47+
WriterData& writerData = m_writersData[writerIndex].get();
5448
while (true) {
55-
threadQueueIndex = (threadQueueIndex + 1) % m_queues.size();
49+
writerData.queueIndex = (writerData.queueIndex + 1) % m_queues.size();
5650
// const uint64_t queueIndex = m_nextQueue++ % m_queues.size();
57-
if (m_queues[threadQueueIndex].get().tryPush(element)) {
51+
if (m_queues[writerData.queueIndex]->tryPush(element)) {
5852
return;
5953
}
6054
}
6155
}
6256

63-
private:
57+
protected:
6458
class Queue {
6559
public:
66-
std::optional<ElementType*> tryPop() noexcept
60+
ElementType* tryPop() noexcept
6761
{
6862
if (!tryLock()) {
69-
return std::nullopt;
63+
return nullptr;
7064
}
7165
if (pointers.empty()) {
7266
unlock();
73-
return std::nullopt;
67+
return nullptr;
7468
}
7569
ElementType* res = pointers.back();
7670
pointers.pop_back();
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
3+
#include "allocationBuffer3.hpp"
4+
#include "fastRandomGenerator.hpp"
5+
6+
#include <deque>
7+
8+
namespace ipxp::output {
9+
10+
template<typename ElementType>
11+
class AllocationBuffer3RD : public AllocationBuffer3<ElementType> {
12+
public:
13+
explicit AllocationBuffer3RD(const std::size_t capacity, const uint8_t writersCount) noexcept
14+
: AllocationBuffer3<ElementType>(capacity, writersCount)
15+
, m_randomGenerator(1, writersCount)
16+
{
17+
for (const auto _ : std::views::iota(0U, writersCount)) {
18+
m_randomHandlers.emplace_back(this->m_randomGenerator.getHandler());
19+
}
20+
}
21+
22+
ElementType* allocate(const uint8_t writerIndex) noexcept override
23+
{
24+
typename AllocationBuffer3<ElementType>::WriterData& writerData
25+
= this->m_writersData[writerIndex].get();
26+
FastRandomGenerator<>::FastRandomGeneratorHandler& randomHandler
27+
= this->m_randomHandlers[writerIndex].get();
28+
while (true) {
29+
writerData.queueIndex
30+
= (writerData.queueIndex + randomHandler.getValue()) % this->m_queues.size();
31+
ElementType* res = this->m_queues[writerData.queueIndex]->tryPop();
32+
if (res) {
33+
return res;
34+
}
35+
}
36+
}
37+
38+
void deallocate(ElementType* element, const uint8_t writerIndex) noexcept override
39+
{
40+
typename AllocationBuffer3<ElementType>::WriterData& writerData
41+
= this->m_writersData[writerIndex].get();
42+
FastRandomGenerator<>::FastRandomGeneratorHandler& randomHandler
43+
= this->m_randomHandlers[writerIndex].get();
44+
while (true) {
45+
writerData.queueIndex
46+
= (writerData.queueIndex + randomHandler.getValue()) % this->m_queues.size();
47+
if (this->m_queues[writerData.queueIndex]->tryPush(element)) {
48+
return;
49+
}
50+
}
51+
}
52+
53+
private:
54+
FastRandomGenerator<> m_randomGenerator;
55+
std::deque<CacheAlligned<FastRandomGenerator<>::FastRandomGeneratorHandler>> m_randomHandlers;
56+
};
57+
58+
} // namespace ipxp::output
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#pragma once
2+
3+
#include "allocationBufferBase.hpp"
4+
#include "backoffScheme.hpp"
5+
#include "cacheAlligned.hpp"
6+
#include "fastRandomGenerator.hpp"
7+
8+
#include <algorithm>
9+
#include <atomic>
10+
#include <barrier>
11+
#include <cstddef>
12+
#include <random>
13+
#include <ranges>
14+
#include <vector>
15+
16+
#include <boost/container/static_vector.hpp>
17+
18+
namespace ipxp::output {
19+
20+
template<typename ElementType>
21+
class AllocationBufferB : public AllocationBufferBase<ElementType> {
22+
constexpr static std::size_t BUCKET_SIZE = 64;
23+
constexpr static std::size_t INDEXES_IN_CACHE_LINE = 64 / sizeof(uint16_t);
24+
constexpr static std::size_t WINDOW_SIZE = 4;
25+
26+
public:
27+
explicit AllocationBufferB(const std::size_t capacity, const uint8_t writersCount) noexcept
28+
: m_objectPool(capacity + writersCount * BUCKET_SIZE)
29+
, m_buckets(m_objectPool.size() / BUCKET_SIZE)
30+
{
31+
if (capacity % BUCKET_SIZE != 0) {
32+
throw std::invalid_argument("Capacity must be a multiple of bucket size");
33+
}
34+
if (m_buckets.size() % writersCount != 0) {
35+
throw std::invalid_argument("Number of buckets must be a multiple of writers count");
36+
}
37+
38+
// m_fullBuckets.reserve(m_buckets.size());
39+
// m_emptyBuckets.reserve(m_buckets.size());
40+
for (ElementType& element : m_objectPool) {
41+
const std::size_t elementIndex = &element - m_objectPool.data();
42+
const std::size_t bucketIndex = elementIndex / BUCKET_SIZE;
43+
m_buckets[bucketIndex].storage[elementIndex % BUCKET_SIZE] = &element;
44+
}
45+
for (std::size_t i = 0; i < m_buckets.size(); i++) {
46+
m_fullBuckets[i].store(i);
47+
m_emptyBuckets[i].store(Bucket::PLACEHOLDER);
48+
}
49+
m_writersData.resize(writersCount);
50+
for (uint8_t writerIndex = 0; writerIndex < writersCount; writerIndex++) {
51+
m_writersData[writerIndex]->fullPushRank = writerIndex * INDEXES_IN_CACHE_LINE;
52+
m_writersData[writerIndex]->emptyPushRank = writerIndex * INDEXES_IN_CACHE_LINE;
53+
m_writersData[writerIndex]->currentBucketIndex = writerIndex;
54+
m_writersData[writerIndex]->currentBucketSize = BUCKET_SIZE;
55+
m_fullBuckets[writerIndex].store(Bucket::PLACEHOLDER);
56+
}
57+
}
58+
59+
void unregisterWriter(const uint8_t writerIndex) noexcept override {}
60+
61+
ElementType* allocate(const uint8_t writerIndex) noexcept override
62+
{
63+
WriterData& writerData = m_writersData[writerIndex].get();
64+
if (writerData.currentBucketSize == 0) {
65+
if (writerData.reservedEmptyBucketIndexes.size() == WINDOW_SIZE) {
66+
pushBucket(m_emptyBuckets, writerData.currentBucketIndex, writerData.emptyPushRank);
67+
} else {
68+
writerData.reservedEmptyBucketIndexes.push_back(writerData.currentBucketIndex);
69+
}
70+
if (writerData.reservedFullBucketIndexes.size() == 0) {
71+
while (writerData.reservedFullBucketIndexes.size() < WINDOW_SIZE) {
72+
uint16_t fullBucketIndex = popBucket(m_fullBuckets, writerData.fullPushRank);
73+
writerData.reservedFullBucketIndexes.push_back(fullBucketIndex);
74+
}
75+
}
76+
writerData.currentBucketIndex = writerData.reservedFullBucketIndexes.back();
77+
writerData.reservedFullBucketIndexes.pop_back();
78+
writerData.currentBucketSize = BUCKET_SIZE;
79+
}
80+
Bucket& bucket = m_buckets[writerData.currentBucketIndex];
81+
ElementType* res = bucket.storage[writerData.currentBucketSize - 1];
82+
writerData.currentBucketSize--;
83+
return res;
84+
}
85+
86+
void deallocate(ElementType* element, const uint8_t writerIndex) noexcept override
87+
{
88+
WriterData& writerData = m_writersData[writerIndex].get();
89+
if (writerData.currentBucketSize == BUCKET_SIZE) {
90+
if (writerData.reservedFullBucketIndexes.size() == WINDOW_SIZE) {
91+
pushBucket(m_fullBuckets, writerData.currentBucketIndex, writerData.fullPushRank);
92+
} else {
93+
writerData.reservedFullBucketIndexes.push_back(writerData.currentBucketIndex);
94+
}
95+
if (writerData.reservedEmptyBucketIndexes.size() == 0) {
96+
while (writerData.reservedEmptyBucketIndexes.size() < WINDOW_SIZE) {
97+
uint16_t emptyBucketIndex = popBucket(m_emptyBuckets, writerData.emptyPushRank);
98+
writerData.reservedEmptyBucketIndexes.push_back(emptyBucketIndex);
99+
}
100+
}
101+
writerData.currentBucketIndex = writerData.reservedEmptyBucketIndexes.back();
102+
writerData.reservedEmptyBucketIndexes.pop_back();
103+
writerData.currentBucketSize = 0;
104+
}
105+
Bucket& bucket = m_buckets[writerData.currentBucketIndex];
106+
bucket.storage[writerData.currentBucketSize] = element;
107+
writerData.currentBucketSize++;
108+
}
109+
110+
private:
111+
struct WriterData {
112+
std::size_t fullPushRank;
113+
std::size_t emptyPushRank;
114+
uint16_t currentBucketIndex;
115+
std::size_t currentBucketSize {BUCKET_SIZE};
116+
std::vector<uint16_t> reservedFullBucketIndexes;
117+
std::vector<uint16_t> reservedEmptyBucketIndexes;
118+
};
119+
120+
struct Bucket {
121+
constexpr static uint16_t PLACEHOLDER = std::numeric_limits<uint16_t>::max();
122+
std::array<ElementType*, BUCKET_SIZE> storage;
123+
};
124+
125+
void pushBucket(auto& buckets, const std::size_t bucketIndex, std::size_t& pushRank) noexcept
126+
{
127+
while (true) {
128+
uint16_t expected = buckets[pushRank].load(std::memory_order_acquire);
129+
if (expected != Bucket::PLACEHOLDER) {
130+
pushRank = ((pushRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
131+
% m_buckets.size();
132+
continue;
133+
}
134+
if (buckets[pushRank].compare_exchange_weak(
135+
expected,
136+
bucketIndex,
137+
std::memory_order_release,
138+
std::memory_order_acquire)) {
139+
pushRank = (pushRank + 1) % m_buckets.size();
140+
return;
141+
}
142+
}
143+
}
144+
145+
uint16_t popBucket(auto& buckets, std::size_t& popRank) noexcept
146+
{
147+
while (true) {
148+
uint16_t expected = buckets[popRank].load(std::memory_order_acquire);
149+
if (expected == Bucket::PLACEHOLDER) {
150+
popRank = ((popRank / INDEXES_IN_CACHE_LINE + 1) * INDEXES_IN_CACHE_LINE)
151+
% m_buckets.size();
152+
continue;
153+
}
154+
if (buckets[popRank].compare_exchange_weak(
155+
expected,
156+
Bucket::PLACEHOLDER,
157+
std::memory_order_release,
158+
std::memory_order_acquire)) {
159+
popRank = (popRank + 1) % m_buckets.size();
160+
return expected;
161+
}
162+
}
163+
}
164+
165+
std::vector<ElementType> m_objectPool;
166+
std::vector<Bucket> m_buckets;
167+
std::array<std::atomic<uint16_t>, 65536> m_fullBuckets;
168+
std::array<std::atomic<uint16_t>, 65536> m_emptyBuckets;
169+
std::vector<CacheAlligned<WriterData>> m_writersData;
170+
};
171+
172+
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/outputStorageRegistrar.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#pragma once
22

3+
#include "allocationBuffer3.hpp"
4+
#include "allocationBuffer3RD.hpp"
5+
#include "allocationBufferB.hpp"
36
#include "allocationBufferS.hpp"
47
#include "outputStorage.hpp"
58
#include "outputStorageReader.hpp"
@@ -22,7 +25,7 @@ class OutputStorageRegistrar {
2225
m_storages = std::make_shared<std::shared_ptr<OutputStorage<ElementType>>[]>(
2326
OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT);
2427
m_allocationBuffer
25-
= std::make_shared<AllocationBufferS<ReferenceCounter<OutputContainer<ElementType>>>>(
28+
= std::make_shared<AllocationBufferB<ReferenceCounter<OutputContainer<ElementType>>>>(
2629
OutputStorage<ElementType>::STORAGE_CAPACITY
2730
* OutputStorage<ElementType>::MAX_READER_GROUPS_COUNT,
2831
writersCount);

0 commit comments

Comments
 (0)