Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -494,6 +495,26 @@ public void showCountTimeSeries() throws SQLException {
}
}

@Test
@Ignore
public void showCountTimeSeriesExcludeInternalDatabaseAndIncludeView() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
final long baseVisibleCount = queryCount(statement, "COUNT TIMESERIES root.ln*.**");
statement.execute("CREATE DATABASE root.count_it");
statement.execute(
"CREATE TIMESERIES root.count_it.src.s1 WITH DATATYPE = INT32, ENCODING = PLAIN");
statement.execute(
"CREATE TIMESERIES root.count_it.src.s2 WITH DATATYPE = INT32, ENCODING = PLAIN");
statement.execute("CREATE VIEW root.count_it.dst.v1 AS SELECT s1 FROM root.count_it.src;");

final long localCount = queryCount(statement, "COUNT TIMESERIES root.count_it.**");
assertEquals(3L, localCount);
assertEquals(
baseVisibleCount + localCount, queryCount(statement, "COUNT TIMESERIES root.**"));
}
}

@Test
public void showCountTimeSeriesWithTag() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Expand Down Expand Up @@ -865,4 +886,11 @@ public void showDeadbandInfo() throws SQLException {
}
}
}

private long queryCount(final Statement statement, final String sql) throws SQLException {
try (ResultSet resultSet = statement.executeQuery(sql)) {
Assert.assertTrue(resultSet.next());
return resultSet.getLong(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -236,6 +237,39 @@ public void testShowTimeSeries() {
}
}

@Test
@Ignore
public void testCountTimeSeriesWithTimeConditionIncludesView() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE DATABASE root.view_count");
statement.execute(
"CREATE TIMESERIES root.view_count.src.s1 WITH DATATYPE = INT32, ENCODING = PLAIN");
statement.execute(
"CREATE TIMESERIES root.view_count.src.s2 WITH DATATYPE = INT32, ENCODING = PLAIN");
statement.execute("CREATE VIEW root.view_count.dst.v1 AS SELECT s1 FROM root.view_count.src");

checkResultSet(
statement,
"count timeseries root.view_count.**",
new HashSet<>(Collections.singletonList("3,")));

statement.execute("insert into root.view_count.src(timestamp,s1) values(1,1)");

checkResultSet(
statement,
"count timeseries root.view_count.** where time>0",
new HashSet<>(Collections.singletonList("2,")));
checkResultSet(
statement,
"count timeseries root.view_count.dst.** where time>0",
new HashSet<>(Collections.singletonList("1,")));
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
}

@Test
public void testShowDevices() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import static org.apache.iotdb.db.queryengine.execution.operator.schema.source.TimeSeriesSchemaSource.mapToString;

Expand All @@ -42,8 +45,17 @@

private final String deadband;
private final String deadbandParameters;
private final int activeCountMultiplier;
private final Set<String> activeLogicalViewCountSet;

public TimeseriesContext(IMeasurementSchemaInfo schemaInfo) {
this(schemaInfo, 1, Collections.emptySet());
}

public TimeseriesContext(
IMeasurementSchemaInfo schemaInfo,
int activeCountMultiplier,
Set<String> activeLogicalViewCountSet) {
this.dataType = schemaInfo.getSchema().getType().toString();
this.encoding = schemaInfo.getSchema().getEncodingType().toString();
this.compression = schemaInfo.getSchema().getCompressor().toString();
Expand All @@ -54,6 +66,8 @@
MetaUtils.parseDeadbandInfo(schemaInfo.getSchema().getProps());
this.deadband = deadbandInfo.left;
this.deadbandParameters = deadbandInfo.right;
this.activeCountMultiplier = activeCountMultiplier;
this.activeLogicalViewCountSet = new HashSet<>(activeLogicalViewCountSet);
}

public String getDataType() {
Expand Down Expand Up @@ -88,6 +102,14 @@
return deadband;
}

public int getActiveCountMultiplier() {
return activeCountMultiplier;
}

public Set<String> getActiveLogicalViewCountSet() {
return activeLogicalViewCountSet;
}

public TimeseriesContext(
String dataType,
String alias,
Expand All @@ -97,6 +119,30 @@
String attributes,
String deadband,
String deadbandParameters) {
this(
dataType,
alias,
encoding,
compression,
tags,
attributes,
deadband,
deadbandParameters,
1,
Collections.emptySet());
}

public TimeseriesContext(

Check warning on line 135 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/TimeseriesContext.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Constructor has 10 parameters, which is greater than 7 authorized.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ5esNY1jL8g0JA_ZWXG&open=AZ5esNY1jL8g0JA_ZWXG&pullRequest=17703
String dataType,
String alias,
String encoding,
String compression,
String tags,
String attributes,
String deadband,
String deadbandParameters,
int activeCountMultiplier,
Set<String> activeLogicalViewCountSet) {
this.dataType = dataType;
this.alias = alias;
this.encoding = encoding;
Expand All @@ -105,6 +151,24 @@
this.attributes = attributes;
this.deadband = deadband;
this.deadbandParameters = deadbandParameters;
this.activeCountMultiplier = activeCountMultiplier;
this.activeLogicalViewCountSet = new HashSet<>(activeLogicalViewCountSet);
}

public TimeseriesContext mergeActiveCount(TimeseriesContext that) {
Set<String> mergedActiveLogicalViewCountSet = new HashSet<>(activeLogicalViewCountSet);
mergedActiveLogicalViewCountSet.addAll(that.activeLogicalViewCountSet);
return new TimeseriesContext(
dataType,
alias,
encoding,
compression,
tags,
attributes,
deadband,
deadbandParameters,
activeCountMultiplier + that.activeCountMultiplier,
mergedActiveLogicalViewCountSet);
}

public void serializeAttributes(ByteBuffer byteBuffer) {
Expand All @@ -116,6 +180,11 @@
ReadWriteIOUtils.write(attributes, byteBuffer);
ReadWriteIOUtils.write(deadband, byteBuffer);
ReadWriteIOUtils.write(deadbandParameters, byteBuffer);
ReadWriteIOUtils.write(activeCountMultiplier, byteBuffer);
ReadWriteIOUtils.write(activeLogicalViewCountSet.size(), byteBuffer);
for (String logicalView : activeLogicalViewCountSet) {
ReadWriteIOUtils.write(logicalView, byteBuffer);
}
}

public void serializeAttributes(DataOutputStream stream) throws IOException {
Expand All @@ -127,6 +196,11 @@
ReadWriteIOUtils.write(attributes, stream);
ReadWriteIOUtils.write(deadband, stream);
ReadWriteIOUtils.write(deadbandParameters, stream);
ReadWriteIOUtils.write(activeCountMultiplier, stream);
ReadWriteIOUtils.write(activeLogicalViewCountSet.size(), stream);
for (String logicalView : activeLogicalViewCountSet) {
ReadWriteIOUtils.write(logicalView, stream);
}
}

public static TimeseriesContext deserialize(ByteBuffer buffer) {
Expand All @@ -138,8 +212,23 @@
String attributes = ReadWriteIOUtils.readString(buffer);
String deadband = ReadWriteIOUtils.readString(buffer);
String deadbandParameters = ReadWriteIOUtils.readString(buffer);
int activeCountMultiplier = ReadWriteIOUtils.readInt(buffer);
int activeLogicalViewCountSetSize = ReadWriteIOUtils.readInt(buffer);
Set<String> activeLogicalViewCountSet = new HashSet<>();
for (int i = 0; i < activeLogicalViewCountSetSize; i++) {
activeLogicalViewCountSet.add(ReadWriteIOUtils.readString(buffer));
}
return new TimeseriesContext(
dataType, alias, encoding, compression, tags, attributes, deadband, deadbandParameters);
dataType,
alias,
encoding,
compression,
tags,
attributes,
deadband,
deadbandParameters,
activeCountMultiplier,
activeLogicalViewCountSet);
}

