diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java index 435182909fb7..c08668e1bafd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/adapter/BeamFlinkDataStreamAdapter.java @@ -228,7 +228,8 @@ FlinkInput.URN, flinkInputTranslator(flinkInputs), DataStream flinkInput = Preconditions.checkStateNotNull( (DataStream) inputMap.get(inputId), - "missing input referenced in proto: " + inputId); + "missing input referenced in proto: %s", + inputId); context.addDataStream( Iterables.getOnlyElement(transform.getOutputsMap().values()), flinkInput.process( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java index 438688015c4f..854b8e300cdb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java @@ -74,7 +74,7 @@ public FlinkSideInputReader( public @Nullable T get(PCollectionView view, BoundedWindow window) { checkNotNull(view, "View passed to sideInput cannot be null"); TupleTag tag = view.getTagInternal(); - checkNotNull(sideInputs.get(tag), "Side input for " + view + " not available."); + checkNotNull(sideInputs.get(tag), "Side input for %s not available.", view); Map sideInputs = runtimeContext.getBroadcastVariableWithInitializer( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java index 0e11531226f7..e30376819f84 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/RangeCoder.java @@ -54,9 +54,9 @@ public void verifyDeterministic() throws NonDeterministicException { @Override public void encode(Range value, OutputStream outStream) throws IOException { Preconditions.checkState( - value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + value); + value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range %s", value); Preconditions.checkState( - value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + value); + value.upperBoundType().equals(BoundType.OPEN), "unexpected range %s", value); boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, outStream); boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, outStream); } diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java index 54fccbceadab..45b1b1942c46 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ReferenceCountingExecutableStageContextFactory.java @@ -115,7 +115,7 @@ public ExecutableStageContext get(JobInfo jobInfo) { private void scheduleRelease(JobInfo jobInfo) { WrappedContext wrapper = getCache().get(jobInfo.jobId()); Preconditions.checkState( - wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId()); + wrapper != null, "Releasing context for unknown job %s", jobInfo.jobId()); PipelineOptions pipelineOptions = PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()); diff --git a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java index b32f03e78e6a..0eef65cfc460 100644 --- a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java +++ b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java @@ -242,7 +242,7 @@ private static String userHome() { } private static String mustGetPropertyAsLowerCase(String name) { - return checkStateNotNull(System.getProperty(name), "System property: " + name + " not set") + return checkStateNotNull(System.getProperty(name), "System property: %s not set", name) .toLowerCase(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java index 6cfd6a435dd7..1fa2c5165fdb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BigDecimalCoder.java @@ -55,7 +55,7 @@ public void encode(BigDecimal value, OutputStream outStream) throws IOException, @Override public void encode(BigDecimal value, OutputStream outStream, Context context) throws IOException, CoderException { - checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); + checkNotNull(value); VAR_INT_CODER.encode(value.scale(), outStream); BIG_INT_CODER.encode(value.unscaledValue(), outStream, context); } @@ -108,7 +108,7 @@ public boolean isRegisterByteSizeObserverCheap(BigDecimal value) { */ @Override protected long getEncodedElementByteSize(BigDecimal value) throws Exception { - checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName())); + checkNotNull(value); return VAR_INT_CODER.getEncodedElementByteSize(value.scale()) + BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue()); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java index 7016242299ad..49eebe13af58 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/AutoValueSchema.java @@ -79,12 +79,10 @@ private static void validateFieldNumbers(List types) } Preconditions.checkState( number == i, - "Expected field number " - + i - + " for field + " - + type.getName() - + " instead got " - + number); + "Expected field number %s for field %s instead got %s", + i, + type.getName(), + number); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java index 14adf2f6603e..9d5e2fa9c744 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java @@ -81,12 +81,10 @@ private static void validateFieldNumbers(List types) } Preconditions.checkState( number == i, - "Expected field number " - + i - + " for field: " - + type.getName() - + " instead got " - + number); + "Expected field number %s for field %s instead got %s", + i, + type.getName(), + number); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java index 9a8eef2bf2c8..3e0c9c879201 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java @@ -103,12 +103,10 @@ private static void validateFieldNumbers(List types) } Preconditions.checkState( number == i, - "Expected field number " - + i - + " for field + " - + type.getName() - + " instead got " - + number); + "Expected field number %s for field %s instead got %s", + i, + type.getName(), + number); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index fee19810c15c..e2c0825e0274 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -169,7 +169,8 @@ public OutputReceiver getRowReceiver(TupleTag tag) { checkStateNotNull(outputCoder, "No output tag for %s ", tag); checkState( outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + tag); return DoFnOutputReceivers.rowReceiver( context, builderSupplier, tag, (SchemaCoder) outputCoder); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java index 467f5ab61a3d..01dc8de1cc2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/InstanceBuilder.java @@ -195,19 +195,16 @@ private T buildFromMethod(Class[] types) { checkState( Modifier.isStatic(method.getModifiers()), - "Factory method must be a static method for " - + factoryClass.getName() - + "#" - + method.getName()); + "Factory method %s#%s must be a static method", + factoryClass.getName(), + method.getName()); checkState( type.isAssignableFrom(method.getReturnType()), - "Return type for " - + factoryClass.getName() - + "#" - + method.getName() - + " must be assignable to " - + type.getSimpleName()); + "Return type for %s#%s must be assignable to %s", + factoryClass.getName(), + method.getName(), + type.getSimpleName()); if (!method.isAccessible()) { method.setAccessible(true); @@ -248,10 +245,9 @@ private T buildFromConstructor(Class[] types) { checkState( type.isAssignableFrom(factoryClass), - "Instance type " - + factoryClass.getName() - + " must be assignable to " - + type.getSimpleName()); + "Instance type %s must be assignable to %s", + factoryClass.getName(), + type.getSimpleName()); if (!constructor.isAccessible()) { constructor.setAccessible(true); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java index 063ca9517a23..f97c7fe75d8c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/BigDecimalCoderTest.java @@ -109,7 +109,6 @@ public void testGetEncodedElementByteSize() throws Exception { @Test public void encodeNullThrowsException() throws Exception { thrown.expect(NullPointerException.class); - thrown.expectMessage("cannot encode a null BigDecimal"); CoderUtils.encodeToBase64(TEST_CODER, null); } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 78aceeaab198..052ff324490d 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -2087,9 +2087,8 @@ public OutputBuilder builder(Row value) { public OutputReceiver outputRowReceiver(DoFn doFn) { checkState( mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + mainOutputTag); return mainRowOutputReceiver; } @@ -2130,9 +2129,8 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { if (tag == null || mainOutputTag.equals(tag)) { checkState( mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + mainOutputTag); return mainRowOutputReceiver; } @@ -2140,7 +2138,8 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { checkState(outputCoder != null, "No output tag for %s", tag); checkState( outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + tag); return new OutputReceiver() { private SerializableFunction fromRowFunction = ((SchemaCoder) outputCoder).getFromRowFunction(); @@ -2426,9 +2425,8 @@ public OutputBuilder builder(Row value) { public OutputReceiver outputRowReceiver(DoFn doFn) { checkState( mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + mainOutputTag); return mainRowOutputReceiver; } @@ -2466,9 +2464,8 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { if (tag == null || mainOutputTag.equals(tag)) { checkState( mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + mainOutputTag); return mainRowOutputReceiver; } @@ -2476,7 +2473,8 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { checkState(outputCoder != null, "No output tag for %s", tag); checkState( outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + tag); return new OutputReceiver() { private SerializableFunction fromRowFunction = ((SchemaCoder) outputCoder).getFromRowFunction(); @@ -2736,9 +2734,8 @@ public OutputBuilder builder(Row value) { public OutputReceiver outputRowReceiver(DoFn doFn) { checkState( mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + mainOutputTag); return mainRowOutputReceiver; } @@ -2776,9 +2773,8 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { if (tag == null || mainOutputTag.equals(tag)) { checkState( mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + mainOutputTag); return mainRowOutputReceiver; } @@ -2786,7 +2782,8 @@ private OutputReceiver createTaggedRowReceiver(TupleTag tag) { checkState(outputCoder != null, "No output tag for %s", tag); checkState( outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); + "Output with tag %s must have a schema in order to call getRowReceiver", + tag); return new OutputReceiver() { private SerializableFunction fromRowFunction = ((SchemaCoder) outputCoder).getFromRowFunction(); diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java index 1429253d1948..28e63aeac581 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java @@ -770,27 +770,20 @@ public Write withSsl(ValueProvider sslOptions) { public void validate(PipelineOptions pipelineOptions) { checkState( hosts() != null, - "CassandraIO." - + getMutationTypeName() - + "() requires a list of hosts to be set via withHosts(hosts)"); + "CassandraIO.%s() requires a list of hosts to be set via withHosts(hosts)", + getMutationTypeName()); checkState( port() != null, - "CassandraIO." - + getMutationTypeName() - + "() requires a " - + "valid port number to be set via withPort(port)"); + "CassandraIO.%s() requires a valid port number to be set via withPort(port)", + getMutationTypeName()); checkState( keyspace() != null, - "CassandraIO." - + getMutationTypeName() - + "() requires a keyspace to be set via " - + "withKeyspace(keyspace)"); + "CassandraIO.%s() requires a keyspace to be set via withKeyspace(keyspace)", + getMutationTypeName()); checkState( entity() != null, - "CassandraIO." - + getMutationTypeName() - + "() requires an entity to be set via " - + "withEntity(entity)"); + "CassandraIO.%s() requires an entity to be set via withEntity(entity)", + getMutationTypeName()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index d940ff8dd7fc..a4d20707304e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -227,13 +227,13 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) { TypeName containedTypeName = Preconditions.checkNotNull( elementType.getTypeName(), - "Null type name found in contained type at " + field.getName()); + "Null type name found in contained type at %s", + field.getName()); Preconditions.checkState( !(containedTypeName.isCollectionType() || containedTypeName.isMapType()), - "Nested container types are not supported by BigQuery. Field " - + field.getName() - + " contains a type " - + containedTypeName.name()); + "Nested container types are not supported by BigQuery. Field %s contains a type %s", + field.getName(), + containedTypeName.name()); TableFieldSchema elementFieldSchema = fieldDescriptorFromBeamField(Field.of(field.getName(), elementType)); builder = builder.setType(elementFieldSchema.getType()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 02d14b745fe8..d247886cc674 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -2132,10 +2132,8 @@ public void processElement(OutputReceiver receiver) { if (logTopicVerification == null || !logTopicVerification) { checkState( partitionInfoList != null && !partitionInfoList.isEmpty(), - "Could not find any partitions info for topic " - + topic - + ". Please check Kafka configuration and make sure " - + "that provided topics exist."); + "Could not find any partitions info for topic %s. Please check Kafka configuration and make sure that provided topics exist.", + topic); } else { LOG.warn( "Could not find any partitions info for topic {}. Please check Kafka configuration " diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java index 37fc6378e2d0..01bbe61ab9ab 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java @@ -129,11 +129,9 @@ public List> split(int desiredNumSplits, PipelineOpti for (Integer p : providedPartitions) { checkState( partitionsForTopic.contains(p), - "Partition " - + p - + " does not exist for topic " - + providedTopic - + ". Please check Kafka configuration."); + "Partition %s does not exist for topic %s. Please check Kafka configuration.", + p, + providedTopic); } } else { for (Integer p : providedPartitions) { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java index 65ece98d618f..490faafb22fa 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java @@ -193,10 +193,8 @@ static List getAllTopicPartitions( List partitionInfoList = kafkaConsumer.partitionsFor(topic); checkState( partitionInfoList != null && !partitionInfoList.isEmpty(), - "Could not find any partitions info for topic " - + topic - + ". Please check Kafka configuration and make sure " - + "that provided topics exist."); + "Could not find any partitions info for topic %s. Please check Kafka configuration and make sure that provided topics exist.", + topic); for (PartitionInfo partition : partitionInfoList) { current.add(new TopicPartition(topic, partition.partition())); }