Chinaunix首页 | 论坛 | 博客
  • 博客访问: 1085177
  • 博文数量: 165
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 1351
  • 用 户 组: 普通用户
  • 注册时间: 2016-03-11 14:13
个人简介

狂甩酷拽吊炸天

文章分类

全部博文(165)

文章存档

2024年(1)

2023年(1)

2022年(3)

2021年(4)

2020年(17)

2019年(37)

2018年(17)

2017年(35)

2016年(50)

分类: LINUX

2020-03-24 17:57:21

1. 什么是Apache Hudi
一个spark 库
大数据更新解决方案,大数据中没有传统意义的更新,只有append和重写(Hudi就是采用重写方式)


使用Hudi的优点
使用Bloomfilter机制+二次查找,可快速确定记录是更新还是新增
更新范围小,是文件级别,不是表级别
文件大小与hdfs的Blocksize保持一致
数据文件使用parquet格式,充分利用列存的优势(dremal论文实现)
提供了可扩展的大数据更新框架
并发度由spark控制
hudi详细介绍见hudi官网


2. Hudi编译
git clone https://github.com/apache/incubator-hudi.git && cd incubator-hudi
mvn clean package -DskipTests -DskipITs
注意: 本文编译hudi使用的linux环境,window环境一定要加上-DskipITs,不然会编译docker 文件启动服务运行linux命令导致报错,如果是linux环境且需要用docker进行测试可以考虑去掉其参数。


3. 前置环境安装准备
所有版本选择均是查看当前master分支pom 中所依赖的 spark,hive ,hadoop,presto版本。(hudi-0.5.2-SNAPSHOT)
**注意:**小版本不一样不影响使用,如果运行spark任务报错不兼容排下依赖包就好。


4. Hive和Presto集成
4.1 hive
hive 查询hudi 数据主要是在hive中建立外部表数据路径指向hdfs 路径,同时hudi 重写了inputformat 和outpurtformat。因为hudi 在读的数据的时候会读元数据来决定我要加载那些parquet文件,而在写的时候会写入新的元数据信息到hdfs路径下。所以hive 要集成hudi 查询要把编译的jar 包放到HIVE-HOME/lib 下面。否则查询时找不到inputformat和outputformat的类。
hive 外表数据结构如下:


