diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index b81634626246..783c136cf1bc 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.beam.examples.common.ExampleUtils; import org.apache.beam.examples.complete.game.utils.GameConstants; +import org.apache.beam.examples.complete.game.utils.WriteToBigQuery; import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -189,18 +190,17 @@ public interface Options extends LeaderBoard.Options { * Create a map of information that describes how to write pipeline output to BigQuery. This map * is used to write information about team score sums. */ - protected static Map>> + protected static Map>> configureWindowedWrite() { - Map>> tableConfigure = - new HashMap<>(); + Map>> tableConfigure = new HashMap<>(); tableConfigure.put( - "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); + "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); tableConfigure.put( "total_score", - new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); tableConfigure.put( "window_start", - new WriteWindowedToBigQuery.FieldInfo<>( + new WriteToBigQuery.FieldInfo<>( "STRING", (c, w) -> { IntervalWindow window = (IntervalWindow) w; @@ -208,7 +208,7 @@ public interface Options extends LeaderBoard.Options { })); tableConfigure.put( "processing_time", - new WriteWindowedToBigQuery.FieldInfo<>( + new WriteToBigQuery.FieldInfo<>( "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); return tableConfigure; } @@ -217,20 +217,19 @@ public interface Options extends LeaderBoard.Options { * Create a map of information that describes how to write pipeline output to BigQuery. This map * is used to write information about mean user session time. */ - protected static Map> - configureSessionWindowWrite() { + protected static Map> configureSessionWindowWrite() { - Map> tableConfigure = new HashMap<>(); + Map> tableConfigure = new HashMap<>(); tableConfigure.put( "window_start", - new WriteWindowedToBigQuery.FieldInfo<>( + new WriteToBigQuery.FieldInfo<>( "STRING", (c, w) -> { IntervalWindow window = (IntervalWindow) w; return GameConstants.DATE_TIME_FORMATTER.print(window.start()); })); tableConfigure.put( - "mean_duration", new WriteWindowedToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element())); + "mean_duration", new WriteToBigQuery.FieldInfo<>("FLOAT", (c, w) -> c.element())); return tableConfigure; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index b8d81056a3e8..832c0ad79e76 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -130,19 +130,18 @@ public interface Options extends ExampleOptions, StreamingOptions { * Create a map of information that describes how to write pipeline output to BigQuery. This map * is used to write team score sums and includes event timing information. */ - protected static Map>> + protected static Map>> configureWindowedTableWrite() { - Map>> tableConfigure = - new HashMap<>(); + Map>> tableConfigure = new HashMap<>(); tableConfigure.put( - "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); + "team", new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); tableConfigure.put( "total_score", - new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + new WriteToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); tableConfigure.put( "window_start", - new WriteWindowedToBigQuery.FieldInfo<>( + new WriteToBigQuery.FieldInfo<>( "STRING", (c, w) -> { IntervalWindow window = (IntervalWindow) w; @@ -150,12 +149,11 @@ public interface Options extends ExampleOptions, StreamingOptions { })); tableConfigure.put( "processing_time", - new WriteWindowedToBigQuery.FieldInfo<>( + new WriteToBigQuery.FieldInfo<>( "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); tableConfigure.put( "timing", - new WriteWindowedToBigQuery.FieldInfo<>( - "STRING", (c, w) -> c.pane().getTiming().toString())); + new WriteToBigQuery.FieldInfo<>("STRING", (c, w) -> c.pane().getTiming().toString())); return tableConfigure; } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java index 2315f1b811b6..b28db261ab0e 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/game/StatefulTeamScore.java @@ -100,16 +100,12 @@ public interface Options extends LeaderBoard.Options { */ private static Map>> configureCompleteWindowedTableWrite() { - Map>> tableConfigure = - new HashMap<>(); - tableConfigure.put( - "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); - tableConfigure.put( - "total_score", - new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); + Map>> tableConfigure = new HashMap<>(); + tableConfigure.put("team", new FieldInfo<>("STRING", (c, w) -> c.element().getKey())); + tableConfigure.put("total_score", new FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); tableConfigure.put( "processing_time", - new WriteWindowedToBigQuery.FieldInfo<>( + new FieldInfo<>( "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); return tableConfigure; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java index c822f3c77d42..be88fd9b4ca3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.joda.time.Duration; @@ -68,8 +69,9 @@ public void after() { private void add(long... instants) { for (final long instant : instants) { System.out.println("ADD " + instant); - Sessions.AssignContext context = - windowFn.new AssignContext() { + WindowFn wf = windowFn; + WindowFn.AssignContext context = + wf.new AssignContext() { @Override public Object element() { return (Object) instant; diff --git a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index e8929b84593a..647297394f52 100644 --- a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -284,10 +284,7 @@ public FlinkPortablePipelineTranslator.Executor translate( return context; } - private void urnNotFound( - String id, - RunnerApi.Pipeline pipeline, - FlinkStreamingPortablePipelineTranslator.TranslationContext context) { + private void urnNotFound(String id, RunnerApi.Pipeline pipeline, TranslationContext context) { throw new IllegalArgumentException( String.format( "Unknown type of URN %s for PTransform with id %s.", diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java index 8f0ecf3efbda..7d9263678a28 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.jobsubmission.JobInvocation; import org.apache.beam.runners.jobsubmission.JobInvoker; +import org.apache.beam.runners.jobsubmission.JobServerDriver; import org.apache.beam.runners.jobsubmission.PortablePipelineResult; import org.apache.beam.runners.jobsubmission.PortablePipelineRunner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; @@ -210,7 +211,7 @@ private void stopJobService() throws InterruptedException { } } - private class DetachedJobInvokerFactory implements FlinkJobServerDriver.JobInvokerFactory { + private class DetachedJobInvokerFactory implements JobServerDriver.JobInvokerFactory { private CountDownLatch latch = new CountDownLatch(1); private volatile PortablePipelineRunner actualPipelineRunner; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java index caa5a1788c86..77dc4d795167 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java @@ -284,10 +284,7 @@ public FlinkPortablePipelineTranslator.Executor translate( return context; } - private void urnNotFound( - String id, - RunnerApi.Pipeline pipeline, - FlinkStreamingPortablePipelineTranslator.TranslationContext context) { + private void urnNotFound(String id, RunnerApi.Pipeline pipeline, TranslationContext context) { throw new IllegalArgumentException( String.format( "Unknown type of URN %s for PTransform with id %s.", diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java index 663af0540852..52c0db92c39d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainerTest.java @@ -99,8 +99,8 @@ public void testCounter() { @Test public void testGauge() { - FlinkMetricContainer.FlinkGauge flinkGauge = - new FlinkMetricContainer.FlinkGauge(GaugeResult.empty()); + FlinkMetricContainerBase.FlinkGauge flinkGauge = + new FlinkMetricContainerBase.FlinkGauge(GaugeResult.empty()); when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge); MetricsContainer step = container.getMetricsContainer("step"); @@ -249,8 +249,8 @@ public boolean matches(FlinkDistributionGauge argument) { @Test public void testDistribution() { - FlinkMetricContainer.FlinkDistributionGauge flinkGauge = - new FlinkMetricContainer.FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT); + FlinkDistributionGauge flinkGauge = + new FlinkDistributionGauge(DistributionResult.IDENTITY_ELEMENT); when(metricGroup.gauge(eq("namespace.name"), any())).thenReturn(flinkGauge); MetricsContainer step = container.getMetricsContainer("step"); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java index c1db7efdf088..b00c1494e2ff 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/common/worker/CachingShuffleBatchReaderTest.java @@ -82,7 +82,7 @@ public void readerShouldRereadEvictedBatches() throws IOException, ExecutionExce verify(base, times(1)).read(null, null); CachingShuffleBatchReader.BatchRange range = new CachingShuffleBatchReader.BatchRange(null, null); - CachingShuffleBatchReader.Batch batch = reader.cache.get(range); + ShuffleBatchReader.Batch batch = reader.cache.get(range); assertThat(batch, notNullValue()); reader.cache.invalidateAll(); read = reader.read(null, null); diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java index 8ff29ecf3838..09b51349aa49 100644 --- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java +++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java @@ -50,7 +50,7 @@ public final class ClassicBundleManagerTest { private FutureCollector mockFutureCollector; private ClassicBundleManager bundleManager; - private ClassicBundleManager.BundleProgressListener bundleProgressListener; + private BundleManager.BundleProgressListener bundleProgressListener; private Scheduler> mockScheduler; @Before diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java index 205cba38c11d..10d159400375 100644 --- a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java @@ -21,7 +21,7 @@ import com.google.protobuf.Message; import org.checkerframework.checker.nullness.qual.Nullable; -public class DummyRateLimitPolicy implements GoogleAdsV23.RateLimitPolicy { +public class DummyRateLimitPolicy implements GoogleAdsIO.RateLimitPolicy { @Override public void onBeforeRequest(@Nullable String developerToken, String customerId, Message request) throws InterruptedException {} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index c2e891145acd..e55eef5cd6b6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -58,7 +58,6 @@ import org.apache.beam.sdk.transforms.errorhandling.BadRecord; import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter; import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler; -import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler; import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; @@ -355,7 +354,7 @@ public TypedRead fromConfigRow(Row configRow, PipelineOptions options) { // from older Beam versions. // See https://github.com/apache/beam/issues/30534. builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); - builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>()); + builder.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()); } else { byte[] badRecordRouter = configRow.getBytes("bad_record_router"); builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); @@ -923,7 +922,7 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { // from older Beam versions. // See https://github.com/apache/beam/issues/30534. builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); - builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>()); + builder.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()); } else { byte[] badRecordRouter = configRow.getBytes("bad_record_router"); builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java index c258ce4ab7fb..c9df1e4dad02 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeBigQueryServices.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ListCoder; @@ -128,7 +129,7 @@ public static KV> decodeQueryResult(String queryResult) th // Longs tend to get converted back to Integers due to JSON serialization. Convert them back. public static TableRow convertNumbers(TableRow tableRow) { - for (TableRow.Entry entry : tableRow.entrySet()) { + for (Map.Entry entry : tableRow.entrySet()) { if (entry.getValue() instanceof Integer) { entry.setValue(Long.valueOf((Integer) entry.getValue())); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index e9b1e25a7afc..a401ff978bd6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -19,8 +19,8 @@ import static org.junit.Assert.assertEquals; -import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponseException; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.gax.grpc.GrpcStatusCode; @@ -810,7 +810,7 @@ Map> getInsertErrors() { void throwNotFound(@FormatString String format, Object... args) throws IOException { throw new IOException( String.format(format, args), - new GoogleJsonResponseException.Builder(404, String.format(format, args), new HttpHeaders()) + new HttpResponseException.Builder(404, String.format(format, args), new HttpHeaders()) .build()); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index 10932162d30c..a0177c2cb484 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -1065,7 +1065,7 @@ public void testBatchableMutationFilterFn_cells() { BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 10000000, 3 * CELLS_PER_KEY, 1000); - BatchableMutationFilterFn.ProcessContext mockProcessContext = + DoFn.ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class); when(mockProcessContext.sideInput(any())).thenReturn(getSchema()); @@ -1195,7 +1195,7 @@ public void testBatchableMutationFilterFn_rows() { BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 1000, 1000, 3); - BatchableMutationFilterFn.ProcessContext mockProcessContext = + DoFn.ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class); when(mockProcessContext.sideInput(any())).thenReturn(getSchema()); @@ -1246,7 +1246,7 @@ public void testBatchableMutationFilterFn_batchingDisabled() { BatchableMutationFilterFn testFn = new BatchableMutationFilterFn(null, null, 0, 0, 0); - BatchableMutationFilterFn.ProcessContext mockProcessContext = + DoFn.ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class); when(mockProcessContext.sideInput(any())).thenReturn(getSchema()); @@ -1280,9 +1280,9 @@ public void testGatherSortAndBatchFn() throws Exception { 100, // groupingFactor null); - GatherSortCreateBatchesFn.ProcessContext mockProcessContext = + DoFn>.ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class); - GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext = + DoFn>.FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class); when(mockProcessContext.sideInput(any())).thenReturn(getSchema()); @@ -1355,9 +1355,9 @@ public void testGatherBundleAndSortFn_flushOversizedBundle() throws Exception { 3, // groupingFactor null); - GatherSortCreateBatchesFn.ProcessContext mockProcessContext = + DoFn>.ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class); - GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext = + DoFn>.FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class); when(mockProcessContext.sideInput(any())).thenReturn(getSchema()); OutputReceiver> mockOutputReceiver = mock(OutputReceiver.class); @@ -1485,9 +1485,9 @@ public void testBatchFn_rows() throws Exception { } private void testAndVerifyBatches(GatherSortCreateBatchesFn testFn) throws Exception { - GatherSortCreateBatchesFn.ProcessContext mockProcessContext = + DoFn>.ProcessContext mockProcessContext = Mockito.mock(ProcessContext.class); - GatherSortCreateBatchesFn.FinishBundleContext mockFinishBundleContext = + DoFn>.FinishBundleContext mockFinishBundleContext = Mockito.mock(FinishBundleContext.class); when(mockProcessContext.sideInput(any())).thenReturn(getSchema());