diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index 21fdae4a57760..96f5b4d190e9f 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -11,6 +11,7 @@ #include "AnalysisCCDBHelpers.h" #include "CCDBFetcherHelper.h" +#include "Framework/ArrowTypes.h" #include "Framework/DataProcessingStats.h" #include "Framework/DeviceSpec.h" #include "Framework/TimingInfo.h" @@ -29,6 +30,8 @@ #include #include #include +#include +#include #include #include #include @@ -109,7 +112,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) auto it = ccdbUrls.find(m.name); fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString()); auto columnName = m.name.substr(strlen("ccdb:")); - fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, fieldMetadata)); + fields.emplace_back(std::make_shared(columnName, soa::asArrowDataType(), false, fieldMetadata)); } schemas.emplace_back(std::make_shared(fields, schemaMetadata)); } @@ -122,6 +125,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { O2_SIGNPOST_ID_GENERATE(sid, ccdb); O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice); + auto pool = arrow::MemoryPool::CreateDefault(); for (auto& schema : schemas) { std::vector ops; auto inputBinding = *schema->metadata()->Get("sourceTable"); @@ -143,9 +147,23 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) } int outputRouteIndex = bindings.at(outRouteDesc); auto& spec = helper->routes[outputRouteIndex].matcher; - std::vector> builders; - for (auto const& _ : schema->fields()) { - builders.emplace_back(std::make_shared()); + std::vector> builders; + builders.resize(schema->fields().size()); + + for (auto i = 0U; i < schema->fields().size(); ++i) { + auto valueBuilder = std::make_shared(); + builders[i] = std::make_shared(pool.get(), valueBuilder, 2); + } + + auto reserveSize = timestampColumn->length(); + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "* reserving for size: %lld (has: %lld)", + reserveSize, builders[0]->capacity()); + arrow::Status status; + for (auto i = 0U; i < builders.size(); ++i) { + if (builders[i]->capacity() < reserveSize) { + status &= builders[i]->Reserve(reserveSize - builders[i]->capacity()); + } } for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) { @@ -175,11 +193,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size()); } arrow::Status result; + int64_t values[2]; for (size_t bi = 0; bi < responses.size(); bi++) { auto& builder = builders[bi]; + auto* value_builder = static_cast(builder->value_builder()); auto& response = responses[bi]; - char const* address = reinterpret_cast(response.id.value); - result &= builder->Append(std::string_view(address, response.size)); + values[0] = response.id.value; + values[1] = response.size; + result &= builder->Append(); + result &= value_builder->AppendValues(&values[0], 2, nullptr); + LOGP(info, "P: {}; S: {}", values[0], values[1]); } if (!result.ok()) { LOGP(fatal, "Error adding results from CCDB"); @@ -192,6 +215,17 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) arrays.push_back(*builder->Finish()); } auto outTable = arrow::Table::Make(schema, arrays); + + auto mock = std::make_shared(); + int64_t expectedSize = 0; + auto mockWriter = arrow::ipc::MakeStreamWriter(mock.get(), outTable->schema()); + arrow::Status outStatus = mockWriter.ValueOrDie()->WriteTable(*(outTable.get())); + + expectedSize = mock->Tell().ValueOrDie(); + assert(outTable->num_rows() == reserveSize); + O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", + "* sending a table of size: %lld", + expectedSize); auto concrete = DataSpecUtils::asConcreteDataMatcher(spec); allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable); } diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 784a0796f86fe..8e680132cc17d 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -2455,15 +2455,15 @@ consteval static std::string_view namespace_prefix() [[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() } #define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ - struct _Name_ : o2::soa::Column, _Name_> { \ + struct _Name_ : o2::soa::Column { \ static constexpr const char* mLabel = _Label_; \ static constexpr const char* query = _CCDBQuery_; \ static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \ - using base = o2::soa::Column, _Name_>; \ - using type = std::span; \ + using base = o2::soa::Column; \ + using type = int64_t[2]; \ using column_t = _Name_; \ _Name_(arrow::ChunkedArray const* column) \ - : o2::soa::Column, _Name_>(o2::soa::ColumnIterator>(column)) \ + : o2::soa::Column(o2::soa::ColumnIterator(column)) \ { \ } \ \ @@ -2473,13 +2473,15 @@ consteval static std::string_view namespace_prefix() \ decltype(auto) _Getter_() const \ { \ + auto a = *mColumnIterator; \ + LOGP(info, "P: {}; S: {}", a[0], a[1]); \ + auto span = std::span{reinterpret_cast(a[0]), static_cast(a[1])}; \ if constexpr (std::same_as<_ConcreteType_, std::span>) { \ - return *mColumnIterator; \ + return span; \ } else { \ static std::byte* payload = nullptr; \ static _ConcreteType_* deserialised = nullptr; \ static TClass* c = TClass::GetClass(#_ConcreteType_); \ - auto span = *mColumnIterator; \ if (payload != (std::byte*)span.data()) { \ payload = (std::byte*)span.data(); \ delete deserialised; \ diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index bf2493dd0de19..3170236e18f09 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -667,8 +667,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) } // execute process() if constexpr (requires { &T::process; }) { - constexpr auto phash = o2::framework::TypeIdHelpers::uniqueId(); - auto matchers = std::ranges::find_if(inputInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers; + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); } // execute optional process() @@ -676,8 +676,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { if constexpr (is_process_configurable) { if (x.value == true) { - constexpr auto phash = o2::framework::TypeIdHelpers::uniqueId(); - auto matchers = std::ranges::find_if(inputInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers; + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); return true; } diff --git a/Framework/Core/include/Framework/ArrowTypes.h b/Framework/Core/include/Framework/ArrowTypes.h index 2673472a81152..57dfa02188461 100644 --- a/Framework/Core/include/Framework/ArrowTypes.h +++ b/Framework/Core/include/Framework/ArrowTypes.h @@ -93,6 +93,16 @@ struct arrow_array_for { using type = arrow::FixedSizeListArray; using value_type = int8_t; }; +template +struct arrow_array_for { + using type = arrow::FixedSizeListArray; + using value_type = int64_t; +}; +template +struct arrow_array_for { + using type = arrow::FixedSizeListArray; + using value_type = uint64_t; +}; #define ARROW_VECTOR_FOR(_type_) \ template <> \