Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ FlinkInput.URN, flinkInputTranslator(flinkInputs),
DataStream<InputT> flinkInput =
Preconditions.checkStateNotNull(
(DataStream<InputT>) inputMap.get(inputId),
"missing input referenced in proto: " + inputId);
"missing input referenced in proto: %s",
inputId);
context.addDataStream(
Iterables.getOnlyElement(transform.getOutputsMap().values()),
flinkInput.process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public FlinkSideInputReader(
public <T> @Nullable T get(PCollectionView<T> view, BoundedWindow window) {
checkNotNull(view, "View passed to sideInput cannot be null");
TupleTag<?> tag = view.getTagInternal();
checkNotNull(sideInputs.get(tag), "Side input for " + view + " not available.");
checkNotNull(sideInputs.get(tag), "Side input for %s not available.", view);

Map<BoundedWindow, T> sideInputs =
runtimeContext.getBroadcastVariableWithInitializer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void verifyDeterministic() throws NonDeterministicException {
@Override
public void encode(Range<T> value, OutputStream outStream) throws IOException {
Preconditions.checkState(
value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range " + value);
value.lowerBoundType().equals(BoundType.CLOSED), "unexpected range %s", value);
Preconditions.checkState(
value.upperBoundType().equals(BoundType.OPEN), "unexpected range " + value);
value.upperBoundType().equals(BoundType.OPEN), "unexpected range %s", value);
boundCoder.encode(value.hasLowerBound() ? value.lowerEndpoint() : null, outStream);
boundCoder.encode(value.hasUpperBound() ? value.upperEndpoint() : null, outStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public ExecutableStageContext get(JobInfo jobInfo) {
private void scheduleRelease(JobInfo jobInfo) {
WrappedContext wrapper = getCache().get(jobInfo.jobId());
Preconditions.checkState(
wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId());
wrapper != null, "Releasing context for unknown job %s", jobInfo.jobId());

PipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private static String userHome() {
}

private static String mustGetPropertyAsLowerCase(String name) {
return checkStateNotNull(System.getProperty(name), "System property: " + name + " not set")
return checkStateNotNull(System.getProperty(name), "System property: %s not set", name)
.toLowerCase();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void encode(BigDecimal value, OutputStream outStream) throws IOException,
@Override
public void encode(BigDecimal value, OutputStream outStream, Context context)
throws IOException, CoderException {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
checkNotNull(value);
VAR_INT_CODER.encode(value.scale(), outStream);
BIG_INT_CODER.encode(value.unscaledValue(), outStream, context);
}
Expand Down Expand Up @@ -108,7 +108,7 @@ public boolean isRegisterByteSizeObserverCheap(BigDecimal value) {
*/
@Override
protected long getEncodedElementByteSize(BigDecimal value) throws Exception {
checkNotNull(value, String.format("cannot encode a null %s", BigDecimal.class.getSimpleName()));
checkNotNull(value);
return VAR_INT_CODER.getEncodedElementByteSize(value.scale())
+ BIG_INT_CODER.getEncodedElementByteSize(value.unscaledValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,10 @@ private static void validateFieldNumbers(List<FieldValueTypeInformation> types)
}
Preconditions.checkState(
number == i,
"Expected field number "
+ i
+ " for field + "
+ type.getName()
+ " instead got "
+ number);
"Expected field number %s for field %s instead got %s",
i,
type.getName(),
number);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,10 @@ private static void validateFieldNumbers(List<FieldValueTypeInformation> types)
}
Preconditions.checkState(
number == i,
"Expected field number "
+ i
+ " for field: "
+ type.getName()
+ " instead got "
+ number);
"Expected field number %s for field %s instead got %s",
i,
type.getName(),
number);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,10 @@ private static void validateFieldNumbers(List<FieldValueTypeInformation> types)
}
Preconditions.checkState(
number == i,
"Expected field number "
+ i
+ " for field + "
+ type.getName()
+ " instead got "
+ number);
"Expected field number %s for field %s instead got %s",
i,
type.getName(),
number);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
checkStateNotNull(outputCoder, "No output tag for %s ", tag);
checkState(
outputCoder instanceof SchemaCoder,
"Output with tag " + tag + " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
tag);
return DoFnOutputReceivers.rowReceiver(
context, builderSupplier, tag, (SchemaCoder<T>) outputCoder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,19 +195,16 @@ private T buildFromMethod(Class<?>[] types) {

checkState(
Modifier.isStatic(method.getModifiers()),
"Factory method must be a static method for "
+ factoryClass.getName()
+ "#"
+ method.getName());
"Factory method %s#%s must be a static method",
factoryClass.getName(),
method.getName());

checkState(
type.isAssignableFrom(method.getReturnType()),
"Return type for "
+ factoryClass.getName()
+ "#"
+ method.getName()
+ " must be assignable to "
+ type.getSimpleName());
"Return type for %s#%s must be assignable to %s",
factoryClass.getName(),
method.getName(),
type.getSimpleName());

if (!method.isAccessible()) {
method.setAccessible(true);
Expand Down Expand Up @@ -248,10 +245,9 @@ private T buildFromConstructor(Class<?>[] types) {

checkState(
type.isAssignableFrom(factoryClass),
"Instance type "
+ factoryClass.getName()
+ " must be assignable to "
+ type.getSimpleName());
"Instance type %s must be assignable to %s",
factoryClass.getName(),
type.getSimpleName());

if (!constructor.isAccessible()) {
constructor.setAccessible(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ public void testGetEncodedElementByteSize() throws Exception {
@Test
public void encodeNullThrowsException() throws Exception {
thrown.expect(NullPointerException.class);
thrown.expectMessage("cannot encode a null BigDecimal");

CoderUtils.encodeToBase64(TEST_CODER, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2087,9 +2087,8 @@ public OutputBuilder<Row> builder(Row value) {
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
checkState(
mainOutputSchemaCoder != null,
"Output with tag "
+ mainOutputTag
+ " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
mainOutputTag);
return mainRowOutputReceiver;
}

Expand Down Expand Up @@ -2130,17 +2129,17 @@ private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
"Output with tag "
+ mainOutputTag
+ " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
mainOutputTag);
return mainRowOutputReceiver;
}

Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag);
checkState(outputCoder != null, "No output tag for %s", tag);
checkState(
outputCoder instanceof SchemaCoder,
"Output with tag " + tag + " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
tag);
return new OutputReceiver<Row>() {
private SerializableFunction<Row, T> fromRowFunction =
((SchemaCoder) outputCoder).getFromRowFunction();
Expand Down Expand Up @@ -2426,9 +2425,8 @@ public OutputBuilder<Row> builder(Row value) {
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
checkState(
mainOutputSchemaCoder != null,
"Output with tag "
+ mainOutputTag
+ " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
mainOutputTag);
return mainRowOutputReceiver;
}

Expand Down Expand Up @@ -2466,17 +2464,17 @@ private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
"Output with tag "
+ mainOutputTag
+ " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
mainOutputTag);
return mainRowOutputReceiver;
}

Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag);
checkState(outputCoder != null, "No output tag for %s", tag);
checkState(
outputCoder instanceof SchemaCoder,
"Output with tag " + tag + " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
tag);
return new OutputReceiver<Row>() {
private SerializableFunction<Row, T> fromRowFunction =
((SchemaCoder) outputCoder).getFromRowFunction();
Expand Down Expand Up @@ -2736,9 +2734,8 @@ public OutputBuilder<Row> builder(Row value) {
public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
checkState(
mainOutputSchemaCoder != null,
"Output with tag "
+ mainOutputTag
+ " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
mainOutputTag);
return mainRowOutputReceiver;
}

Expand Down Expand Up @@ -2776,17 +2773,17 @@ private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) {
if (tag == null || mainOutputTag.equals(tag)) {
checkState(
mainOutputSchemaCoder != null,
"Output with tag "
+ mainOutputTag
+ " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
mainOutputTag);
return mainRowOutputReceiver;
}

Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag);
checkState(outputCoder != null, "No output tag for %s", tag);
checkState(
outputCoder instanceof SchemaCoder,
"Output with tag " + tag + " must have a schema in order to call getRowReceiver");
"Output with tag %s must have a schema in order to call getRowReceiver",
tag);
return new OutputReceiver<Row>() {
private SerializableFunction<Row, T> fromRowFunction =
((SchemaCoder) outputCoder).getFromRowFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,27 +770,20 @@ public Write<T> withSsl(ValueProvider<SSLOptions> sslOptions) {
public void validate(PipelineOptions pipelineOptions) {
checkState(
hosts() != null,
"CassandraIO."
+ getMutationTypeName()
+ "() requires a list of hosts to be set via withHosts(hosts)");
"CassandraIO.%s() requires a list of hosts to be set via withHosts(hosts)",
getMutationTypeName());
checkState(
port() != null,
"CassandraIO."
+ getMutationTypeName()
+ "() requires a "
+ "valid port number to be set via withPort(port)");
"CassandraIO.%s() requires a valid port number to be set via withPort(port)",
getMutationTypeName());
checkState(
keyspace() != null,
"CassandraIO."
+ getMutationTypeName()
+ "() requires a keyspace to be set via "
+ "withKeyspace(keyspace)");
"CassandraIO.%s() requires a keyspace to be set via withKeyspace(keyspace)",
getMutationTypeName());
checkState(
entity() != null,
"CassandraIO."
+ getMutationTypeName()
+ "() requires an entity to be set via "
+ "withEntity(entity)");
"CassandraIO.%s() requires an entity to be set via withEntity(entity)",
getMutationTypeName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ private static TableFieldSchema fieldDescriptorFromBeamField(Field field) {
TypeName containedTypeName =
Preconditions.checkNotNull(
elementType.getTypeName(),
"Null type name found in contained type at " + field.getName());
"Null type name found in contained type at %s",
field.getName());
Preconditions.checkState(
!(containedTypeName.isCollectionType() || containedTypeName.isMapType()),
"Nested container types are not supported by BigQuery. Field "
+ field.getName()
+ " contains a type "
+ containedTypeName.name());
"Nested container types are not supported by BigQuery. Field %s contains a type %s",
field.getName(),
containedTypeName.name());
TableFieldSchema elementFieldSchema =
fieldDescriptorFromBeamField(Field.of(field.getName(), elementType));
builder = builder.setType(elementFieldSchema.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2132,10 +2132,8 @@ public void processElement(OutputReceiver<KafkaSourceDescriptor> receiver) {
if (logTopicVerification == null || !logTopicVerification) {
checkState(
partitionInfoList != null && !partitionInfoList.isEmpty(),
"Could not find any partitions info for topic "
+ topic
+ ". Please check Kafka configuration and make sure "
+ "that provided topics exist.");
"Could not find any partitions info for topic %s. Please check Kafka configuration and make sure that provided topics exist.",
topic);
} else {
LOG.warn(
"Could not find any partitions info for topic {}. Please check Kafka configuration "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,9 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
for (Integer p : providedPartitions) {
checkState(
partitionsForTopic.contains(p),
"Partition "
+ p
+ " does not exist for topic "
+ providedTopic
+ ". Please check Kafka configuration.");
"Partition %s does not exist for topic %s. Please check Kafka configuration.",
p,
providedTopic);
}
} else {
for (Integer p : providedPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,8 @@ static List<TopicPartition> getAllTopicPartitions(
List<PartitionInfo> partitionInfoList = kafkaConsumer.partitionsFor(topic);
checkState(
partitionInfoList != null && !partitionInfoList.isEmpty(),
"Could not find any partitions info for topic "
+ topic
+ ". Please check Kafka configuration and make sure "
+ "that provided topics exist.");
"Could not find any partitions info for topic %s. Please check Kafka configuration and make sure that provided topics exist.",
topic);
for (PartitionInfo partition : partitionInfoList) {
current.add(new TopicPartition(topic, partition.partition()));
}
Expand Down
Loading