@Override
Expand All @@ -159,13 +248,24 @@
&& Objects.equals(tags, that.tags)
&& Objects.equals(attributes, that.attributes)
&& Objects.equals(deadband, that.deadband)
&& Objects.equals(deadbandParameters, that.deadbandParameters);
&& Objects.equals(deadbandParameters, that.deadbandParameters)
&& activeCountMultiplier == that.activeCountMultiplier
&& Objects.equals(activeLogicalViewCountSet, that.activeLogicalViewCountSet);
return res;
}

@Override
public int hashCode() {
return Objects.hash(
dataType, alias, encoding, compression, tags, attributes, deadband, deadbandParameters);
dataType,
alias,
encoding,
compression,
tags,
attributes,
deadband,
deadbandParameters,
activeCountMultiplier,
activeLogicalViewCountSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
import org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ISchemaInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.ISchemaReader;

Expand Down Expand Up @@ -95,6 +96,10 @@ public OperatorContext getOperatorContext() {
return operatorContext;
}

private ISchemaRegion getSchemaRegion() {
return ((SchemaDriverContext) operatorContext.getDriverContext()).getSchemaRegion();
}

@Override
public ListenableFuture<?> isBlocked() {
if (isBlocked == null) {
Expand All @@ -109,6 +114,11 @@ public ListenableFuture<?> isBlocked() {
*/
private ListenableFuture<?> tryGetNext() {
if (schemaReader == null) {
if (schemaSource.shouldSkipSchemaRegion(getSchemaRegion())) {
next = null;
isFinished = true;
return NOT_BLOCKED;
}
schemaReader = createTimeSeriesReader();
}
while (true) {
Expand Down Expand Up @@ -172,15 +182,14 @@ public TsBlock next() throws Exception {
@Override
public boolean hasNext() throws Exception {
isBlocked().get(); // wait for the next TsBlock
if (!schemaReader.isSuccess()) {
if (schemaReader != null && !schemaReader.isSuccess()) {
throw new SchemaExecutionException(schemaReader.getFailure());
}
return next != null;
}

public ISchemaReader<T> createTimeSeriesReader() {
return schemaSource.getSchemaReader(
((SchemaDriverContext) operatorContext.getDriverContext()).getSchemaRegion());
return schemaSource.getSchemaReader(getSchemaRegion());
}

private TsBlock constructTsBlockAndClearMap(Map<PartialPath, Long> countMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public ListenableFuture<?> isBlocked() {
*/
private ListenableFuture<?> tryGetNext() {
ISchemaRegion schemaRegion = getSchemaRegion();
if (schemaSource.shouldSkipSchemaRegion(schemaRegion)) {
next = constructTsBlock(0);
return NOT_BLOCKED;
}
if (schemaSource.hasSchemaStatistic(schemaRegion)) {
long statisticCount = schemaSource.getSchemaStatistic(schemaRegion);
// Check if database path itself is counted as a device (bug fix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ void transformToTsBlockColumns(

long getSchemaStatistic(final ISchemaRegion schemaRegion);

default boolean shouldSkipSchemaRegion(final ISchemaRegion schemaRegion) {
return false;
}

default boolean checkRegionDatabaseIncluded(final ISchemaRegion schemaRegion) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaCountSourc
Map<Integer, Template> templateMap,
PathPatternTree scope) {
return new TimeSeriesSchemaSource(
pathPattern, isPrefixMatch, 0, 0, schemaFilter, templateMap, false, scope, null);
pathPattern, isPrefixMatch, 0, 0, schemaFilter, templateMap, false, true, scope, null);
}

// show time series
Expand All @@ -69,6 +69,7 @@ public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaScanSource
schemaFilter,
templateMap,
true,
false,
scope,
timeseriesOrdering);
}
Expand Down
Loading
Loading