Skip to content
19 changes: 10 additions & 9 deletions be/src/runtime_filter/runtime_filter_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "runtime_filter/runtime_filter_consumer.h"

#include "exprs/minmax_predicate.h"
#include "runtime_filter/runtime_filter_selectivity.h"
#include "util/runtime_profile.h"
#include "vec/exprs/vbitmap_predicate.h"
#include "vec/exprs/vbloom_predicate.h"
Expand Down Expand Up @@ -84,11 +85,11 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
auto real_filter_type = _wrapper->get_real_type();
bool null_aware = _wrapper->contain_null();

// Set sampling frequency based on disable_always_true_logic status
// Determine sampling frequency for the always_true optimization.
// This will be propagated to VExprContext in VRuntimeFilterWrapper::open().
int sampling_frequency = _wrapper->disable_always_true_logic()
? RuntimeFilterSelectivity::DISABLE_SAMPLING
: config::runtime_filter_sampling_frequency;
probe_ctx->get_runtime_filter_selectivity().set_sampling_frequency(sampling_frequency);

switch (real_filter_type) {
case RuntimeFilterType::IN_FILTER: {
Expand All @@ -106,7 +107,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
in_pred->add_child(probe_ctx->root());
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
node, in_pred, get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()),
null_aware, _wrapper->filter_id());
null_aware, _wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
break;
}
Expand All @@ -124,7 +125,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
DCHECK(null_aware == false) << "only min predicate do not support null aware";
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));
break;
}
case RuntimeFilterType::MAX_FILTER: {
Expand All @@ -141,7 +142,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
DCHECK(null_aware == false) << "only max predicate do not support null aware";
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
Expand All @@ -157,7 +158,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
max_pred->add_child(max_literal);
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));

vectorized::VExprContextSPtr new_probe_ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(probe_expr, new_probe_ctx));
Expand All @@ -174,7 +175,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
min_pred->add_child(min_literal);
container.push_back(vectorized::VRuntimeFilterWrapper::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(), null_aware,
_wrapper->filter_id()));
_wrapper->filter_id(), sampling_frequency));
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
Expand All @@ -191,7 +192,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
bloom_pred->add_child(probe_ctx->root());
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
node, bloom_pred, get_bloom_filter_ignore_thredhold(), null_aware,
_wrapper->filter_id());
_wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
break;
}
Expand All @@ -209,7 +210,7 @@ Status RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
bitmap_pred->add_child(probe_ctx->root());
DCHECK(null_aware == false) << "bitmap predicate do not support null aware";
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
node, bitmap_pred, 0, null_aware, _wrapper->filter_id());
node, bitmap_pred, 0, null_aware, _wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
break;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/scan/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Status Scanner::try_append_late_arrival_runtime_filter() {
// avoid conjunct destroy in used by storage layer
_conjuncts.clear();
RETURN_IF_ERROR(_local_state->clone_conjunct_ctxs(_conjuncts));
_applied_rf_num = arrived_rf_num;
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/vec/exprs/vruntimefilter_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ class VExprContext;

VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl,
double ignore_thredhold, bool null_aware,
int filter_id)
int filter_id, int sampling_frequency)
: VExpr(node),
_impl(std::move(impl)),
_ignore_thredhold(ignore_thredhold),
_null_aware(null_aware),
_filter_id(filter_id) {}
_filter_id(filter_id),
_sampling_frequency(sampling_frequency) {}

Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
Expand All @@ -76,6 +77,7 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
DCHECK(_prepare_finished);
RETURN_IF_ERROR(_impl->open(state, context, scope));
context->get_runtime_filter_selectivity().set_sampling_frequency(_sampling_frequency);
_open_finished = true;
return Status::OK();
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/vec/exprs/vruntimefilter_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "common/config.h"
#include "common/status.h"
#include "runtime_filter/runtime_filter_selectivity.h"
#include "util/runtime_profile.h"
#include "vec/exprs/function_context.h"
#include "vec/exprs/vexpr.h"
Expand All @@ -51,7 +52,8 @@ class VRuntimeFilterWrapper final : public VExpr {

public:
VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl, double ignore_thredhold,
bool null_aware, int filter_id);
bool null_aware, int filter_id,
int sampling_frequency = RuntimeFilterSelectivity::DISABLE_SAMPLING);
~VRuntimeFilterWrapper() override = default;
Status execute_column(VExprContext* context, const Block* block, Selector* selector,
size_t count, ColumnPtr& result_column) const override;
Expand Down Expand Up @@ -115,9 +117,10 @@ class VRuntimeFilterWrapper final : public VExpr {
double _ignore_thredhold;
bool _null_aware;
int _filter_id;
int _sampling_frequency;
};

