[bigdata@bigdata185 software]$ tar -zxvf iceberg-apache-iceberg-0.11.1.tar.gz -C /opt/module/[bigdata@bigdata185 software]$ cd /opt/module/iceberg-apache-iceberg-0.11.1/
4.1.3 修改对应的版本 我们选择最稳定的版本进行编译,Hadoop2.7.7 Hive2.3.9 Flink1.11.6 Spark3.0.3
org.apache.flink:* = 1.11.6org.apache.hadoop:* = 2.7.7org.apache.hive:hive-metastore = 2.3.9org.apache.hive:hive-serde = 2.3.9org.apache.spark:spark-hive_2.12 = 3.0.3
4.1.4 编辑build.gradle文件,添加国内源 (1)在buildscript的repositories中添加
maven { url 'https://mirrors.huaweicloud.com/repository/maven/'; }
添加后如下所示:
buildscript { repositories { jcenter() gradlePluginPortal() maven { url 'https://mirrors.huaweicloud.com/repository/maven/'; } } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:5.0.0' classpath 'com.palantir.baseline:gradle-baseline-java:3.36.2' classpath 'com.palantir.gradle.gitversion:gradle-git-version:0.12.3' classpath 'com.diffplug.spotless:spotless-plugin-gradle:3.14.0' classpath 'gradle.plugin.org.inferred:gradle-processors:2.1.0' classpath 'me.champeau.gradle:jmh-gradle-plugin:0.4.8' }}
(2)allprojects中添加
maven { url 'https://mirrors.huaweicloud.com/repository/maven/'; }
添加后如下所示
allprojects { group = "org.apache.iceberg" version = getProjectVersion() repositories { maven { url 'https://mirrors.huaweicloud.com/repository/maven/'; } mavenCentral() mavenLocal() }}
4.1.5 下载依赖(可选) 进入项目根目录,执行脚本:
[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ ./gradlew dependencies
[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ ./gradlew build
(2)上述命令会执行代码里的单元测试,如果不需要,则执行以下命令:
[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ ./gradlew build -x test -x scalaStyle
Hadoop2.7.7 | Spark3.0.3 | Iceberg0.11.1 |
[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ cp spark3-extensions/build/libs/iceberg-spark3-extensions-0.11.1.jar /opt/module/spark-3.0.3-bin-hadoop2.7/jars/[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ cp spark3-runtime/build/libs/iceberg-spark3-runtime-0.11.1.jar /opt/module/spark-3.0.3-bin-hadoop2.7/jars/
cp /opt/module/hadoop-2.7.7/etc/hadoop/core-site.xml /opt/module/spark-3.0.3-bin-hadoop2.7/conf/cp /opt/module/hadoop-2.7.7/etc/hadoop/hdfs-site.xml /opt/module/spark-3.0.3-bin-hadoop2.7/conf/cp /opt/module/hive-2.3.9/conf/hive-site.xml /opt/module/spark-3.0.3-bin-hadoop2.7/conf/
spark.sql.catalog.hive_catalog = org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.hive_catalog.type = hivespark.sql.catalog.hive_catalog.uri = thrift://bigdata185:9083
1.4.2 Hadoop模式 spark.sql.catalog.hadoop_catalog = org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.hadoop_catalog.type = hadoopspark.sql.catalog.hadoop_catalog.warehouse = hdfs://bigdata185:9000/dw/icebergspark.sql.catalog.catalog-name.type = hadoopspark.sql.catalog.catalog-name.default-namespace = dbspark.sql.catalog.catalog-name.uri = thrift://bigdata185:9083spark.sql.catalog.catalog-name.warehouse= hdfs://bigdata185:9000/dw/iceberg
1.4.3 编辑spark-defaults.conf 配置spark参数,配置SparkSQL Catalog,可以用两种方式,基于Hive和基于Hadoop,这里优先选择hadoop;在不特定指定use hadoop_catalog时,默认采用hadoop_catalog。
[bigdata@bigdata185 spark-3.0.3-bin-hadoop2.7]$ vi conf/spark-defaults.conf spark.sql.catalog.hive_catalog = org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.hive_catalog.type = hivespark.sql.catalog.hive_catalog.uri = thrift://bigdata185:9083spark.sql.catalog.hadoop_catalog = org.apache.iceberg.spark.SparkCatalogspark.sql.catalog.hadoop_catalog.type = hadoopspark.sql.catalog.hadoop_catalog.warehouse = hdfs://bigdata185:9000/dw/icebergspark.sql.catalog.catalog-name.type = hadoopspark.sql.catalog.catalog-name.default-namespace = dbspark.sql.catalog.catalog-name.uri = thrift://bigdata185:9083spark.sql.catalog.catalog-name.warehouse= hdfs://bigdata185:9000/dw/iceberg
二 SparkSQL Iceberg案例实操 不支持SQL操作的数据湖是没有灵魂的,下面我们将介绍如何在Iceberg中使用Spark SQL。
[bigdata@bigdata185 spark-3.0.3-bin-hadoop2.7]$ bin/spark-sqlspark-sql> show databases;namespacedefaultTime taken: 0.108 seconds, Fetched 1 row(s)spark-sql>
2.1 DDL操作2.1.1 创建Iceberg表 只能创建Iceberg的内部表,不支持外部表。在删除表的时候,数据文件会自动从HDFS上删除。
spark-sql> use hadoop_catalog;spark-sql> create database db;spark-sql> use db;spark-sql> CREATE TABLE sensordata( sensor_id STRING, ts BIGINT, temperature DOUBLE, dt STRING) USING icebergPARTITIONED BY(dt)TBLPROPERTIES ('read.split.target-size'='134217728', 'read.split.metadata-target-size'='33554432');
spark-sql> DESC sensordata;
spark-sql> DESC FORMATTED sensordata;
ALTER TABLE sensordata UNSET TBLPROPERTIES ('read.split.target-size');
(2)设置TBLPROPERTIES属性
ALTER TABLE sensordata SET TBLPROPERTIES ('read.split.target-size'='134217728');
(3)增加列
ALTER TABLE sensordata ADD COLUMNS ( new_column string comment 'new_column docs');
(4)删除列
ALTER TABLE sensordata DROP COLUMN new_column;
2.1.4 CTAS 建表 CREATE TABLE hadoop_catalog.db.sensordata_like USING iceberg AS SELECT * FROM sensordata;
2.1.5 DROP 表 DROP TABLE hadoop_catalog.db.sensordata_like;DROP TABLE sensordata;
2.2 DML操作2.2.1 插入数据 (1)向sensordata表中插入一条记录
spark-sql> CREATE TABLE sensordata( sensor_id STRING, ts BIGINT, temperature DOUBLE, dt STRING) USING icebergPARTITIONED BY(dt);spark-sql> INSERT INTO sensordata VALUES('sensor_01',1635743301,-12.1,'2021-12-01');
(2)查询数据
spark-sql> SELECT * FROM sensordata;
spark-sql> INSERT OVERWRITE sensordata VALUES('sensor_02',1635743301,23.6,'2021-12-01');spark-sql> SELECT * FROM sensordata;
[bigdata@bigdata185 conf]$ vi spark-defaults.conf spark.sql.sources.partitionOverwriteMode=dynamic
CREATE TABLE sensordata_01( sensor_id STRING, ts BIGINT, temperature DOUBLE, dt STRING) USING icebergPARTITIONED BY(dt);
spark-sql> INSERT INTO sensordata VALUES('sensor_02',1638421701,-22.2,'2021-12-02');spark-sql> SELECT * FROM sensordata;
spark-sql> INSERT OVERWRITE sensordata_01 SELECT * FROM sensordata;
spark-sql> SELECT * FROM sensordata_01;
spark-sql> INSERT OVERWRITE sensordata_01 PARTITION(dt='2021-12-31') SELECT sensor_id,ts,temperature FROM sensordata;spark-sql> SELECT * FROM sensordata_01;
spark-sql> DELETE FROM sensordata_01 WHERE dt>='2021-12-01';
spark-sql> SELECT * FROM sensordata_01;
spark-sql> UPDATE sensordata SET ts=1631349225,sensor_id='sensor_27' WHERE dt='2021-12-01';
spark-sql> SELECT * FROM hadoop_catalog.db.sensordata_01;
SELECT * FROM hadoop_catalog.db.sensordata_01.history;SELECT * FROM hadoop_catalog.db.sensordata_01.files;SELECT * FROM hadoop_catalog.db.sensordata_01.snapshots;
2.3.1 历史快照2.3.1.1 历史表 每张Iceberg表都有一张对应的历史表,历史的表名是当前表加上后缀.history,注意:查询历史快照表的时候必须是表的全称,不能切换到hadoop_prod.db库,再查询历史表。历史表数据表示该表被操作过几次。
SELECT * FROM hadoop_catalog.db.sensordata_01.history;SELECT * FROM hadoop_catalog.db.sensordata.history;
SELECT * FROM hadoop_catalog.db.sensordata_01.snapshots;
SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] FROM hadoop_catalog.db.sensordata.history h JOIN hadoop_catalog.db.sensordata.snapshots s ON h.snapshot_id = s.snapshot_idORDER BY made_current_at;
scala> :pastespark.read.option("snapshot-id","8109378507001106084").format("iceberg").load("/dw/iceberg/db/sensordata_01").show
SELECT * FROM hadoop_catalog.db.sensordata.files;
SELECT * FROM hadoop_catalog.db.sensordata_01.manifests;
CREATE TABLE sensordata_days( sensor_id STRING, ts TIMESTAMP, temperature DOUBLE) USING icebergPARTITIONED BY(days(ts));
向表中插入不同日期的数据。
INSERT INTO sensordata_days VALUES('sensor_01',CAST(1639816497 AS TIMESTAMP),10.2),('sensor_01',CAST(1639730097 AS TIMESTAMP),12.3);
插入成功之后,再查询表的数据。
spark-sql> SELECT * FROM sensordata_days;
CREATE TABLE sensordata_years( sensor_id STRING, ts TIMESTAMP, temperature DOUBLE) USING icebergPARTITIONED BY(years(ts));
同样,向表中插入不同年份的数据。
INSERT INTO sensordata_years VALUES('sensor_01',CAST(1608194097 AS TIMESTAMP),10.2),('sensor_01',CAST(1639730097 AS TIMESTAMP),12.3);
插入成功之后,再查询表的数据。
SELECT * FROM sensordata_years;
CREATE TABLE sensordata_months( sensor_id STRING, ts TIMESTAMP, temperature DOUBLE) USING icebergPARTITIONED BY(months(ts));
同样,向表中插入不同月份的数据。
INSERT INTO sensordata_months VALUES('sensor_01',CAST(1637138097 AS TIMESTAMP),10.2),('sensor_01',CAST(1639730097 AS TIMESTAMP),12.3);
插入成功之后,再查询表的数据。
spark-sql> SELECT * FROM sensordata_months;
因此,当SELECT查询Iceberg表时,查询引擎首先进入Iceberg目录,然后检索它要读取的表的当前元数据文件的位置条目,然后打开该文件。3.4 元数据层 而元数据管理层又可以细分为三层:
.└── sensordata ├── data │ ├── dt=2021-12-01 │ │ ├── 00000-0-275a936f-4d21-4a82-9346-bceac4381e6c-00001.parquet │ │ └── 00000-2-1189ac19-b488-4956-8de8-8dd96cd5920a-00001.parquet │ └── dt=2021-12-02 │ └── 00000-1-cc4f552a-28eb-4ff3-a4fa-6c28ce6e5f79-00001.parquet └── metadata ├── 0dafa6f3-2dbd-4728-ba9b-af31a3416700-m0.avro ├── 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m0.avro ├── 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m1.avro ├── ad4cd65e-7351-4ad3-baaf-5e5bd99dc257-m0.avro ├── snap-232980156660427676-1-0dafa6f3-2dbd-4728-ba9b-af31a3416700.avro ├── snap-4599216928086762873-1-ad4cd65e-7351-4ad3-baaf-5e5bd99dc257.avro ├── snap-5874199297651146296-1-2b1fbd5a-6241-4f7d-a4a6-3121019b9afb.avro ├── v1.metadata.json ├── v2.metadata.json ├── v3.metadata.json ├── v4.metadata.json └── version-hint.text
其中metadata目录存放元数据管理层的数据:
SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] FROM hadoop_catalog.db.sensordata.history h JOIN hadoop_catalog.db.sensordata.snapshots s ON h.snapshot_id = s.snapshot_idORDER BY made_current_at;
下面是一个元数据文件的完整内容,v4.metadata.json
{ // 当前文件格式版本信息,目前为version 1,支持row-level delete等功能的version 2还在开发中 "format-version" : 1, "table-uuid" : "3e0d4750-bf7d-4ace-95a3-881732103f86", // hadoopTable location "location" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata", // 最新snapshot的创建时间 "last-updated-ms" : 1642474805225, "last-column-id" : 4, // iceberg schema "schema" : { "type" : "struct", "fields" : [ { "id" : 1, "name" : "sensor_id", "required" : false, "type" : "string" }, { "id" : 2, "name" : "ts", "required" : false, "type" : "long" }, { "id" : 3, "name" : "temperature", "required" : false, "type" : "double" }, { "id" : 4, "name" : "dt", "required" : false, "type" : "string" } ] }, "partition-spec" : [ { "name" : "dt", "transform" : "identity", "source-id" : 4, "field-id" : 1000 } ], "default-spec-id" : 0, // 分区信息 "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "dt", // transform类型:目前支持identity,year,bucket等 "transform" : "identity", // 对应schema.fields中相应field的dt "source-id" : 4, "field-id" : 1000 } ] } ], "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], // Iceberg表的property信息 "properties" : { "owner" : "bigdata" }, // 当前snapshot id "current-snapshot-id" : 4599216928086762873, // snapshot信息 "snapshots" : [ { "snapshot-id" : 232980156660427676, // 创建snapshot时间 "timestamp-ms" : 1642474364978, "summary" : { // spark写入方式,目前支持overwrite以及append "operation" : "append", "spark.app.id" : "local-1642474286024", // 本次snapshot添加的文件数量 "added-data-files" : "1", // 本次snapshot添加的record数量 "added-records" : "1", // 本次snapshot添加的文件大小 "added-files-size" : "1247", // 本次snapshot修改的分区数量 "changed-partition-count" : "1", // 本次snapshot中record总数 = lastSnapshotTotalRecord - currentSnapshotDeleteRecord currentSnapshotAddRecord "total-records" : "1", "total-data-files" : "1", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, // snapshot文件路径 "manifest-list" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/snap-232980156660427676-1-0dafa6f3-2dbd-4728-ba9b-af31a3416700.avro" }, { "snapshot-id" : 5874199297651146296, // 上次snapshotID "parent-snapshot-id" : 232980156660427676, "timestamp-ms" : 1642474492951, "summary" : { "operation" : "overwrite", "spark.app.id" : "local-1642474286024", "added-data-files" : "1", "deleted-data-files" : "1", "added-records" : "1", "deleted-records" : "1", "added-files-size" : "1247", "removed-files-size" : "1247", "changed-partition-count" : "1", "total-records" : "1", "total-data-files" : "1", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, "manifest-list" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/snap-5874199297651146296-1-2b1fbd5a-6241-4f7d-a4a6-3121019b9afb.avro" }, { "snapshot-id" : 4599216928086762873, "parent-snapshot-id" : 5874199297651146296, "timestamp-ms" : 1642474805225, "summary" : { "operation" : "append", "spark.app.id" : "local-1642474751012", "added-data-files" : "1", "added-records" : "1", "added-files-size" : "1246", "changed-partition-count" : "1", "total-records" : "2", "total-data-files" : "2", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, "manifest-list" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/snap-4599216928086762873-1-ad4cd65e-7351-4ad3-baaf-5e5bd99dc257.avro" } ], // snapshot记录 "snapshot-log" : [ { "timestamp-ms" : 1642474364978, "snapshot-id" : 232980156660427676 }, { "timestamp-ms" : 1642474492951, "snapshot-id" : 5874199297651146296 }, { "timestamp-ms" : 1642474805225, "snapshot-id" : 4599216928086762873 } ], // metada记录 "metadata-log" : [ { "timestamp-ms" : 1642474310215, "metadata-file" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/v1.metadata.json" }, { "timestamp-ms" : 1642474364978, "metadata-file" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/v2.metadata.json" }, { "timestamp-ms" : 1642474492951, "metadata-file" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/v3.metadata.json" } ]}
上面展示的是v4.metadata.json中的数据,该文件保存了Iceberg table schema、partition、snapshot信息,partition中的transform信息使得Iceberg能够根据字段进行hidden partition,而无需像hive一样显示的指定分区字段。由于Metadata中记录了每次snapshot的id以及create_time,我们可以通过时间或snapshotId查询相应snapshot的数据,实现Time Travel。
当SELECT查询Iceberg表,并从目录中的表条目获取其位置后打开其当前元数据文件时,查询引擎先读取current-snapshot-id。然后它使用该值在snapshots数组中查找该快照的条目,然后检索该快照的manifest-list条目的值,并打开该位置指向的清单列表。
3.4.2 清单列表文件(Manifest list) Manifest list也被称为(Snapshot),一个snapshot中可以包含多个manifest entry,一个manifest entry表示一个manifest,其中重点需要关注的是每个manifest中的partitions字段,在根据filter进行过滤时可以首先通过该字段表示的分区范围对manifest进行过滤,避免无效的查询。
java -jar /opt/sources/avro-tools-1.11.0.jar tojson snap-4599216928086762873-1-ad4cd65e-7351-4ad3-baaf-5e5bd99dc257.avro
转换后的json文件内容如下
{ "manifest_path": "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/ad4cd65e-7351-4ad3-baaf-5e5bd99dc257-m0.avro", "manifest_length": 5932, "partition_spec_id": 0, // 该manifest entry所属的snapshot "added_snapshot_id": { "long": 4599216928086762873 }, // 该manifest中添加的文件数量 "added_data_files_count": { "int": 1 }, // 创建该manifest时已经存在且没有被这次创建操作删除的文件数量 "existing_data_files_count": { "int": 0 }, // 创建manifest时删除的数据文件数量 "deleted_data_files_count": { "int": 0 }, // 该manifest中partition字段的范围 "partitions": { "array": [{ "contains_null": false, "lower_bound": { "bytes": "2021-12-02" }, "upper_bound": { "bytes": "2021-12-02" } }] }, "added_rows_count": { "long": 1 }, "existing_rows_count": { "long": 0 }, "deleted_rows_count": { "long": 0 }} // manifest entry{ "manifest_path": "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m1.avro", "manifest_length": 5933, "partition_spec_id": 0, "added_snapshot_id": { "long": 5874199297651146296 }, "added_data_files_count": { "int": 1 }, "existing_data_files_count": { "int": 0 }, "deleted_data_files_count": { "int": 0 }, "partitions": { "array": [{ "contains_null": false, "lower_bound": { "bytes": "2021-12-01" }, "upper_bound": { "bytes": "2021-12-01" } }] }, "added_rows_count": { "long": 1 }, "existing_rows_count": { "long": 0 }, "deleted_rows_count": { "long": 0 }}
当SELECT查询Iceberg表并在从元数据文件中获取快照的位置后,并打开清单列表时,查询引擎会读取清单路径条目的值,并打开清单文件。
3.4.3 清单文件java -jar /opt/sources/avro-tools-1.11.0.jar tojson 2b1fbd5a-6241-4f7d-a4a6-3121019b9afb-m1.avro
转换后的json文件内容如下,里面的file_path是数据文件的地址:{ // 表示对应数据文件status // 0: EXISTING, 1: ADDED,2: DELETED "status": 1, "snapshot_id": { "long": 5874199297651146296 }, "data_file": { "file_path": "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/data/dt=2021-12-01/00000-2-1189ac19-b488-4956-8de8-8dd96cd5920a-00001.parquet", "file_format": "PARQUET", // 对应的分区值 "partition": { "dt": { "string": "2021-12-01" } }, // 文件中record数量 "record_count": 1, "file_size_in_bytes": 1247, "block_size_in_bytes": 67108864, // 不同column存储大小 "column_sizes": { "array": [{ "key": 1, "value": 56 }, { "key": 2, "value": 49 }, { "key": 3, "value": 48 }, { "key": 4, "value": 57 }] }, // 不同列对应的value数量 "value_counts": { "array": [{ "key": 1, "value": 1 }, { "key": 2, "value": 1 }, { "key": 3, "value": 1 }, { "key": 4, "value": 1 }] }, // 列值为null的数量 "null_value_counts": { "array": [{ "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 }] }, "nan_value_counts": { "array": [{ "key": 3, "value": 0 }] }, // 不同列的范围 "lower_bounds": { "array": [{ "key": 1, "value": "sensor_02" }, { "key": 2, "value": "Evau0000u0000u0000u0000" }, { "key": 3, "value": "7@" }, { "key": 4, "value": "2021-12-01" }] }, "upper_bounds": { "array": [{ "key": 1, "value": "sensor_02" }, { "key": 2, "value": "Evau0000u0000u0000u0000" }, { "key": 3, "value": "7@" }, { "key": 4, "value": "2021-12-01" }] }, "key_metadata": null, "split_offsets": { "array": [4] } }}
Manifest管理多个data文件,一条DataFileEntry对应一个data文件,DataFileEntry中记录了所属partition,value bounds等信息,value_counts和null_value_counts可以用于过滤null列,除此之外,可以根据value bounds进行过滤,加速查询。
四 总结spark-sql> use hadoop_catalog.db;spark-sql> show tables;namespace tableNamedb sensordatadb sensordata_hoursdb sensordata_yearsdb sensordata_daysdb sensordata_01db sensordata_monthsTime taken: 1.031 seconds, Fetched 6 row(s)spark-sql> drop table sensordata;spark-sql> CREATE TABLE sensordata( sensor_id STRING, ts BIGINT, temperature DOUBLE, dt STRING) USING icebergPARTITIONED BY(dt);
INSERT INTO sensordata VALUES('sensor_01',1638351932,-0.2,'2021-12-01');
2.2.2 查看HDFS目录 此时只有sensordata表开始生成数据文件。
INSERT INTO sensordata VALUES('sensor_03',1638351932,-23.2,'2021-12-01'),('sensor_02',1638351625,-5.2,'2021-12-02');
2.3.1 执行查询操作 SELECT * FROM sensordata;
2.3.2 查询操作分析spark-sql> delete from sensordata ;spark-sql> select * from sensordata;spark-sql> SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] FROM hadoop_catalog.db.sensordata.history h JOIN hadoop_catalog.db.sensordata.snapshots s ON h.snapshot_id = s.snapshot_idORDER BY made_current_at;
scala> :pastespark.read.option("snapshot-id","8585349266450764220").format("iceberg").load("/dw/iceberg/db/sensordata").show
[bigdata@bigdata185 flink-1.11.6]$ vi bin/config.shexport HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
1.1.2 复制Iceberg的jar包 将编译好的[bigdata@bigdata185 iceberg-apache-iceberg-0.11.1]$ cp flink-runtime/build/libs/iceberg-flink-runtime-0.11.1.jar /opt/module/flink-1.11.6/lib/[bigdata@bigdata185 software]$ cp flink-sql-connector-hive-2.3.6_2.12-1.11.6.jar /opt/module/flink-1.11.6/lib/
备注:cp /opt/module/hadoop-2.7.7/etc/hadoop/core-site.xml /opt/module/flink-1.11.6/conf/cp /opt/module/hadoop-2.7.7/etc/hadoop/hdfs-site.xml /opt/module/flink-1.11.6/conf/cp /opt/module/hive-2.3.9/conf/hive-site.xml /opt/module/flink-1.11.6/conf/
[bigdata@bigdata185 ~]$ cd /opt/module/flink-1.11.6/[bigdata@bigdata185 flink-1.11.6]$ bin/start-cluster.sh
1.2.2 启动FlinkSQL Client 相关详细文档请参考:[bigdata@bigdata185 flink-1.11.6]$ bin/sql-client.sh embedded shell
成功启动FlinkSQL Client之后,会出现下面的Flink松鼠图案。
-- 创建hive catalogCREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://bigdata185:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://bigdata185:9000/dw/iceberg');-- 使用hive catalogUSE CATALOG hive_catalog;
-- 创建hadoop catalogCREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://bigdata185:9000/dw/iceberg', 'property-version'='1');-- 使用hadoop catalogUSE CATALOG hadoop_catalog;
[bigdata@bigdata185 flink-1.11.6]$ vi conf/sql-client-defaults.yamlcatalogs: - name: hadoop_catalog type: iceberg catalog-type: hadoop warehouse: hdfs://bigdata185:9000/dw/iceberg/
Flink SQL> use catalog hadoop_catalog;
hadoop_catalog创建成功之后,会在HDFS上生成一个目录:
Flink SQL> SHOW CATALOGS;
[bigdata@bigdata185 flink-1.11.6]$ bin/sql-client.sh embedded shellFlink SQL> USE CATALOG hadoop_catalog;
(2)可以使用默认数据库,也可以创建数据库。Iceberg默认会使用Flink中的default数据库。如果我们不想在default数据库下面创建表,可以按照下面的命令创建一个单独的数据库。
Flink SQL> SHOW DATABASES;defaultFlink SQL> Flink SQL> CREATE DATABASE db;
Flink SQL> use db;
2.2 创建Iceberg表 我们直接创建分区表,目前Flink对接Iceberg还不能使用Iceberg的隐藏分区特性。
CREATE TABLE sensordata ( sensorid STRING, ts BIGINT, temperature DOUBLE, dt STRING) PARTITIONED BY (dt);
CREATE TABLE sensordata_01 LIKE sensordata;
INSERT INTO sensordata VALUES('sensor_01',1638351932,-0.2,'2021-12-01'),('sensor_02',1638351625,-5.2,'2021-12-01');
Flink SQL> SELECT * FROM sensordata;
INSERT INTO sensordata VALUES ('sensor_03', 1638533291, 32.1, '2021-12-02');
Flink SQL> INSERT OVERWRITE sensordata SELECT 'sensor_04', 1638619691, 31.7, '2021-12-01';
(2)Flink默认使用流的方式插入数据,这个时候流是不支持INSERT OVERWRITE操作的。
Flink SQL> SET execution.type = batch ;Flink SQL> INSERT OVERWRITE sensordata SELECT 'sensor_04', 1638619691, 31.7, '2021-12-01';
Flink SQL> SELECT * FROM sensordata;
[bigdata@bigdata185 ~]$ HDFS-2.7.7 start
3.2 引入相关maven配置文件 我们采用的是Flink1.11.6 Hadoop2.7.7 Iceberg0.11.1版本组合。
<!-- 通过参数配置版本信息 --> <properties> <flink.version>1.11.6</flink.version> <hadoop.version>2.7.7</hadoop.version> <iceberg.version>0.11.1</iceberg.version> </properties> <dependencies> <!-- 引入Flink相关依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- 引入Iceberg相关依赖 --> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>${iceberg.version}</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink</artifactId> <version>${iceberg.version}</version> </dependency> <!-- 引入hadoop客户端相关依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- 引入日志相关依赖 --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <!-- 所有的编译都依照JDK1.8来搞 --> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- 用于项目的打包插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
3.3 添加log4j.properties配置文件 log4j.rootLogger=WARN, stdoutlog4j.appender.stdout = org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.Target = System.outlog4j.appender.stdout.layout = org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c]- %m%n
3.4 DataStream读数据 Iceberg现在支持使用Java API流式或者批量读取。
3.4.1 批量读取数据 通过batch的方式读取数据
package com.yunclass.iceberg.flinksql;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.RowData;import org.apache.iceberg.flink.TableLoader;import org.apache.iceberg.flink.source.FlinkSource;public class TableOperations { public static void main(String[] args) throws Exception { // 设置执行HDFS操作的用户,防止权限不够 System.setProperty("HADOOP_USER_NAME", "bigdata"); // 1 获取Flink的执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2 使用TableLoader加载HDFS路径 TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://bigdata185:9000/dw/iceberg/db/sensordata"); // 3 操作方式 batchReadTable(env, tableLoader); // 4 执行程序 env.execute("Flink对Iceberg表的批量操作"); } /** * 1、批量读取Iceberg表数据 */ private static void batchReadTable(StreamExecutionEnvironment env, TableLoader tableLoader) { DataStream<RowData> rowData = FlinkSource.forRowData() .env(env) .tableLoader(tableLoader) .streaming(false) .build(); // 将DataStream<RowData>转换为DataStream<String> DataStream<String> outputData = rowData.map(new MapFunction<RowData, String>() { @Override public String map(RowData rowData) throws Exception { String sensorId = rowData.getString(0).toString(); long ts = rowData.getLong(1); double temperature = rowData.getDouble(2); String dt = rowData.getString(3).toString(); return sensorId "," ts "," temperature "," dt; } }); // 打印结果数据 outputData.print(); }}
3.4.2 流式读取数据 SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] FROM hadoop_catalog.db.sensordata.history h JOIN hadoop_catalog.db.sensordata.snapshots s ON h.snapshot_id = s.snapshot_idORDER BY made_current_at;
(1)通过Streaming的方式读取数据
// 2 使用流的方式读取Iceberg表数据 private static void streamingReadTable(StreamExecutionEnvironment env, TableLoader tableLoader) { DataStream<RowData> streamingData = FlinkSource.forRowData() .env(env) .tableLoader(tableLoader) // 流式读取iceberg表数据 .streaming(true) .build(); // 使用lambda表达式的方式打印输出数据 streamingData.map(item -> item.getString(0) "," item.getLong(1) "," item.getDouble(2) "," item.getString(3) ).print(); }}
(2)程序在启动之后,不会立刻停止。
INSERT INTO sensordata VALUES('sensor_05', 1638706091, 21.5, '2021-12-01');INSERT INTO sensordata VALUES('sensor_06', 1638706091, 21.6, '2021-12-01');INSERT INTO sensordata VALUES('sensor_07', 1638706091, 22.7, '2021-12-02');
/** * 3、自定义数据源,追加写入Iceberg表 * @param env */ private static void appendingTable(StreamExecutionEnvironment env) { // 定义随机变量 Random random = new Random(); DecimalFormat df = new DecimalFormat("0.0"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // 自定义数据源,生成DataStream<RowData> DataStream<RowData> inputData = env.addSource(new SourceFunction<RowData>() { boolean flag = true; @Override public void run(SourceContext<RowData> ctx) throws Exception { GenericRowData rowData = new GenericRowData(4); while (flag) { // 生成5条数据 for (int i = 0; i < 5; i ) { long ts = System.currentTimeMillis(); String sensorId = "sensor_" random.nextInt(10); double temperature = Double.parseDouble(df.format(random.nextDouble() * 10)); String dt = sdf.format(new Timestamp(ts)); rowData.setField(0, StringData.fromString(sensorId)); rowData.setField(1, ts); rowData.setField(2, temperature); rowData.setField(3, StringData.fromString(dt)); ctx.collect(rowData); } cancel(); } } @Override public void cancel() { flag = false; } }); inputData.print(); // 加载目标表 TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://bigdata185:9000/dw/iceberg/db/sensordata_01"); // 向目标表写入数据 FlinkSink.forRowData(inputData) .tableLoader(tableLoader) .overwrite(true) .build(); }
3.5.1.2 INSERT INTO SELECT 方式 (1)下面的示例是读取sensordata表的数据,插入到sensordata_01表中,类似于INSERT INTO target_table SELECT * FROM source_table表,执行两遍代码。
// 3 向sensordata_01表appending数据 private static void appendingTable(StreamExecutionEnvironment env, TableLoader tableLoader) { DataStream<RowData> batchData = FlinkSource.forRowData() .env(env) .streaming(false) .tableLoader(tableLoader) .build(); // 加载目标表 TableLoader sinkTable = TableLoader.fromHadoopTable("hdfs://bigdata185:9000/dw/iceberg/db/sensordata_01"); // 向sensordata_01表appending写入数据 FlinkSink.forRowData(batchData) .tableLoader(sinkTable) .build(); System.out.println("ok"); }
Flink SQL> select * from sensordata_01;
FlinkSink.forRowData(inputData) .tableLoader(tableLoader) .build();
3.5.2.2 INSERT OVERWRITE SELECT方式 (1)编写代码,将overwrite设置为true,sensordata_01的数据将会按照分区自动进行覆盖写操作。
// 4、覆盖写数据 public static void overWriteData(StreamExecutionEnvironment env, TableLoader tableLoader) { DataStream<RowData> batchData = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); TableLoader tableSensordata = TableLoader.fromHadoopTable("hdfs://bigdata185:8020/flink/warehouse/iceberg_db/sensordata_02"); FlinkSink.forRowData(batchData).tableLoader(tableSensordata).overwrite(true).build(); }
(2)查询sensordata_01表查看OverWrite效果,根据分区将数据进行了覆盖操作。
Flink SQL> select * from sensordata_01;
Flink SQL> DROP TABLE sensordata;Flink SQL> CREATE TABLE sensordata ( sensorid STRING, ts BIGINT, temperature DOUBLE, dt STRING) PARTITIONED BY (dt);
4.1.2 查看HDFS目录 HDFS上只有元数据目录,并没有数据目录
hadoop fs -cat /dw/iceberg/db/sensordata/metadata/version-hint.text 1
4.1.3.2 metadata 顾名思义,元数据文件存储有关表的元数据。这包括有关表的结构、分区信息、快照以及哪个快照是当前快照的信息。快照S0(current-snapshot-id)目前不指向任何清单列表,因为表中还不存在数据。
元数据文件(v1.metadata.json)样例:
{ "format-version" : 1, "table-uuid" : "7cd325fb-c9af-45f5-ba24-ea7c62624878", "location" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata", "last-updated-ms" : 1641807202347, "last-column-id" : 4, "schema" : { "type" : "struct", "fields" : [ { "id" : 1, "name" : "sensorid", "required" : false, "type" : "string" }, { "id" : 2, "name" : "ts", "required" : false, "type" : "long" }, { "id" : 3, "name" : "temperature", "required" : false, "type" : "double" }, { "id" : 4, "name" : "dt", "required" : false, "type" : "string" } ] }, "partition-spec" : [ { "name" : "dt", "transform" : "identity", "source-id" : 4, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "dt", "transform" : "identity", "source-id" : 4, "field-id" : 1000 } ] } ], "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { }, "current-snapshot-id" : -1, "snapshots" : [ ], "snapshot-log" : [ ], "metadata-log" : [ ]}
4.2 写入数据逻辑分析4.2.1 插入数据 我们先向sensordata表插入两条数据,以方便理解数据插入逻辑。
INSERT INTO sensordata VALUES('sensor_01',1638351932,-0.2,'2021-12-01'),('sensor_02',1638351625,-5.2,'2021-12-01');
4.2.2 写入数据逻辑分析java -jar /opt/sources/avro-tools-1.11.0.jar tojson 1b5f0cc0-a34d-4e9e-ad81-3f3e5bd7bcd3-m0.avro
转换后的json文件内容如下,里面的file_path是数据文件的地址:{ "status": 1, "snapshot_id": { "long": 4271853720390397954 }, "data_file": { "file_path": "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/data/dt=2021-12-01/00000-0-a15bdcbb-f90d-4f89-bbaf-b8ea30187704-00001.parquet", "file_format": "PARQUET", "partition": { "dt": { "string": "2021-12-01" } }, "record_count": 2, "file_size_in_bytes": 1319, "block_size_in_bytes": 67108864, "column_sizes": { "array": [{ "key": 1, "value": 63 }, { "key": 2, "value": 59 }, { "key": 3, "value": 61 }, { "key": 4, "value": 104 }] }, "value_counts": { "array": [{ "key": 1, "value": 2 }, { "key": 2, "value": 2 }, { "key": 3, "value": 2 }, { "key": 4, "value": 2 }] }, "null_value_counts": { "array": [{ "key": 1, "value": 0 }, { "key": 2, "value": 0 }, { "key": 3, "value": 0 }, { "key": 4, "value": 0 }] }, "nan_value_counts": { "array": [{ "key": 3, "value": 0 }] }, "lower_bounds": { "array": [{ "key": 1, "value": "sensor_01" }, { "key": 2, "value": "tC§au0000u0000u0000u0000" }, { "key": 3, "value": "ÍÌÌÌÌÌu0014À" }, { "key": 4, "value": "2021-12-01" }] }, "upper_bounds": { "array": [{ "key": 1, "value": "sensor_02" }, { "key": 2, "value": "<D§au0000u0000u0000u0000" }, { "key": 3, "value": "É¿" }, { "key": 4, "value": "2021-12-01" }] }, "key_metadata": null, "split_offsets": { "array": [4] } }}
4.2.2.2 查看清单列表文件(manifest-list) 清单列表文件:java -jar /opt/sources/avro-tools-1.11.0.jar tojson snap-4271853720390397954-1-1b5f0cc0-a34d-4e9e-ad81-3f3e5bd7bcd3.avro
转换后的json文件内容如下,里面的manifest_path是清单文件的地址:{ "manifest_path": "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/1b5f0cc0-a34d-4e9e-ad81-3f3e5bd7bcd3-m0.avro", "manifest_length": 5949, "partition_spec_id": 0, "added_snapshot_id": { "long": 4271853720390397954 }, "added_data_files_count": { "int": 1 }, "existing_data_files_count": { "int": 0 }, "deleted_data_files_count": { "int": 0 }, "partitions": { "array": [{ "contains_null": false, "lower_bound": { "bytes": "2021-12-01" }, "upper_bound": { "bytes": "2021-12-01" } }] }, "added_rows_count": { "long": 2 }, "existing_rows_count": { "long": 0 }, "deleted_rows_count": { "long": 0 }}
4.2.2.3 查看元数据文件(v2.metadata.json) 当前的元数据文件内容如下,其中manifest-list指向的是“{ "format-version" : 1, "table-uuid" : "46590bb6-5f0b-4d1b-878f-1b043f1faeb1", "location" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata", "last-updated-ms" : 1641807779230, "last-column-id" : 4, "schema" : { "type" : "struct", "fields" : [ { "id" : 1, "name" : "sensorid", "required" : false, "type" : "string" }, { "id" : 2, "name" : "ts", "required" : false, "type" : "long" }, { "id" : 3, "name" : "temperature", "required" : false, "type" : "double" }, { "id" : 4, "name" : "dt", "required" : false, "type" : "string" } ] }, "partition-spec" : [ { "name" : "dt", "transform" : "identity", "source-id" : 4, "field-id" : 1000 } ], "default-spec-id" : 0, "partition-specs" : [ { "spec-id" : 0, "fields" : [ { "name" : "dt", "transform" : "identity", "source-id" : 4, "field-id" : 1000 } ] } ], "default-sort-order-id" : 0, "sort-orders" : [ { "order-id" : 0, "fields" : [ ] } ], "properties" : { }, "current-snapshot-id" : 4271853720390397954, "snapshots" : [ { "snapshot-id" : 4271853720390397954, "timestamp-ms" : 1641807779230, "summary" : { "operation" : "append", "flink.job-id" : "1cff09f16bf701552a7af2153de0412a", "flink.max-committed-checkpoint-id" : "9223372036854775807", "added-data-files" : "1", "added-records" : "2", "added-files-size" : "1319", "changed-partition-count" : "1", "total-records" : "2", "total-data-files" : "1", "total-delete-files" : "0", "total-position-deletes" : "0", "total-equality-deletes" : "0" }, "manifest-list" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/snap-4271853720390397954-1-1b5f0cc0-a34d-4e9e-ad81-3f3e5bd7bcd3.avro" } ], "snapshot-log" : [ { "timestamp-ms" : 1641807779230, "snapshot-id" : 4271853720390397954 } ], "metadata-log" : [ { "timestamp-ms" : 1641807740051, "metadata-file" : "hdfs://bigdata185:9000/dw/iceberg/db/sensordata/metadata/v1.metadata.json" } ]}
4.2.2.4 元数据指针文件(version-hint.text) 次数里面的数字为2,指向v2.metadata.json元数据。
2
4.3 写入数据流程总结[bigdata@bigdata185 libs]$ cp iceberg-hive-runtime-0.11.1.jar /opt/module/hive-2.3.9/lib/add jar /opt/jars/iceberg-hive-runtime-0.11.1.jar
1.2 Catalog模式1.2.1 Hive Catalog模式 通过Hive Catalog模式管理和操作Iceberg表。
SET iceberg.catalog.hive_catalog.type=hive;SET iceberg.catalog.hive_catalog.uri=thrift://bigdata185:9083;SET iceberg.catalog.hive_catalog.clients=10;SET iceberg.catalog.hive_catalog.warehouse=hdfs://bigdata185:9000/dw/iceberg;
1.2.2 Hadoop Catalog模式 通过Hadoop Catalog模式管理和操作Iceberg表。
SET iceberg.catalog.hadoop_catalog.type=hadoop;SET iceberg.catalog.hadoop_catalog.warehouse=hdfs://bigdata185:9000/dw/iceberg;
1.3 数据表操作 (1)创建Iceberg表
CREATE DATABASE hive_db;USE hive_db;CREATE TABLE sensordata_hive ( sensor_id STRING, ts BIGINT, temperature DOUBLE) PARTITIONED BY ( dt STRING) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'LOCATION 'hdfs://bigdata185:9000/dw/iceberg/hive_db/sensordata_hive'TBLPROPERTIES ('iceberg.catalog'='hadoop_catalog');
(2)向Iceberg表中写入数据
INSERT INTO sensordata_hive VALUES('sensor_02',1638421701,-22.2,'2021-12-02');
SELECT * FROM sensordata_hive;
vi conf/spark-defaults.conf spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
2.1 版本管理2.1.1 环境准备 (1)为了在SQL中演示Iceberg的版本管理功能,我们先按照下面的方式准备数据。
-- 创建Iceberg表CREATE TABLE sensordata( sensor_id STRING, ts BIGINT, temperature DOUBLE, dt STRING) USING icebergPARTITIONED BY(dt);-- Append写入1条数据INSERT INTO sensordata VALUES('sensor_01',1635743301,-12.1,'2021-12-01');-- OverWrite写入一条数据INSERT OVERWRITE sensordata VALUES('sensor_02',1635743301,23.6,'2021-12-01');-- Append写入1条数据INSERT INTO sensordata VALUES('sensor_02',1638421701,-22.2,'2021-12-02');-- 删除2021-12-02这一天的数据DELETE FROM sensordata WHERE dt = '2021-12-02';-- Append写入1条数据INSERT INTO sensordata VALUES('sensor_03',1638421701,-22.2,'2021-12-03');
SELECT s.committed_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] FROM hadoop_catalog.db.sensordata.history h JOIN hadoop_catalog.db.sensordata.snapshots s ON h.snapshot_id = s.snapshot_idORDER BY committed_at;committed_at operation snapshot_id is_current_ancestor summary[spark.app.id]2022-01-26 13:26:27.475 append 4815667141509177681 true local-16431741165492022-01-26 13:26:34.404 overwrite 5163041208696993750 true local-16431741165492022-01-26 13:26:40.707 append 2479499780166474991 true local-16431741165492022-01-26 13:26:46.831 delete 6907478957764082176 true local-16431741165492022-01-26 13:26:52.259 append 4719601493924525234 true local-1643174116549
spark-sql> SELECT * FROM sensordata;
CALL hadoop_catalog.system.rollback_to_snapshot('db.sensordata', 6907478957764082176);
spark-sql> SELECT * FROM sensordata;
CALL hadoop_catalog.system.rollback_to_timestamp('db.sensordata', TIMESTAMP '2022-01-26 13:26:46.831')
SELECT * FROM sensordata;
CALL hadoop_catalog.system.set_current_snapshot('db.sensordata', 4719601493924525234)
SELECT * FROM sensordata;
CALL hadoop_catalog.system.expire_snapshots('db.sensordata', TIMESTAMP '2022-01-26 13:26:40.707',3);
SELECT s.committed_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id'] FROM hadoop_catalog.db.sensordata.history h JOIN hadoop_catalog.db.sensordata.snapshots s ON h.snapshot_id = s.snapshot_idORDER BY committed_at;
2.2.2 rewrite_manifests 重新Manifest文件可以优化查询计划。
CALL hadoop_catalog.system.rewrite_manifests('db.sensordata');
CALL hadoop_catalog.system.rewrite_manifests('db.sensordata', false);
-- 创建测试表1create table ods_sensordata1( sensor_id STRING, ts BIGINT, temperature double)stored as PARQUET location '/dw/hive/ods_sensordata1';-- 创建测试表2create table ods_sensordata2( sensor_id STRING, ts BIGINT, temperature double)stored as PARQUET location '/dw/hive/ods_sensordata2';-- 写数据insert into ods_sensordata1 select 'sensor_20', 1631349207, -23.2;insert into ods_sensordata1 select 'sensor_21', 1631349207, -33.3;insert into ods_sensordata2 select 'sensor_01', 1631349389, -23.4;insert into ods_sensordata2 select 'sensor_02', 1631349390, -33.5;
(2)创建Hive外部表
-- 创建测试表1create external table ods_ext_sensordata1( sensor_id STRING, ts BIGINT, temperature double)stored as PARQUET location '/dw/hive/external/ods_ext_sensordata1';-- 创建测试表2create external table ods_ext_sensordata2( sensor_id STRING, ts BIGINT, temperature double)stored as PARQUET location '/dw/hive/external/ods_ext_sensordata2';-- 写数据insert into ods_ext_sensordata1 select 'sensor_20', 1631349207, -23.2;insert into ods_ext_sensordata1 select 'sensor_21', 1631349207, -33.3;insert into ods_ext_sensordata2 select 'sensor_01', 1631349389, -23.4;insert into ods_ext_sensordata2 select 'sensor_02', 1631349390, -33.5;
3.2 迁移到Iceberg表中 (1)可以把Hive表替换成Iceberg表,表结构、分区、属性和位置将从源表进行复制,目前支持的表格式有Avro、Parquet和ORC格式。
vi conf/spark-defaults.conf spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkSessionCatalogspark.sql.catalog.spark_catalog.type = hive
3.2.1 迁移Hive内部表 (1)用下面的命令迁移Hive内部表
CALL hadoop_catalog.system.migrate('spark_catalog.hive_db.ods_sensordata1');
spark-sql> select * from spark_catalog.hive_db.ods_sensordata1;
CALL hadoop_catalog.system.migrate('spark_catalog.hive_db.ods_ext_sensordata1');
spark-sql> SELECT * FROM spark_catalog.hive_db.ods_ext_sensordata1;
花粉社群VIP加油站
猜你喜欢