object test { def main(args: Array[String]): Unit = { // 关闭akka和org的日志 Logger.getLogger("akka").setLevel(Level.OFF) Logger.getLogger("org").setLevel(Level.OFF)
// 创建SparkSession val spark=SparkSession.builder().appName("aaa") .enableHiveSupport() .config("hive.metastore.uris","thrift://192.168.152.82:9083") .getOrCreate()
// 设置数据库连接属性 val properties=newProperties() properties.setProperty("user","root") properties.setProperty("password","123456")
// 从mysql中读取数据 val df01=spark.read.jdbc("jdbc:mysql://192.168.152.82:3306/ds_db01","order_master",properties) df01.show(5,false) df01.createOrReplaceTempView("t_df01")
// 从hive中读取数据 val df02=spark.sql("select * from ods.order_master") df02.show(5) df02.createOrReplaceTempView("t_df02")
// 筛选出需要插入的数据 val df03=spark.sql("select *,'20231222' as etl_date from t_df01 where modified_time not in (select modified_time from t_df02)") df03.show(5) df03.createOrReplaceTempView("t_df03")