using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;

#include "common/compile_check_end.h"
} // namespace doris::vectorized
} // namespace doris::vectorized
46 changes: 46 additions & 0 deletions be/test/runtime_filter/runtime_filter_selectivity_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,50 @@ TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
}
}

// Regression test: with default sampling_frequency (-1), update_judge_counter()
// always resets because (_judge_counter++) >= -1 is always true.
// This was the root cause of the selectivity accumulation bug.
TEST_F(RuntimeFilterSelectivityTest, default_sampling_frequency_always_resets) {
RuntimeFilterSelectivity selectivity;
// Don't set sampling_frequency — defaults to DISABLE_SAMPLING (-1)

// Accumulate selectivity data: low filter rate -> should be always_true
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
// With default -1, maybe_always_true_can_ignore returns false (disabled)
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());

// Now call update_judge_counter — with -1, it immediately resets
selectivity.update_judge_counter();
// Verify: accumulated data has been wiped out by the reset
// Even after setting a valid sampling_frequency, the previously accumulated
// selectivity data is gone
selectivity.set_sampling_frequency(100);
// always_true was reset to false by the premature reset
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}

// Verify that setting sampling_frequency correctly prevents premature reset
TEST_F(RuntimeFilterSelectivityTest, proper_sampling_frequency_preserves_accumulation) {
RuntimeFilterSelectivity selectivity;
selectivity.set_sampling_frequency(32);

// Accumulate selectivity: low filter rate
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());

// Counter increments don't reset before reaching sampling_frequency.
// Post-increment semantics: check uses old value, so need 33 calls total
// to trigger reset (counter must reach 32 before comparison fires).
for (int i = 0; i < 32; i++) {
selectivity.update_judge_counter();
}
// Still always_true because counter value 31 was compared last (31 >= 32 → false)
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());

// 33rd call: counter=32, 32 >= 32 → true → triggers reset
selectivity.update_judge_counter();
// After reset, needs re-evaluation
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}

} // namespace doris
184 changes: 184 additions & 0 deletions be/test/runtime_filter/vruntimefilter_wrapper_sampling_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "runtime_filter/runtime_filter_selectivity.h"
#include "runtime_filter/runtime_filter_test_utils.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vruntimefilter_wrapper.h"

namespace doris {

// Minimal VExpr implementation for testing VRuntimeFilterWrapper in isolation.
class StubVExpr : public vectorized::VExpr {
public:
StubVExpr() : vectorized::VExpr(make_texpr_node()) {}

const std::string& expr_name() const override {
static const std::string name = "StubVExpr";
return name;
}

Status execute(vectorized::VExprContext*, vectorized::Block*, int*) const override {
return Status::OK();
}

Status execute_column(vectorized::VExprContext*, const vectorized::Block*,
vectorized::Selector*, size_t, vectorized::ColumnPtr&) const override {
return Status::OK();
}

// SLOT_REF is not a constant — without this override, VExpr::is_constant()
// returns true for a leaf node (no children), causing get_const_col() to
// DCHECK-fail on the second open() call.
bool is_constant() const override { return false; }

private:
static TExprNode make_texpr_node() {
return TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();
}
};

class VRuntimeFilterWrapperSamplingTest : public RuntimeFilterTest {};

// Test that VRuntimeFilterWrapper stores and propagates sampling_frequency
// through open() to VExprContext. This is the core fix for the bug where
// sampling_frequency was lost when _append_rf_into_conjuncts creates a new
// VExprContext via VExprContext::create_shared(expr).
TEST_F(VRuntimeFilterWrapperSamplingTest, open_propagates_sampling_frequency) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();

const int expected_frequency = 32;
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
node, stub, 0.4, false, /*filter_id=*/1, expected_frequency);