CREATE EXTERNAL TABLE `test_partition`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_file_name` string, 
  `id` string, 
  `oid` string, 
  `name` string, 
  `dt` string, 
  `isdeleted` string, 
  `lastupdatedttm` string, 
  `rowkey` string)
PARTITIONED BY ( 
  `_hoodie_partition_path` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  'hdfs://hj:9000/tmp/hudi'
TBLPROPERTIES (
  'transient_lastDdlTime'='1582111004')

hive集成hudi方法:将hudi jar复制到hive lib下
cp ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $HIVE_HOME/lib

4.2 Presto
presto 集成hudi 是基于hive catalog 同样是访问hive 外表进行查询,如果要集成需要把hudi 包copy 到presto hive-hadoop2插件下面。
presto集成hudi方法: 将hudi jar复制到 presto hive-hadoop2下
cp  ./packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.5.2-SNAPSHOT.jar  $PRESTO_HOME/plugin/hive-hadoop2/ 


5. Hudi代码实战
5.1 Copy_on_Write 模式操作(默认模式)
5.1.1 insert操作(初始化插入数据)
// 不带分区写入
  @Test
  def insert(): Unit = {
    val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val insertData = spark.read.parquet("/tmp/1563959377698.parquet")
    insertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // table name 设置
      .option(HoodieWriteConfig.TABLE_NAME, "test")
      .mode(SaveMode.Overwrite)
      // 写入路径设置
      .save("/tmp/hudi")
  }


// 带分区写入
  @Test
  def insertPartition(): Unit = {
    val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    // 读取文本文件转换为df
    val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")
    insertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 设置分区列
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
      .mode(SaveMode.Overwrite)
      .save("/tmp/hudi")
  }



5.1.2 upsert操作(数据存在时修改,不存在时新增)
// 不带分区upsert
  @Test
  def upsert(): Unit = {


    val spark = SparkSession.builder.appName("hudi upsert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val insertData = spark.read.parquet("/tmp/1563959377699.parquet")


    insertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 表名称设置
      .option(HoodieWriteConfig.TABLE_NAME, "test")
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      // 写入路径设置
      .save("/tmp/hudi");
  }


// 带分区upsert
  @Test
  def upsertPartition(): Unit = {


    val spark = SparkSession.builder.appName("upsert partition").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val upsertData = Util.readFromTxtByLineToDf(spark, "/home/xxx/soft/git/experiment/hudi-test/src/main/resources/test_update_data.txt")


    upsertData.write.format("org.apache.hudi").option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
       // 分区列设置
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save("/tmp/hudi");
  }


5.1.3 delete操作(删除数据)
  @Test
  def delete(): Unit = {
    val spark = SparkSession.builder.appName("delta insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val deleteData = spark.read.parquet("/tmp/1563959377698.parquet")
    deleteData.write.format("com.uber.hoodie")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 表名称设置
      .option(HoodieWriteConfig.TABLE_NAME, "test")
      // 硬删除配置
      .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
  }



删除操作分为软删除和硬删除配置在这里查看:%E5%88%A0%E9%99%A4%E6%95%B0%E6%8D%AE


5.1.4 query操作(查询数据)
  @Test
  def query(): Unit = {
    val basePath = "/tmp/hudi"
    val spark = SparkSession.builder.appName("query insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val tripsSnapshotDF = spark.
      read.
      format("org.apache.hudi").
      load(basePath + "/*/*")


    tripsSnapshotDF.show()
  }



5.1.5 同步至Hive
  @Test
  def hiveSync(): Unit = {
    val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val upsertData = Util.readFromTxtByLineToDf(spark, "/home/xxx/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")


    upsertData.write.format("org.apache.hudi")
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 分区列设置
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
      // 设置要同步的hive库名
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")
      // 设置要同步的hive表名
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition")
      // 设置数据集注册并同步到hive
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      // 设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      // 设置要同步的分区列名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
      // 设置jdbc 连接同步
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")
      // hudi表名称设置
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition")
      // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save("/tmp/hudi");
  }


  @Test
  def hiveSyncMergeOnReadByUtil(): Unit = {
    val args: Array[String] = Array("--jdbc-url", "jdbc:hive2://hj:10000", "--partition-value-extractor", "org.apache.hudi.hive.MultiPartKeysValueExtractor", "--user", "hive", "--pass", "hive", "--partitioned-by", "dt", "--base-path", "/tmp/hudi_merge_on_read", "--database", "hj_repl", "--table", "test_partition_merge_on_read")
    HiveSyncTool.main(args)
  }



这里可以选择使用spark 或者hudi-hive包中的hiveSynTool进行同步,hiveSynTool类其实就是run_sync_tool.sh运行时调用的。hudi 和hive同步时保证hive目标表不存在,同步其实就是建立外表的过程。


5.1.6 Hive查询读优化视图和增量视图
  @Test
  def hiveViewRead(): Unit = {
    // 目标表
    val sourceTable = "test_partition"
    // 增量视图开始时间点
    val fromCommitTime = "20200220094506"
    // 获取当前增量视图后几个提交批次
    val maxCommits = "2"


    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val prop = new Properties()
    prop.put("user", "hive")
    prop.put("password", "hive")
    val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
    val stmt = conn.createStatement
    // 这里设置增量视图参数
    stmt.execute("set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat")
    // Allow queries without partition predicate
    stmt.execute("set hive.strict.checks.large.query=false")
    // Dont gather stats for the table created
    stmt.execute("set hive.stats.autogather=false")
    // Set the hoodie modie
    stmt.execute("set hoodie." + sourceTable + ".consume.mode=INCREMENTAL")
    // Set the from commit time
    stmt.execute("set hoodie." + sourceTable + ".consume.start.timestamp=" + fromCommitTime)
    // Set number of commits to pull
    stmt.execute("set hoodie." + sourceTable + ".consume.max.commits=" + maxCommits)


    val rs = stmt.executeQuery("select * from " + sourceTable)
    val metaData = rs.getMetaData
    val count = metaData.getColumnCount




    while (rs.next()) {
      for (i <- 1 to count) {
        println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
      }
      println("-----------------------------------------------------------")
    }


    rs.close()
    stmt.close()
    conn.close()


  }



读优化视图即去掉增量视图参数即可。


5.1.7 Presto查询读优化视图(暂不支持增量视图)
  @Test
  def prestoViewRead(): Unit = {
    // 目标表
    val sourceTable = "test_partition"
    Class.forName("com.facebook.presto.jdbc.PrestoDriver")
    val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
    val stmt = conn.createStatement
    val rs = stmt.executeQuery("select * from  " + sourceTable)
    val metaData = rs.getMetaData
    val count = metaData.getColumnCount


    while (rs.next()) {
      for (i <- 1 to count) {
        println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
      }
      println("-----------------------------------------------------------")
    }


    rs.close()
    stmt.close()
    conn.close()
  }



5.2 Merge_On_Read 模式操作
5.2.1 insert操作(插入数据)
  @Test
  def insertPartitionMergeOnRead(): Unit = {
    val spark = SparkSession.builder.appName("hudi insert").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    // 读取文本文件转换为df
    val insertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/test_insert_data.txt")
    insertData.write.format("org.apache.hudi")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 设置分区列
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
      // 设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")
      .mode(SaveMode.Overwrite)
      .save("/tmp/hudi_merge_on_read")
  }



merge on read 主要是要是加入option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)参数,其他修改删除操作和copy on write 类似,这里不一一列举。


5.2.2 同步至Hive
  @Test
  def hiveSyncMergeOnRead(): Unit = {
    val spark = SparkSession.builder.appName("delta hiveSync").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[3]").getOrCreate()
    val upsertData = Util.readFromTxtByLineToDf(spark, "/home/huangjing/soft/git/experiment/hudi-test/src/main/resources/hive_sync.txt")


    upsertData.write.format("org.apache.hudi")
      // 配置读时合并
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
      // 设置主键列名
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "rowkey")
      // 设置数据更新时间的列名
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "lastupdatedttm")
      // 分区列设置
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
      // 设置要同步的hive库名
      .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "hj_repl")
      // 设置要同步的hive表名
      .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, "test_partition_merge_on_read")
      // 设置数据集注册并同步到hive
      .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
      // 设置当分区变更时,当前数据的分区目录是否变更
      .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
      // 设置要同步的分区列名
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
      // 设置jdbc 连接同步
      .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://localhost:10000")
      // hudi表名称设置
      .option(HoodieWriteConfig.TABLE_NAME, "test_partition_merge_on_read")
      // 用于将分区字段值提取到Hive分区列中的类,这里我选择使用当前分区的值同步
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
      // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引 为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
      .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name())
      // 并行度参数设置
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save("/tmp/hudi_merge_on_read");
  }



与copy on write 操作一样,不同的是merge on read 会生成两个表后缀为_ro和_rt的外表。_ro为读优化视图,_rt为实时视图。


5.2.3 Hive查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)
/**
   * merge on read 实时视图查询
   */
  @Test
  def mergeOnReadRealtimeViewByHive(): Unit = {
    // 目标表
    val sourceTable = "test_partition_merge_on_read_rt"


    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val prop = new Properties()
    prop.put("user", "hive")
    prop.put("password", "hive")
    val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
    val stmt = conn.createStatement


    val rs = stmt.executeQuery("select * from " + sourceTable)
    val metaData = rs.getMetaData
    val count = metaData.getColumnCount




    while (rs.next()) {
      for (i <- 1 to count) {
        println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
      }
      println("-----------------------------------------------------------")
    }


    rs.close()
    stmt.close()
    conn.close()
  }




  /**
   * merge on read 读优化视图查询
   */
  @Test
  def mergeOnReadReadoptimizedViewByHive(): Unit = {
    // 目标表
    val sourceTable = "test_partition_merge_on_read_ro"


    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val prop = new Properties()
    prop.put("user", "hive")
    prop.put("password", "hive")
    val conn = DriverManager.getConnection("jdbc:hive2://localhost:10000/hj_repl", prop)
    val stmt = conn.createStatement


    val rs = stmt.executeQuery("select * from " + sourceTable)
    val metaData = rs.getMetaData
    val count = metaData.getColumnCount




    while (rs.next()) {
      for (i <- 1 to count) {
        println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
      }
      println("-----------------------------------------------------------")
    }


    rs.close()
    stmt.close()
    conn.close()
  }



5.2.4 Presto查询读优化视图(后缀_ro)和实时视图查询 (后缀_rt)
/**
   * presto merge on read 实时视图查询
   */
  @Test
  def mergeOnReadRealtimeViewByPresto(): Unit = {
    // 目标表
    val sourceTable = "test_partition_merge_on_read_rt"
    Class.forName("com.facebook.presto.jdbc.PrestoDriver")
    val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
    val stmt = conn.createStatement
    val rs = stmt.executeQuery("select * from  " + sourceTable)
    val metaData = rs.getMetaData
    val count = metaData.getColumnCount


    while (rs.next()) {
      for (i <- 1 to count) {
        println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
      }
      println("-----------------------------------------------------------")
    }


    rs.close()
    stmt.close()
    conn.close()
  }




  /**
   * presto merge on read 读优化视图查询
   */
  @Test
  def mergeOnReadReadoptimizedViewByPresto(): Unit = {
    // 目标表
    val sourceTable = "test_partition_merge_on_read_ro"
    Class.forName("com.facebook.presto.jdbc.PrestoDriver")
    val conn = DriverManager.getConnection("jdbc:presto://hj:7670/hive/hj_repl", "hive", null)
    val stmt = conn.createStatement
    val rs = stmt.executeQuery("select * from  " + sourceTable)
    val metaData = rs.getMetaData
    val count = metaData.getColumnCount


    while (rs.next()) {
      for (i <- 1 to count) {
        println(metaData.getColumnName(i) + ":" + rs.getObject(i).toString)
      }
      println("-----------------------------------------------------------")
    }


    rs.close()
    stmt.close()
    conn.close()
  }



6. 问题整理
1. merg on read 问题
merge on read 要配置option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)才会生效,配置为option(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME, HoodieTableType.MERGE_ON_READ.name())将不会生效。

阅读(3909) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~