Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public PipeConfigNodePluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
}

@Override
protected PipeSourceConstructor createPipeExtractorConstructor(
protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionSourceConstructor();
}
Expand All @@ -44,7 +44,7 @@ protected PipeProcessorConstructor createPipeProcessorConstructor(
}

@Override
protected PipeSinkConstructor createPipeConnectorConstructor(
protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeConfigRegionSinkConstructor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWri
true);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Successfully transferred config event {}.", pipeConfigRegionWritePlanEvent);
}
LOGGER.info("Successfully transferred config event {}.", pipeConfigRegionWritePlanEvent);
}

private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public PipeDataRegionPluginAgent(DataNodePipePluginMetaKeeper pipePluginMetaKeep
}

@Override
protected PipeSourceConstructor createPipeExtractorConstructor(
protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeDataRegionSourceConstructor((DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
}
Expand All @@ -55,26 +55,26 @@ protected PipeProcessorConstructor createPipeProcessorConstructor(
}

@Override
protected PipeSinkConstructor createPipeConnectorConstructor(
protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeDataRegionSinkConstructor((DataNodePipePluginMetaKeeper) pipePluginMetaKeeper);
}

@Override
public void validate(
String pipeName,
Map<String, String> extractorAttributes,
Map<String, String> sourceAttributes,
Map<String, String> processorAttributes,
Map<String, String> connectorAttributes)
Map<String, String> sinkAttributes)
throws Exception {
PipeExtractor temporaryExtractor = validateExtractor(extractorAttributes);
PipeExtractor temporaryExtractor = validateSource(sourceAttributes);
PipeProcessor temporaryProcessor = validateProcessor(processorAttributes);
PipeConnector temporaryConnector = validateConnector(pipeName, connectorAttributes);
PipeConnector temporaryConnector = validateSink(pipeName, sinkAttributes);

// validate visibility
// TODO: validate visibility for schema region and config region
Visibility pipeVisibility =
VisibilityUtils.calculateFromExtractorParameters(new PipeParameters(extractorAttributes));
VisibilityUtils.calculateFromExtractorParameters(new PipeParameters(sourceAttributes));
Visibility extractorVisibility =
VisibilityUtils.calculateFromPluginClass(temporaryExtractor.getClass());
Visibility processorVisibility =
Expand All @@ -88,13 +88,13 @@ public void validate(
"The visibility of the pipe (%s, %s) is not compatible with the visibility of the extractor (%s, %s, %s), processor (%s, %s, %s), and connector (%s, %s, %s).",
pipeName,
pipeVisibility,
extractorAttributes,
sourceAttributes,
temporaryExtractor.getClass().getName(),
extractorVisibility,
processorAttributes,
temporaryProcessor.getClass().getName(),
processorVisibility,
connectorAttributes,
sinkAttributes,
temporaryConnector.getClass().getName(),
connectorVisibility));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public PipeSchemaRegionPluginAgent(PipePluginMetaKeeper pipePluginMetaKeeper) {
}

@Override
protected PipeSourceConstructor createPipeExtractorConstructor(
protected PipeSourceConstructor createPipeSourceConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionSourceConstructor();
}
Expand All @@ -44,7 +44,7 @@ protected PipeProcessorConstructor createPipeProcessorConstructor(
}

@Override
protected PipeSinkConstructor createPipeConnectorConstructor(
protected PipeSinkConstructor createPipeSinkConstructor(
PipePluginMetaKeeper pipePluginMetaKeeper) {
return new PipeSchemaRegionSinkConstructor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public PipeDataNodeTask build() {
blendUserAndSystemParameters(pipeStaticMeta.getProcessorParameters()),
regionId,
sourceStage.getEventSupplier(),
sinkStage.getPipeConnectorPendingQueue(),
sinkStage.getPipeSinkPendingQueue(),
PROCESSOR_EXECUTOR,
pipeTaskMeta,
pipeStaticMeta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class PipeTaskSinkStage extends PipeTaskStage {

protected final String pipeName;
protected final long creationTime;
protected final PipeParameters pipeConnectorParameters;
protected final PipeParameters pipeSinkParameters;
protected final int regionId;
protected final Supplier<? extends PipeSinkSubtaskExecutor> executor;

Expand All @@ -43,12 +43,12 @@ public class PipeTaskSinkStage extends PipeTaskStage {
public PipeTaskSinkStage(
String pipeName,
long creationTime,
PipeParameters pipeConnectorParameters,
PipeParameters pipeSinkParameters,
int regionId,
Supplier<? extends PipeSinkSubtaskExecutor> executor) {
this.pipeName = pipeName;
this.creationTime = creationTime;
this.pipeConnectorParameters = pipeConnectorParameters;
this.pipeSinkParameters = pipeSinkParameters;
this.regionId = regionId;
this.executor = executor;

Expand All @@ -60,7 +60,7 @@ protected void registerSubtask() {
PipeSinkSubtaskManager.instance()
.register(
executor,
pipeConnectorParameters,
pipeSinkParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, regionId));
}

Expand All @@ -85,7 +85,7 @@ public void dropSubtask() throws PipeException {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}

public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
return PipeSinkSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
return PipeSinkSubtaskManager.instance().getPipeSinkPendingQueue(connectorSubtaskId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class PipeSinkSubtaskLifeCycle implements AutoCloseable {
protected int registeredTaskCount;

public PipeSinkSubtaskLifeCycle(
PipeSinkSubtaskExecutor executor,
PipeSinkSubtask subtask,
UnboundedBlockingPendingQueue<Event> pendingQueue) {
final PipeSinkSubtaskExecutor executor,
final PipeSinkSubtask subtask,
final UnboundedBlockingPendingQueue<Event> pendingQueue) {
this.executor = executor;
this.subtask = subtask;
this.pendingQueue = pendingQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public synchronized void stop(final String attributeSortedString) {
}
}

public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue(
final String attributeSortedString) {
if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
throw new PipeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
public class PipeDataNodeRemainingEventAndTimeOperator extends PipeRemainingOperator {

// Calculate from schema region extractors directly for it requires less computation
private final Set<IoTDBSchemaRegionSource> schemaRegionExtractors =
private final Set<IoTDBSchemaRegionSource> schemaRegionSources =
Collections.newSetFromMap(new ConcurrentHashMap<>());

private final AtomicInteger insertNodeEventCount = new AtomicInteger(0);
Expand Down Expand Up @@ -105,7 +105,7 @@ public long getRemainingNonHeartbeatEvents() {
tsfileEventCount.get()
+ rawTabletEventCount.get()
+ insertNodeEventCount.get()
+ schemaRegionExtractors.stream()
+ schemaRegionSources.stream()
.map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
Expand Down Expand Up @@ -157,7 +157,7 @@ public double getRemainingTime() {
}

final long totalSchemaRegionWriteEventCount =
schemaRegionExtractors.stream()
schemaRegionSources.stream()
.map(IoTDBSchemaRegionSource::getUnTransferredEventCount)
.reduce(Long::sum)
.orElse(0L);
Expand Down Expand Up @@ -192,8 +192,8 @@ public double getRemainingTime() {

//////////////////////////// Register & deregister (pipe integration) ////////////////////////////

void register(final IoTDBSchemaRegionSource extractor) {
schemaRegionExtractors.add(extractor);
void register(final IoTDBSchemaRegionSource source) {
schemaRegionSources.add(source);
}

//////////////////////////// Rate ////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private ClusterConfigTaskExecutorHolder() {}
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create();
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException(
new IoTDBException(
"Subscription not enabled, please set config `subscription_enabled` to true.",
"Subscription is not enabled.",
TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
public class SubscriptionTaskSinkStage extends PipeTaskSinkStage {

public SubscriptionTaskSinkStage(
String pipeName,
long creationTime,
PipeParameters pipeConnectorParameters,
int regionId,
PipeSinkSubtaskExecutor executor) {
super(pipeName, creationTime, pipeConnectorParameters, regionId, () -> executor);
final String pipeName,
final long creationTime,
final PipeParameters pipeSinkParameters,
final int regionId,
final PipeSinkSubtaskExecutor executor) {
super(pipeName, creationTime, pipeSinkParameters, regionId, () -> executor);
}

@Override
Expand All @@ -45,7 +45,7 @@ protected void registerSubtask() {
SubscriptionSinkSubtaskManager.instance()
.register(
executor.get(),
pipeConnectorParameters,
pipeSinkParameters,
new PipeTaskSinkRuntimeEnvironment(pipeName, creationTime, regionId));
}

Expand All @@ -70,7 +70,7 @@ public void dropSubtask() throws PipeException {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}

public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
public UnboundedBlockingPendingQueue<Event> getPipeSinkPendingQueue() {
return SubscriptionSinkSubtaskManager.instance()
.getPipeConnectorPendingQueue(connectorSubtaskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public class SubscriptionSinkSubtaskManager {

public synchronized String register(
final PipeSinkSubtaskExecutor executor,
final PipeParameters pipeConnectorParameters,
final PipeParameters pipeSinkParameters,
final PipeTaskSinkRuntimeEnvironment environment) {
final String connectorKey =
pipeConnectorParameters
pipeSinkParameters
.getStringOrDefault(
Arrays.asList(PipeSinkConstant.CONNECTOR_KEY, PipeSinkConstant.SINK_KEY),
BuiltinPipePlugin.IOTDB_THRIFT_CONNECTOR.getPipePluginName())
Expand All @@ -86,13 +86,13 @@ public synchronized String register(
environment.getRegionId(),
connectorKey);

boolean realTimeFirst =
pipeConnectorParameters.getBooleanOrDefault(
final boolean realTimeFirst =
pipeSinkParameters.getBooleanOrDefault(
Arrays.asList(
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
PipeSinkConstant.SINK_REALTIME_FIRST_KEY),
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
String attributeSortedString = generateAttributeSortedString(pipeConnectorParameters);
String attributeSortedString = generateAttributeSortedString(pipeSinkParameters);
attributeSortedString = "__subscription_" + attributeSortedString;

if (!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) {
Expand All @@ -101,36 +101,33 @@ public synchronized String register(
? new PipeRealtimePriorityBlockingQueue()
: new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter());

final PipeConnector pipeConnector =
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeConnectorParameters);
final PipeConnector pipeSink =
PipeDataNodeAgent.plugin().dataRegion().reflectSink(pipeSinkParameters);
// 1. Construct, validate and customize PipeConnector, and then handshake (create connection)
// with the target
try {
pipeConnector.validate(new PipeParameterValidator(pipeConnectorParameters));
pipeConnector.customize(
pipeConnectorParameters, new PipeTaskRuntimeConfiguration(environment));
pipeConnector.handshake();
pipeSink.validate(new PipeParameterValidator(pipeSinkParameters));
pipeSink.customize(pipeSinkParameters, new PipeTaskRuntimeConfiguration(environment));
pipeSink.handshake();
} catch (final Exception e) {
try {
pipeConnector.close();
pipeSink.close();
} catch (final Exception closeException) {
LOGGER.warn(
"Failed to close connector after failed to initialize connector. "
+ "Ignore this exception.",
"Failed to close sink after failed to initialize sink. " + "Ignore this exception.",
closeException);
}
throw new PipeException(
"Failed to construct PipeConnector, because of " + e.getMessage(), e);
throw new PipeException("Failed to construct PipeSink, because of " + e.getMessage(), e);
}

// 2. Fetch topic and consumer group id from connector parameters
final String topicName = pipeConnectorParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
final String topicName = pipeSinkParameters.getString(PipeSinkConstant.SINK_TOPIC_KEY);
final String consumerGroupId =
pipeConnectorParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
pipeSinkParameters.getString(PipeSinkConstant.SINK_CONSUMER_GROUP_KEY);
if (Objects.isNull(topicName) || Objects.isNull(consumerGroupId)) {
throw new SubscriptionException(
String.format(
"Failed to construct subscription connector, because of %s or %s does not exist in pipe connector parameters",
"Failed to construct subscription sink, because of %s or %s does not exist in pipe connector parameters",
PipeSinkConstant.SINK_TOPIC_KEY, PipeSinkConstant.SINK_CONSUMER_GROUP_KEY));
}

Expand All @@ -142,7 +139,7 @@ public synchronized String register(
attributeSortedString,
0,
pendingQueue,
pipeConnector,
pipeSink,
topicName,
consumerGroupId);
final PipeSinkSubtaskLifeCycle pipeSinkSubtaskLifeCycle =
Expand Down
Loading
Loading