diff --git a/flink-formats/flink-avro/pom.xml b/flink-formats/flink-avro/pom.xml
index 262d5c03cf823..831a0aac2e3da 100644
--- a/flink-formats/flink-avro/pom.xml
+++ b/flink-formats/flink-avro/pom.xml
@@ -257,6 +257,7 @@ under the License.
${project.basedir}/src/test/resources/avro
${project.basedir}/target/generated-test-sources/
+ true
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index 9c63c56c4dcba..83c82b054c679 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -29,8 +29,10 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.avro.generic.GenericFixed;
@@ -128,12 +130,13 @@ private static AvroToRowDataConverter createConverter(
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
- return AvroToRowDataConverters::convertToTimestamp;
+ return createTimestampConverter(((TimestampType) type).getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (legacyTimestampMapping) {
throw new UnsupportedOperationException("Unsupported type: " + type);
} else {
- return AvroToRowDataConverters::convertToTimestamp;
+ return createTimestampConverter(
+ ((LocalZonedTimestampType) type).getPrecision());
}
case CHAR:
case VARCHAR:
@@ -211,24 +214,37 @@ private static AvroToRowDataConverter createMapConverter(
};
}
- private static TimestampData convertToTimestamp(Object object) {
- final long millis;
- if (object instanceof Long) {
- millis = (Long) object;
- } else if (object instanceof Instant) {
- millis = ((Instant) object).toEpochMilli();
- } else if (object instanceof LocalDateTime) {
- return TimestampData.fromLocalDateTime((LocalDateTime) object);
- } else {
- JodaConverter jodaConverter = JodaConverter.getConverter();
- if (jodaConverter != null) {
- millis = jodaConverter.convertTimestamp(object);
+ private static AvroToRowDataConverter createTimestampConverter(int precision) {
+ return object -> {
+ if (object instanceof Long) {
+ long val = (Long) object;
+ if (precision <= 3) {
+ return TimestampData.fromEpochMillis(val);
+ } else if (precision <= 6) {
+ long millis = Math.floorDiv(val, 1000L);
+ int nanosOfMilli = (int) (Math.floorMod(val, 1000L) * 1000);
+ return TimestampData.fromEpochMillis(millis, nanosOfMilli);
+ } else {
+ long millis = Math.floorDiv(val, 1000_000L);
+ int nanosOfMilli = (int) Math.floorMod(val, 1000_000L);
+ return TimestampData.fromEpochMillis(millis, nanosOfMilli);
+ }
+ } else if (object instanceof Instant) {
+ return TimestampData.fromInstant((Instant) object);
+ } else if (object instanceof LocalDateTime) {
+ return TimestampData.fromLocalDateTime((LocalDateTime) object);
} else {
- throw new IllegalArgumentException(
- "Unexpected object type for TIMESTAMP logical type. Received: " + object);
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ long millis = jodaConverter.convertTimestamp(object);
+ return TimestampData.fromEpochMillis(millis);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIMESTAMP logical type. Received: "
+ + object);
+ }
}
- }
- return TimestampData.fromEpochMillis(millis);
+ };
}
private static int convertToDate(Object object) {
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
index af7a936b270c9..82853cac3b42f 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java
@@ -25,8 +25,10 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.CollectionUtil;
import org.apache.avro.Schema;
@@ -156,6 +158,7 @@ public Object convert(Schema schema, Object object) {
};
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
+ final int timestampPrecision = ((TimestampType) type).getPrecision();
if (legacyTimestampMapping) {
converter =
new RowDataToAvroConverter() {
@@ -173,15 +176,26 @@ public Object convert(Schema schema, Object object) {
@Override
public Object convert(Schema schema, Object object) {
- return ((TimestampData) object)
- .toLocalDateTime()
- .toInstant(ZoneOffset.UTC)
- .toEpochMilli();
+ java.time.Instant instant =
+ ((TimestampData) object)
+ .toLocalDateTime()
+ .toInstant(ZoneOffset.UTC);
+ if (timestampPrecision <= 3) {
+ return instant.toEpochMilli();
+ } else if (timestampPrecision <= 6) {
+ return instant.getEpochSecond() * 1_000_000L
+ + instant.getNano() / 1000L;
+ } else {
+ return instant.getEpochSecond() * 1_000_000_000L
+ + instant.getNano();
+ }
}
};
}
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ final int localZonedTimestampPrecision =
+ ((LocalZonedTimestampType) type).getPrecision();
if (legacyTimestampMapping) {
throw new UnsupportedOperationException("Unsupported type: " + type);
} else {
@@ -191,7 +205,17 @@ public Object convert(Schema schema, Object object) {
@Override
public Object convert(Schema schema, Object object) {
- return ((TimestampData) object).toInstant().toEpochMilli();
+ java.time.Instant instant =
+ ((TimestampData) object).toInstant();
+ if (localZonedTimestampPrecision <= 3) {
+ return instant.toEpochMilli();
+ } else if (localZonedTimestampPrecision <= 6) {
+ return instant.getEpochSecond() * 1_000_000L
+ + instant.getNano() / 1000L;
+ } else {
+ return instant.getEpochSecond() * 1_000_000_000L
+ + instant.getNano();
+ }
}
};
}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
index 8bf5cbe2c07f7..2dcf6e930f07e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java
@@ -194,7 +194,8 @@ private static TypeInformation> convertToTypeInfo(
case LONG:
if (legacyTimestampMapping) {
if (schema.getLogicalType() == LogicalTypes.timestampMillis()
- || schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+ || schema.getLogicalType() == LogicalTypes.timestampMicros()
+ || schema.getLogicalType() == LogicalTypes.timestampNanos()) {
return Types.SQL_TIMESTAMP;
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()
|| schema.getLogicalType() == LogicalTypes.timeMillis()) {
@@ -203,10 +204,12 @@ private static TypeInformation> convertToTypeInfo(
} else {
// Avro logical timestamp types to Flink DataStream timestamp types
if (schema.getLogicalType() == LogicalTypes.timestampMillis()
- || schema.getLogicalType() == LogicalTypes.timestampMicros()) {
+ || schema.getLogicalType() == LogicalTypes.timestampMicros()
+ || schema.getLogicalType() == LogicalTypes.timestampNanos()) {
return Types.INSTANT;
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()
- || schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
+ || schema.getLogicalType() == LogicalTypes.localTimestampMicros()
+ || schema.getLogicalType() == LogicalTypes.localTimestampNanos()) {
return Types.LOCAL_DATE_TIME;
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()
|| schema.getLogicalType() == LogicalTypes.timeMillis()) {
@@ -350,6 +353,8 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping)
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.timestampNanos()) {
+ return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
@@ -358,6 +363,8 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping)
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
+ } else if (schema.getLogicalType() == LogicalTypes.localTimestampNanos()) {
+ return DataTypes.TIMESTAMP(9).notNull();
}
}
@@ -479,12 +486,14 @@ public static Schema convertToSchema(
avroLogicalType = LogicalTypes.localTimestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.localTimestampMicros();
+ } else if (precision <= 9) {
+ avroLogicalType = LogicalTypes.localTimestampNanos();
} else {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type "
+ "with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only supports precision less than 9.");
}
}
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
@@ -501,12 +510,14 @@ public static Schema convertToSchema(
avroLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.timestampMicros();
+ } else if (precision <= 9) {
+ avroLogicalType = LogicalTypes.timestampNanos();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type "
+ "with precision: "
+ precision
- + ", it only supports precision less than 6.");
+ + ", it only supports precision less than 9.");
}
timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
index 1347366cf8551..5dbca399504ee 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatITCase.java
@@ -25,7 +25,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroOutputFormat.Codec;
import org.apache.flink.formats.avro.generated.Colors;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -185,12 +184,11 @@ public User map(Tuple3 value) {
user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
- user.setTypeDecimalBytes(
- ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
- user.setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
return user;
}
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
index 92f43ac7d6562..217c0c12fa0fe 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroOutputFormatTest.java
@@ -23,7 +23,6 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.generated.Colors;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.mock.Whitebox;
@@ -198,13 +197,12 @@ public int getAttemptNumber() {
user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
- user.setTypeDecimalBytes(
- ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
- user.setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
outputFormat.writeRecord(user);
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
index a3e2e6c0914d3..0f5fa12990a25 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRecordInputFormatTest.java
@@ -31,7 +31,6 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils;
@@ -139,12 +138,11 @@ public static void writeTestFile(File testFile) throws IOException {
user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ user1.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
- user1.setTypeDecimalBytes(
- ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
- user1.setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
// Construct via builder
User user2 =
@@ -179,14 +177,12 @@ public static void writeTestFile(File testFile) throws IOException {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
// 20.00
- .setTypeDecimalBytes(
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
// 20.00
- .setTypeDecimalFixed(
- new Fixed2(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
DatumWriter userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter dataFileWriter = new DataFileWriter<>(userDatumWriter);
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
index 84f9994f6665c..5349409823279 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
@@ -117,6 +117,7 @@ void testSerializeDeserialize(AvroEncoding encoding) throws Exception {
FIELD("date", DATE()),
FIELD("timestamp3", TIMESTAMP(3)),
FIELD("timestamp3_2", TIMESTAMP(3)),
+ FIELD("timestamp9", TIMESTAMP(9)),
FIELD("map", MAP(STRING(), BIGINT())),
FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))),
FIELD("map2array", MAP(STRING(), ARRAY(INT()))),
@@ -124,7 +125,7 @@ void testSerializeDeserialize(AvroEncoding encoding) throws Exception {
.notNull();
final RowType rowType = (RowType) dataType.getLogicalType();
- final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
+ final Schema schema = AvroSchemaConverter.convertToSchema(rowType, false);
final GenericRecord record = new GenericData.Record(schema);
record.put(0, true);
record.put(1, (int) Byte.MAX_VALUE);
@@ -149,34 +150,35 @@ void testSerializeDeserialize(AvroEncoding encoding) throws Exception {
record.put(12, 10087);
record.put(13, 1589530213123L);
record.put(14, 1589530213122L);
+ record.put(15, 1589530213123456789L);
Map map = new HashMap<>();
map.put("flink", 12L);
map.put("avro", 23L);
- record.put(15, map);
+ record.put(16, map);
Map> map2map = new HashMap<>();
Map innerMap = new HashMap<>();
innerMap.put("inner_key1", 123);
innerMap.put("inner_key2", 234);
map2map.put("outer_key", innerMap);
- record.put(16, map2map);
+ record.put(17, map2map);
List list1 = Arrays.asList(1, 2, 3, 4, 5, 6);
List list2 = Arrays.asList(11, 22, 33, 44, 55);
Map> map2list = new HashMap<>();
map2list.put("list1", list1);
map2list.put("list2", list2);
- record.put(17, map2list);
+ record.put(18, map2list);
Map map2 = new HashMap<>();
map2.put("key1", null);
- record.put(18, map2);
+ record.put(19, map2);
AvroRowDataSerializationSchema serializationSchema =
- createSerializationSchema(dataType, encoding, true);
+ createSerializationSchema(dataType, encoding, false);
AvroRowDataDeserializationSchema deserializationSchema =
- createDeserializationSchema(dataType, encoding, true);
+ createDeserializationSchema(dataType, encoding, false);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema);
@@ -358,6 +360,10 @@ void testTimestampTypeLegacyMapping() throws Exception {
.isEqualTo(new AtomicDataType(new BigIntType(false)));
assertThat(dataType.getChildren().get(3))
.isEqualTo(new AtomicDataType(new BigIntType(false)));
+ assertThat(dataType.getChildren().get(4))
+ .isEqualTo(new AtomicDataType(new BigIntType(false)));
+ assertThat(dataType.getChildren().get(5))
+ .isEqualTo(new AtomicDataType(new BigIntType(false)));
assertThatThrownBy(() -> createSerializationSchema(dataType, AvroEncoding.BINARY, true))
.isInstanceOf(IllegalArgumentException.class)
@@ -396,10 +402,14 @@ void testTimestampTypeNewMapping() throws Exception {
RowData rowData2 = deserializationSchema.deserialize(output);
assertThat(rowData2).isEqualTo(rowData);
- assertThat(rowData.getTimestamp(2, 3).toLocalDateTime().toString())
+ assertThat(rowData.getTimestamp(2, 9).toInstant().toString())
+ .isEqualTo("1970-01-01T00:00:00.123456789Z");
+ assertThat(rowData.getTimestamp(3, 3).toLocalDateTime().toString())
.isEqualTo("2014-03-01T12:12:12.321");
- assertThat(rowData.getTimestamp(3, 6).toLocalDateTime().toString())
- .isEqualTo("1970-01-01T00:02:03.456");
+ assertThat(rowData.getTimestamp(4, 6).toLocalDateTime().toString())
+ .isEqualTo("1970-01-01T00:00:00.123456");
+ assertThat(rowData.getTimestamp(5, 9).toLocalDateTime().toString())
+ .isEqualTo("1970-01-01T00:00:00.123456789");
}
private AvroRowDataSerializationSchema createSerializationSchema(
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
index f6fe06342c2bb..f082262306c38 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroSplittableInputFormatTest.java
@@ -25,7 +25,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.avro.file.DataFileWriter;
@@ -119,12 +118,11 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
user1.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user1.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user1.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ user1.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
- user1.setTypeDecimalBytes(
- ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user1.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
- user1.setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user1.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
// Construct via builder
User user2 =
@@ -159,14 +157,12 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
// 20.00
- .setTypeDecimalBytes(
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
// 20.00
- .setTypeDecimalFixed(
- new Fixed2(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
DatumWriter userDatumWriter = new SpecificDatumWriter<>(User.class);
DataFileWriter dataFileWriter = new DataFileWriter<>(userDatumWriter);
@@ -198,12 +194,11 @@ void createFiles(@TempDir java.nio.file.Path tempDir) throws IOException {
user.setTypeTimeMicros(LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS));
user.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"));
user.setTypeTimestampMicros(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ user.setTypeTimestampNanos(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
// 20.00
- user.setTypeDecimalBytes(
- ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.setTypeDecimalBytes(BigDecimal.valueOf(2000, 2));
// 20.00
- user.setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ user.setTypeDecimalFixed(BigDecimal.valueOf(2000, 2));
dataFileWriter.append(user);
}
@@ -234,7 +229,7 @@ void testSplittedIF() throws IOException {
format.close();
}
- assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
+ assertThat(elementsPerSplit).containsExactly(1564, 1173, 1173, 1090);
assertThat(elements).isEqualTo(NUM_RECORDS);
format.close();
}
@@ -280,7 +275,7 @@ void testAvroRecoveryWithFailureAtStart() throws Exception {
format.close();
}
- assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
+ assertThat(elementsPerSplit).containsExactly(1564, 1173, 1173, 1090);
assertThat(elements).isEqualTo(NUM_RECORDS);
format.close();
}
@@ -326,7 +321,7 @@ void testAvroRecovery() throws Exception {
format.close();
}
- assertThat(elementsPerSplit).containsExactly(1604, 1203, 1203, 990);
+ assertThat(elementsPerSplit).containsExactly(1564, 1173, 1173, 1090);
assertThat(elements).isEqualTo(NUM_RECORDS);
format.close();
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
index 65b470f0b5fd8..67ea9ea01a047 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/EncoderDecoderTest.java
@@ -21,7 +21,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
@@ -299,12 +298,9 @@ void testGeneratedObjectWithNullableFields() {
LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
Instant.parse("2014-03-01T12:12:12.321Z"),
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()), // 20.00
- new Fixed2(
- BigDecimal.valueOf(2000, 2)
- .unscaledValue()
- .toByteArray())); // 20.00
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS),
+ BigDecimal.valueOf(2000, 2),
+ BigDecimal.valueOf(2000, 2));
testObjectSerialization(user);
}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
index 0cf0033341c3b..9f2eda069fd97 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
@@ -610,6 +610,7 @@ private void validateUserSchema(TypeInformation> actual) {
"type_time_micros",
"type_timestamp_millis",
"type_timestamp_micros",
+ "type_timestamp_nanos",
"type_decimal_bytes",
"type_decimal_fixed"
},
@@ -634,6 +635,7 @@ private void validateUserSchema(TypeInformation> actual) {
Types.SQL_TIME,
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP,
+ Types.SQL_TIMESTAMP,
Types.BIG_DEC,
Types.BIG_DEC);
@@ -649,11 +651,15 @@ private void validateTimestampsSchema(TypeInformation> actual) {
new String[] {
"type_timestamp_millis",
"type_timestamp_micros",
+ "type_timestamp_nanos",
"type_local_timestamp_millis",
- "type_local_timestamp_micros"
+ "type_local_timestamp_micros",
+ "type_local_timestamp_nanos"
},
Types.INSTANT,
Types.INSTANT,
+ Types.INSTANT,
+ Types.LOCAL_DATE_TIME,
Types.LOCAL_DATE_TIME,
Types.LOCAL_DATE_TIME);
final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps;
@@ -666,11 +672,15 @@ private void validateLegacyTimestampsSchema(TypeInformation> actual) {
new String[] {
"type_timestamp_millis",
"type_timestamp_micros",
+ "type_timestamp_nanos",
"type_local_timestamp_millis",
- "type_local_timestamp_micros"
+ "type_local_timestamp_micros",
+ "type_local_timestamp_nanos"
},
Types.SQL_TIMESTAMP,
Types.SQL_TIMESTAMP,
+ Types.SQL_TIMESTAMP,
+ Types.LONG,
Types.LONG,
Types.LONG);
final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps;
@@ -684,12 +694,16 @@ private void validateLegacyTimestampsSchema(DataType actual) {
"type_timestamp_millis", DataTypes.TIMESTAMP(3).notNull()),
DataTypes.FIELD(
"type_timestamp_micros", DataTypes.TIMESTAMP(6).notNull()),
+ DataTypes.FIELD(
+ "type_timestamp_nanos", DataTypes.TIMESTAMP(9).notNull()),
DataTypes.FIELD(
"type_local_timestamp_millis",
DataTypes.BIGINT().notNull()),
DataTypes.FIELD(
"type_local_timestamp_micros",
- DataTypes.BIGINT().notNull()))
+ DataTypes.BIGINT().notNull()),
+ DataTypes.FIELD(
+ "type_local_timestamp_nanos", DataTypes.BIGINT().notNull()))
.notNull();
assertThat(actual).isEqualTo(timestamps);
@@ -704,12 +718,18 @@ private void validateTimestampsSchema(DataType actual) {
DataTypes.FIELD(
"type_timestamp_micros",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull()),
+ DataTypes.FIELD(
+ "type_timestamp_nanos",
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull()),
DataTypes.FIELD(
"type_local_timestamp_millis",
DataTypes.TIMESTAMP(3).notNull()),
DataTypes.FIELD(
"type_local_timestamp_micros",
- DataTypes.TIMESTAMP(6).notNull()))
+ DataTypes.TIMESTAMP(6).notNull()),
+ DataTypes.FIELD(
+ "type_local_timestamp_nanos",
+ DataTypes.TIMESTAMP(9).notNull()))
.notNull();
assertThat(actual).isEqualTo(timestamps);
@@ -765,6 +785,8 @@ private void validateUserSchema(DataType actual) {
"type_timestamp_millis", DataTypes.TIMESTAMP(3).notNull()),
DataTypes.FIELD(
"type_timestamp_micros", DataTypes.TIMESTAMP(6).notNull()),
+ DataTypes.FIELD(
+ "type_timestamp_nanos", DataTypes.BIGINT().notNull()),
DataTypes.FIELD(
"type_decimal_bytes", DataTypes.DECIMAL(4, 2).notNull()),
DataTypes.FIELD(
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
index 6975e7c882ec8..16b09406a422e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroTypeExtractionTest.java
@@ -102,14 +102,15 @@ void testSimpleAvroRead(boolean useMiniCluster, @InjectMiniCluster MiniCluster m
+ "\"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, "
+ "\"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], "
+ "\"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", "
- + "\"type_map\": {\"KEY 2\": 17554, \"KEY 1\": 8546456}, \"type_fixed\": null, \"type_union\": null, "
+ + "\"type_map\": {\"KEY 1\": 8546456, \"KEY 2\": 17554}, \"type_fixed\": null, \"type_union\": null, "
+ "\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", "
+ "\"state\": \"London\", \"zip\": \"NW1 6XE\"}, "
+ "\"type_bytes\": \"\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\\u0000\", "
+ "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
+ "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", "
+ "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", "
- + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n"
+ + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", "
+ + "\"type_decimal_bytes\": 20.00, \"type_decimal_fixed\": 20.00}\n"
+ "{\"name\": \"Charlie\", \"favorite_number\": null, "
+ "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, "
+ "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], "
@@ -121,8 +122,9 @@ void testSimpleAvroRead(boolean useMiniCluster, @InjectMiniCluster MiniCluster m
+ "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
+ "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", "
+ "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", "
- + "\"type_decimal_bytes\": \"\\u0007Ð\", "
- + "\"type_decimal_fixed\": [7, -48]}\n";
+ + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", "
+ + "\"type_decimal_bytes\": 20.00, "
+ + "\"type_decimal_fixed\": 20.00}\n";
}
@ParameterizedTest
@@ -163,7 +165,8 @@ void testSerializeWithAvro(boolean useMiniCluster, @InjectMiniCluster MiniCluste
+ "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
+ "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", "
+ "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", "
- + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n"
+ + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", "
+ + "\"type_decimal_bytes\": 20.00, \"type_decimal_fixed\": 20.00}\n"
+ "{\"name\": \"Charlie\", \"favorite_number\": null, "
+ "\"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, "
+ "\"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], "
@@ -175,7 +178,8 @@ void testSerializeWithAvro(boolean useMiniCluster, @InjectMiniCluster MiniCluste
+ "\"type_date\": \"2014-03-01\", \"type_time_millis\": \"12:12:12\", \"type_time_micros\": \"00:00:00.123456\", "
+ "\"type_timestamp_millis\": \"2014-03-01T12:12:12.321Z\", "
+ "\"type_timestamp_micros\": \"1970-01-01T00:00:00.123456Z\", "
- + "\"type_decimal_bytes\": \"\\u0007Ð\", \"type_decimal_fixed\": [7, -48]}\n";
+ + "\"type_timestamp_nanos\": \"1970-01-01T00:00:00.123456789Z\", "
+ + "\"type_decimal_bytes\": 20.00, \"type_decimal_fixed\": 20.00}\n";
}
@ParameterizedTest
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index b4e12f8ca10ee..b2fb563e61e89 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -24,7 +24,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.Timestamps;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.formats.avro.typeutils.AvroSerializerLargeGenericRecordTest;
@@ -107,20 +106,15 @@ public final class AvroTestUtils {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
// byte array must contain the two's-complement representation of the
// unscaled integer value in big-endian byte order
- .setTypeDecimalBytes(
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
- // array of length n can store at most
- // Math.floor(Math.log10(Math.pow(2, 8 * n - 1) - 1))
- // base-10 digits of precision
- .setTypeDecimalFixed(
- new Fixed2(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
+ .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
- final Row rowUser = new Row(23);
+ final Row rowUser = new Row(24);
rowUser.setField(0, "Charlie");
rowUser.setField(1, null);
rowUser.setField(2, "blue");
@@ -144,8 +138,10 @@ public final class AvroTestUtils {
rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
rowUser.setField(
20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
- rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+ rowUser.setField(
+ 21, Timestamp.from(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)));
rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+ rowUser.setField(23, BigDecimal.valueOf(2000, 2));
final Tuple3, SpecificRecord, Row> t = new Tuple3<>();
t.f0 = User.class;
@@ -176,7 +172,8 @@ public static Tuple3 getGenericTestData() {
+ "{\"name\":\"type_time_millis\",\"type\":{\"type\":\"int\",\"logicalType\":\"time-millis\"}},{\"name\":\"type_time_micros\","
+ "\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}},{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
+ "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
- + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
+ + "\"logicalType\":\"timestamp-micros\"}},{\"name\":\"type_timestamp_nanos\",\"type\":{\"type\":\"long\","
+ + "\"logicalType\":\"timestamp-nanos\"}},{\"name\":\"type_decimal_bytes\",\"type\":{\"type\":\"bytes\","
+ "\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}},{\"name\":\"type_decimal_fixed\",\"type\":{\"type\":\"fixed\","
+ "\"name\":\"Fixed2\",\"size\":2,\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}";
final Schema schema = new Schema.Parser().parse(schemaString);
@@ -222,6 +219,9 @@ public static Tuple3 getGenericTestData() {
user.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z"));
user.put(
"type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ user.put(
+ "type_timestamp_nanos",
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
user.put(
"type_decimal_bytes",
ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
@@ -231,7 +231,7 @@ public static Tuple3 getGenericTestData() {
schema.getField("type_decimal_fixed").schema(),
BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
- final Row rowUser = new Row(23);
+ final Row rowUser = new Row(24);
rowUser.setField(0, "Charlie");
rowUser.setField(1, null);
rowUser.setField(2, "blue");
@@ -255,8 +255,10 @@ public static Tuple3 getGenericTestData() {
rowUser.setField(19, Timestamp.valueOf("2014-03-01 12:12:12.321"));
rowUser.setField(
20, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
- rowUser.setField(21, BigDecimal.valueOf(2000, 2));
+ rowUser.setField(
+ 21, Timestamp.from(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)));
rowUser.setField(22, BigDecimal.valueOf(2000, 2));
+ rowUser.setField(23, BigDecimal.valueOf(2000, 2));
final Tuple3 t = new Tuple3<>();
t.f0 = user;
@@ -274,34 +276,49 @@ public static Tuple3 getGenericTestData() {
+ "\"fields\": [{\"name\":\"type_timestamp_millis\",\"type\":{\"type\":\"long\","
+ "\"logicalType\":\"timestamp-millis\"}},{\"name\":\"type_timestamp_micros\",\"type\":{\"type\":\"long\","
+ "\"logicalType\":\"timestamp-micros\"}},{\"name\": \"type_local_timestamp_millis\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-millis\"}},"
- + "{\"name\": \"type_local_timestamp_micros\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}}]}";
+ + "{\"name\": \"type_local_timestamp_micros\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-micros\"}},"
+ + "{\"name\": \"type_timestamp_nanos\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}},"
+ + "{\"name\": \"type_local_timestamp_nanos\", \"type\": {\"type\": \"long\", \"logicalType\": \"local-timestamp-nanos\"}}]}";
final Schema schema = new Schema.Parser().parse(schemaString);
final GenericRecord timestampRecord = new GenericData.Record(schema);
timestampRecord.put("type_timestamp_millis", Instant.parse("2014-03-01T12:12:12.321Z"));
timestampRecord.put(
"type_timestamp_micros", Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS));
+ timestampRecord.put(
+ "type_timestamp_nanos",
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS));
timestampRecord.put(
"type_local_timestamp_millis", LocalDateTime.parse("2014-03-01T12:12:12.321"));
timestampRecord.put(
"type_local_timestamp_micros", LocalDateTime.parse("1970-01-01T00:00:00.123456"));
+ timestampRecord.put(
+ "type_local_timestamp_nanos", LocalDateTime.parse("1970-01-01T00:00:00.123456789"));
final Timestamps timestamps =
Timestamps.newBuilder()
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
.setTypeLocalTimestampMillis(LocalDateTime.parse("2014-03-01T12:12:12.321"))
.setTypeLocalTimestampMicros(
LocalDateTime.parse("1970-01-01T00:00:00.123456"))
+ .setTypeLocalTimestampNanos(
+ LocalDateTime.parse("1970-01-01T00:00:00.123456789"))
.build();
- final Row timestampRow = new Row(4);
+ final Row timestampRow = new Row(6);
timestampRow.setField(0, Timestamp.valueOf("2014-03-01 12:12:12.321"));
timestampRow.setField(
1, Timestamp.from(Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS)));
timestampRow.setField(2, Timestamp.valueOf(LocalDateTime.parse("2014-03-01T12:12:12.321")));
timestampRow.setField(
3, Timestamp.valueOf(LocalDateTime.parse("1970-01-01T00:00:00.123456")));
+ timestampRow.setField(
+ 4, Timestamp.from(Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS)));
+ timestampRow.setField(
+ 5, Timestamp.valueOf(LocalDateTime.parse("1970-01-01T00:00:00.123456789")));
final Tuple4, SpecificRecord, GenericRecord, Row> t =
new Tuple4<>();
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index 13686bb2b5f23..01b63aeaee44e 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -21,7 +21,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.SimpleUser;
import org.apache.flink.formats.avro.generated.User;
@@ -62,8 +61,9 @@ public static User generateRandomUser(Random rnd) {
LocalTime.ofSecondOfDay(0).plus(123456L, ChronoUnit.MICROS),
Instant.parse("2014-03-01T12:12:12.321Z"),
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS),
- ByteBuffer.wrap(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()),
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()));
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS),
+ BigDecimal.valueOf(2000, 2),
+ BigDecimal.valueOf(2000, 2));
}
public static SimpleUser generateRandomSimpleUser(Random rnd) {
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
index 3c97e2269804c..72924a6753b3a 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/table/runtime/batch/AvroTypesITCase.java
@@ -21,7 +21,6 @@
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.generated.Colors;
import org.apache.flink.formats.avro.generated.Fixed16;
-import org.apache.flink.formats.avro.generated.Fixed2;
import org.apache.flink.formats.avro.generated.User;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -84,11 +83,10 @@ class AvroTypesITCase extends AbstractTestBase {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
- .setTypeDecimalBytes(
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
- .setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
+ .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
+ .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
private static final User USER_2 =
@@ -114,11 +112,10 @@ class AvroTypesITCase extends AbstractTestBase {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
- .setTypeDecimalBytes(
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
- .setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
+ .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
+ .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
private static final User USER_3 =
@@ -144,11 +141,10 @@ class AvroTypesITCase extends AbstractTestBase {
.setTypeTimestampMillis(Instant.parse("2014-03-01T12:12:12.321Z"))
.setTypeTimestampMicros(
Instant.ofEpochSecond(0).plus(123456L, ChronoUnit.MICROS))
- .setTypeDecimalBytes(
- ByteBuffer.wrap(
- BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
- .setTypeDecimalFixed(
- new Fixed2(BigDecimal.valueOf(2000, 2).unscaledValue().toByteArray()))
+ .setTypeTimestampNanos(
+ Instant.ofEpochSecond(0).plus(123456789L, ChronoUnit.NANOS))
+ .setTypeDecimalBytes(BigDecimal.valueOf(2000, 2))
+ .setTypeDecimalFixed(BigDecimal.valueOf(2000, 2))
.build();
@Test
@@ -168,15 +164,15 @@ void testAvroToRow() throws Exception {
"+I[Charlie, null, blue, 1337, 1.337, null, false, [], [], null, RED, {}, null, null, "
+ "{\"num\": 42, \"street\": \"Bakerstreet\", \"city\": \"Berlin\", \"state\": \"Berlin\", \"zip\": \"12049\"}, "
+ "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12.345, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
- + "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
+ + "1970-01-01T00:00:00.123456Z, 1970-01-01T00:00:00.123456789Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
+ "+I[Whatever, null, black, 42, 0.0, null, true, [hello], [true], null, GREEN, {}, "
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], null, null, "
+ "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
- + "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
+ + "1970-01-01T00:00:00.123456Z, 1970-01-01T00:00:00.123456789Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]\n"
+ "+I[Terminator, null, yellow, 1, 0.0, null, false, [world], [false], null, GREEN, {}, "
+ "[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], null, null, "
+ "java.nio.HeapByteBuffer[pos=0 lim=10 cap=10], 2014-03-01, 12:12:12, 00:00:00.123456, 2014-03-01T12:12:12.321Z, "
- + "1970-01-01T00:00:00.123456Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]";
+ + "1970-01-01T00:00:00.123456Z, 1970-01-01T00:00:00.123456789Z, java.nio.HeapByteBuffer[pos=0 lim=2 cap=2], [7, -48]]";
TestBaseUtils.compareResultAsText(results, expected);
}
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 0462ec12539e4..f1b3f02b0e172 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -44,6 +44,7 @@
{"name": "type_time_micros", "type": {"type": "long", "logicalType": "time-micros"}},
{"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}},
+ {"name": "type_timestamp_nanos", "type": {"type": "long", "logicalType": "timestamp-nanos"}},
{"name": "type_decimal_bytes", "type": {"type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}},
{"name": "type_decimal_fixed", "type": {"name": "Fixed2", "size": 2, "type": "fixed", "logicalType": "decimal", "precision": 4, "scale": 2}}
]
@@ -122,8 +123,10 @@
"fields": [
{"name": "type_timestamp_millis", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "type_timestamp_micros", "type": {"type": "long", "logicalType": "timestamp-micros"}},
+ {"name": "type_timestamp_nanos", "type": {"type": "long", "logicalType": "timestamp-nanos"}},
{"name": "type_local_timestamp_millis", "type": {"type": "long", "logicalType": "local-timestamp-millis"}},
- {"name": "type_local_timestamp_micros", "type": {"type": "long", "logicalType": "local-timestamp-micros"}}
+ {"name": "type_local_timestamp_micros", "type": {"type": "long", "logicalType": "local-timestamp-micros"}},
+ {"name": "type_local_timestamp_nanos", "type": {"type": "long", "logicalType": "local-timestamp-nanos"}}
]
}
]
diff --git a/pom.xml b/pom.xml
index 2bd47d01bdccb..9c665dec6c852 100644
--- a/pom.xml
+++ b/pom.xml
@@ -148,7 +148,7 @@ under the License.
5.4.0
- 1.11.4
+ 1.12.1
1.9.14.jdk17-redhat-00001
2.20.1