模块B模板代码
做题之前的技术流程:
- 自行备份mysql数据库的ds01为ds02,,防止误操作:
1 2 3 4
| [root@bigdata1 ~] [root@bigdata1 ~] [root@bigdata1 ~] mysql> source /root/ds_db01.sql;
|
可以在workbench中看到在mysql中有一个ds_db02,和ds_db01一模一样
- hive的本地文件下载
教师已经将原始hive备份到hdfs的bbb中了
1 2
| [root@bigdata1 ~] [root@bigdata1 ~]
|
- 启动所有环境程序(看虚拟机快照注释)
然后把bbb文件夹拷贝在工程目录下即可查看hive数据文件
任务一
- 抽取
ds_db01
库中 table01
的增量数据进入 Hive 的 ods
库中表 table01
。根据 ods .table01
表中 modified_time
作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为 etl_date
,类型为 String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用 hive cli执行 show partitions ods.table01
命令,将执行结果截图粘贴至客户端桌面【Release\模块B提交结果.docx】中对应的任务序号下;
代码示例(可根据题目要求更改变量,适用于模块B任务一离线数据采集的同类型题型):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession
import java.util.Properties
object Demo01 { def main(args: Array[String]): Unit = { Logger.getLogger("akka").setLevel(Level.OFF) Logger.getLogger("org").setLevel(Level.OFF)
val spark_link = "192.168.152.82:9083" val mysql_link = "192.168.152.82:3306/ds_db01" val need_table = "table01" val need_date = "20240526" val need_change = "etl_table" val need_value = "modified_time"
val spark = SparkSession.builder().appName("aaa") .enableHiveSupport() .config("hive.metastore.uris", s"thrift://$spark_link") .getOrCreate()
val pro = new Properties() pro.setProperty("user", "root") pro.setProperty("password", "123456")
val df01 = spark.read.jdbc(s"jdbc:mysql://$mysql_link", s"$need_table", pro) df01.show(5, false) df01.createOrReplaceTempView("t_df01")
val df02 = spark.sql(s"select * from ods.$need_table") df02.show(5) df02.createOrReplaceTempView("t_df02")
val df03 = spark.sql( s"select *, '$need_date' as $need_change from t_df01 " + s"where $need_value not in " + s"(select $need_value from t_df02)" ) df03.show(5) df03.createOrReplaceTempView("t_df03")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict") df03.write.insertInto(s"ods.$need_table") } }
|
使用 IDEA maven package打包后上传到 bigdata1 执行:
1 2 3 4
| spark-submit --master yarn --class 方法名 文件路径
spark-submit --master yarn --class org.apache.spark.examples.SparkPi /opt/module/spark/examples/jars/spark-examples_2.12-3.1.1.jar
|
答案:
1 2 3 4
| hive> show partitions ods.table01; OK etl_date=20231222 Time taken: 0.1 seconds, Fetched: 1 row(s)
|