Hive 入门学习
一、Hive概述
Apache Hive是一款分布式SQL计算的工具,其主要功能是:将SQL语句翻译成MapReduce程序运行,处理位于HDFS上的结构化数据。
基于MapReduce构建分布式SQL执行引擎,主要需要两个功能组件:元数据管理和SQL解析器。
元数据管理:记录数据存储位置,数据结构以及数据描述等。
SQL解析器:其功能包括SQL分析,SQL到MapReduce程序的转换,提交MapReduce程序运行并收集执行结果。
二、Hive 基础架构
Hive提供了 Metastore 服务进程提供元数据管理功能。通常是存储在关系数据库如 mysql 中。Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等。
Hive Driver驱动程序(包括语法解析器、计划编译器、优化器、执行器)即SQL解析器,其主要完成 HQL 查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的査询计划存储在 HDFS 中,并在随后有执行引擎调用执行。这部分内容不是具体的服务进程,而是封装在Hive所依赖的Jar文件即Java代码中。
用户接口(包括CLI、JDBC/ODBC、WebGUl),其中CLl(command line interface)为shell命令行,Hive中的 Thrift 服务器允许外部客户端通过网络与Hive进行交互,类似于JDBC或ODBC协议。WebGUl是通过浏览器访问Hive。
三、Hive 部署
Hive是单机工具,只需部署在一台服务器即可。Hive虽然是单机的,但是它可以提交分布式运行的MapReduce程序运行。
Hive需要使用元数据服务,即需要提供一个关系型数据库。
1、安装MySQL数据库
2、配置Hadoop
3、下载并解压Hive
4、提供MySQL Driver包
5、配置Hive
6、初始化元数据库
7、启动Hive(使用Hadoop用户)
metastore服务
metastore服务的作用是为Hive CLI 或者 Hiveserver2提供元数据访问接口。
metastore有两种运行模式,分别为嵌入式模式和独立服务模式。
嵌入式模式:
独立服务模式:
生产环境中,不推荐使用嵌入式模式。因为其存在以下两个问题:
(1)嵌入式模式下,每个 Hive CLI 都需要直接连接元数据库,当 Hive CLI较多时,数据库压力会比较大。
(2)每个客户端都需要用户元数据库的读写权限,元数据库的安全得不到很好的保证
四、Hive 客户端
HiveServer2 服务
用户说明
问题:使用HiveServer2代理访问Hadoop集群的用户身份是谁?是HiveServer2的启动用户?还是客户端的登录用户?
解答:这是由hive.server2.enable.doAs 参数决定,该参数的含义是:是否启用 Hiveserver2 用户模拟的功能。若启用,则 Hiveserver2 会模拟成客户端的登录用户去访问 Hadoop 集群的数据,不启用,则 Hivesever2 会直接使用启动用户访问 Hadoop 集群数据。模拟用户的功能,默认是开启的。
未启用用户模拟功能:
开启用户模拟功能(生产环境,推荐开启用户模拟功能,因为开启后才能保证各用户之间的权限隔离):
启动HiveServer2 服务(提供的端口默认为10000)
通过客户端连接HiveServer2服务,可以操作Hive
比如,通过Hive自带的Beeline客户端连接
五、Hive 字段类型
Hive 中的字段类型可分为基本数据类型和复杂数据类型。
常用的基本数据类型
常用的复杂数据类型
类型转换
Hive 的基本数据类型可以做类型转换,转换的方式包括隐式转换以及显示转换。
隐式转换规则
(1)任何整数类型都可以隐式地转换为一个范围更广的类型,如 tinyint 可以转换成 int,int
可以转换成 bigint。
(2)所有整数类型、float 和 string 类型都可以隐式地转换成 double
(3)tinyint、smallint、int 都可以转换为 float。
(4)boolean 类型不可以转换为任何其它的类型
两个不同类型做计算,会将两者转为其共同能转的最小类型。
显式转换规则:
借助cast 函数完成显式的类型转换
语法: cast(expr as ) 例如:cast(\'1\' as int) + 2
六、Hive 使用
Hive 的HQL语法与SQL语法类似,具体的操作可以参考这篇博文:
Hive必须了解的技能有哪些?万字博客带你掌握Hive_bi 转hive需要掌握什么技能-CSDN博客
6.1 Hive 使用技巧
6.1.1 Hive 常用交互命令
hive -H 查看 hive命令的帮助信息
hive -e \"sql语句\" 可以非交互式执行sql
hive -f 文件名 可以读取文件中的sql执行
6.1.2 Hive 参数配置方式
hive>set; 查看当前所有的配置信息
参数配置的三种方式:
(1)配置文件方式,默认配置文件:hive-default.xml;用户自定义配置文件:hive-site.xml
注意:用户自定义配置会覆盖默认配置。另外,Hive 也会读入 Hadoop 的配置,因为Hive 是作为 Hadoop 的客户端启动的,Hive 的配置会覆盖 Hadoop 的配置。配置文件的设定对本机启动的所有 Hive 进程都有效。
(2)命令行参数方式
启动 Hive 时,可以在命令行添加-hiveconf param = value 来设定参数(仅对本次hive启动有效)
(3)参数声明方式(仅对当前进程有效)
可以在 HQL 中使用 SET 关键字设定参数,例如:
hive(default)> set mapreduce.job.reduces = 10;(不加值,可以查看对应的参数值)
上述三种设定方式的优先级依次递增。即配置文件 < 命令行参数 < 参数声明
6.1.3 Hive 常见属性配置
(1)hive 客户端显示当前库和表头
(2)hive 运行日志路径配置
hive 的log 默认存放在 /tmp/用户名/hive.log 路径的文件中
(3)hive 的JVM 堆内存设置
新版本的 Hive 启动的时候,默认申请的JVM 堆内存大小为 256M,JVM 堆内存申请的太小,导致后期开启本地模式,执行复杂的SQL时经常会报错。
6.2 数据库操作
数据库本质上就是在HDFS之上的文件夹
默认数据库的存放路径是HDFS的 /user/hive/warehouse(可以在配置文件中修改)内
查看数据库
show databases; 查看所以数据库
可以使用通配符查找数据库,例如: show databases like \"db*\"
* 表示任意个任意字符,| 表示或的关系
describe database extended 查看数据库更多信息
修改数据库
alter database set ;
alter database databse_name set location hdfs_path; 修改数据库存放的位置
需要注意的是:修改数据库 location不会改变当前己有表的路径信息,而只是改变后续创建的新表的默认的父目录。
6.3 数据表操作
创建表
直接建表法:
Hive中的数据表本质上就是在HDFS之上的文件夹
查询建表法(该语法允许用户利用 select 查询语句返回的结果,直接建表,表的结构和查询语句的结构保持一致,且保证包含 select 查询语句放回的内容,只能创建内部表):
create table NewTableBySelect as select * from test_create_table;
like 建表法(该语法允许用户复刻一张已经存在的表结构,与上述查询建表法的语法不同,该语法创建出来的表中不包含数据)
create table like_table like exisit_table;
内部表和外部表
外部表和数据是相互独立的
可以先有表,然后把数据移动到表指定的 location 中;
也可以先有数据,然后创建表,通过 location 指向数据
查看表类型:desc formatted [表名];
内外部表相互转换:
数据的导入和导出
使用 LOAD 语法,可以从外部将数据加载到Hive内,语法如下:
不使用 OVERWRITE 则会追加写入数据
使用 LOCAL 本质上是将本地文件上传到Hive表的存储目录里面
不使用LOCAL本质上是将HDFS的文件移动到Hive表的存储目录里面
除了使用LOAD 加载外部数据外,也可以通过SQL语句,从其他表中加载数据
insert overwrite 可以将Hive表中数据导出
Export & Import
Export 导出语句可将表的数据和元数据信息一并到处的 HDFS 路径,Import 可将 Export导出的内容导入 Hive,表的数据和元数据信息都会恢复。Export 和Import 可用于两个 Hive实例之间的数据迁移。
#导出export table tablename to \'export_target_path\'#导入import [external] table new_or_original tablename from \'source_path\' [location \'import_target_path\']
使用Hive脚本导出数据
分区表
分区表:将大文件表按照字段(时间,如年月日等)切分成多个小文件,这样hive操作小文件就容易的多。
每一个分区,是一个文件夹(没有分区的表就是一个文件夹,分区的表就是在原有表文件夹下划分多个子文件夹),比如按月分区,则相同月份的数据存到同一个子文件夹下。
hive也支持多个字段作为分区,多分区有层级关系,如图所示:
加载数据时,有几个分区列,都要一一对应。
分区表基本操作
查看所有分区信息 show partitions table_name
增加分区(创建单个分区)
alter table tb_name partition add partition (day = \'yyyymmdd\')
增加分区(同时创建多个分区,分区间不能有逗号)
alter table tb_name partition add partition (day = \'yyyymmdd1\') partition (day = \'yyyymmdd2\')
删除分区(单个分区)
alter table tb_name partition drop partition (day = \'yyyymmdd\')
删除分区(多个分区,分区之间必须有逗号)
alter table tb_name partition drop partition (day = \'yyyymmdd\'), partition (day = \'yyyymmdd\')
修复分区表
若分区元数据和 HDFS的分区路径不一致,还可使用msck命令进行修复,以下是该命令的用法说明。
msck repair table table_name [add/drop/sync partitions];
说明:
msck repair table table_name add partitions:该命令会增加 HDFS 路径存在但元数据缺失的分区信息。
msck repair table table_name drop partitions:该命令会删除 HDFS 路径已经删除但元数据仍然存在的分区信息。
msck repair table table_name sync partitions:该命令会同步 HDFS 路径和元数据分区信息,相当于同时执行上述的两个命令。
msck repair table table_name:等价于msck repair table table_name add partitions 命令
二级分区
二级分区建表语句
create table dept_partition2(deptno int, --部门编号部门名称dname string, --部门名称loc string --部门位置)partitioned by(day string, hour string)row format delimited fields terminated by \\t\';
数据装载语句
load data local inpath \'/opt/module/hive/datas/dept_20220401.log\'into table dept_partition2partition(day=\'20220401\',hour=\'12\');
查询分区数据
select *from dept_partition2where day=\'20220401\'and hour=\'12\';
动态分区
动态分区是指向分区表insert 数据时,被写往的分区不由用户指定,而是由每行数据的最后一个字段的值来动态的决定。使用动态分区,可只用一个insert语句将数据写入多个分区。
动态分区功能总开关(默认 true,开启)
set hive.exec.dynamic.partition = true
动态分区的模式,默认 strict(严格模式),要求必须指定至少一个分区为静态分区(在有多个分区的情况下),nonstrict(非严格模式)允许所有的分区字段都使用动态分区。
set hive.exec.dynamic.partition.mode = nonstrict
示例:
分桶表
分桶表的创建:
clustered by(表内字段) sorted by(表内字段) into 分桶数 buckets
sorted by 在每个桶内做排序(排序字段与分桶字段可以不一致,排序字段也可以有多个)
分桶表的数据加载:
老版的 hive 由于load data(在数据导入导出部分介绍过其原理,只是简单的数据移动过程)不会触发MapReduce,也就是没有计算过程(无法执行Hash算法),只是简单的移动数据而已所以无法用于分桶表数据插入。
Hive 3.x新版本 load 数据可以直接跑 MapReduce,从而可以进行分桶表的数据加载。
如果说分区表的性能提升是:在指定分区列的前提下,减少被操作的数据量,从而提升性能
分桶表的性能提升就是:基于分桶列的特定操作,如:过滤、JOIN、分组,均可带来性能提升。
查看表
展示所有表 show table [in database_name] like [\'identifier_with_wildcards\']
查看表信息
describe [extended | formatted] [da_name.] table_name
extended:展示详细信息
formatted:对详细信息进行格式化的展示
修改表操作
修改分区值会修改元数据记录,HDFS的实体文件夹不会改名但是在元数据记录中是改名了的
删除分区,只会删除元数据,HDFS中的数据依然存在。
修改列名时必须保持修改列的类型一致,否则会报错
复杂类型操作
数组类型(array)
数组在查询中使用:
数组[数字序号],可以取出指定需要元素(从0开始)
size(数组),可以统计数组元素个数
array contains(数组,数据),可以查看指定数据是否在数组中存在
map 类型
map在查询中使用
map[key]来获取指定key的值
map keys(map)取到全部的key作为array返回,map values(map)取到全部values
size(map)可以统计K-V对的个数
array_contains(map_values(map),数据) 可以统计map是否包含指定数据
Struct 类型
在查询中使用
struct.key 即可取得对应的value
6.3 数据查询
基本查询
RLINK 正则匹配
正则表达式是一种规则集合,通过特定的规则字符描述,来判断字符串是否符合规则
rlike可以基于正则表达式,对数据内容进行匹配
UNION 联合查询
UNION 默认会对返回的结果进行去重,加上 ALL 关键字则不会进行去重,直接返回全部结果
Join 查询
内连接:只有进行连接的两个表中都存在与连接条件相匹配的数据才会被保留下来。
select * from a inner join b on a.id = b.id
左外连接:left join 操作符左边表中符合 where 子句的所有记录将会被返回
右外连接:right join 操作符右边表中符合 where 子句的所有记录将会被返回
满外连接:full join 将会返回所有表中符合 where 语句条件的所有记录。如果任一表的指定字段没有符合条件的值的话,那么就使用 null 值替代。
多表连接:
select * from a join b on a.id = b.id join c on b.col = c.col
笛卡尔积(a表的每一个行与b表的每一行进行关联)
产生笛卡尔的条件:
(1)省略连接条件
(2)连接条件无效
(3)所有表中的所有行互相连接
select * from a join b
排序
全局排序(order by),可能会对reduce程序造成压力,慎用
每个reduce 内部排序(sort by),对于大规模的数据集 order by 的效率非常低。在很多情况下,并不需要全局排序,此时可以使用 Sort by。
Sort by 为每个 reduce 产生一个排序文件。每个 Reduce 内部进行排序,对全局结果集来说不是排序。
分区(distribute by)指定分区字段,控制某个特定行应该到哪个reducer ,通常是为了进行后续的聚集操作,可以结合sort by使用。
分区排序(cluster by),当distribute by与sort by字段相同时,可以使用cluster by方式,但是排序只能是生序,不能指定排序规则
Sampling采样
colname基于某个列的值进行划分,比较适合分桶表,不需要从新分桶,节省时间
普通表使用rand()速度比较快
虚拟列(Vritual Columns)
虚拟列的作用:
查看行级别的数据详细参数
可以用于WHERE、GROUP BY等各类统计计算中
可以协助进行错误排查工作
6.4 函数
Hive的函数共计有上百种,详细的函数使用可以参阅官方文档:
LanguageManual UDF - Apache Hive - Apache Software Foundation
单行函数
单行函数的特点是:输入一行输出一行
条件函数-全部
聚合函数
特点:多行传入,一行输出,常见的count/sum
collect_list 收集形成list集合,结果不去重。
collect_set 收集并形成set集合,结果去重。
炸裂函数(UDTF)
特点:接受一行数据,输出一行或多行数据
explode(array a) 将传入的数组元素分解为多行。
explode(map m)
posexplode(array a) 除了返回元素,还会返回每个元素的位置
inline(array<struct> a)
Latera View(侧视图)
Latera View 通常与UDTF配合使用。Lateral View可以将UDTF应用到源表的每行数据,将每行数据转换为一行或多行,并将源表中每行的输出结果与该行连接起来,形成一个虚拟表。
窗口函数
窗口函数,能为每行数据划分一个窗口,然后对窗口范围内的数据进行计算,最后将计算结果返回给该行数据。
窗口函数的语法中主要包括“窗口”和“函数”两部分。其中“窗口”用于定义计算范围,“函数”用于定义计算逻辑。
基本语法如下:
绝大多数的聚合函数都可以配合窗口使用,例如max(),min(),sum(),count(),avg()等。
窗口范围的定义分为两种类型,一种是基于行的,一种是基于值的。
基于行示例:要求每行数据的窗口为上一行到当前行
基于值示例:要求每行数据的窗口为,值位于当前值减1,到当前值
一定要声明order by字段,窗口内数据才会一致,否则因为map分片和shuffle操作会使窗口内的数据不一致。
基于行的示例:(窗口含义:截至到当前行订单为止的所有订单总金额)
基于值的示例:(窗口含义:到当前行的日期为止的所有订单总金额)
窗口分区
定义窗口范围时,可以指定分区字段,每个分区单独划分窗口。
窗口分区示例:(窗口含义:每个用户截止到当前行订单的所有订单总金额)
窗口函数—跨行取值函数
lead和lag函数(不支持自定义窗口)
功能:获取当前行的下边(lead)/上边(lag)某行、某个字段的值
示例(含义:获取每个用户当前行订单的上一次和下一次 下单日期):
first_value和last_value
功能:获取窗口内某一列的第一个值/最后一个值
示例(含义:获取每个用户截至到当前行订单为止的第一个和最后一个下单日期):
窗口函数—排名函数
rank、dense_rank、row_number(三者在存在并列的情况下,会有不同)
功能:计算排名
自定义函数
用户自定义函数分为以下三种:
UDF,输入一行输出一行
UDAF,输入多行,输出一行
UDTF,输入一行,输出多行
自定义UDF函数步骤:
(1)创建一个类,继承GenericUDF并重写其方法
(2)创建临时函数
1)将上一步创建的类打成 jar包上传到服务器
2)将jar包添加到hive的classpath
add jar ,
3)创建临时函数与开发好的类关联
create temporary function my_fun as \"com.hive.udf.MYUDF\"
4)在hql中使用自定义的临时函数
创建永久函数:
注意:因为 add jar 本身也是临时生效,所以在创建永久函数的时候,需要制定路径(并且因为元数据的原因,这个路径还得是 HDFS 上的路径)。
create function my_fun as \"com.atquiqu.hive.udf.MyUDF\"using jar \"hdfs://hadoop102:8020/udf/myudf.jar\";
删除永久函数:drop function my_fun
注意:永久函数跟会话没有关系,创建函数的会话断了以后,其他会话也可以使用。
永久函数创建的时候,在函数名之前需要自己加上库名,如果不指定库名的话,会默认把当前库的库名给加上。
永久函数使用的时候,需要在指定的库里面操作,或者在其他库里面使用的话加上,库名.函数名。
6.5 案例学习
基于Hadoop和Hive实现聊天数据统计分析,构建聊天数据分析报表
建库建表
将数据文件上传到HDFS中,再通过load将数据加载到数据表中
ETL数据清洗
数据问题1:当前数据中,有一些数据的字段为空,不是合法数据。
数据问题2:需求中,需要统计每天、每个小时的消息量,但是数据中没有天和小时字段,只有整体时间字段
数据问题3:需求中,需要对经度和维度构建地区的可视化地图,但是数据中GPS经纬度为一个字段
指标计算
可以基于查询结果建表(通过这种方式对每个需求的查询结果建表)
可视化展示
FineBI 是帆软软件有限公司推出的一款商业智能(Business Intelligence)产品。FineBI 是定位于自助大数据分析的 BI 工具,能够帮助企业的业务人员和数据分析师,开展以问题导向的探索式分析。案例中选择FineBI进行可视化展示
1、将FineBI连接Hive
将Hive的连接驱动(5个)全部放到FineBI的安装目录中,FineBI6.0 >webapps > webroot >WEB-INF >lib
2、配置连接Hive的隔离插件
在FineBI界面的管理系统 > 插件管理 > 从本地安装,导入事先准备的zip插件包
3、重启 FineBI
4、管理系统 > 数据连接 > 数据连接管理 > 新建数据连接 > 填写连接信息
5、将数据导入到FineBI 公共数据 > 新建文件夹 > 新建数据集 > 选择数据库表 > 选择hive中创建的结果分析表 > 更新数据
6、我的分析 > 新建文件夹 > 新建分析主题 > 选择数据 > 创建组件(选择图表类型 > 导入数据)> 添加仪表板(一个空白页面,可以容纳多个组件)
7、对每个分析需求创建一个组件,并将其放到仪表板中
成果展示:
七、Hive 文件格式和压缩
为 Hive 表中的数据选择一个合适的文件格式,对提高查询性能的提高是十分有益的。Hive 表数据的存储格式,可以选择 text file、orc、parguet、sequence file 等。
Text File
text file 是Hive默认使用的文件格式,文本文件中的一行内容,就对应Hive表中的一行记录。
可通过以下建表语句指定文件格式为文本文件:
create table textfile_table(column specs)stored as textfile;
ORC 文件
ORC(Optimized Row Columnar) file format 是Hive 0.11 版里引入的一种列式存储的文件格式。ORC 文件能够提高 Hive 读写数据和处理数据的性能。
ORC文件基本格式
建表语句
create table orc_table (column specs)stored as orctblproperties (property_name=property_value)
Parquet 文件
Parquet 文件是Hadoop生态中的一个通用的文件格式,它也是一个列式存储的文件格式。
Parquet 文件基本格式
建表语句:
create table parquet_table(column specs)stored as parquettblproperties(property_name=property_value)
压缩
Hive表数据进行压缩
(1)TextFile
若一张表的文件类型为 TextFile,若需要对该表中的数据进行压缩,多数情况下,无需在建表语句做出声明。直接将压缩后的文件导入到该表即可,Hive在查询表中数据时,可自动识别其压缩格式,进行解压。
在执行往表中导入数据的 SQL语句时,用户需设置以下参数,来保证写入表中的数据是被压缩的。
(2)ORC
(3)Parquet
计算过程中使用压缩
(1)单个MR的中间结果进行压缩
单个 MR 的中间结果是指 Mapper 输出的数据,对其进行压缩可降低 shuffle 阶段的网络IO,可通过以下参数进行配置:
(2)单条SQL语句的中间结果进行压缩
单条 SQL语句的中间结果是指,两个MR(一条SQL语句可能需要通过 MR 进行计算)之间的临时数据,可通过以下参数进行配置:
八、企业级调优
8.1 计算资源配置(计算环境为 hive on MR)
Yarn资源配置
需要调整的 Yarn 参数均与 CPU、内存等资源有关,核心配置参数如下:
(1)yarn.nodemanager.resource.memory-mb
该参数的含义是,一个 NodeManager 节点分配给 Container 使用的内存。该参数的配置,取决于 NodeManager 所在节点的总内存容量和该节点运行的其他服务的数量。
varn.nodemanager.resource.memory-mb 65536
(2)yarn.nodemanager.resource.cpu-vcores
该参数的含义是,一个 NodeManager 节点分配给 Container 使用的 CPU 核数。该参数的配置,同样取决于 NodeManager 所在节点的总 CPU 核数和该节点运行的其他服务。
yarn.nodemanager.resource.cpu-vcores 16
(3)yarn.scheduler.maximum-allocation-mb
该参数的含义是,单个Container能够使用的最大内存。
yarn.scheduler.maximum-allocation-mb 16384
(4)yarn.schedulerminimum-allocation-mb
该参数的含义是,单个Container能够使用的最小内存。
yarn.scheduler.minimum-allocation-mb 512
MapReduce 资源配置
MapReduce 资源配置主要包括 Map Task 的内存和 CPU 核数,以及 Reduce Task 的内存和 CPU 核数。核心配置参数如下:
(1)mapreduce.map.memory.mb
该参数的含义是,单个 Map Task 申请的 container 容器内存大小,其默认值为 1024。该值不能超出和yarn.scheduler.maximum-allocation-mb 和 yarn.scheduler.minimum-allocation-mb 规定的范围。
该参数需要根据不同的计算任务单独进行配置,在 hive 中,可直接使用如下方式为每个 SQL 语句单独进行配置:
set mapreduce.map.memory.mb = 2048;
(2)mapreduce.map.cpu.vcores
该参数的含义是,单个 Map Task 申请的 container 容器 cpu 核数,其默认值为 1。该值般无需调整。
(3)mapreduce.reduce.memory.mb
该参数的含义是,单个 Reduce Task 申请的 container 容器内存大小,其默认值为 1024该值同样不能超出 yarn.scheduler.maximum-allocation-mb 和 yarn.scheduler.minimum-allocation-mb 规定的范围。
该参数需要根据不同的计算任务单独进行配置,在 hive 中,可直接使用如下方式为每个 SQL 语句单独进行配置:
set mapreduce.reduce.memory.mb = 2048;
(4)mapreduce.reauce.cp.vcores
该参数的含义是,单个 Reduce Task 申请的 container 容器 cpu 核数,其默认值为 1。该值一般无需调整。
8.2 HQL 语法优化
分组聚合优化
Hive 对分组聚合的优化主要围绕着减少 Shuffle 数据量进行,具体做法是 map-side 聚合。所谓 map-side 聚合,就是在 map 端维护一个 hash table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至reduce 端,完成最终的聚合。map-side聚合能有效减少 shuffle 的数据量,提高分组聚合运算的效率。
map-side 聚合相关的参数如下:
Join优化
Hive 拥有多种 join算法,包括 Common Join,Map Join,Bucket Map Join,SortMerge Buckt Map Join等。
(1)Common Join
Common Join 是Hive 中最稳定的 join 算法,其通过一个 MapReduce Job 完成一个join 操作。Map 端负责读取 join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce 端,相同key 的数据在 Reduce 端完成最终的Join 操作。如下图所示:
需要注意的是,sql语句中的 join操作和执行计划中的 Common Join 任务并非一对的关系,一个 sql语句中的相邻的且关联字段相同的多个 join 操作可以合并为一个Commmon Join 任务。
(2)Map Join
Map Join 算法可以通过两个只有 map 阶段的 Job 完成一个 join 操作。其适用场景为大表 join 小表。若某 join操作满足要求,则第一个 Job 会读取小表数据,将其制作为hash table,并上传至 Hadoop 分布式缓存(本质上是上传至HDFS)。第二个 Job 会先从分布式缓存中读取小表数据,并缓存在 Map Task 的内存中,然后扫描大表数据,这样在map 端即可完成关联操作。如下图所示:
Map Join 有两种触发方式:一种是用户在、SQL语句中增加 hint 提示(已过时,不推荐),另外一种是Hive 优化器根据参与 join 表的数据量大小,自动触发。
自动触发
Hive会根据每个Common Join任务所需表的大小判断该Common Join 任务是否能够转换为Map Join 任务,若满足要求便将 Common Join 任务自动转换为Map Join 任务。
但有些Comon Join任务所需的表大小,在SQL的编译阶段是未知的(例如对子查询进行 join 操作),所以这种 Common Join任务是否能转换成Map Join 任务在编译阶是无法确定的。针对这种情况,Hive 会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的Map Join任务以及原有的Common Join 任务。最终具体采用哪个计划,是在运行时决定的。
(3)Bucket Map Join
Bucket Map Join 是对 Map Join 算法的改进,其打破了 Map Join 只适用于大表 join小表的限制,可用于大表 join 大表的场景。
Bucket Map Join的核心思想是:若能保证参与 ioin 的表均为分桶表,且关联字段为分桶字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍,就能保证参与join的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行Map Join 操作了。这样一来,第二个Job的 Map 端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶即可。其原理如图所示:
(4)Sort Merge Bucket Map Join
Sort Merge Bucket Map Join(简称SMB Map Join)基于Bucket Map Join。 SMB Map Join 要求,参与 join的表均为分桶表,且需保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
SMB Map Join 同 Bucket Join 一样,同样是利用两表各分桶之间的关联关系,在分桶之间进行 join 操作,不同的是,分桶之间的 join操作的实现原理。Bucket Map Join,两个分桶之间的 join 实现原理为Hash Join算法;而 SMB Map Join,两个分桶之间的 join实现原理为 Sort Merge Join 算法。
Hash Join和 Sort Merge Join 均为关系型数据库中常见的 Join 实现算法。Hash Join 的原理相对简单,就是对参与 join 的一张表构建 hash table,然后扫描另外一张表然后进行逐行匹配。Sort Merge Join需要在两张按照关联字段排好序的表中进行。
SMB Map Join与Bucket Map Join 相比,在进行 Join 操作时,Map 端是无需对整个Bucket 构建 hash table,也无需在 Map端缓存整个Bucket 数据的,每个Mapper 只需按顺字逐个 key 读取两个分桶的数据进行join 即可。
数据倾斜优化
数据倾斜问题通常是指参与计算的数据分布不均,即某个 key 或者某些 key 的数据量远超其他 key,导致在 shuffle 阶段,大量相同 key 的数据被发往同一个 Reduce,进而导致该 Reduce 所需的时间远超其他 Reduce,成为整个任务的瓶颈。
分组聚合导致的数据倾斜
Hive 中未经优化的分组聚合是通过个MapReduce Job 实现的。Map各组数据端负责读取数据,并按照分组字段分区,通过shuffle,将数据发往 Reduce 端,在 Reduce 端完成最终的聚合运算。如果 group by 分组字段的值分布不均,就可能导致大量相同的key 进入同一 Reduce,从而导致数据倾斜问题。
解决思路有以下两种:
(1)Map-Side 聚合
开启 Map-side 聚合后,数据会现在 Map 端完成部分聚合工作。这样一来即便原始数据是倾斜的,经过 Map 端的初步聚合后,发往 Reduce 的数据也就不再倾斜了。最佳状态下,Map 端聚合能完全屏蔽数据倾斜问题。具体见分组聚合优化。
(2)Skew-GroupBy优化
Skew-GroupBy 的原理是启动两个MR 任务,第一个 MR 按照随机数分区,将数据分散发送到 Reduce,完成部分聚合,第二个 MR 按照分组字段分区,完成最终聚合。
启用分组聚合数据倾斜优化:
set hive.groupby.skewwindata = true;
Join 导致的数据倾斜
未经优化的 join 操作,默认是使用 common join 算法,也就是通过一个MapReduce Job 完成计算。Map 端负责读取 join 操作所需表的数据,并按照关联空段进行分区,通过 shuffle,将其发送到 Reduce 端,相同的数据在 Reduce 端完成最终的 Join操作。
如果关联字段的值分布不均,就可能导致大量相同的key 进入同一 Beduce,从而导致数据倾斜问题。
解决思路有以下三种:
(1)Map Join
使用 map join 算法,join 操作仅在 map 端就能完成,没有 shuffle 操作,没有 reduce阶段,自然不会产生 reduce 端的数据倾斜。该方案适用于大表 join 小表时发生数据倾斜的场景。
(2)Skew Join
skew join 的原理是,为倾斜的大 key 单独启动一个 map join 任务进行计算,其余 key 进行正常的 common join。原理图如下:
相关参数:
--启用 skew join 优化
set hive.optimize.skewjoin = true;
--触发 skew join 的阈值,若某个 key 的行数超过该参数值,则触发
set hive.skewjoin.key = 100000;
(3)调整 SQL 语句
假设原始 SQL 语句如下:A,B两表均为大表,且其中一张表的数据是倾斜的。
select * from A join Bon A.id = B.id
调整 SQL 语句如下(利用随机数打散倾斜key):
Select *from( select --打散操作 concat(id, \'_\', cast(rand()*2 as int))id, value from A)tajoin( select --扩容操作 concat(id, \'_\', 0) id, value from B union all select concat(id, \'_\', 1)id, value from B)tbon ta.id=tb.id;
并行度优化
Map 端并行度
一般情况下,Map 端的并行度,也就是 Map 的个数。是由输入文件的切片数决定的。Map 端的并行度无需手动调整。
以下特殊情况可考虑调整 map 端并行度:
(1)查询的表中存在大量小文件
按照 Hadoop 默认的切片策略,一个小文件会单独启动一个 map task 负责计算。若查询的表中存在大量小文件,则会启动大量 map task,造成计算资源的浪费。这种情况下,可以使用 Hive 提供的 CombineHiveInputFormat,多个小文件合并为一个切片,从而控制 map task 个数。相关参数如下(默认已设置):
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
(2)Map 端有复杂的查询逻辑
若 SQL 语句中有正则替换、json 解析等复杂耗时的查询逻辑时,map 端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增大 map端的并行度,令map task 多一些,每个 map task 计算的数据少一些。相关参数如下(默认为256M):
set mapreduce.input.fileinputformat.split.maxsize=256000000;(一个切片的最大值)
Reduce 端并行度
Reduce 端的并行度,可由用户自己指定,也可由 Hive 自行根据该 MR Job 输入的文件大小进行估算。
Reduce 端并行度的相关参数如下:
——指定 Reduce 端并行度,默认值为-1,表示用户未指定
set mapreduce,job.reduces;
——Reduce 端并行度最大值
set hive.exec.reducers.max;
——单个Reduce Task 计算的数据量,用于估算 Reduce 并行度
set hive.exec.reducers.bytes.per.reducer;
小文件合并
合并 Reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少 HDFS 小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
相关参数为:
--开启合并 map only 任务输出的小文件
set hive.merge.mapfiles = true;
--开启合并 map reduce 任务输出的小文件
set hive.merge.mapredfiles = true;
--合并后的文件大小
set hive.merge.size.per.task = 256000000;
--触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;
CBO 优化
CBO 是指 Cost based optimizer,即基于计算成本的优化。在 Hive 中,计算成本模型考虑到了:数据的行数、CPU、本地 IO、HDFS IO、网络 IO等方面。Hive 会计算同一 SQL 语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前 CBO 在 hive 的 MR 引擎下主要用于 join 的优化,例如多表 join 的 join 顺序。
相关参数为:
--是否启用 cbo 优化
set hive.cbo.enable=true;
谓词下推
谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。
相关参数为:
--是否启动谓词下推(predicate pushdown)优化
set hive.optimize.ppd = true;
需要注意的是:CBO 优化也会完成一部分的谓词下推优化工作,因为在执行计划中,谓词越靠前,整个计划的计算成本就会越低。
失量化查询
Hive 的矢量化查询,可以极大的提高一些典型查询场景(例如 scans,filters,aggregates,and joins)下的 CPU 使用效率
相关参数如下(开启矢量化计算,默认开启):
set hive.vectorized.execution.enabled = true;
本地模式
大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过,有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
并行执行
Hive 会将一个 SQL 语句转化成一个或者多个 stage,每个 stage 对应一个 MR Job。默认情况下,Hive 同时只会执行一个 stage。但是某 SQL 语句可能会包含多个 stage,但这多个 stage 可能并非完全互相依赖,也就是说有些 stage 是可以并行执行的。此处提到的并行执行就是指这些 stage 的并行执行。相关参数如下:
严格模式
Hive 可以通过设置某些参数防止危险操作
(1)分区表不使用分区过滤
将 hive.strict.checks.no.partition.filter 设置为 true 时,对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行。
(2)使用 order by 没有 limit 过滤
将 hive.strict.checks.orderby.no.limit 设置为 true 时,对于使用了 order by 语句的查询,要求必须使用 limit 语句。
(3)笛卡尔积
将 hive.strict.checks.cartesian.product 设置为 true 时,会限制笛卡尔积的査询。