> 技术文档 > Spark SQL 使用及进阶详解

Spark SQL 使用及进阶详解


一、Spark SQL 简介

Spark SQL 是 Apache Spark 中的一个模块,它提供了一种统一的方式来处理结构化和半结构化数据。Spark SQL 允许用户使用 SQL 语句或者编程接口(如 Scala、Java、Python 和 R)来查询数据,同时支持多种数据源,包括 Hive 表、JSON 文件、Parquet 文件等。它将 SQL 查询与 Spark 的分布式计算能力相结合,能够高效地处理大规模数据。

二、在 Linux 上使用 Spark SQL

(一)环境准备

1. 安装 Java

Spark 基于 Java 开发,因此需要先安装 Java。以 CentOS 系统为例,可以使用以下命令安装 OpenJDK 1.8:

bash

sudo yum install -y java-1.8.0-openjdk-devel

安装完成后,配置 JAVA_HOME 环境变量,编辑 /etc/profile 文件:

bash

sudo vim /etc/profile

在文件末尾添加以下内容:

plaintext

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.xxx.x86_64 # 根据实际安装路径修改export PATH=$PATH:$JAVA_HOME/bin

使配置生效:

bash

source /etc/profile
2. 安装 Spark

从 Apache Spark 官方网站(https://spark.apache.org/downloads.html)下载适合的版本,以 Spark 3.3.2 为例:

bash

wget https://downloads.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgztar -zxvf spark-3.3.2-bin-hadoop3.tgzmv spark-3.3.2-bin-hadoop3 /opt/spark

配置 SPARK_HOME 环境变量,编辑 /etc/profile 文件:

bash

sudo vim /etc/profile

在文件末尾添加以下内容:

plaintext

export SPARK_HOME=/opt/sparkexport PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

使配置生效:

bash

source /etc/profile

(二)在 Linux 客户端使用 spark-sql 命令的常见方式

1. 交互式模式

直接输入 spark-sql 命令进入交互式模式,在该模式下可以逐行输入 SQL 语句并立即执行,适用于临时的查询和调试。例如:

bash

spark-sql

进入交互式界面后,输入 SQL 语句,如查询系统内置函数:

sql

SHOW FUNCTIONS;

输入完成后按回车键执行,结果会立即显示。若要退出交互式模式,输入 :q 并回车。

2. 执行 SQL 文件

如果有一系列的 SQL 语句存储在一个文件中,可以使用 -f 参数指定文件路径来执行该文件中的所有 SQL 语句。
假设存在一个名为 queries.sql 的文件,内容如下:

sql

-- queries.sqlCREATE TEMPORARY VIEW test_view AS SELECT 1 AS col1, \'test\' AS col2;SELECT * FROM test_view;

在 Linux 终端中使用以下命令执行该文件:

bash

spark-sql -f queries.sql

spark-sql 会按顺序执行文件中的每一条 SQL 语句,并将执行结果输出到终端。

3. 传递参数执行 SQL

使用 -e 参数可以直接在命令行中指定要执行的 SQL 语句。例如,要查询一个已经存在的表 people 中的所有记录,可以使用以下命令:

bash

spark-sql -e \"SELECT * FROM people;\"

此方式适合快速执行单条或简单的 SQL 语句,无需进入交互式模式。

4. 指定配置参数

在执行 spark-sql 命令时,可以通过 --conf 参数指定 Spark 或 Spark SQL 的配置参数。例如,要增加内存分配,可以设置 spark.driver.memory 和 spark.executor.memory

bash

spark-sql --conf spark.driver.memory=2g --conf spark.executor.memory=4g -e \"SELECT COUNT(*) FROM large_table;\"

这样在执行查询大表的操作时,有更多的内存资源可用,有助于提高执行效率。

5. 连接到远程集群

如果 Spark 集群是远程部署的,可以使用 --master 参数指定集群的主节点地址。例如,连接到一个 YARN 集群:

bash

spark-sql --master yarn -e \"SELECT * FROM distributed_table;\"

或者连接到一个独立的 Spark 集群:

bash

spark-sql --master spark://:7077 -e \"SELECT * FROM distributed_table;\"

(三)基本操作示例

1. 加载数据

假设我们有一个 CSV 文件 data.csv,内容如下:

plaintext

id,name,age1,Alice,252,Bob,303,Charlie,35

在 Spark SQL 客户端中,使用以下命令加载数据:

sql

CREATE TEMPORARY VIEW peopleUSING csvOPTIONS (path \'/path/to/data.csv\', header \'true\');
2. 查询数据

查询 people 视图中的所有数据:

sql

SELECT * FROM people;

查询年龄大于 30 的人员信息:

sql

SELECT * FROM people WHERE age > 30;
3. 保存查询结果

将查询结果保存为 Parquet 文件:

sql

CREATE TABLE resultUSING parquetAS SELECT * FROM people WHERE age > 30;

三、Spark SQL 基础使用

(一)创建 SparkSession

在编写 Spark SQL 代码时,首先需要创建一个 SparkSession 对象,它是与 Spark SQL 交互的入口点。

Python 示例

python

from pyspark.sql import SparkSession# 创建 SparkSessionspark = SparkSession.builder \\ .appName(\"SparkSQLExample\") \\ .getOrCreate()
Scala 示例

scala

import org.apache.spark.sql.SparkSession// 创建 SparkSessionval spark = SparkSession.builder() .appName(\"SparkSQLExample\") .getOrCreate()

(二)数据加载

Spark SQL 支持从多种数据源加载数据,以下是一些常见的示例。

1. 从 CSV 文件加载数据

python

# Python 示例df = spark.read.csv(\"path/to/your/file.csv\", header=True, inferSchema=True)

scala

// Scala 示例val df = spark.read .option(\"header\", \"true\") .option(\"inferSchema\", \"true\") .csv(\"path/to/your/file.csv\")
2. 从 JSON 文件加载数据

python

# Python 示例df = spark.read.json(\"path/to/your/file.json\")

scala

// Scala 示例val df = spark.read.json(\"path/to/your/file.json\")
3. 从 Parquet 文件加载数据

python

# Python 示例df = spark.read.parquet(\"path/to/your/file.parquet\")

scala

// Scala 示例val df = spark.read.parquet(\"path/to/your/file.parquet\")

(三)基本查询操作

1. 显示数据

python

# Python 示例df.show()

scala

// Scala 示例df.show()
2. 选择列

python

# Python 示例selected_df = df.select(\"column1\", \"column2\")selected_df.show()

scala

// Scala 示例val selectedDF = df.select(\"column1\", \"column2\")selectedDF.show()
3. 过滤数据

python

# Python 示例filtered_df = df.filter(df[\"column1\"] > 10)filtered_df.show()

scala

// Scala 示例val filteredDF = df.filter($\"column1\" > 10)filteredDF.show()
4. 分组聚合

python

# Python 示例from pyspark.sql.functions import avggrouped_df = df.groupBy(\"column1\").agg(avg(\"column2\"))grouped_df.show()

scala

// Scala 示例import org.apache.spark.sql.functions._val groupedDF = df.groupBy(\"column1\").agg(avg(\"column2\"))groupedDF.show()

(四)使用 SQL 语句查询

可以将 DataFrame 注册为临时视图,然后使用 SQL 语句进行查询。

python

# Python 示例df.createOrReplaceTempView(\"my_table\")result = spark.sql(\"SELECT * FROM my_table WHERE column1 > 10\")result.show()

scala

// Scala 示例df.createOrReplaceTempView(\"my_table\")val result = spark.sql(\"SELECT * FROM my_table WHERE column1 > 10\")result.show()

四、Spark SQL 进阶使用

(一)复杂查询

1. 连接查询

python

# Python 示例from pyspark.sql import SparkSessionspark = SparkSession.builder.appName(\"JoinExample\").getOrCreate()df1 = spark.createDataFrame([(1, \"Alice\"), (2, \"Bob\")], [\"id\", \"name\"])df2 = spark.createDataFrame([(1, 25), (2, 30)], [\"id\", \"age\"])joined_df = df1.join(df2, on=\"id\", how=\"inner\")joined_df.show()

scala

// Scala 示例import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().appName(\"JoinExample\").getOrCreate()val df1 = spark.createDataFrame(Seq((1, \"Alice\"), (2, \"Bob\"))).toDF(\"id\", \"name\")val df2 = spark.createDataFrame(Seq((1, 25), (2, 30))).toDF(\"id\", \"age\")val joinedDF = df1.join(df2, Seq(\"id\"), \"inner\")joinedDF.show()
2. 子查询

python

# Python 示例df.createOrReplaceTempView(\"my_table\")subquery = spark.sql(\"SELECT * FROM my_table WHERE column1 IN (SELECT column1 FROM my_table WHERE column2 > 20)\")subquery.show()

scala

// Scala 示例df.createOrReplaceTempView(\"my_table\")val subquery = spark.sql(\"SELECT * FROM my_table WHERE column1 IN (SELECT column1 FROM my_table WHERE column2 > 20)\")subquery.show()

(二)函数使用

1. 内置函数

Spark SQL 提供了丰富的内置函数,如字符串函数、数学函数、日期函数等。

python

# Python 示例from pyspark.sql.functions import upperdf_with_upper_name = df.withColumn(\"upper_name\", upper(df[\"name\"]))df_with_upper_name.show()

scala

// Scala 示例import org.apache.spark.sql.functions._val dfWithUpperName = df.withColumn(\"upper_name\", upper($\"name\"))dfWithUpperName.show()
2. 自定义函数(UDF)

可以创建自定义函数来满足特定的业务需求。

python

# Python 示例from pyspark.sql.functions import udffrom pyspark.sql.types import IntegerTypedef square(x): return x * xsquare_udf = udf(square, IntegerType())df_with_square = df.withColumn(\"square_column\", square_udf(df[\"column1\"]))df_with_square.show()

scala

// Scala 示例import org.apache.spark.sql.functions.udfval square = udf((x: Int) => x * x)val dfWithSquare = df.withColumn(\"square_column\", square($\"column1\"))dfWithSquare.show()

(三)数据写入

Spark SQL 支持将数据写入多种数据源,以下是一些常见的示例。

1. 写入 CSV 文件

python

# Python 示例df.write.csv(\"path/to/output.csv\", header=True)

scala

// Scala 示例df.write .option(\"header\", \"true\") .csv(\"path/to/output.csv\")
2. 写入 Parquet 文件

python

# Python 示例df.write.parquet(\"path/to/output.parquet\")

scala

// Scala 示例df.write.parquet(\"path/to/output.parquet\")

(四)窗口函数

窗口函数用于在查询结果的特定窗口内进行计算,常见的窗口函数有 row_number()rank()dense_rank() 等。

python

# Python 示例from pyspark.sql.window import Windowfrom pyspark.sql.functions import row_numberwindow_spec = Window.partitionBy(\"column1\").orderBy(\"column2\")df_with_row_number = df.withColumn(\"row_number\", row_number().over(window_spec))df_with_row_number.show()

scala

// Scala 示例import org.apache.spark.sql.expressions.Windowimport org.apache.spark.sql.functions.row_numberval windowSpec = Window.partitionBy(\"column1\").orderBy(\"column2\")val dfWithRowNumber = df.withColumn(\"row_number\", row_number().over(windowSpec))dfWithRowNumber.show()

五、性能优化

(一)数据缓存

使用 cache() 或 persist() 方法将经常使用的 DataFrame 缓存到内存中,避免重复计算。

python

# Python 示例df.cache()

scala

// Scala 示例df.cache()

(二)分区和排序

合理对数据进行分区和排序,以提高数据处理效率。例如,在进行连接操作时,确保连接键在两个 DataFrame 中具有相同的分区和排序方式。

(三)广播变量

对于小表,可以使用广播变量将其广播到每个节点,减少数据传输开销。

python

# Python 示例from pyspark.sql.functions import broadcastsmall_df = spark.createDataFrame([(1, \"A\"), (2, \"B\")], [\"id\", \"value\"])big_df = spark.createDataFrame([(1, 100), (2, 200)], [\"id\", \"amount\"])joined_df = big_df.join(broadcast(small_df), on=\"id\", how=\"inner\")

scala

// Scala 示例import org.apache.spark.sql.functions.broadcastval smallDF = spark.createDataFrame(Seq((1, \"A\"), (2, \"B\"))).toDF(\"id\", \"value\")val bigDF = spark.createDataFrame(Seq((1, 100), (2, 200))).toDF(\"id\", \"amount\")val joinedDF = bigDF.join(broadcast(smallDF), Seq(\"id\"), \"inner\")

(四)调整并行度

根据集群资源和数据量,调整 spark.sql.shuffle.partitions 参数,以优化数据洗牌操作的并行度。

南京中国旅行社