Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:extensions:ml")
implementation project(":sdks:java:managed")
implementation library.java.avro
implementation library.java.bigdataoss_util
implementation library.java.google_api_client
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.snippets.transforms.io.iceberg;

// [START iceberg_schema_and_row]

import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;

public class IcebergBeamSchemaAndRow {
Schema nestedSchema =
Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build();
Schema beamSchema =
Schema.builder()
.addBooleanField("boolean_field")
.addInt32Field("int_field")
.addInt64Field("long_field")
.addFloatField("float_field")
.addDoubleField("double_field")
.addDecimalField("numeric_field")
.addByteArrayField("bytes_field")
.addStringField("string_field")
.addLogicalTypeField("time_field", SqlTypes.TIME)
.addLogicalTypeField("date_field", SqlTypes.DATE)
.addLogicalTypeField("timestamp_field", Timestamp.MICROS)
.addDateTimeField("timestamptz_field")
.addArrayField("array_field", Schema.FieldType.INT32)
.addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32)
.addRowField("struct_field", nestedSchema)
.build();

Row beamRow =
Row.withSchema(beamSchema)
.withFieldValues(
ImmutableMap.<String, Object>builder()
.put("boolean_field", true)
.put("int_field", 1)
.put("long_field", 2L)
.put("float_field", 3.4f)
.put("double_field", 4.5d)
.put("numeric_field", new BigDecimal(67))
.put("bytes_field", new byte[] {1, 2, 3})
.put("string_field", "value")
.put("time_field", LocalTime.now())
.put("date_field", LocalDate.now())
.put("timestamp_field", Instant.now())
.put("timestamptz_field", DateTime.now())
.put("array_field", Arrays.asList(1, 2, 3))
.put("map_field", ImmutableMap.of("a", 1, "b", 2))
.put(
"struct_field",
Row.withSchema(nestedSchema).addValues("nested_value", 123).build())
.build())
.build();
}
// [END iceberg_schema_and_row]
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.snippets.transforms.io.iceberg;

import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;

public class Quickstart {
public static void main(String[] args) {
// [START hadoop_catalog_props]
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "hadoop",
"warehouse", "file://tmp/beam-iceberg-local-quickstart");
// [END hadoop_catalog_props]
}

public static void other() {
// [START biglake_catalog_props]
Map<String, String> catalogProps =
ImmutableMap.of(
"type", "rest",
"uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"warehouse", "gs://biglake-public-nyc-taxi-iceberg",
"header.x-goog-user-project", "$PROJECT_ID",
"rest.auth.type", "google",
"io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO",
"header.X-Iceberg-Access-Delegation", "vended-credentials");
// [END biglake_catalog_props]

// [START managed_iceberg_config]
Map<String, Object> managedConfig =
ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps);
// Note: The table will get created when inserting data (see below)
// [END managed_iceberg_config]

// [START managed_iceberg_insert]
Schema inputSchema =
Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build();

Pipeline p = Pipeline.create();
p.apply(
Create.of(
Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(),
Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(),
Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build()))
.apply(Managed.write("iceberg").withConfig(managedConfig));

p.run();
// [END managed_iceberg_insert]

// [START managed_iceberg_read]
Pipeline q = Pipeline.create();
PCollection<Row> rows =
q.apply(Managed.read("iceberg").withConfig(managedConfig)).getSinglePCollection();

rows.apply(
MapElements.into(TypeDescriptors.voids())
.via(
row -> {
System.out.println(row);
return null;
}));

q.run();
// [END managed_iceberg_read]
}
}
68 changes: 68 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,74 @@ def model_bigqueryio_xlang(
# [END model_bigqueryio_write_with_storage_write_api]


def model_managed_iceberg():
"""Examples for Managed Iceberg sources and sinks."""
# [START hadoop_catalog_config]
hadoop_catalog_props = {
'type': 'hadoop', 'warehouse': 'file://tmp/beam-iceberg-local-quickstart'
}
# [END hadoop_catalog_config]

# [START managed_iceberg_config]
managed_config = {
'table': 'my_db.my_table', 'catalog_properties': hadoop_catalog_props
}
# Note: The table will get created when inserting data (see below)
# [END managed_iceberg_config]

# [START managed_iceberg_insert]
with beam.Pipeline() as p:
(
p
| beam.Create([
beam.Row(id=1, name="Mark", age=32),
beam.Row(id=2, name="Omar", age=24),
beam.Row(id=3, name="Rachel", age=27)
])
| beam.managed.Write("iceberg", config=managed_config))
# [END managed_iceberg_insert]

# [START managed_iceberg_read]
with beam.Pipeline() as p:
(
p
| beam.managed.Read("iceberg", config=managed_config)
| beam.LogElements())
# [END managed_iceberg_read]

# [START biglake_catalog_config]
biglake_catalog_config = {
'type': 'rest',
'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
'warehouse': 'gs://biglake-public-nyc-taxi-iceberg',
'header.x-goog-user-project': '$PROJECT_ID',
'rest.auth.type': 'google',
'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO',
'header.X-Iceberg-Access-Delegation': 'vended-credentials'
}
# [END biglake_catalog_config]

# [START model_managed_iceberg_data_types]
from decimal import Decimal
import apache_beam as beam
from apache_beam.utils.timestamp import Timestamp

row = beam.Row(
boolean_field=True,
int_field=1,
float_field=2.3,
numeric_field=Decimal('34'),
bytes_field=b'value',
string_field="value",
timestamptz_field=Timestamp(4, 5),
array_field=[1, 2, 3],
map_field={
"a": 1, "b": 2
},
struct_field=beam.Row(nested_field="nested_value", nested_field2=123))
# [END model_managed_iceberg_data_types]


def model_composite_transform_example(contents, output_path):
"""Example of a composite transform.

Expand Down
2 changes: 2 additions & 0 deletions website/www/site/assets/js/language-switch-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ $(document).ready(function() {
"valueToTabTitle": function (value) {
switch (value) {
case 'py': return 'Python';
case 'sql': return 'SQL';
case 'scio': return 'SCIO';
case 'typescript': return 'TypeScript';
}
Expand Down Expand Up @@ -287,6 +288,7 @@ $(document).ready(function() {
}).render();

Switcher({"name": "runner", "default": "direct"}).render();
Switcher({"name": "tab"}).render();
Switcher({"name": "shell", "default": "unix"}).render();
Switcher({"name": "version"}).render();
});
3 changes: 3 additions & 0 deletions website/www/site/assets/scss/_syntax-highlighting.scss
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ pre {
}
}
}
.tab-switcher {
@extend .runner-switcher;
}
.shell-switcher {
ul.nav-tabs {
padding-left: 0;
Expand Down
Loading
Loading