更新于:

模块B 1.1

模块B第1小题

抽取 ds_db01 库中 order_master 的增量数据进入 Hive 的 ods 库中表 order_master 。根据 ods .order_master 表中 modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为 etl_date ,类型为 String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用 hive cli执行 show partitions ods .order_master 命令,将执行结果截图粘贴至客户端桌面【Release\模块B提交结果.docx】中对应的任务序号下;

做题之前的技术流程:

  1. 自行备份mysql数据库的ds01为ds02,,防止误操作:
1
2
3
4
[root@bigdata1 ~]# mysqldump -h 192.168.152.82 -u root -p --databases ds_db01 > /root/ds_db01.sql
[root@bigdata1 ~]# sed -i 's/ds_db01/ds_db02/g' ds_db01.sql
[root@bigdata1 ~]# mysql -u root -p
mysql> source /root/ds_db01.sql;

可以在workbench中看到在mysql中有一个ds_db02,和ds_db01一模一样

  1. hive的本地文件下载 教师已经将原始hive备份到hdfs的bbb中了
1
2
3
[root@bigdata1 ~]# hadoop fs -get /bbb ./

[root@bigdata1 ~]# scp -r ./bbb ubuntu@192.168.152.136:/home/ubuntu
  1. 启动所有环境程序(看虚拟机快照注释)

然后把bbb文件夹拷贝在工程目录下即可查看hive数据文件

任务一

第1小题

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.Properties

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object test {
def main(args: Array[String]): Unit = {
// 关闭akka和org的日志
Logger.getLogger("akka").setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.OFF)

// 创建SparkSession
val spark=SparkSession.builder().appName("aaa")
.enableHiveSupport()
.config("hive.metastore.uris","thrift://192.168.152.82:9083")
.getOrCreate()

// 设置数据库连接属性
val properties=new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123456")

// 从mysql中读取数据
val df01=spark.read.jdbc("jdbc:mysql://192.168.152.82:3306/ds_db01","order_master",properties)
df01.show(5,false)
df01.createOrReplaceTempView("t_df01")

// 从hive中读取数据
val df02=spark.sql("select * from ods.order_master")
df02.show(5)
df02.createOrReplaceTempView("t_df02")

// 筛选出需要插入的数据
val df03=spark.sql("select *,'20231222' as etl_date from t_df01 where modified_time not in (select modified_time from t_df02)")
df03.show(5)
df03.createOrReplaceTempView("t_df03")

// 设置hive的分区模式
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
// 将数据插入到hive中
df03.write.insertInto("ods.order_master")


}

}

使用 IDEA maven package打包后上传到 bigdata1 执行:

1
2
3
4
spark-submit --class 方法名 --master yarn 文件路径

# 例:
spark-submit --class org.apache.spark.examples.SparkPi --master yarn /opt/module/spark/examples/jars/spark-examples_2.12-3.1.1.jar

答案:

1
2
3
4
hive> show partitions ods.order_master;
OK
etl_date=20231222
Time taken: 0.1 seconds, Fetched: 1 row(s)