头歌-Spark SQL 多数据源操作(Scala)_spark sql 多数据源操作(scala)
第1关:加载与保存操作
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地文件 file:///data/bigfiles/demo.json,根据年龄字段 age 设置降序,输出结果。
demo.json 文件内容如下所示:
{\"name\": \"zhangsan\", \"age\": 20, \"sex\": \"m\"},
{\"name\": \"lisi\", \"age\": 21, \"sex\": \"m\"},
{\"name\": \"tiantian\", \"age\": 22, \"sex\": \"f\"},
{\"name\": \"lihua\", \"age\": 23, \"sex\": \"f\"},
{\"name\": \"zhaoliu\", \"age\": 24, \"sex\": \"m\"},
{\"name\": \"liguanqing\", \"age\": 25, \"sex\": \"f\"},
{\"name\": \"zhangqi\", \"age\": 26, \"sex\": \"m\"},
{\"name\": \"zhaoai\", \"age\": 27, \"sex\": \"m\"},
{\"name\": \"wangjiu\", \"age\": 28, \"sex\": \"f\"}
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object First_Question { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(\"First_Question\") .master(\"local[*]\") .getOrCreate() /******************* Begin *******************/ val df: DataFrame = spark.read.json(\"file:///data/bigfiles/demo.json\") val sortedDf = df.orderBy(df(\"age\").desc) sortedDf.show() /******************* End *******************/ spark.stop() }}
第2关:Parquet 格式文件
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,根据下列要求,完善程序。
读取本地文件 file:///data/bigfiles/demo.json,使用 Parquet 完成分区,列名为 student=1,保存到本地路径file:///result/下。
读取本地文件 file:///data/bigfiles/demo2.json,使用 Parquet 完成分区,列名为 student=2,保存到本地路径file:///result/下。
demo.json 文件内容如下所示:
{\"name\": \"zhangsan\", \"age\": 20, \"sex\": \"m\"},
{\"name\": \"lisi\", \"age\": 21, \"sex\": \"m\"},
{\"name\": \"tiantian\", \"age\": 22, \"sex\": \"f\"},
{\"name\": \"lihua\", \"age\": 23, \"sex\": \"f\"},
{\"name\": \"zhaoliu\", \"age\": 24, \"sex\": \"m\"},
{\"name\": \"liguanqing\", \"age\": 25, \"sex\": \"f\"},
{\"name\": \"zhangqi\", \"age\": 26, \"sex\": \"m\"},
{\"name\": \"zhaoai\", \"age\": 27, \"sex\": \"m\"},
{\"name\": \"wangjiu\", \"age\": 28, \"sex\": \"f\"}
demo2.json 文件内容如下所示:
{\"name\": \"hongkong\", \"age\": 20, \"sex\": \"m\"},
{\"name\": \"kulu\", \"age\": 21, \"sex\": \"m\"},
{\"name\": \"huxiaotian\", \"age\": 22, \"sex\": \"f\"},
{\"name\": \"yueming\", \"age\": 23, \"sex\": \"f\"},
{\"name\": \"wangsan\", \"age\": 24, \"sex\": \"m\"},
{\"name\": \"zhaojiu\", \"age\": 25, \"sex\": \"f\"},
{\"name\": \"wangqiqi\", \"age\": 26, \"sex\": \"m\"},
{\"name\": \"wangxiantian\", \"age\": 27, \"sex\": \"m\"},
{\"name\": \"zhaoba\", \"age\": 28, \"sex\": \"f\"}
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Second_Question { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(\"Second_Question\") .master(\"local[*]\") .getOrCreate() /******************* Begin *******************/ spark.read.json(\"file:///data/bigfiles/demo.json\").write.parquet(\"file:///result/student=1\") spark.read.json(\"file:///data/bigfiles/demo2.json\").write.parquet(\"file:///result/student=2\") /******************* End *******************/ spark.stop() }}
第3关:ORC 格式文件
编程要求
根据下列要求,完善程序。
创建 Orc 格式的 Hive 数据表 student,添加字段id(int),name(string),age(int),class(string)。
按顺序插入如下数据:
1001,\"王刚\",19,\"大数据一班\"
1002,\"李虹\",18,\"大数据一班\"
1003,\"张子萱\",20,\"大数据一班\"
1004,\"赵云\",18,\"大数据一班\"
1005,\"李晓玲\",19,\"大数据一班\"
1006,\"张惠\",18,\"大数据二班\"
1007,\"秦散\",19,\"大数据二班\"
1008,\"王丽\",18,\"大数据二班\"
1009,\"田忌\",20,\"大数据二班\"
1010,\"张花\",18,\"大数据二班\"
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,编写 spark sql 程序,读取创建的 student 表并按字段 id 升序输出。
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Third_Question { def main(args: Array[String]): Unit = { // 创建 SparkSession,并启用 Hive 支持 val spark: SparkSession = SparkSession .builder() .appName(\"Third_Question\") .master(\"local[*]\") .enableHiveSupport() // 启用 Hive 支持 .getOrCreate() /******************* Begin *******************/ // 创建 Hive 数据表 student spark.sql( \"\"\" |CREATE TABLE IF NOT EXISTS student ( | id INT, | name STRING, | age INT, | class STRING |) |STORED AS ORC \"\"\".stripMargin) // 插入数据到 student 表 spark.sql( \"\"\" |INSERT INTO student VALUES |(1001, \'王刚\', 19, \'大数据一班\'), |(1002, \'李虹\', 18, \'大数据一班\'), |(1003, \'张子萱\', 20, \'大数据一班\'), |(1004, \'赵云\', 18, \'大数据一班\'), |(1005, \'李晓玲\', 19, \'大数据一班\'), |(1006, \'张惠\', 18, \'大数据二班\'), |(1007, \'秦散\', 19, \'大数据二班\'), |(1008, \'王丽\', 18, \'大数据二班\'), |(1009, \'田忌\', 20, \'大数据二班\'), |(1010, \'张花\', 18, \'大数据二班\') \"\"\".stripMargin) // 查询并按 id 字段升序输出 student 表数据 val studentDF: DataFrame = spark.sql(\"SELECT * FROM student ORDER BY id ASC\") studentDF.show() /******************* End *******************/ spark.stop() }}
第4关:JSON 格式文件
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地文件 file:///data/bigfiles/test.json,不改变原数据排列顺序进行输出。
test.json 文件内容如下所示:
{\"id\":1001,\"name\":\"王刚\",\"age\":19,\"class\":\"大数据一班\"},
{\"id\":1002,\"name\":\"李虹\",\"age\":18,\"class\":\"大数据一班\"},
{\"id\":1003,\"name\":\"张子萱\",\"age\":20,\"class\":\"大数据一班\"},
{\"id\":1004,\"name\":\"赵云\",\"age\":18,\"class\":\"大数据一班\"},
{\"id\":1005,\"name\":\"李晓玲\",\"age\":19,\"class\":\"大数据一班\"},
{\"id\":1006,\"name\":\"张惠\",\"age\":18,\"class\":\"大数据二班\"},
{\"id\":1007,\"name\":\"秦散\",\"age\":19,\"class\":\"大数据二班\"},
{\"id\":1008,\"name\":\"王丽\",\"age\":18,\"class\":\"大数据二班\"},
{\"id\":1009,\"name\":\"田忌\",\"age\":20,\"class\":\"大数据二班\"},
{\"id\":1010,\"name\":\"张花\",\"age\":18,\"class\":\"大数据二班\"}
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Forth_Question { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(\"Forth_Question\") .master(\"local[*]\") .getOrCreate() /******************* Begin *******************/ // 读取 JSON 文件 val df = spark.read.json(\"file:///data/bigfiles/test.json\") df.select(\"id\", \"name\", \"age\", \"class\").show() /******************* End *******************/ spark.stop() }}
第5关:JDBC 操作数据库
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,完善程序。读取本地 csv 文件 file:///data/bigfiles/job58_data.csv(有表头),将加载的数据以覆盖的方式保存到本地 Mysql 数据库的 work.job_data 表中,数据库连接信息如下:
账号:root
密码:123123
端口:3306
注意设置 useSSL=false。
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Fifth_Question { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(\"Fifth_Question\") .master(\"local[*]\") .getOrCreate() /******************* Begin *******************/ // 1. 读取本地 CSV 文件 val df: DataFrame = spark.read .option(\"header\", \"true\") .csv(\"file:///data/bigfiles/job58_data.csv\") // 2. MySQL 连接信息 val jdbcUrl = \"jdbc:mysql://localhost:3306/work?useSSL=false\" val dbProperties = new java.util.Properties() dbProperties.setProperty(\"user\", \"root\") dbProperties.setProperty(\"password\", \"123123\") // 3. 将数据写入 MySQL 数据库的 work.job_data 表中,覆盖原有数据 df.write .mode(SaveMode.Overwrite) .jdbc(jdbcUrl, \"job_data\", dbProperties) /******************* End *******************/ spark.stop() }}
第6关:Hive 表操作
编程要求
打开右侧代码文件窗口,在 Begin 至 End 区域补充代码,根据下列要求,完善程序。
在 Hive 中创建数据表 employee,添加字段eid(string),ename(string),age(int),part(string)。
插入如下数据:
\"A568952\",\"王晓\",25,\"财务部\"
\"B256412\",\"张天\",28,\"人事部\"
\"C125754\",\"田笑笑\",23,\"销售部\"
\"D265412\",\"赵云\",24,\"研发部\"
\"F256875\",\"李姿姿\",26,\"后勤部\"
编写 spark sql 程序,直接采用 Spark on Hive 的方式读取创建的 employee 表并按字段 eid 升序输出。
开始任务前,注意先启动 Hadoop 与 Hive 环境:start-all.sh、nohup hive --service metastore &
import org.apache.spark.sql.{DataFrame, SparkSession}object Sixth_Question { def main(args: Array[String]): Unit = { /******************* Begin *******************/ try { // 创建SparkSession,启用Hive支持 val spark = SparkSession.builder() .appName(\"ReadHiveTable\") .enableHiveSupport() .getOrCreate() import spark.implicits._ import spark.sql // 如果表不存在则创建 if (!spark.catalog.tableExists(\"employee\")) { sql(\"\"\" CREATE TABLE employee ( eid STRING, ename STRING, age INT, part STRING ) \"\"\") // 准备数据 val employeeData = Seq( (\"A568952\",\"王晓\",25,\"财务部\"), (\"B256412\",\"张天\",28,\"人事部\"), (\"C125754\",\"田笑笑\",23,\"销售部\"), (\"D265412\",\"赵云\",24,\"研发部\"), (\"F256875\",\"李姿姿\",26,\"后勤部\") ) // 转换为DataFrame并写入Hive表 employeeData.toDF(\"eid\", \"ename\", \"age\", \"part\") .write.mode(\"append\").saveAsTable(\"employee\") } // 读取Hive表并按eid升序排序 val employeeDF: DataFrame = sql(\"SELECT * FROM employee ORDER BY eid ASC\") // 显示结果 println(\"\") employeeDF.show() // 停止SparkSession spark.stop() } catch { case e: Exception => println(s\"程序执行出错: ${e.getMessage}\") e.printStackTrace() } /******************* End *******************/ }}