// Simulate the VExprContext recreation that happens in _append_rf_into_conjuncts.
// A fresh VExprContext has default sampling_frequency = DISABLE_SAMPLING (-1).
auto context = std::make_shared<vectorized::VExprContext>(wrapper);
ASSERT_EQ(context->get_runtime_filter_selectivity().maybe_always_true_can_ignore(), false);

RowDescriptor row_desc;
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
ASSERT_TRUE(
wrapper->open(_runtime_states[0].get(), context.get(), FunctionContext::FRAGMENT_LOCAL)
.ok());

// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
// to VExprContext. Verify by accumulating low-selectivity data and checking
// that always_true can now be detected.
auto& selectivity = context->get_runtime_filter_selectivity();
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
}

// Test that default sampling_frequency (DISABLE_SAMPLING) disables the always_true
// optimization, matching the behavior when disable_always_true_logic is set.
TEST_F(VRuntimeFilterWrapperSamplingTest, default_sampling_frequency_disables_optimization) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();

// No sampling_frequency argument - uses default DISABLE_SAMPLING
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(node, stub, 0.4, false,
/*filter_id=*/1);

auto context = std::make_shared<vectorized::VExprContext>(wrapper);
RowDescriptor row_desc;
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context.get()).ok());
ASSERT_TRUE(
wrapper->open(_runtime_states[0].get(), context.get(), FunctionContext::FRAGMENT_LOCAL)
.ok());

// Even with low-selectivity data, always_true should NOT be detected
// because sampling is disabled
auto& selectivity = context->get_runtime_filter_selectivity();
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}

// Test that sampling_frequency survives VExprContext recreation, which is the
// exact scenario that caused the original bug.
TEST_F(VRuntimeFilterWrapperSamplingTest, sampling_frequency_survives_context_recreation) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
.set_types(TTypeNodeBuilder()
.set_type(TTypeNodeType::SCALAR)
.set_scalar_type(TPrimitiveType::INT)
.build())
.build(),
0)
.build();

const int expected_frequency = 32;
auto wrapper = vectorized::VRuntimeFilterWrapper::create_shared(
node, stub, 0.4, false, /*filter_id=*/1, expected_frequency);

// First context - prepare and open work
auto context1 = std::make_shared<vectorized::VExprContext>(wrapper);
RowDescriptor row_desc;
ASSERT_TRUE(wrapper->prepare(_runtime_states[0].get(), row_desc, context1.get()).ok());
ASSERT_TRUE(
wrapper->open(_runtime_states[0].get(), context1.get(), FunctionContext::FRAGMENT_LOCAL)
.ok());

// Create a brand new non-clone VExprContext with the same VRuntimeFilterWrapper,
// matching the production path in _append_rf_into_conjuncts which calls
// VExprContext::create_shared(expr) then conjunct->prepare() and conjunct->open().
auto context2 = std::make_shared<vectorized::VExprContext>(wrapper);
EXPECT_FALSE(context2->get_runtime_filter_selectivity().maybe_always_true_can_ignore());

// Drive the recreated context through prepare/open via VExprContext (not the
// wrapper directly), matching the production _append_rf_into_conjuncts lifecycle.
ASSERT_TRUE(context2->prepare(_runtime_states[0].get(), row_desc).ok());
ASSERT_TRUE(context2->open(_runtime_states[0].get()).ok());

// After open(), sampling_frequency should be propagated from VRuntimeFilterWrapper
// to context2. Verify by accumulating low-selectivity data and checking that
// always_true can be detected — this is the actual behavior the fix protects.
auto& selectivity = context2->get_runtime_filter_selectivity();
selectivity.update_judge_selectivity(1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
}

} // namespace doris
Loading
Loading