Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1579,7 +1579,6 @@ class BeamModulePlugin implements Plugin<Project> {
"DirectInvocationOnMock",
"Finalize",
"JUnitIncompatibleType",
"LongDoubleConversion",
"MockNotUsedInProduction",
"NarrowCalculation",
"NullableTypeParameter",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private void testRead() throws IOException {
region,
launchInfo.jobId(),
getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));
assertEquals(configuration.numRecords, numRecords, 0.5);
assertEquals((double) configuration.numRecords, numRecords, 0.5);

// export metrics
MetricsConfiguration metricsConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void testSpannerWriteAndRead() throws IOException {
region,
readInfo.jobId(),
getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));
assertEquals(configuration.numRecords, numRecords, 0.5);
assertEquals((double) configuration.numRecords, numRecords, 0.5);

// export metrics
MetricsConfiguration metricsConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public void testTextIOWriteThenRead() throws IOException {
readInfo.jobId(),
getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME));

assertEquals(configuration.numRecords, numRecords, 0.5);
assertEquals((double) configuration.numRecords, numRecords, 0.5);

// export metrics
MetricsConfiguration metricsConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private int calculateShards(long totalRecords) {
return (int) totalRecords;
}
// 100mil records before >7 output files
int floorLogRecs = (int) Math.log10(totalRecords);
int floorLogRecs = (int) Math.log10((double) totalRecords);
return Math.max(floorLogRecs, MIN_SHARDS_FOR_LOG) + extraShards;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ public void close() throws IOException {
@VisibleForTesting
static @Nullable ReportedParallelism longToParallelism(long value) {
if (value >= 0) {
return new ReportedParallelism().setValue(Double.valueOf(value));
return new ReportedParallelism().setValue(Double.valueOf((double) value));
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public final CounterDistribution addValue(long value) {
long count = this.getCount() + 1;
long sum = this.getSum() + value;
// TODO: Replace sum-of-squares with statistics for a better stddev algorithm.
double sumOfSquares = this.getSumOfSquares() + Math.pow(value, 2);
double sumOfSquares = this.getSumOfSquares() + Math.pow((double) value, 2);

int bucketIndex = calculateBucket(value);
List<Long> buckets = incrementBucket(bucketIndex);
Expand Down
4 changes: 2 additions & 2 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public void splitRestriction(
desiredChunkSize = 64 << 20; // 64mb
} else {
// 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 32k shards
desiredChunkSize = Math.max(1 << 20, (long) (1000 * Math.sqrt(estimatedSize)));
desiredChunkSize = Math.max(1 << 20, (long) (1000 * Math.sqrt((double) estimatedSize)));
}
List<BoundedSourceT> splits =
(List<BoundedSourceT>) restriction.split(desiredChunkSize, pipelineOptions);
Expand Down Expand Up @@ -1079,7 +1079,7 @@ private Progress tryGetProgressOrThrow() throws IOException {
if (size != UnboundedReader.BACKLOG_UNKNOWN) {
// The UnboundedSource/UnboundedReader API has no way of reporting how much work
// has been completed so runners can only see the work remaining changing.
return Progress.from(0, size);
return Progress.from(0, (double) size);
}

// TODO: Support "global" backlog reporting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public ApproximateQuantilesCombineFn<T, ComparatorT> withEpsilon(double epsilon)
* of {@code maxNumElements}.
*/
public ApproximateQuantilesCombineFn<T, ComparatorT> withMaxInputSize(long maxNumElements) {
return create(numQuantiles, compareFn, maxNumElements, maxNumElements);
return create(numQuantiles, compareFn, maxNumElements, (double) maxNumElements);
}

/**
Expand Down Expand Up @@ -484,7 +484,8 @@ private QuantileBuffer<T> collapse(Iterable<QuantileBuffer<T>> buffers) {
newLevel = Math.max(newLevel, buffer.level + 1);
newWeight += buffer.weight;
}
List<T> newElements = interpolate(buffers, bufferSize, newWeight, offset(newWeight));
List<T> newElements =
interpolate(buffers, bufferSize, (double) newWeight, (double) offset(newWeight));
return new QuantileBuffer<>(newLevel, newWeight, newElements);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public long nextBackOffMillis() {
Math.min(
backoffConfig.initialBackoff.getMillis()
* Math.pow(backoffConfig.exponent, currentRetry),
backoffConfig.maxBackoff.getMillis());
(double) backoffConfig.maxBackoff.getMillis());
double randomOffset =
(Math.random() * 2 - 1) * DEFAULT_RANDOMIZATION_FACTOR * currentIntervalMillis;
long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testProgressAfterFinished() throws Exception {
tracker.checkDone();
simpleEstimator.setEstimateRangeEnd(0L);
Progress currentProgress = tracker.getProgress();
assertEquals(Long.MAX_VALUE - 10L, currentProgress.getWorkCompleted(), 0);
assertEquals((double) (Long.MAX_VALUE - 10L), currentProgress.getWorkCompleted(), 0);
assertEquals(0, currentProgress.getWorkRemaining(), 0.001);
}

Expand All @@ -197,23 +197,23 @@ public void testProgress() throws Exception {

simpleEstimator.setEstimateRangeEnd(5L);
Progress currentProgress = tracker.getProgress();
assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
assertEquals((double) (cur - start), currentProgress.getWorkCompleted(), 0.001);
assertEquals(0, currentProgress.getWorkRemaining(), 0.001);

simpleEstimator.setEstimateRangeEnd(35L);
currentProgress = tracker.getProgress();
assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
assertEquals(35L - cur, currentProgress.getWorkRemaining(), 0.001);
assertEquals((double) (cur - start), currentProgress.getWorkCompleted(), 0.001);
assertEquals((double) (35L - cur), currentProgress.getWorkRemaining(), 0.001);

simpleEstimator.setEstimateRangeEnd(25L);
currentProgress = tracker.getProgress();
assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
assertEquals(25L - cur, currentProgress.getWorkRemaining(), 0.001);
assertEquals((double) (cur - start), currentProgress.getWorkCompleted(), 0.001);
assertEquals((double) (25L - cur), currentProgress.getWorkRemaining(), 0.001);

simpleEstimator.setEstimateRangeEnd(Long.MAX_VALUE);
currentProgress = tracker.getProgress();
assertEquals(cur - start, currentProgress.getWorkCompleted(), 0.001);
assertEquals(Long.MAX_VALUE - cur, currentProgress.getWorkRemaining(), 0.001);
assertEquals((double) (cur - start), currentProgress.getWorkCompleted(), 0.001);
assertEquals((double) (Long.MAX_VALUE - cur), currentProgress.getWorkRemaining(), 0.001);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public LoggingInterceptor(Histogram requestLatencies) {
@Override
public void interceptResponse(HttpResponse response) throws IOException {
long timeToResponse = System.currentTimeMillis() - startTime;
requestLatencies.update(timeToResponse);
requestLatencies.update((double) timeToResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(
suppliers.add(
reader -> {
long fieldsRead = reader.getCounterMetric(FIELDS_READ_METRIC);
return NamedTestResult.create(uuid, timestamp, FIELDS_READ_METRIC, fieldsRead);
return NamedTestResult.create(uuid, timestamp, FIELDS_READ_METRIC, (double) fieldsRead);
});
return suppliers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ public void processElement(

private long getDesiredChunkSize(
PipelineOptions options, BigQueryStorageTableSource<T> output) throws Exception {
return Math.max(1 << 20, (long) (1000 * Math.sqrt(output.getEstimatedSizeBytes(options))));
return Math.max(
1 << 20, (long) (1000 * Math.sqrt((double) output.getEstimatedSizeBytes(options))));
}
}

Expand Down Expand Up @@ -1755,7 +1756,8 @@ private PCollection<T> expandForDirectRead(
desiredChunkSize = 64 << 20; // 64mb
} else {
// 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 32k shards
desiredChunkSize = Math.max(1 << 20, (long) (1000 * Math.sqrt(estimatedSize)));
desiredChunkSize =
Math.max(1 << 20, (long) (1000 * Math.sqrt((double) estimatedSize)));
}
sources = source.split(desiredChunkSize, bqOptions);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private static void updateRpcLatencyMetric(@Nonnull Context<?> c, RpcMethod meth
}
long timeElapsed = java.time.Duration.between(operationStartTime, operationEndTime).toMillis();
if (timeElapsed > 0) {
BigQuerySinkMetrics.createRPCLatencyHistogram(method).update(timeElapsed);
BigQuerySinkMetrics.createRPCLatencyHistogram(method).update((double) timeElapsed);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void updateSuccessfulRpcMetrics(Instant start, Instant end) {
/** Record rpc latency histogram metrics. */
private void recordRpcLatencyMetrics() {
for (Duration d : rpcLatencies()) {
LATENCY_HISTOGRAM.update(d.toMillis());
LATENCY_HISTOGRAM.update((double) d.toMillis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ private double calcMaxRequestBudget(Instant instant, Instant first) {
long calculatedGrowth =
(durationSinceFirst.getStandardMinutes() - rampUpIntervalMinutes) / rampUpIntervalMinutes;
long growth = Math.max(0, calculatedGrowth);
return baseBatchBudget * Math.pow(1.5, growth);
return baseBatchBudget * Math.pow(1.5, (double) growth);
}

void recordWriteCount(Instant instant, int numWrites) {
Expand Down Expand Up @@ -801,7 +801,8 @@ private BackoffResult doBackoff() {

double currentIntervalMillis =
Math.min(
initialBackoff.getMillis() * Math.pow(1.5, attempt - 1), MAX_BACKOFF.getMillis());
initialBackoff.getMillis() * Math.pow(1.5, attempt - 1),
(double) MAX_BACKOFF.getMillis());
double randomOffset =
(rand.nextDouble() * 2 - 1) * RANDOMIZATION_FACTOR * currentIntervalMillis;
long nextBackoffMillis = Math.round(currentIntervalMillis + randomOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testProcessElementAndGetSize() throws IOException, InterruptedExcept
testToken, tenSecondsAgo, BigDecimal.valueOf(20), Instant.now(), false));
// we should have output 2 100B mutations in the past 10s
long bytesPerSecond = (mutationSize * 2) / 10;
assertEquals(sizeEstimate, bytesPerSecond * watermarkLag, 10);
assertEquals(sizeEstimate, (double) (bytesPerSecond * watermarkLag), 10);
verify(receiver, times(2)).outputWithTimestamp(KV.of(rowKey, mockMutation), Instant.EPOCH);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public class NullSizeEstimatorTest {
@Test
public void alwaysReturns0AsEstimatedThroughput() {
final NullSizeEstimator<byte[]> estimator = new NullSizeEstimator<>();
assertEquals(estimator.sizeOf(new byte[40]), 0D, DELTA);
assertEquals(estimator.sizeOf(new byte[20]), 0D, DELTA);
assertEquals(estimator.sizeOf(new byte[10]), 0D, DELTA);
assertEquals((double) estimator.sizeOf(new byte[40]), 0D, DELTA);
assertEquals((double) estimator.sizeOf(new byte[20]), 0D, DELTA);
assertEquals((double) estimator.sizeOf(new byte[10]), 0D, DELTA);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,9 @@ public void testGetProgressWorkCompletedAndWorkRemaining(
tracker.tryClaim(position);
final Progress progress = tracker.getProgress();

assertEquals(position.getSeconds(), progress.getWorkCompleted(), DELTA);
assertEquals(to.getSeconds() - position.getSeconds(), progress.getWorkRemaining(), DELTA);
assertEquals((double) position.getSeconds(), progress.getWorkCompleted(), DELTA);
assertEquals(
(double) (to.getSeconds() - position.getSeconds()), progress.getWorkRemaining(), DELTA);
}

@Test
Expand All @@ -249,7 +250,7 @@ public void testGetProgressReturnsWorkRemainingAsWholeRangeWhenNoClaimWasAttempt

final Progress progress = tracker.getProgress();
assertEquals(0D, progress.getWorkCompleted(), DELTA);
assertEquals(to.getSeconds(), progress.getWorkRemaining(), DELTA);
assertEquals((double) to.getSeconds(), progress.getWorkRemaining(), DELTA);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ private static Value recordValueFrom(
.putFields(
"number_of_records_in_transaction",
Value.newBuilder()
.setNumberValue(record.getNumberOfRecordsInTransaction())
.setNumberValue((double) record.getNumberOfRecordsInTransaction())
.build())
.putFields(
"number_of_partitions_in_transaction",
Value.newBuilder()
.setNumberValue(record.getNumberOfPartitionsInTransaction())
.setNumberValue((double) record.getNumberOfPartitionsInTransaction())
.build())
.putFields(
"transaction_tag",
Expand Down Expand Up @@ -149,7 +149,9 @@ private static Value columnTypeValueFrom(ColumnType columnType)
Value.newBuilder().setBoolValue(columnType.isPrimaryKey()).build())
.putFields(
"ordinal_position",
Value.newBuilder().setNumberValue(columnType.getOrdinalPosition()).build())
Value.newBuilder()
.setNumberValue((double) columnType.getOrdinalPosition())
.build())
.build())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void recordRpcLatencyMetrics() {
// Update all the latencies
for (Duration d : topicLatencies.getValue()) {
Preconditions.checkArgumentNotNull(topicHistogram);
topicHistogram.update(d.toMillis());
topicHistogram.update((double) d.toMillis());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ public ProcessContinuation processElement(
}
} finally {
if (rawSizesCount > 0L) {
avgRecordSize.update(rawSizesSum, rawSizesCount);
avgRecordSize.update((double) rawSizesSum, rawSizesCount);
rawSizes.update(rawSizesSum, rawSizesCount, rawSizesMin, rawSizesMax);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private List<NamedTestResult> readMetrics(
testId,
timestamp.toString(),
buildMetric("total_bytes_count"),
reader.getCounterMetric("totalBytes.count"));
(double) reader.getCounterMetric("totalBytes.count"));

return Arrays.asList(runtime, totalBytes);
}
Expand Down
Loading