object test { def main(args: Array[String]): Unit = { Logger.getLogger("akka").setLevel(Level.OFF) Logger.getLogger("org").setLevel(Level.OFF)
val spark=SparkSession.builder().appName("aaa") .enableHiveSupport() .config("hive.metastore.uris","thrift://192.168.152.82:9083") .getOrCreate()
val properties=newProperties() properties.setProperty("user","root") properties.setProperty("password","123456")
val df01=spark.read.jdbc("jdbc:mysql://192.168.152.82:3306/ds_db01","coupon_use",properties) df01.show(5,false) df01.createOrReplaceTempView("t_df01")
//对于NULL空值处理特别重要,稍有不慎,满盘皆输,要观察好再写入 def check01(a:AnyVal,b:AnyVal,c:AnyVal):String={ var result="XXX" var a1=a.toString var b1=b.toString var c1=c.toString if(a1=="NULL") a1="0" if (b1=="NULL") b1="0" if (c1=="NULL") c1="0" if (a1.toInt>b1.toInt) result=a1 else result=b1 if(result.toInt<c1.toInt) result=c1 result } spark.udf.register("check01",check01 _)
val df02=spark.sql("select *,check01(get_time,used_time,pay_time) as max_time from ods.coupon_use") df02.show(5) df02.createOrReplaceTempView("t_df02")
val df03=spark.sql("select *,'20231222' as etl_date,check01(get_time,used_time,pay_time) as max_time from t_df01 ") df03.show(5) df03.createOrReplaceTempView("t_df03")
val df04=spark.sql("select * from t_df03 where max_time not in (select max_time from t_df02)") df04.show(5)