更新于:

模块B模板代码

做题之前的技术流程:

  1. 自行备份mysql数据库的ds01为ds02,,防止误操作:
1
2
3
4
[root@bigdata1 ~]# mysqldump -h 192.168.152.82 -u root -p --databases ds_db01 > /root/ds_db01.sql
[root@bigdata1 ~]# sed -i 's/ds_db01/ds_db02/g' ds_db01.sql
[root@bigdata1 ~]# mysql -u root -p
mysql> source /root/ds_db01.sql;

可以在workbench中看到在mysql中有一个ds_db02,和ds_db01一模一样

  1. hive的本地文件下载 教师已经将原始hive备份到hdfs的bbb中了
1
2
[root@bigdata1 ~]# hadoop fs -get /bbb ./
[root@bigdata1 ~]# scp -r ./bbb ubuntu@192.168.152.136:/home/ubuntu
  1. 启动所有环境程序(看虚拟机快照注释)

然后把bbb文件夹拷贝在工程目录下即可查看hive数据文件

任务一

  1. 抽取 ds_db01 库中 table01 的增量数据进入 Hive 的 ods 库中表 table01 。根据 ods .table01 表中 modified_time 作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为 etl_date ,类型为 String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用 hive cli执行 show partitions ods.table01 命令,将执行结果截图粘贴至客户端桌面【Release\模块B提交结果.docx】中对应的任务序号下;

代码示例(可根据题目要求更改变量,适用于模块B任务一离线数据采集的同类型题型):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

import java.util.Properties

object Demo01 {
def main(args: Array[String]): Unit = {
// 关闭akka和org的日志
Logger.getLogger("akka").setLevel(Level.OFF)
Logger.getLogger("org").setLevel(Level.OFF)

// 创建题目所需变量
val spark_link = "192.168.152.82:9083"
val mysql_link = "192.168.152.82:3306/ds_db01"
val need_table = "table01" //题目要求表名
val need_date = "20240526" // 题目要求日期
val need_change = "etl_table" // 题目要求更改字段名
val need_value = "modified_time" // 题目要求筛选字段名

// 创建SparkSession
val spark = SparkSession.builder().appName("aaa")
.enableHiveSupport()
.config("hive.metastore.uris", s"thrift://$spark_link")
.getOrCreate()

// 设置数据库连接属性
val pro = new Properties()
pro.setProperty("user", "root")
pro.setProperty("password", "123456")

// 从mysql中读取数据
val df01 = spark.read.jdbc(s"jdbc:mysql://$mysql_link",
s"$need_table", pro)
df01.show(5, false)
df01.createOrReplaceTempView("t_df01")

// 从hive中读取数据
val df02 = spark.sql(s"select * from ods.$need_table")
df02.show(5)
df02.createOrReplaceTempView("t_df02")

// 筛选出需要插入的数据
val df03 = spark.sql(
s"select *, '$need_date' as $need_change from t_df01 " +
s"where $need_value not in " +
s"(select $need_value from t_df02)"
)
df03.show(5)
df03.createOrReplaceTempView("t_df03")

// 设置hive的分区模式 如果执行报错会提示设置,无需背过
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
// 将数据插入到hive中
df03.write.insertInto(s"ods.$need_table")
}
}

使用 IDEA maven package打包后上传到 bigdata1 执行:

1
2
3
4
spark-submit --master yarn --class 方法名 文件路径

# 例:
spark-submit --master yarn --class org.apache.spark.examples.SparkPi /opt/module/spark/examples/jars/spark-examples_2.12-3.1.1.jar

答案:

1
2
3
4
hive> show partitions ods.table01;
OK
etl_date=20231222
Time taken: 0.1 seconds, Fetched: 1 row(s)