> 文档中心 > 中国电信大数据离线数据仓库

中国电信大数据离线数据仓库

文章目录

  • 电信数据仓库
  • ```第一章```
    • 项目研发流程
    • 项目简介
    • 业务总则
      • 信息域概述
      • 市场运营域(BSS 域)
      • 企业管理域(MSS 域)
      • 网络运营域(OSS 域)
    • 通用数据仓库分层
    • 总体架构
    • 数据仓库
    • 权限划分
    • 数据仓库数据架构
    • 分层命名规范
  • ```第二章```
  • 旅游大数据整体框架及处理流程
    • 一、整体流程图
    • 二、参数字段
      • 1.数据源
        • 1.1 OIDD数据
        • 1.2 WCDR数据
        • 1.3 移动DPI数据
        • 1.4 移动DDR数据
      • 2.集成和存储层
      • 2.1 融合文件
        • 2.1.1 位置表
        • 2.1.2 区域表
      • 2.2 停留点
      • 2.3 职住地
        • 2.3.1居住追踪地
        • 2.3.2工作追踪地
        • 2.3.3当前居住地表
        • 2.3.4历史居住地表
        • 2.3.5当前工作地表
        • 2.3.6历史工作地表
        • 2.3.7网格标签表
        • 2.3.8旅游专题职住表
      • 2.4 跨域
        • 2.4.1最新城市跨域表
        • 2.4.2历史城市跨域表
        • 2.4.3最新区县跨域表
        • 2.4.4历史区县跨域表
        • 2.4.5城市OD表
        • 2.4.6区县OD表
      • 2.5 准实时
        • 2.5.1准实时位置字段表
        • 2.5.1准实时区域字段表
    • 3.分析层
      • 3.1省市目的地游客分析专题逻辑表-市级别
      • 3.2省市目的地游客分析专题逻辑表-省级别
      • 3.3区县级别目的地游客分析专题逻辑表
      • 3.4景区目的地游客分析专题逻辑表
      • 3.5出行基础表
      • 3.6乡村游出游表
      • 3.7游客旅游历史信息表
      • 3.8用户画像表
      • 3.9用户标签表
    • 4.应用层
      • 4.1实时查询指标
      • 4.2景区指标
      • 4.3行政区指标
  • ```第三章```
  • 1、数据仓库搭建
    • 1.1、开启hdfs的权限认证,以及ACL认证
      • 修改hdfs-site.xml文件,将权限认证打开
    • 解决hive tmp目录权限不够问题
      • 重启hadoop
      • 验证http://master:50070是否能访问
    • 1.2、为每一个层创建一个用户
      • 增加权限的命令
    • 1.3、为每一个层创建一个hive的库
    • 1.4、为每一个层在hdfs中创建一个目录
    • 1.5、将本地hive目录的权限设置为777
  • 2、数据采集 -- 使用ods用户操作
    • 2.1 oidd数据采集
      • 1、在hive中创建表
      • 2、将数据上传到服务器
      • 3、使用flume监控基站日志目录采集数据
      • 4、启动flume
      • 5、查看结果
      • 6、增加分区
    • 2.2、采集crm系统中的数据
      • 1、在mysql中创建crm数据库
      • 2、创建表
      • 3、先在hive的ods层中创建表
      • 4、使用datax 采集数据
  • ``第四章```
  • 位置融合表
    • 新建IDEA的Maven项目
      • 新建ods层
      • 新建dwi层
      • 新建common公共模块
      • 将ods本地脚本上传到ods的resources目录下
      • 在外部 pom文件导入spark依赖(和公司spark版本一致)
      • 在dwi层导入ctyun的scala依赖
    • 在dwi层创建DwiResRegnMergelocationMskDay类
      • 导入隐式转换
    • 计算两个经纬度之间的距离
      • 在common中创建Geography的java类
      • dwi层引入 common层的依赖
      • 在时间线上做聚类操作
        • 在root的hive建表merge用于测试
        • 时间聚类测试数据
        • 上传数据
        • 通过 spark-sql进行测试
      • 超速处理
      • 查看ods_oidd是否有读权限
        • 删除之前运行结果
      • 位置数据融合表
      • 编写DwiResRegnMergelocationMskDay代码
      • 打包、上传、运行(dwi)
      • 访问master:4040和master:8088的web ui 查看运行结果
      • 在dwi登陆hive查看运行结果
      • 对手机号敏感数据进行加密
  • ```第四章```
    • 用户画像表
      • 创建dim层
      • 添加依赖
      • 新建包
      • 新建DimUserTagDay类
      • 登录ods的hive查看用户画像表 是否存在
        • hive表如何导入本地目录里
      • 封装工具
      • 在common里增加spark依赖
      • 新建SparkTool的class工具类并编写
        • 测试
          • 新建Test的object类
      • 编写电信数仓的SparkTool封装工具
      • 使用SparkTool的位置融合表代码
      • 修改ods的权限查看数据
      • 编写运行script脚本
        • 在dim层新建dim-usertag-day.sh脚本并编写
      • 打包上传到dim用户
      • 运行用户画像表脚本
      • 查看用户画像表运行结果
      • 在dwi层新建dwi-res-regn-mergelocation-msk-day.sh脚本并编写
      • 运行位置融合表脚本
      • 在dim层的hive创建行政区配置表
      • 在dim层新建dim-admincode.sh脚本并编写
      • 运行dim-admincode.sh脚本
      • 查看dim-admincode.sh脚本运行结果
      • 在dim层 创建景区配置表
      • 在dim层新建dim-scenic-boundary.sh脚本并编写
      • 上传运行dim-scenic-boundary.sh脚本
      • 查看dim-scenic-boundary.sh脚本运行结果
      • 编写ods层脚本
      • 编写datax-crm-admin-code-mysql-to-hive.sh脚本
      • 编写datax-crm-scenic_boundary-mysql-to-hive.sh脚本
      • 编写datax-crm-usertag-mysql-to-hive.sh脚本
      • 编写flume-oss-oidd-to-hdfs.sh脚本
  • ```第五章```
  • 时空伴随着
    • 判断一个人是否是景区的游客
      • 在MySQL中创建intimate表(疫情密接者)
      • 向intimate表插入数据
        • 进入dwi层的hive
        • 查询到的手机号复制到visual Studio code进行处理
      • 在MySQL中创建confirmed表(疫情确诊)
      • 向confirmed表插入数据
    • 时空伴随着计算
      • 新建dws模块
      • 新建com.ctyun.dws包
      • 导入试试park依赖
      • 新建SpacetimeAdjoint类
        • 在common新建DateUtil并编辑
      • 代码优化
      • 编辑SpacetimeAdjoint类(已优化)
      • 在dws层 编写spacetime-adjoint.sh脚本
      • 在dwi层增加权限
      • 上传包并运行spacetime-adjoint.sh脚本
      • 查看spacetime-adjoint.sh脚本运行结果
      • 再次优化
        • 在SparkTool里新增代码块
        • 优化后的SpacetimeAdjoint代码
      • 打包上传并运行spacetime-adjoint.sh脚本
      • 访问mater:8088和master:4040查看运行 状态
      • 查看优化后的运行结果
  • ```第六章```
  • Azkaban定时调度
    • 在root安装搭建Azkaban
      • 1、上传解压
      • 2、修改配置文件
      • 3、启动azkaban
      • 4、访问azkaban
      • 5、配置邮箱服务
      • 6、重启 、关闭、启动
    • 在IDEA中ctyun目录下创建jobs目录
      • 编写basic.flow
  • 市游客表
    • 判断一个人是否是景区的游客
      • 在dws层创建dws_city_tourist_msk_d游客表
      • 新建并编写DwsCityTouristMskDay类
      • 在SparkTool里新增
      • 在Geography里新增
      • 在common新建Grid并编写
      • 在common新建GridLevel.java并编写
      • 在common中新建GISUtil.java并编写
      • 在 common新建 entity目录并编写Const.java、Gps.java、GridEntity.java
      • 在common创建Constants.java并编写
      • 在dws层打包上传并运行
      • 访问master:8088查看运行状态
      • 在dws层查看运行结果
      • 在jobs目录下编写Azkaban的脚本(新增)
    • 景区游客表
      • 从百度地图上拿下合肥景区数据放到ctyun目录下命名为a.json
      • 在dws层的hive新建dws_scenic_tourist_msk_d表
      • 在dws层新建DwsScenicTouristMskDay.scala类并编写
      • 在common下创建poly并编写Circle.java、 CommonUtil.java、DataFormatUtil.java、Line.java、StringUtil.java、Polygon.java
      • 编写dws-scenic-tourist-msk-day.sh脚本
      • 在dim层修改权限
      • 打包上传运行脚本
      • 访问mater:8088查看运行状态
      • 在dws的hive查看运行结果
    • 构建景区网格表
      • 上传dim_scenic_grid数据
      • 在dim层创建dim.dim_scenic_grid表
      • 在dim层创建dim.dim_geotag_grid表
      • 在dim层创建DimScenicGrid.scala类并编写
      • 在dim层创建DimScenicGrid.scala脚本并编写
      • 在dim层打包上传并运行
      • 查看运行结果
  • ```第七章```
  • 指标统计
    • 统计游客指标
      • 1、客流量按天 [市,客流量]
      • 2、性别按天 [市,性别,客流量]
      • 3、年龄按天 [市,年龄,客流量]
      • 4、性别年龄按天 [市,性别,年龄,客流量]
      • 5、停留时长按天 [市,停留时长,客流量]
    • 宽表
      • 在IDEA的ctyun目录下新dal模块
        • 导入spark的maven依赖
      • 在dal层创建DalCityTouristMskWideDay.scala类并编写
      • 在dal用户的hive创建 dal_city_tourist_msk_wide_d表
      • 通过dws用户给dal增加权限
      • 通过dim用户给dal增加权限
      • 通过dal用户使用spark-sql查看sql是否能运行
      • 在dal层 编写dal-city-tourist-msk-wide-day.sh脚本
      • 打包上传并运行dal-city-tourist-msk-wide-day.sh脚本
      • 访问master:8088查看运行情况
      • 查看dal-city-tourist-msk-wide-day.sh脚本运行结果
  • ```第八章```
  • Finebi
    • Finebi官网[https://www.finebi.com/](https://www.finebi.com/)
      • 注意
      • 访问
      • (使用dal用户)开启hive服务连接JDBC
      • ```解决0: jdbc:hive2://master:10000 (closed)>问题 ```
      • 卸载(root)
      • 当遇到这个问题别急好办,去VMware装
      • 将下载的驱动解压上传
      • 重启finebi
      • 新建驱动
      • 新建连接
      • 数据准备
      • 可视化效果
      • 电信离线仓库项目总结

电信数据仓库


第一章

项目研发流程


项目简介

  作为一家提供大数据服务的公司,数据能力是公司的核心资产。随着外部业务的快速发展,内部数据的不断丰富完善,数据能力需要更好的支撑公司重点平台化产品,即消费金融平台、终端产业平台、商业地产平台、基础及行业标签平台等等。现有的数据能力由于各项目按照各自需求自行开发,缺少统一规划、统一管理等,导致现行数据仓库存在如下问题:


业务总则


信息域概述

  在电信行业企业中,将 IT 支撑系统划分为三个大域,分别为:市场运营域 (BSS:
Business support system)、网络运营域(OSS:Operation support system) 、企业管理域
(MSS:Management Support System),是电信行业 IT 战略规划具有重要地位的三大支柱内容。
IT 支撑系统包括 BSS、OSS、MSS 三个子系统。三个子系统在整个 IT 支撑系统中承担不同
的责任,同时彼此之间相互关联。


市场运营域(BSS 域)

  业务支持系统(BSS)主要实现了对电信业务、电信资费、电信营销的管理,以及对客户的管理和服务的过程,它所包含的主要系统包括:计费系统、客服系统、帐务系统、结算系统以及经营分析系统等。


企业管理域(MSS 域)

  管理支持系统(MSS),包括为支撑企业所需的所有非核心业务流程,内容涵盖制订公司战略和发展方向、企业风险管理、审计管理、公众宣传与形象管理、财务与资产管理、人力资源管理、知识与研发管理、股东与外部关系管理、采购管理、企业绩效评估、政府政策与法律等。


网络运营域(OSS 域)

  运营支撑系统(OSS)主要是面向资源(网络、设备、计算系统)的后台支撑系统,包括专业网络管理系统、综合网络管理系统、资源管理系统、业务开通系统、服务保障系统等,为网络可靠、安全和稳定运行提供支撑手段。


通用数据仓库分层


总体架构


数据仓库


  • 数据接入层: 数据接入层,即将数据源系统的数据装载进入数据仓库,以便充分利用数据仓库平台本身的性能完成后续数据的处理。数据接入包括:传统的 ETL 离线采集、也有实时数据采集、互联网爬虫解析、第三方数据采集等。
  • 数据整合层: 对数据进行整合,即包括数据标准化、规范化(存储标准化、命名标准化、数据标准化等),又包括数据基本规整(数据去操、数据合并、数据标签),在标准化、规范化和数据规整的基础上进行主题数据生成建设和主题数据生成建设,形成基本数据能力层。
  • 数据汇总层: 数据融合接入分析引擎,如:数据挖掘、深度学习等,根据公司发展需要和应用需求,建设各种指标模型和数据能力,能够快速、简洁支撑不同的内外部应用需要。
  • 数据访问层: 主要是实现读写分离,将内、外部应用的查询、数据获取等能力与数据仓库计算能力剥离。坚持外部权限最小化,内部处理不影响外部应用的原则。
  • 数据应用层: 根据不同的应用类别和应用需求,划分不同的应用数据组,坚持需要多少中国电信股份公司云计算分公司大数据事业部 数据仓库(BDDWH)V2.0总体设计提供多少的原则,坚持内部变化不影响外部应用的原则,坚持外部权限最小化的原则。管理平台:这是一纵,主要实现数据仓库的管理和运维,他横跨多层,实现数据仓库的统一管理。

权限划分

  根据数据仓库数据层次和各层次实现功能,数据仓库权限坚持三个原则:

  • 生产、临时需求支撑账户分离
  • 对内权限最小化
  • 对外权限固定化

  总之坚持权限最小化、账号最少化、管理智能化原则。大数据平台数据仓库权限根据数据层次划分,各账号设计和相互关系如下图:


数据仓库数据架构


分层命名规范

  命名规范参考整个大数据平台的分层架构体系:维表层(DIM)、整合层(DWI)、汇总层(DWS)、访问层(DAL)。


第二章

旅游大数据整体框架及处理流程


一、整体流程图



二、参数字段


1.数据源


1.1 OIDD数据


  OIDD是采集A接口的信令数据,包括手机在发生业务时的位置信息。OIDD信令类型数据分为三大类,呼叫记录、短信记录和用户位置更新记录。


数据质量:

  OIDD数据已汇聚了30省数据(除山东)。


数据字段:


1.2 WCDR数据


  WCDR采集网络中ABIS接口的数据,基于业务发生过程中三个扇区的测量信息,通过三角定位法确定用户的位置信息。

数据质量;

  目前WCDR数据上传至生产区仅有17省数据,并且上报的数据不完整。

数据字段:


1.3 移动DPI数据


  移动DPI数据采集用户移动用户数据上网时移动核心网和PDSN之间接口的数据。

数据质量;

  移动DPI包括3G和4G的数据,3G中包含BSID位置信息,而4G中目前字段只含有SAI和TAI,无法根据这两个字段确定位置信息。

数据字段:


1.4 移动DDR数据


  当前DDR中只有移动数据详单可以提取基站标识,其他语音,短信,增值等业务没有位置信息,不做为数据融合的基础数据。

数据质量;

  DDR数据各个省公司上传相比其他数据延迟一天,因此DDR数据的融合数据延迟一天执行,其他数据第二天执行先融合清洗。

数据字段:


2.集成和存储层


2.1 融合文件


  提取OIDD、移动DPI和WCDR分钟粒度位置数据,对重复时间点数据进行二次清洗,完成后写入合并文件库;根据乒乓切换、超速数据等清洗原则对噪声数据进行清洗,完成后写入融合文件库;移动DDR数据在数据上传后延迟一天进行。

  将原始信令数据融合处理后,根据应用的需求和后期应用场景的需求,共保留12个字段。包括手机号、城市代码、业务开始时间、经纬度、BSID、对端号码、数据源和业务类型。

序号 字段英文名 字段中文名 物理意义 字段类型 备注
1 mdn 手机号码 用户手机号码 String
2 city_code 地市代码 用户所在的地市区号 String 用户业务发生城市
3 StartTime 业务开始时间 业务流开始时间,格式为yyyymmddhhmmss(24小时制),如果开启中间记录模式,每条记录都填写相同的开始时间。 String
4 lat 经度 用户所在的经度 String
5 lon 纬度 用户所在的纬度 String
6 BSID 基站标识 基站标识,该标识包括:SID(4octets)+NID(4octets)+CI(4octets)。CI的高12比特为小区标识,低4比特为扇区标识 String 由于BSID在OIDD中不存在,建议后期OIDD提供BSID数据,当前情况下OIDD数据反填BSID数据
7 gridID 网格号 用户所在网格号 String
8 service_type 业务类型 业务类型:话音1,短信2,位置更新3,数据业务4 INTEGER OIDD数据保持不变;DDR和DPI全部为4;WCDR业务类型中1保持不变,2(数据)修改为4,3(短信)修改为2.
9 event_type 事件类型 事件类型: 语音:主叫(11)、被叫(12)。 短信:主叫(21)、被叫(22)。 位置更新:周期位置更新-310、开机-311、区域位置更新-312、关机-313. 4G:周期位置更新-320、开机-321、区域位置更新-322、关机-323 数据业务:协议类型 INTEGER 此字段暂时保留,主被叫业务在后期数据表中有数据后进行补充,位置更新数据在信令数据中包含业务详细分类后进行补充,数据业务中协议编号数据提取DPI数据中协议编号(protocolid )。格式为原数据编号前增加4。即:“4”+“DPI数据中协议编号”
10 data_source 数据源 OIDD(01),DPI(02),DDR(03),WCDR(04),… INTEGER 增加数据来源新标签,当前4个电信数据源,随数据源增加编号递增。
11 RESERVER1 保留1 String
12 RESERVER2 保留2 String
13 RESERVER3 保留3 String
14 RESERVER4 保留4 String
15 RESERVER5 保留5 String

2.1.1 位置表


  位置表包含了用户的位置信息,用日期做列名,用时间戳详细每个经纬度的时间。

  位置表字段如下:


2.1.2 区域表


  区域表包含了每个网格每小时内,存在的用户信息。

  区域表字段如下:


2.2 停留点


  停留点:在一个电信网格内停留就算停留点。实际是将电信网格内多个点变为两个点,或者一个点(如果本来就只有一个点),即为停留点。(网格:中国电信内部定义的500米乘以500米的网格。)

  停留点表的字段如下:


2.3 职住地


2.3.1居住追踪地


  通过手机信令上报的位置数据,长期跟踪、监测用户在休息时间经常出现的地区,在本文档中定义为居住地。每周进行追踪,得到这周的居住地。

  居住追踪地表字段如下:


2.3.2工作追踪地


  通过手机信令上报的位置数据,长期跟踪、监测用户在工作时间经常出现的地区,在本文档中定义为工作地。每周进行追踪,得到这周的工作地。

  工作追踪地表字段如下:


2.3.3当前居住地表


  通过手机信令上报的位置数据,长期跟踪、监测用户在休息时间经常出现的地区,在本文档中定义为居住地。通过对比追踪地表,综合分析得到用户的居住地。

  当前居住地表字段如下:


2.3.4历史居住地表


  历史居住地表记录用户居住过至少一周的居住地信息。

  历史居住地表字段如下:


2.3.5当前工作地表


  通过手机信令上报的位置数据,长期跟踪、监测用户在工作时间经常出现的地区,在本文档中定义为工作地。通过对比追踪地表,综合分析得到用户的工作地。

  当前工作地表字段如下:


2.3.6历史工作地表

  历史工作地表记录用户工作过至少一周的工作地信息。

  历史工作地表字段如下:


2.3.7网格标签表


  网格标签表为实时接口提供网格级别的职住信息。

  网格标签表字段如下:


2.3.8旅游专题职住表


  旅游专题职住表汇总了各个职住表的信息。

  旅游专题职住表字段如下:


2.4 跨域


2.4.1最新城市跨域表


  城市间跨域主要服务于城市间的人口流动,城市间的交通等。最新城市位置状态为未离开,用户仅有一条最新的记录。

  最新城市跨域表字段如下:


2.4.2历史城市跨域表


  城市间跨域主要服务于城市间的人口流动,城市间的交通等。城市跨域表要求实现历史跨城市的位置变化。

  历史城市跨域表字段如下:


2.4.3最新区县跨域表

  区县间跨域表反映用户在区县间的位置变化,通过停留表格中的区县字段获取变化区县间的数据信息。最新区县位置状态为未离开,用户仅有一条最新的记录。

  最新区县跨域表字段如下:


2.4.4历史区县跨域表


  区县间跨域表反映用户在区县间的位置变化,通过停留表格中的区县字段获取变化区县间的数据信息。区县跨域表要求实现历史跨区县的位置变化。

  历史城市跨域表字段如下:


2.4.5城市OD表


  记录用户在城市间的出行信息,不包括中间经过的城市。

  城市OD表字段如下:


2.4.6区县OD表


  记录用户在区县间的出行信息。不包括中间经过的区县。

  区县OD表字段如下:


2.5 准实时


2.5.1准实时位置字段表


  准实时位置数据是位置服务的基础数据之一,融合电信的OIDD信令数据和无线WCDR数据,提供用户的准实时的位置信息。

  准实时位置字段表结构如下:


2.5.1准实时区域字段表


  根据WCDR和OIDD的实时上传的位置数据,提取用户、时间、和位置信息,匹配网格信息和判断用户状态信息。准实时区域表主要应用于某最新时间点区域内的人口、当天历史区域停留。

  准实时区域字段表结构如下:


3.分析层


3.1省市目的地游客分析专题逻辑表-市级别

  首先依据省市游客定义,基于区域底层融合表、停留表、职住表、OD表进行初次筛选分析,形成该表,用于向上分析省市游客,是目的地游客分析的基础表。

  该表格字段如下:


3.2省市目的地游客分析专题逻辑表-省级别


  首先依据省市游客定义,基于区域底层融合表、停留表、职住表、OD表进行初次筛选分析,形成该表,用于向上分析省市游客,是目的地游客分析的基础表。

  该表格字段如下:


3.3区县级别目的地游客分析专题逻辑表


  首先依据游客定义,基于区域底层融合表、停留表、职住表、OD表进行初次筛选分析,形成该表,用于向上分析省市游客,是目的地游客分析的基础表。

  该表格字段如下:


3.4景区目的地游客分析专题逻辑表


  依据游客定义,基于区域底层表格初次分析,形成该表,用于向上分析景区游客。

  该表格字段如下:


3.5出行基础表


  离开常驻地和办公地地十公里,一次出行后回到常驻办公地,整个行程称为一次出行。

  该表格字段如下:


3.6乡村游出游表


  总的出行时长小于6小时,除去4A及以上景区,火车站,飞机场,城市核心区的停留时长剩余停留时长大于两小时的出行记录被筛选为乡村游出游。

  该表格字段如下:


3.7游客旅游历史信息表


  自公司有数据开始,统计游客旅游爱好历史信息,从位置上统计实际旅游行为,记录全国游客的基本旅游习惯信息。

  该表格字段如下:


3.8用户画像表

  记录用户的基础信息,包括性别,年龄,身份证号,套餐,终端价格等。

  该表格字段如下:


3.9用户标签表


  记录用户的基础信息,包括性别,年龄,身份证号,套餐,终端价格等。

  该表格字段如下:


4.应用层


4.1实时查询指标


  提供标准化的实时查询接口,可供授权用户调用。

  具体字段如下:


4.2景区指标

  提供标准化的景区接口,可供授权用户调用。

  具体字段如下:


4.3行政区指标


  提供标准化的行政区接口,可供授权用户调用。

  具体字段如下:


第三章

1、数据仓库搭建


1.1、开启hdfs的权限认证,以及ACL认证

  1、普通权限认证只能控制到当前用户,当前用户所属 的组,其它用户 ,不能精确到每一个 其它用户。

  2、ACl可以做到每一个用户权限认证。


修改hdfs-site.xml文件,将权限认证打开

vim /usr/local/soft/hadoop-2.7.6/etc/hadoop/hdfs-site.xml
# 修改hdfs-site.xml文件,将权限认证打开<property><name>dfs.permissions</name><value>true</value></property><property><name>dfs.namenode.acls.enabled</name><value>true</value></property>


解决hive tmp目录权限不够问题

1、修改hive-site.xml文件

删除以下三行配置的value

hive.exec.local.scratchdir

hive.downloaded.resources.dir

hive.querylog.location

2、替换spark中的hive-site.xml文件

cp /usr/local/soft/hive-1.2.1/conf/hive-site.xml /usr/local/soft/spark-2.4.5/conf/


重启hadoop

# 重启hadoopstop-all.shstart-all.sh# 启动hive的元数据服务nohup hive --service metastore >> metastore.log 2>&1 &


验证http://master:50070是否能访问


1.2、为每一个层创建一个用户

useradd odspasswd ods123456123456useradd dwipasswd dwi123456123456useradd dwspasswd dws123456123456useradd dimpasswd dim123456123456useradd dalpasswd dal123456123456


增加权限的命令

  • 这个跟项目无关仅仅测试
# 增加权限的命令hadoop dfs chmod 755 /user755: 其它用户可以读取,不能修改# acl设置权限hdfs dfs -setfacl -R -m user:ods:r-x /flume/data/dir1# acl删除权限hdfs dfs -setfacl -R -x user:ods /flume/data/dir1# 查看权限hdfs dfs -getfacl /flume/data/dir1

1.3、为每一个层创建一个hive的库

create database ods;create database dwi;create database dws;create database dim;create database dal;

1.4、为每一个层在hdfs中创建一个目录

  将目录的权限给到层所属的用户。

hadoop dfs -mkdir -p /daas/motl/odshadoop dfs -mkdir -p /daas/motl/dwihadoop dfs -mkdir -p /daas/motl/dwshadoop dfs -mkdir -p /daas/motl/dimhadoop dfs -mkdir -p /daas/motl/dal# 修改权限hadoop dfs -chown ods:ods /daas/motl/odshadoop dfs -chown dwi:dwi /daas/motl/dwihadoop dfs -chown dws:dws /daas/motl/dwshadoop dfs -chown dim:dim /daas/motl/dimhadoop dfs -chown dal:dal /daas/motl/dalhadoop dfs -chmod 750 /daas/motl/odshadoop dfs -chmod 750 /daas/motl/dwihadoop dfs -chmod 750 /daas/motl/dwshadoop dfs -chmod 750 /daas/motl/dimhadoop dfs -chmod 750 /daas/motl/dal


1.5、将本地hive目录的权限设置为777

chmod 777 /usr/local/soft/hive-1.2.1/每次使用其它用户进hie的时候需要使用root用户删除tmp目录# 设置tmp目录的权限chmod 777 tmp# 使用ods用户进入hive测试create table ods.student(id  string,name string,age int,gender string,clazz string)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','STORED AS textfilelocation '/daas/motl/ods/student/';# 上传数据hadoop dfs -put students.txt /daas/motl/ods/student# 设置hdfs的 tmp目录权限hadoop dfs -chmod 777 /tmp# 其它用户需要使用这个额表的时候设置权限hdfs dfs -setfacl -m user:dwi:r-x /daas/motl/odshdfs dfs -setfacl -R -m user:dwi:r-x /daas/motl/ods/student/

2、数据采集 – 使用ods用户操作


2.1 oidd数据采集

  使用flume实时监控基站后台服务器的目录,将数据实时采集过来保存到hdfs中,保存在ods层中。

1、在hive中创建表

CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_oidd(    mdn string comment '手机号码'      ,start_time string comment '业务时间'      ,county_id string comment '区县编码'      ,longi string comment '经度'      ,lati string comment '纬度'      ,bsid string comment '基站标识'      ,grid_id string comment '网格号'      ,biz_type string comment '业务类型'      ,event_type string comment '事件类型'      ,data_source string comment '数据源'  ) comment  'oidd'PARTITIONED BY (    day_id string comment '天分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/ods/ods_oidd';

2、将数据上传到服务器

  使用ods用户上传

  上传到/home/ods


3、使用flume监控基站日志目录采集数据

# rootcd /home/odschmod 777 oidd
# odscd /home/ods
# 文件名:flume-oss-oidd-to-hdfs.properties# a表示给agent命名为a# 给source组件命名为r1a.sources = r1# 给sink组件命名为k1a.sinks = k1 # 给channel组件命名为c1a.channels = c1#指定spooldir的属性a.sources.r1.type = spooldir a.sources.r1.spoolDir = /home/ods/oidda.sources.r1.fileHeader = true a.sources.r1.interceptors = i1 a.sources.r1.interceptors.i1.type = timestamp#指定sink的类型a.sinks.k1.type = hdfsa.sinks.k1.hdfs.path = /daas/motl/ods/ods_oidd/day_id=%Y%m%d# 指定文件名前缀a.sinks.k1.hdfs.filePrefix = oidd# 指定达到多少数据量写一次文件 单位:bytesa.sinks.k1.hdfs.rollSize = 70240000# 指定多少条写一次文件a.sinks.k1.hdfs.rollCount = 600000# 指定文件类型为 流 来什么输出什么a.sinks.k1.hdfs.fileType = DataStream# 指定文件输出格式 为texta.sinks.k1.hdfs.writeFormat = text# 指定文件名后缀a.sinks.k1.hdfs.fileSuffix = .txt#指定channela.channels.c1.type = memory a.channels.c1.capacity = 10000# 表示sink每次会从channel里取多少数据a.channels.c1.transactionCapacity = 1000# 组装a.sources.r1.channels = c1 a.sinks.k1.channel = c1

4、启动flume

nohup flume-ng agent -n a -f ./flume-oss-oidd-to-hdfs.properties -Dflume.root.logger=DEBUG,console &

5、查看结果

# 查看flume是否运行成功tail -f  nohup.outhadoop dfs -du -h /daas/motl/ods/ods_oidd/day_id=20220406hadoop dfs -du -h /daas/motl/ods/ods_oidd

中国电信大数据离线数据仓库


6、增加分区

# 增加一个分区# alter table ods.ods_oidd add if not exists partition(day_id='20220407') ;# 刷新分区,会自动扫描hdfs中目录,增加所有分区MSCK REPAIR TABLE ods.ods_oidd;

2.2、采集crm系统中的数据


1、在mysql中创建crm数据库


create database crm default character set utf8 default collate utf8_general_ci;

2、创建表


用户画像表

CREATE TABLE `usertag` (    mdn varchar(255)     ,name varchar(255)     ,gender varchar(255)     ,age int(10)    ,id_number varchar(255)     ,number_attr varchar(255)     ,trmnl_brand varchar(255)     ,trmnl_price varchar(255)     ,packg varchar(255)     ,conpot varchar(255)    ,resi_grid_id varchar(255)    ,resi_county_id varchar(255))  ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 加载数据LOAD DATA LOCAL INFILE 'usertag.txt' INTO TABLE usertag FIELDS TERMINATED BY ',' ;


景区配置表

CREATE TABLE  scenic_boundary (    scenic_id varchar(255)   ,    scenic_name varchar(255)  ,    boundary text ) ;
# 加载数据LOAD DATA LOCAL INFILE 'scenic_boundary.txt' INTO TABLE scenic_boundary FIELDS TERMINATED BY '|' ;


行政区配置表

CREATE TABLE admin_code (    prov_id varchar(255)      ,prov_name varchar(255)      ,city_id varchar(255)     ,city_name varchar(255)      ,county_id varchar(255)      ,county_name varchar(255)      ,city_level varchar(255)      ,economic_belt varchar(255),city_feature1 varchar(255)  ) ;
# 加载数据LOAD DATA LOCAL INFILE 'ssxdx.txt' INTO TABLE admin_code FIELDS TERMINATED BY ',' ;


3、先在hive的ods层中创建表


用户画像表

CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_usertag_d (    mdn string comment '手机号大写MD5加密'      ,name string comment '姓名'      ,gender string comment '性别,1男2女'      ,age string comment '年龄'      ,id_number string comment '证件号码'      ,number_attr string comment '号码归属地'      ,trmnl_brand string comment '终端品牌' ,trmnl_price string comment '终端价格'    ,packg string comment '套餐'      ,conpot string comment '消费潜力'      ,resi_grid_id string comment '常住地网格'      ,resi_county_id string comment '常住地区县'  ) comment  '用户画像表'PARTITIONED BY (    day_id string comment '天分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/ods/ods_usertag_d'; 
# 增加分区alter table ods.ods_usertag_d add if not exists partition(day_id='20220409') ;

景区配置表

CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_scenic_boundary (    scenic_id string comment '景区id'      ,scenic_name string comment '景区名称'      ,boundary string comment '景区边界'  ) comment  '景区配置表'ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/ods/ods_scenic_boundary'; 

行政区配置表

CREATE EXTERNAL TABLE IF NOT EXISTS ods.ods_admincode (    prov_id string comment '省id'      ,prov_name string comment '省名称'      ,city_id string comment '市id'      ,city_name string comment '市名称'      ,county_id string comment '区县id'      ,county_name string comment '区县名称'      ,city_level string comment '城市级别,一级为1;二级为2...依此类推'      ,economic_belt string comment 'BJ为首都经济带、ZSJ为珠三角经济带、CSJ为长三角经济带、DB为东北经济带、HZ为华中经济带、HB为华北经济带、HD为华东经济带、HN为华南经济带、XB为西北经济带、XN为西南经济带'      ,city_feature1 string comment 'NL代表内陆、YH代表沿海'  ) comment  '行政区配置表'ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/ods/ods_admincode'; 

4、使用datax 采集数据


用户画像表

编写datax 脚本

datax-crm-usertag-mysql-to-hive.json

{    "job": { "content": [     {  "reader": {      "name": "mysqlreader",      "parameter": {   "connection": [{    "jdbcUrl": [ "jdbc:mysql://master:3306/crm"    ],    "table": [ "usertag"    ],}   ],   "column": ["*"],   "password": "123456",   "username": "root"      }  },  "writer": {      "name": "hdfswriter",      "parameter": {   "defaultFS": "hdfs://master:9000",   "fileType": "text",   "path": "/daas/motl/ods/ods_usertag_d/day_id=${day_id}",   "fileName": "data",   "column": [{    "name": "mdn",    "type": "STRING"},{    "name": "name",    "type": "STRING"},{    "name": "gender",    "type": "STRING"},{    "name": "age",    "type": "INT"},{    "name": "id_number",    "type": "STRING"},{    "name": "number_attr",    "type": "STRING"},{    "name": "trmnl_brand",    "type": "STRING"},{    "name": "trmnl_price",    "type": "STRING"},{    "name": "packg",    "type": "STRING"},{    "name": "conpot",    "type": "STRING"},{    "name": "resi_grid_id",    "type": "STRING"},{    "name": "resi_county_id",    "type": "STRING"}   ],   "writeMode": "append",   "fieldDelimiter": "\t"      }  }     } ], "setting": {     "errorLimit": {  "percentage": 0,  "record": 0     },     "speed": {  "channel": 4,  "record": 1000     } }    }}

运行脚本

datax.py -p "-Dday_id=20220406" datax-crm-usertag-mysql-to-hive.json


行政区配置表

编写脚本

datax-crm-admin-code-mysql-to-hive.json

{    "job": { "content": [     {  "reader": {      "name": "mysqlreader",      "parameter": {   "connection": [{    "jdbcUrl": [ "jdbc:mysql://master:3306/crm"    ],    "table": [ "admin_code"    ],}   ],   "column": ["*"],   "password": "123456",   "username": "root"      }  },  "writer": {      "name": "hdfswriter",      "parameter": {   "defaultFS": "hdfs://master:9000",   "fileType": "text",   "path": "/daas/motl/ods/ods_admincode",   "fileName": "data",   "column": [{    "name": "prov_id",    "type": "STRING"},{    "name": "prov_name",    "type": "STRING"},{    "name": "city_id",    "type": "STRING"},{    "name": "city_name",    "type": "STRING"},{    "name": "county_id",    "type": "STRING"},{    "name": "county_name",    "type": "STRING"},{    "name": "city_level",    "type": "STRING"},{    "name": "economic_belt",    "type": "STRING"},{    "name": "city_feature1",    "type": "STRING"}   ],   "writeMode": "append",   "fieldDelimiter": "\t"      }  }     } ], "setting": {     "errorLimit": {  "percentage": 0,  "record": 0     },     "speed": {  "channel": 1,  "record": 1000     } }    }}

运行脚本

datax.py datax-crm-admin-code-mysql-to-hive.json


景区配置表

编写脚本

datax-crm-scenic_boundary-mysql-to-hive.json

{    "job": { "content": [     {  "reader": {      "name": "mysqlreader",      "parameter": {   "connection": [{    "jdbcUrl": [ "jdbc:mysql://master:3306/crm"    ],    "table": [ "scenic_boundary"    ],}   ],   "column": ["*"],   "password": "123456",   "username": "root"      }  },  "writer": {      "name": "hdfswriter",      "parameter": {   "defaultFS": "hdfs://master:9000",   "fileType": "text",   "path": "/daas/motl/ods/ods_scenic_boundary",   "fileName": "data",   "column": [{    "name": "scenic_id",    "type": "STRING"},{    "name": "scenic_name",    "type": "STRING"},{    "name": "boundary",    "type": "STRING"}   ],   "writeMode": "append",   "fieldDelimiter": "\t"      }  }     } ], "setting": {     "errorLimit": {  "percentage": 0,  "record": 0     },     "speed": {  "channel": 1,  "record": 1000     } }    }}

运行脚本

datax.py datax-crm-scenic_boundary-mysql-to-hive.json 


``第四章```

位置融合表



新建IDEA的Maven项目



新建ods层


新建dwi层



新建common公共模块



将ods本地脚本上传到ods的resources目录下




在外部 pom文件导入spark依赖(和公司spark版本一致)


    <properties> <scala.version>2.11.12</scala.version> <spark.version>2.4.5</spark.version> <scala.library.version>2.11</scala.library.version> <java.version>1.8</java.version>    </properties>    <dependencyManagement> <dependencies>     <dependency>  <groupId>org.scala-lang</groupId>  <artifactId>scala-library</artifactId>  <version>${scala.version}</version>     </dependency>     <dependency>  <groupId>org.scala-lang</groupId>  <artifactId>scala-compiler</artifactId>  <version>${scala.version}</version>     </dependency>     <dependency>  <groupId>org.scala-lang</groupId>  <artifactId>scala-reflect</artifactId>  <version>${scala.version}</version>     </dependency>     <dependency>  <groupId>org.apache.spark</groupId>  <artifactId>spark-sql_${scala.library.version}</artifactId>  <version>${spark.version}</version>     </dependency>      </dependencies>    </dependencyManagement>    <build> <pluginManagement>     <plugins>    <plugin>      <groupId>org.apache.maven.plugins</groupId>      <artifactId>maven-compiler-plugin</artifactId>      <version>3.1</version>      <configuration>   <source>${java.version}</source>   <target>${java.version}</target>      </configuration>  </plugin>    <plugin>      <groupId>org.scala-tools</groupId>      <artifactId>maven-scala-plugin</artifactId>      <version>2.15.2</version>      <executions>   <execution><goals>    <goal>compile</goal>    <goal>testCompile</goal></goals>   </execution>      </executions>  </plugin>     </plugins> </pluginManagement>    </build>

在dwi层导入ctyun的scala依赖


    <dependencies> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-library</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-compiler</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-reflect</artifactId> </dependency> <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-sql_${scala.library.version}</artifactId> </dependency>    </dependencies>    <build> <plugins>          <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-compiler-plugin</artifactId>     </plugin>          <plugin>  <groupId>org.scala-tools</groupId>  <artifactId>maven-scala-plugin</artifactId>     </plugin> </plugins>    </build>

导入依赖完成后新建一个spark文件测试一下编译环境是否可用。


在dwi层创建DwiResRegnMergelocationMskDay类

new package
com.ctyun.dwi

  基于基础额数据提取用户的位置信息构建位置融合表:

  1. 超速处理
  2. 数据脱敏
package com.ctyun.dwiimport org.apache.spark.sql.{DataFrame, SparkSession}/** * 基于基础额数据提取用户的位置信息构建位置融合表 * 1、超速处理 * 2、数据脱敏 * */object DwiResRegnMergelocationMskDay {  def main(args: Array[String]): Unit = {    val spark: SparkSession = SparkSession      .builder()      .appName("DwiResRegnMergelocationMskDay")      .enableHiveSupport() // 开启hive元数据支持      .getOrCreate()    /**     *  读取hive中oidd表     */    val oiddDF: DataFrame = spark.table("ods.ods_oidd")    oiddDF.show()  }}

代码不能在本地运行,因为找不到相关的hive配置文件和依赖
只能写代码或sql打包运行

导入隐式转换


import spark.implicits._import org.apache.spark.sql.functions._

计算两个经纬度之间的距离


在common中创建Geography的java类

package com.ctyun.utils;public class Geography {    /**     * 地球半径     */    private static final double EARTH_RADIUS = 6378137;    /**     * 计算两个经纬度之间的距离     *     * @param longi1 经度1     * @param lati1  纬度1     * @param longi2 经度2     * @param lati2  纬度2     * @return 距离, 返回的是米级别的距离     */    public static double calculateLength(double longi1, double lati1, double longi2, double lati2) { double lat21 = lati1 * Math.PI / 180.0; double lat22 = lati2 * Math.PI / 180.0; double a = lat21 - lat22; double b = (longi1 - longi2) * Math.PI / 180.0; double sa2 = Math.sin(a / 2.0); double sb2 = Math.sin(b / 2.0); double d = 2 * EARTH_RADIUS * Math.asin(Math.sqrt(sa2 * sa2 + Math.cos(lat21) * Math.cos(lat22) * sb2 * sb2)); return Math.abs(d);    }}

dwi层引入 common层的依赖

  • 外部pom
     <dependency>  <groupId>com.ctyun</groupId>  <artifactId>common</artifactId>  <version>1.0</version>     </dependency>
  • dwi层
 <dependency>     <groupId>com.ctyun</groupId>     <artifactId>common</artifactId> </dependency>
  • DwiResRegnMergelocationMskDay类


在时间线上做聚类操作


  例如从家到公司上班,起始位置和结束位置都在一个地方,拿结束位置减去起始位置肯定是不行的。


在root的hive建表merge用于测试

CREATE EXTERNAL TABLE IF NOT EXISTS merge (    mdn string     ,sdate string    ,grid string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/data/merge'; 

时间聚类测试数据

17600195923,212111201532,1956003207504017600195923,212111201533,1956003207504017600195923,212111201534,1956003207504117600195923,212111201535,1956003207504217600195923,212111201536,1956003207504317600195923,212111201537,1956003207504017600195923,212111201538,1956003207504017600195923,212111201539,19560032075040

上传数据

hadoop dfs -put a.txt /data/merge

通过 spark-sql进行测试


hive的底层是MapReduce
spark-sql的底层是spark速度快

1、获取上一条数据2、如果当前位置和上一条数据的位置一致在后面记0,不一致记13、按照flag累加求和,将不同的数据分到不同的类中
select mdn,grid,clazz,min(sdate) start_date,max(sdate) as end_date from (select *,sum(flag) over(partition by mdn order by  sdate)  as clazz from (select * ,case when grid=last_grid then 0 else 1 end as flagfrom (select * ,lag(grid,1) over(partition by mdn order by  sdate) as last_gridfrom merge) as a) as b) as cgroup by mdn,grid,clazz;


超速处理

  1. 步行(0.5-2)m/s
  2. 骑行(2 -5)
  3. 开车(10-30)
  4. 高铁 (60-100)
  5. 飞机(100-340)

查看ods_oidd是否有读权限

hdfs dfs -getfacl /daas/motl/ods/ods_oiddhadoop dfs -du -h /daas/motl/dwi/dwi_res_regn_mergelocation_msk_dhdfs dfs  -setfacl -R -m user:dwi:r-x  /daas/motl/ods/ods_oidd/day_id=20220407

删除之前运行结果

  • 通过root对回收站增加权限
  • 或者跳过垃圾回收机制
hadoop dfs -chmod 777 /user
hadoop  dfs -rmr /daas/motl/dwi/dwi_res_regn_mergelocation_msk_d

位置数据融合表

  • 通过dwi用户的hive创建位置数据融合表
CREATE EXTERNAL TABLE IF NOT EXISTS dwi.dwi_res_regn_mergelocation_msk_d (    mdn string comment '手机号码'      ,start_date string comment '开始时间'      ,end_date string comment '结束时间'      ,county_id string comment '区县编码'      ,longi string comment '经度'      ,lati string comment '纬度'      ,bsid string comment '基站标识'      ,grid_id string comment '网格号'  ) comment  '位置数据融合表'PARTITIONED BY (    day_id string comment '天分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/dwi/dwi_res_regn_mergelocation_msk_d';


编写DwiResRegnMergelocationMskDay代码

  • ods_oidd表结构

package com.ctyun.dwiimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}import com.ctyun.utilsimport com.ctyun.utils.Geographyimport org.apache.sparkimport org.apache.spark.sql.expressions.{UserDefinedFunction, Window}/** * 基于基础额数据提取用户的位置信息构建位置融合表 * 1、超速处理 * 2、数据脱敏 * */object DwiResRegnMergelocationMskDay {  def main(args: Array[String]): Unit = {    if (args.length == 0) {      println("请指定时间参数")      return    }    //时间参数    val day_id: String = args.head    val spark: SparkSession = SparkSession      .builder()      .appName("DwiResRegnMergelocationMskDay")      .enableHiveSupport() // 开启hive元数据支持      .getOrCreate()    import spark.implicits._    import org.apache.spark.sql.functions._    /**     *  spark中的自定义函数     *     */    val calculateLength: UserDefinedFunction = udf((longi: String, lati: String, last_longi: String, last_lati: String) => {      //计算距离      Geography.calculateLength(longi.toDouble, lati.toDouble, last_longi.toDouble, last_lati.toDouble)    })    /**     *  读取hive中oidd表     */    val oiddDF: DataFrame = spark.table("ods.ods_oidd")    oiddDF      //按时间过滤数据,取一天的数据      .filter($"day_id" === day_id)      .select($"mdn", $"start_time", $"county_id", $"longi", $"lati", $"bsid", $"grid_id")      //数据去重      .distinct()      //开始时间      .withColumn("start_date", split($"start_time", ",")(1))      //结束时间      .withColumn("end_date", split($"start_time", ",")(0))      //计算时间差和距离差      //1、获取上一条数据的位置      .withColumn("last_grid", lag($"grid_id", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      //2、如果当前位置和上一条数据的位置一致在后面记0,不一致记1      .withColumn("flag", when($"grid_id" === $"last_grid", 0).otherwise(1))      //3、增加类别      .withColumn("clazz", sum("flag") over Window.partitionBy($"mdn").orderBy($"start_date"))      //4、按照手机号分组获取开始时间和结束时间      .groupBy($"mdn", $"county_id", $"grid_id", $"longi", $"bsid", $"lati", $"clazz")      //取第一个点的开始时间和最后一个点的结束时间      .agg(min($"start_date") as "start_date", max($"end_date") as "end_date")      //获取上一条数据的时间      .withColumn("last_date", lag($"start_date", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      //计算时间差,秒级别      .withColumn("diff_time", when($"last_date".isNull, 1).otherwise(unix_timestamp($"start_date", "yyyyMMddHHmmss") - unix_timestamp($"last_date", "yyyyMMddHHmmss")))      //获取上一条数据的经纬度      .withColumn("last_longi", lag($"longi", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      .withColumn("last_lati", lag($"lati", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      //计算距离,使用自定义的函数计算距离      .withColumn("distance", when($"last_lati".isNull, 1).otherwise(calculateLength($"longi", $"lati", $"last_longi", $"last_lati")))      //计算速度      .withColumn("speed", round($"distance" / $"diff_time", 3))      //过滤掉速度较大的数据      .filter($"speed" <= 340)      //取出需要的字段      .select($"mdn", $"start_date", $"end_date", $"county_id", $"longi", $"lati", $"bsid", $"grid_id")      .write      .format("csv")      .mode(SaveMode.Overwrite)      .option("sep", "\t")      .save(s"/daas/motl/dwi/dwi_res_regn_mergelocation_msk_d/day_id=$day_id")    /**     * spark-submit --master local --class com.ctyun.dwi.DwiResRegnMergelocationMskDay --jars common-1.0.jar dwi-1.0.jar     *     */    /**     * 增加分区     */    spark.sql(      s"""  |alter table dwi.dwi_res_regn_mergelocation_msk_d  add if not exists partition(day_id='$day_id')  |      """.stripMargin)  }}

打包、上传、运行(dwi)

cd /home/dwi

spark-submit --master local --class com.ctyun.dwi.DwiResRegnMergelocationMskDay --jars common-1.0.jar dwi-1.0.jar 20220409

访问master:4040和master:8088的web ui 查看运行结果


在dwi登陆hive查看运行结果


use dwi;select * from dwi_res_regn_mergelocation_msk_d limit 10;


对手机号敏感数据进行加密

  使用md5对手机号进行加密,先拼接字符串后加密,并将md5加密后的数据转换为大写。

.select(upper(md5(concat($"mdn", expr("'liangzai'")))) as  "mdn", //对手机号进行加密$"start_date",$"end_date",$"county_id",$"longi",$"lati",$"bsid",$"grid_id")

第四章

用户画像表


创建dim层

添加依赖

    <dependencies> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-library</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-compiler</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-reflect</artifactId> </dependency> <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-sql_${scala.library.version}</artifactId> </dependency> <dependency>     <groupId>com.ctyun</groupId>     <artifactId>common</artifactId> </dependency>    </dependencies>    <build> <plugins>          <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-compiler-plugin</artifactId>     </plugin>          <plugin>  <groupId>org.scala-tools</groupId>  <artifactId>maven-scala-plugin</artifactId>     </plugin> </plugins>    </build>
    <modules> <module>common</module> <module>ods</module> <module>dwi</module> <module>dim</module>    </modules>

新建包

com.ctyun.dim

新建DimUserTagDay类


登录ods的hive查看用户画像表 是否存在

# 登录odsssh ods@master# 输入密码123456# 登录hive hive# 切换用户use ods;# 查询用户画像表select * from ods_usertag_d limit 10;


hive表如何导入本地目录里

hive -e "select *  from 库名.表名" >> 本地文件名.csv

封装工具


在common里增加spark依赖

    <dependencies> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-library</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-compiler</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-reflect</artifactId> </dependency> <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-sql_${scala.library.version}</artifactId> </dependency>    </dependencies>    <build> <plugins>          <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-compiler-plugin</artifactId>     </plugin>          <plugin>  <groupId>org.scala-tools</groupId>  <artifactId>maven-scala-plugin</artifactId>     </plugin> </plugins>    </build>

新建SparkTool的class工具类并编写

测试

package com.ctyun.utilsabstract class SparkTool {  def main(args: Array[String]): Unit = {    println("SprakTool")    this.run()  }  def run()}

这里是不能直接运行的

新建Test的object类
package com.ctyun.utilsobject Test extends SparkTool {  override def run(): Unit = {  println("test.run")  }}

运行Test:


编写电信数仓的SparkTool封装工具

package com.ctyun.utilsimport org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSession/** *  Spark 工具,将通用的代码封装到父类中 * *  继承Logging类,使用log打印日志 * */abstract class SparkTool extends Logging{  var day_id: String = _  def main(args: Array[String]): Unit = {    // 1、获取时间参数    if (args.length == 0) {      log.error("请传入时间参数")      return    }   day_id= args.head    // 获取类名    val simpleName: String = this.getClass.getSimpleName.replace("$", "")    // 2、创建spark环境    val spark: SparkSession = SparkSession      .builder()      .appName(simpleName)      // 开启hive元数据支持      .enableHiveSupport()      .getOrCreate()    // 调用子类的方法,将spark传过去    this.run(spark)  }  def run(spark: SparkSession)}

使用SparkTool的位置融合表代码

package com.ctyun.dwiimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}import com.ctyun.utils.{Geography, SparkTool}import org.apache.spark.sql.expressions.{UserDefinedFunction, Window}/** * 基于基础额数据提取用户的位置信息构建位置融合表 * 1、超速处理 * 2、数据脱敏 * */object DwiResRegnMergelocationMskDay extends SparkTool{  override def run(spark: SparkSession): Unit = {    import spark.implicits._    import org.apache.spark.sql.functions._    /**     *  spark中的自定义函数     *     */    val calculateLength: UserDefinedFunction = udf((longi: String, lati: String, last_longi: String, last_lati: String) => {      //计算距离      Geography.calculateLength(longi.toDouble, lati.toDouble, last_longi.toDouble, last_lati.toDouble)    })    /**     *  读取hive中oidd表     */    val oiddDF: DataFrame = spark.table("ods.ods_oidd")    val mergeDF: DataFrame =  oiddDF      //按时间过滤数据,取一天的数据      .filter($"day_id" === day_id)      .select($"mdn", $"start_time", $"county_id", $"longi", $"lati", $"bsid", $"grid_id")      //数据去重      .distinct()      //开始时间      .withColumn("start_date", split($"start_time", ",")(1))      //结束时间      .withColumn("end_date", split($"start_time", ",")(0))      //计算时间差和距离差      //1、获取上一条数据的位置      .withColumn("last_grid", lag($"grid_id", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      //2、如果当前位置和上一条数据的位置一致在后面记0,不一致记1      .withColumn("flag", when($"grid_id" === $"last_grid", 0).otherwise(1))      //3、增加类别      .withColumn("clazz", sum("flag") over Window.partitionBy($"mdn").orderBy($"start_date"))      //4、按照手机号分组获取开始时间和结束时间      .groupBy($"mdn", $"county_id", $"grid_id", $"longi", $"bsid", $"lati", $"clazz")      //取第一个点的开始时间和最后一个点的结束时间      .agg(min($"start_date") as "start_date", max($"end_date") as "end_date")      //获取上一条数据的时间      .withColumn("last_date", lag($"start_date", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      //计算时间差,秒级别      .withColumn("diff_time", when($"last_date".isNull, 1).otherwise(unix_timestamp($"start_date", "yyyyMMddHHmmss") - unix_timestamp($"last_date", "yyyyMMddHHmmss")))      //获取上一条数据的经纬度      .withColumn("last_longi", lag($"longi", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      .withColumn("last_lati", lag($"lati", 1) over Window.partitionBy($"mdn").orderBy($"start_date"))      //计算距离,使用自定义的函数计算距离      .withColumn("distance", when($"last_lati".isNull, 1).otherwise(calculateLength($"longi", $"lati", $"last_longi", $"last_lati")))      //计算速度      .withColumn("speed", round($"distance" / $"diff_time", 3))      //过滤掉速度较大的数据      .filter($"speed" <= 340)      //取出需要的字段      .select( upper(md5(concat($"mdn", expr("'liangzai'")))) as  "mdn", //对手机号进行加密 $"start_date", $"end_date", $"county_id", $"longi", $"lati", $"bsid", $"grid_id"      )    mergeDF.write      .format("csv")      .mode(SaveMode.Overwrite)      .option("sep", "\t")      .save(s"/daas/motl/dwi/dwi_res_regn_mergelocation_msk_d/day_id=$day_id")    /**     * spark-submit --master local --class com.ctyun.dwi.DwiResRegnMergelocationMskDay --jars common-1.0.jar dwi-1.0.jar 20220406     *     */    /**     * 增加分区     */    spark.sql(      s"""  |alter table dwi.dwi_res_regn_mergelocation_msk_d  add if not exists partition(day_id='$day_id')  |      """.stripMargin)  }}

修改ods的权限查看数据

# 查看权限hdfs dfs -getfacl /daas/motl/ods/ods_usertag_d# 增加权限hdfs dfs -setfacl -R -m user:dim:r-x /daas/motl/ods/ods_usertag_d# 查看 hadoop dfs -ls /daas/motl/ods/ods_usertag_d# 增加ods权限hdfs dfs -setfacl -R -m user:dim:r-x /daas/motl/ods# 查看数据hadoop dfs -cat  /daas/motl/ods/ods_usertag_d/day_id=20220409/*# 运行spark-submit --master yarn-client --class 组列名 包名 20220409

编写运行script脚本


在dim层新建dim-usertag-day.sh脚本并编写

#!/usr/bin/env bash#***********************************************************************************# **  文件名称: dim-usertag-day.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: 用户画像表# **  输出信息: 用户画像表# **# **  功能描述: 用户画像表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************# 时间参数day_id=$1spark-submit \--master yarn-client \--class com.ctyun.dim.DimUserTagDay \--jars common-1.0.jar \dim-1.0.jar $day_id

注意脚本的换行一定是LF

中国电信大数据离线数据仓库

修改默认换行符为Unix and macOS(\n)


打包上传到dim用户

将common包、dim包、脚本一并上传


运行用户画像表脚本

sh dim-usertag-day.sh 20220409

报错,在root目录下申请权限

hdfs dfs -chmod 777 /user

再执行一遍脚本

报错创建hive的dim.dim_usertag_msk_d表

CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_usertag_msk_d (    mdn string comment '手机号大写MD5加密'      ,name string comment '姓名'      ,gender string comment '性别,1男2女'      ,age string comment '年龄'      ,id_number string comment '证件号码'      ,number_attr string comment '号码归属地'      ,trmnl_brand string comment '终端品牌' ,trmnl_price string comment '终端价格'    ,packg string comment '套餐'      ,conpot string comment '消费潜力'      ,resi_grid_id string comment '常住地网格'      ,resi_county_id string comment '常住地区县'  ) comment  '用户画像表'PARTITIONED BY (    day_id string comment '月分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/dim/dim_usertag_msk_d'; 
alter table dim.dim_usertag_msk_d add if not exists partition(day_id='20220409') ;

查看用户画像表运行结果

hiveuse dim;select * from dim_usertag_msk_d limit 10;


在dwi层新建dwi-res-regn-mergelocation-msk-day.sh脚本并编写

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:dwi-res-regn-mergelocation-msk-day.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: oidd# **  输出信息: 位置融合表# **# **  功能描述: 位置融合表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************# 时间参数day_id=$1spark-submit \--master yarn-client \--class com.ctyun.dwi.DwiResRegnMergelocationMskDay \--conf spark.sql.shuffle.partitions=10 \--jars common-1.0.jar \dwi-1.0.jar $day_id

运行位置融合表脚本

sh dwi-res-regn-mergelocation-msk-day.sh 20220409

在dim层的hive创建行政区配置表


CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_admincode (    prov_id string comment '省id'      ,prov_name string comment '省名称'      ,city_id string comment '市id'      ,city_name string comment '市名称'      ,county_id string comment '区县id'      ,county_name string comment '区县名称'      ,city_level string comment '城市级别,一级为1;二级为2...依此类推'      ,economic_belt string comment 'BJ为首都经济带、ZSJ为珠三角经济带、CSJ为长三角经济带、DB为东北经济带、HZ为华中经济带、HB为华北经济带、HD为华东经济带、HN为华南经济带、XB为西北经济带、XN为西南经济带'      ,city_feature1 string comment 'NL代表内陆、YH代表沿海'  ) comment  '行政区配置表'ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/dim/dim_admincode'; 

在dim层新建dim-admincode.sh脚本并编写

由于spark工具类有时间参数,而admincode不需要,这里就不用spark代码,写脚本即可
这里是将ods_admincode的数据导入dim_admincode表

#!/usr/bin/env bash#***********************************************************************************# **  文件名称: dim-usertag-day.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: 用户画像表# **  输出信息: 用户画像表# **# **  功能描述: 用户画像表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************spark-sql \--master yarn-client \-e "insert into dim.dim_admincode select * from ods.ods_admincode "

不需要打包上传脚本运行即可


运行dim-admincode.sh脚本

sh dim-admincode.sh

查看dim-admincode.sh脚本运行结果


在dim层 创建景区配置表

CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_scenic_boundary (    scenic_id string comment '景区id'      ,scenic_name string comment '景区名称'      ,boundary string comment '景区边界'  ) comment  '景区配置表'ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/dim/dim_scenic_boundary';

在dim层新建dim-scenic-boundary.sh脚本并编写

#!/usr/bin/env bash#***********************************************************************************# **  文件名称: dim-scenic-boundary.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: 景区配置表# **  输出信息: 景区配置表# **# **  功能描述: 景区配置表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************spark-sql \--master yarn-client \-e "insert into dim.dim_scenic_boundary select * from ods.ods_scenic_boundary "

上传运行dim-scenic-boundary.sh脚本

sh dim-scenic-boundary.sh

查看dim-scenic-boundary.sh脚本运行结果

select * from dim.dim_scenic_boundary limit 10;


编写ods层脚本


编写datax-crm-admin-code-mysql-to-hive.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:datax-crm-admin-code-mysql-to-hive.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: oidd# **  输出信息: 位置融合表# **# **  功能描述: 位置融合表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************datax.py datax-crm-admin-code-mysql-to-hive.json

编写datax-crm-scenic_boundary-mysql-to-hive.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:datax-crm-admin-code-mysql-to-hive.sh# **  创建日期: 2022年4月10日# **  编写人员: qinxiao# **  输入信息: oidd# **  输出信息: 位置融合表# **# **  功能描述: 位置融合表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************datax.py datax-crm-scenic_boundary-mysql-to-hive.json

编写datax-crm-usertag-mysql-to-hive.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:datax-crm-admin-code-mysql-to-hive.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: oidd# **  输出信息: 位置融合表# **# **  功能描述: 位置融合表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************day_id=$1hive -e "alter table ods.ods_usertag_d add if not exists partition(day_id='$day_id') ;"datax.py  -p "-Dday_id=$day_id" datax-crm-usertag-mysql-to-hive.json

编写flume-oss-oidd-to-hdfs.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称: flume-oss-oidd-to-hdfs.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: flume数据采集# **  输出信息: flume数据采集# **# **  功能描述: flume数据采集# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************flume-ng agent -n a -f ./flume-oss-oidd-to-hdfs.properties -Dflume.root.logger=DEBUG,console

第五章

时空伴随着

判断一个人是否是景区的游客

  1. 时间过滤
  2. 空间过滤


在MySQL中创建intimate表(疫情密接者)



向intimate表插入数据

进入dwi层的hive

select mdn from dwi.dwi_res_regn_mergelocation_msk_d limit 100;

查询到的手机号复制到visual Studio code进行处理

insert into intimate (mdn,intimate_time) values("0BAC7E88A70B87ADF1CDD0EA01641E0A","2018-05-03");insert into intimate (mdn,intimate_time) values("0BAC7E88A70B87ADF1CDD0EA01641E0A","2018-05-03");insert into intimate (mdn,intimate_time) values("0BAC7E88A70B87ADF1CDD0EA01641E0A","2018-05-03");insert into intimate (mdn,intimate_time) values("AAE7163B751D6F90DB0E6320EACF8536","2018-05-03");insert into intimate (mdn,intimate_time) values("AAE7163B751D6F90DB0E6320EACF8536","2018-05-03");insert into intimate (mdn,intimate_time) values("AAE7163B751D6F90DB0E6320EACF8536","2018-05-03");insert into intimate (mdn,intimate_time) values("AAE7163B751D6F90DB0E6320EACF8536","2018-05-03");insert into intimate (mdn,intimate_time) values("AAE7163B751D6F90DB0E6320EACF8536","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("7A5545007F0E6BB5D19C0A43EDAFF70F","2018-05-03");insert into intimate (mdn,intimate_time) values("55CA3CD9EB19AFCF5CD60C58ADC1F54C","2018-05-03");insert into intimate (mdn,intimate_time) values("55CA3CD9EB19AFCF5CD60C58ADC1F54C","2018-05-03");insert into intimate (mdn,intimate_time) values("1DD1B3AF16748E457E3BA0580573808A","2018-05-03");insert into intimate (mdn,intimate_time) values("235079F5122B12043FC054DF48F6756C","2018-05-03");insert into intimate (mdn,intimate_time) values("235079F5122B12043FC054DF48F6756C","2018-05-03");insert into intimate (mdn,intimate_time) values("235079F5122B12043FC054DF48F6756C","2018-05-03");insert into intimate (mdn,intimate_time) values("235079F5122B12043FC054DF48F6756C","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("9B8CD0FA615838679892E56050E1DB51","2018-05-03");insert into intimate (mdn,intimate_time) values("EC9430192862D2E7382BDE4D326E05E9","2018-05-03");insert into intimate (mdn,intimate_time) values("EC9430192862D2E7382BDE4D326E05E9","2018-05-03");insert into intimate (mdn,intimate_time) values("EC9430192862D2E7382BDE4D326E05E9","2018-05-03");insert into intimate (mdn,intimate_time) values("C549BD718C3D15BFCA21F24811869245","2018-05-03");insert into intimate (mdn,intimate_time) values("501E03F7F43D92D7B0BD99ADAEE375B2","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("DFFC9F211109F4F46F75E052AEFF812F","2018-05-03");insert into intimate (mdn,intimate_time) values("5183424C72FB8853C60E946FA79FA9BB","2018-05-03");insert into intimate (mdn,intimate_time) values("5183424C72FB8853C60E946FA79FA9BB","2018-05-03");insert into intimate (mdn,intimate_time) values("5183424C72FB8853C60E946FA79FA9BB","2018-05-03");insert into intimate (mdn,intimate_time) values("B6C09D637B26DFC39D75D462669FBB4F","2018-05-03");insert into intimate (mdn,intimate_time) values("CC3FB0A6EB61421CA35870465C53D578","2018-05-03");insert into intimate (mdn,intimate_time) values("CC3FB0A6EB61421CA35870465C53D578","2018-05-03");insert into intimate (mdn,intimate_time) values("CC3FB0A6EB61421CA35870465C53D578","2018-05-03");insert into intimate (mdn,intimate_time) values("4BFB2FE788D22D33CA36CEFF87C05C43","2018-05-03");insert into intimate (mdn,intimate_time) values("4BFB2FE788D22D33CA36CEFF87C05C43","2018-05-03");insert into intimate (mdn,intimate_time) values("4BFB2FE788D22D33CA36CEFF87C05C43","2018-05-03");insert into intimate (mdn,intimate_time) values("4BFB2FE788D22D33CA36CEFF87C05C43","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("DE0C5768F2549C3D803F4DB6E2A8378E","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("D2C75DD8D4B2ED05C4BA3CE09E2DD996","2018-05-03");insert into intimate (mdn,intimate_time) values("BF965887C70EB829977EE31B3310D1BA","2018-05-03");insert into intimate (mdn,intimate_time) values("95DA579000B57B59B1E9AB1748BFC817","2018-05-03");insert into intimate (mdn,intimate_time) values("95DA579000B57B59B1E9AB1748BFC817","2018-05-03");insert into intimate (mdn,intimate_time) values("17E67FB53C07E07FDBDAF7917CEAE6F9","2018-05-03");insert into intimate (mdn,intimate_time) values("33FE139A0FECB13E118DE4CC66DFC410","2018-05-03");insert into intimate (mdn,intimate_time) values("33FE139A0FECB13E118DE4CC66DFC410","2018-05-03");insert into intimate (mdn,intimate_time) values("33FE139A0FECB13E118DE4CC66DFC410","2018-05-03");insert into intimate (mdn,intimate_time) values("33FE139A0FECB13E118DE4CC66DFC410","2018-05-03");insert into intimate (mdn,intimate_time) values("F0EE065223D8AA495916559DB2BAD4DE","2018-05-03");insert into intimate (mdn,intimate_time) values("F0EE065223D8AA495916559DB2BAD4DE","2018-05-03");insert into intimate (mdn,intimate_time) values("F0EE065223D8AA495916559DB2BAD4DE","2018-05-03");insert into intimate (mdn,intimate_time) values("F0EE065223D8AA495916559DB2BAD4DE","2018-05-03");insert into intimate (mdn,intimate_time) values("F0EE065223D8AA495916559DB2BAD4DE","2018-05-03");insert into intimate (mdn,intimate_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");


在MySQL中创建confirmed表(疫情确诊)



向confirmed表插入数据

insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("CEDC133FD2AB2E9E1B66F419BE4A27FB","2018-05-03");insert into confirmed (mdn,confirmed_time) values("56A54270CB18C9FDC7F8331513526B3F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("56A54270CB18C9FDC7F8331513526B3F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("56A54270CB18C9FDC7F8331513526B3F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("63D1926013BE71837415DD735C5DE59C","2018-05-03");insert into confirmed (mdn,confirmed_time) values("63D1926013BE71837415DD735C5DE59C","2018-05-03");insert into confirmed (mdn,confirmed_time) values("63D1926013BE71837415DD735C5DE59C","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B27CF1A7FED8650F601A6DB3708A5F10","2018-05-03");insert into confirmed (mdn,confirmed_time) values("3B725237D68D29843A036E5E14ABF129","2018-05-03");insert into confirmed (mdn,confirmed_time) values("3B725237D68D29843A036E5E14ABF129","2018-05-03");insert into confirmed (mdn,confirmed_time) values("3B725237D68D29843A036E5E14ABF129","2018-05-03");insert into confirmed (mdn,confirmed_time) values("3B725237D68D29843A036E5E14ABF129","2018-05-03");insert into confirmed (mdn,confirmed_time) values("92F5DFD7BBC8C9A691FB78F590C0CC49","2018-05-03");insert into confirmed (mdn,confirmed_time) values("8985508EC4F1125B9255C3C266BCF92E","2018-05-03");insert into confirmed (mdn,confirmed_time) values("8985508EC4F1125B9255C3C266BCF92E","2018-05-03");insert into confirmed (mdn,confirmed_time) values("9A756E56B7861780D8180A1D508A84F3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("9A756E56B7861780D8180A1D508A84F3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("9A756E56B7861780D8180A1D508A84F3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("9A756E56B7861780D8180A1D508A84F3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C7D5730F0F655987C02A0E8281F1BA29","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C7D5730F0F655987C02A0E8281F1BA29","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C7D5730F0F655987C02A0E8281F1BA29","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B8BF953999758B9E4D92BA8F5C6DC899","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B8BF953999758B9E4D92BA8F5C6DC899","2018-05-03");insert into confirmed (mdn,confirmed_time) values("B8BF953999758B9E4D92BA8F5C6DC899","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F4B24D0D2CB102F72B93D69D68DBC8C6","2018-05-03");insert into confirmed (mdn,confirmed_time) values("217487EA19A42DC0A150E4275F8897B9","2018-05-03");insert into confirmed (mdn,confirmed_time) values("217487EA19A42DC0A150E4275F8897B9","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F5223784F4FB35D129ABBD8570C487B3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F5223784F4FB35D129ABBD8570C487B3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F5223784F4FB35D129ABBD8570C487B3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F5223784F4FB35D129ABBD8570C487B3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F5223784F4FB35D129ABBD8570C487B3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F5223784F4FB35D129ABBD8570C487B3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("F3AA00F1D22668EE461B21A61A2A45E3","2018-05-03");insert into confirmed (mdn,confirmed_time) values("A89D32C5629CA1B038539D7B9A1BCE7F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("A89D32C5629CA1B038539D7B9A1BCE7F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("A89D32C5629CA1B038539D7B9A1BCE7F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("A9EFA156D6CB1C217AAC87F3A70DB0F7","2018-05-03");insert into confirmed (mdn,confirmed_time) values("A9EFA156D6CB1C217AAC87F3A70DB0F7","2018-05-03");insert into confirmed (mdn,confirmed_time) values("A9EFA156D6CB1C217AAC87F3A70DB0F7","2018-05-03");insert into confirmed (mdn,confirmed_time) values("06C01770754A2AF2B796CCF158BA0049","2018-05-03");insert into confirmed (mdn,confirmed_time) values("252B7A074A59FDEF26E72484F02AA915","2018-05-03");insert into confirmed (mdn,confirmed_time) values("252B7A074A59FDEF26E72484F02AA915","2018-05-03");insert into confirmed (mdn,confirmed_time) values("1F085F951C3B4E783C1373431E09FD0F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("1F085F951C3B4E783C1373431E09FD0F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("1F085F951C3B4E783C1373431E09FD0F","2018-05-03");insert into confirmed (mdn,confirmed_time) values("D5C8CBEB8762FEB815E7709B32EB5E82","2018-05-03");insert into confirmed (mdn,confirmed_time) values("D5C8CBEB8762FEB815E7709B32EB5E82","2018-05-03");insert into confirmed (mdn,confirmed_time) values("D5C8CBEB8762FEB815E7709B32EB5E82","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C309E7CFBEE0635CB081C56A25A3B816","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");insert into confirmed (mdn,confirmed_time) values("C33E6AB1DA0E28E02380DFD05D486ADA","2018-05-03");


时空伴随着计算


新建dws模块



新建com.ctyun.dws包


com.ctyun.dws

导入试试park依赖


    <dependencies> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-library</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-compiler</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-reflect</artifactId> </dependency> <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-sql_${scala.library.version}</artifactId> </dependency> <dependency>     <groupId>com.ctyun</groupId>     <artifactId>common</artifactId> </dependency>    </dependencies>    <build> <plugins>          <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-compiler-plugin</artifactId>     </plugin>          <plugin>  <groupId>org.scala-tools</groupId>  <artifactId>maven-scala-plugin</artifactId>     </plugin> </plugins>    </build>

新建SpacetimeAdjoint类


访问spark官网将jdbc的驱动拿下来


在common新建DateUtil并编辑

package com.ctyun.utilsimport java.text.SimpleDateFormatobject DateUtil {  def diff(date1: String, data2: String): Long = {    //将时间字符串转换成时间戳    val format = new SimpleDateFormat("yyyyMMddHHmmss")    //获取时间戳    val time1: Long = format.parse(date1).getTime    val time2: Long = format.parse(data2).getTime    Math.abs(time1 - time2) / 1000  }  def main(args: Array[String]): Unit = {    println(diff("20180503100029", "20180503100050"))  }}

代码优化


  这里代码是笛卡尔积 ,需要进行优化。

#运行慢可以执行命令杀死进程yarn application -kill application_1649504042939_0009

这里已经运行完毕,需再次调优


编辑SpacetimeAdjoint类(已优化)

package com.ctyun.dwsimport com.ctyun.utils.{DateUtil, Geography, SparkTool}import org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.sql._import org.apache.spark.storage.StorageLevelobject SpacetimeAdjoint extends SparkTool {  override def run(spark: SparkSession): Unit = {    import spark.implicits._    //1、读取位置融合表    val mergeLocation: DataFrame = spark      .table("dwi.dwi_res_regn_mergelocation_msk_d")      .where($"day_id" === day_id)    //2、使用jdb读取mysql中的确诊列表和密接列表    //确诊列表    val confirmed: DataFrame = spark      .read      .format("jdbc")      .option("url", "jdbc:mysql://master:3306")      .option("dbtable", "crm.confirmed")      .option("user", "root")      .option("password", "123456")      .load()    //密接列表    val intimate: DataFrame = spark      .read      .format("jdbc")      .option("url", "jdbc:mysql://master:3306")      .option("dbtable", "crm.intimate")      .option("user", "root")      .option("password", "123456")      .load()    /**      * 1、获取确诊人员的行动轨迹      */    val confirmedLocation: DataFrame = mergeLocation.join(confirmed, "mdn")    //确诊人员轨迹列表    val confirmedLocationArray: Array[(String, String, String, String)] = confirmedLocation      .select($"mdn", $"start_date", $"longi", $"lati")      .as[(String, String, String, String)]      .collect()    //将确诊列表广播出去    val confirmedLocationArraybro: Broadcast[Array[(String, String, String, String)]] = spark.sparkContext.broadcast(confirmedLocationArray)    //确诊人员的手机号    val confirmedMdns: Array[String] = confirmed.select($"mdn").as[String].collect()    //过滤掉以及确诊的人员    val filterMergeLocation: DataFrame = mergeLocation      .select($"mdn", $"start_date", $"longi", $"lati")      .filter(row => { val mdn: String = row.getAs[String]("mdn") !confirmedMdns.contains(mdn)      })    /**      * 时空伴随着的判断条件      * 1、时间差一小时      * 2、距离差500米      *      */    //将位置融合表转换成rdd,以手机号作为key    val mergeRDD: RDD[(String, (String, String, String))] = filterMergeLocation.rdd.map {      case Row(mdn: String, start_date: String, longi: String, lati: String) => (mdn, (start_date, longi, lati))    }    val groupByRDD: RDD[(String, Iterable[(String, String, String)])] = mergeRDD.groupByKey(10)    val filterRDD: RDD[(String, Iterable[(String, String, String)])] = groupByRDD.filter {      case (mdn: String, datas: Iterable[(String, String, String)]) => //一个人一天的位置 val dataList: List[(String, String, String)] = datas.toList var flag = false var j = 0 while (!flag && j < dataList.length) {   val (start_date, longi, lati) = dataList(j)   val confirmedLocationArray: Array[(String, String, String, String)] = confirmedLocationArraybro.value   var i = 0   while (!flag && i < confirmedLocationArray.length) {     val (confirmedMdn, confirmedStart_date, confirmedlLongi, confirmedLati) = confirmedLocationArray(i)     //1、计算时间戳     val diff: Long = DateUtil.diff(start_date, confirmedStart_date)     if (diff < 3600) {//2、计算距离差val length: Double = Geography.calculateLength(longi.toDouble, lati.toDouble, confirmedlLongi.toDouble, confirmedLati.toDouble)if (length < 500) {  flag = true}     }     i += 1   }   j += 1 } true    }    //将数据转换成多行    val resultrRDD: RDD[(String, String, String, String)] = filterRDD.flatMap {      case (mdn: String, datas: Iterable[(String, String, String)]) => datas.map {   case (start_date, longi, lati) =>     (mdn, start_date, longi, lati) }    }    val filterDF: DataFrame = resultrRDD.toDF("mdn", "start_date", "longi", "lati")    //filterDF.persist(StorageLevel.MEMORY_AND_DISK_SER)    //保存密切接触者规则    filterDF      .write      .format("csv")      .mode(SaveMode.Overwrite)      .option("sep", "\t")      .save(s"/daas/motl/dws/spacetime_adjoint/day_id=$day_id")    //取出密切接触者手机号    //    filterDF    //      .select("mdn")    //      .distinct()    //      .write    //      .format("csv")    //      .mode(SaveMode.Overwrite)    //      .option("sep", "\t")    //      .save(s"/daas/motl/dws/spacetime_adjoint_mdns/day_id=$day_id")  }}

在dws层 编写spacetime-adjoint.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:spacetime-adjoint.sh# **  创建日期: 2022年4月10日# **  编写人员: liangzai# **  输入信息: oidd# **  输出信息: 时空伴随着# **# **  功能描述: 时空伴随着# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************# 时间参数day_id=$1spark-submit \--master yarn-client \--num-executors 1 \--executor-cores 4 \--executor-memory 6G \--conf spark.sql.shuffle.partitions=10 \--class com.ctyun.dws.SpacetimeAdjoint \--jars common-1.0.jar \dws-1.0.jar $day_id

在dwi层增加权限

hdfs dfs -setfacl -R -m user:dws:r-x /daas/motl/dwi/

上传包并运行spacetime-adjoint.sh脚本

sh spacetime-adjoint.sh 20220409

查看spacetime-adjoint.sh脚本运行结果


hadoop  dfs -cat /daas/motl/dws/spacetime_adjoint/day_id=20220409/part-00000-a7bb9404-4d01-4410-a583-e07b0cce2e68-c000.csv


再次优化

在SparkTool里新增代码块

  /**    * spark中的自定义函数    *    */  //计算两个经纬度距离  val calculateLength: UserDefinedFunction = udf((longi: String, lati: String, last_longi: String, last_lati: String) => {    //计算距离    Geography.calculateLength(longi.toDouble, lati.toDouble, last_longi.toDouble, last_lati.toDouble)  })

优化后的SpacetimeAdjoint代码


package com.ctyun.dwsimport com.ctyun.utils.{DateUtil, Geography, SparkTool}import org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.sql._import org.apache.spark.storage.StorageLevelobject SpacetimeAdjoint extends SparkTool {  override def run(spark: SparkSession): Unit = {    import spark.implicits._    import org.apache.spark.sql.functions._    //1、读取位置融合表    val mergeLocation: DataFrame = spark      .table("dwi.dwi_res_regn_mergelocation_msk_d")      .where($"day_id" === day_id)    //2、使用jdb读取mysql中的确诊列表和密接列表    //确诊列表    val confirmed: DataFrame = spark      .read      .format("jdbc")      .option("url", "jdbc:mysql://master:3306")      .option("dbtable", "crm.confirmed")      .option("user", "root")      .option("password", "123456")      .load()    /**     * 1、获取确诊人员的行动轨迹     */    val confirmedLocation: DataFrame = mergeLocation      .join(confirmed.hint("broadcast"), "mdn")      .select($"mdn" as "confirmed_mdn", $"start_date" as "confirmed_start_date", $"longi" as "confirmed_longi", $"lati" as "confirmed_lati", $"county_id")    /**     * 时空伴随着的判断条件     * 1、时间差一小时     * 2、距离差500米     *     */    //按照区县关联    val filterDF: DataFrame = mergeLocation      .join(confirmedLocation.hint("broadcast"), "county_id")      //排除确诊人员      .where($"confirmed_mdn" =!= $"mdn")      //1、计算用户和确诊人员数据相差的时间      .withColumn("diff", abs(unix_timestamp($"confirmed_start_date", "yyyyMMddHHmmss") - unix_timestamp($"start_date", "yyyyMMddHHmmss")))      //过滤数据      .where($"diff" < 3600)      //计算距离      .withColumn("length", calculateLength($"confirmed_longi", $"confirmed_lati", $"longi", $"lati"))      //过滤数据      .where($"length" < 500)      .select($"mdn" , $"start_date" , $"end_date" , $"county_id" , $"longi" , $"lati" , $"bsid" , $"grid_id")    filterDF.persist(StorageLevel.MEMORY_AND_DISK_SER)    //保存密切接触者规则    filterDF      .write      .format("csv")      .mode(SaveMode.Overwrite)      .option("sep", "\t")      .save(s"/daas/motl/dws/spacetime_adjoint/day_id=$day_id")    //取出密切接触者手机号    filterDF      .select("mdn")      .distinct()      .write      .format("csv")      .mode(SaveMode.Overwrite)      .option("sep", "\t")      .save(s"/daas/motl/dws/spacetime_adjoint_mdns/day_id=$day_id")      }}

打包上传并运行spacetime-adjoint.sh脚本

sh spacetime-adjoint.sh 20220409

访问mater:8088和master:4040查看运行 状态

可见优化后速度得到了显著的提升,关联而不是笛卡尔积

查看优化后的运行结果

hadoop dfs -du -h /daas/motl/dws/spacetime_adjointhadoop dfs -du -h /daas/motl/dws/spacetime_adjoint_mdnshadoop dfs  -cat /daas/motl/dws/spacetime_adjoint_mdns/day_id=20220409/part-00001-a4aa9fe8-5f8e-4a0d-b5a3-7ba98a677e47-c000.csv

  • 密接者手机号

可以看出,优化后代码大大减少了数据的冗余


第六章

Azkaban定时调度


在root安装搭建Azkaban


1、上传解压

yum install unzipyunzip azkaban-solo-server.zip

2、修改配置文件

cd /usr/local/soft/azkaban-solo-server/confvim azkaban.properties# 修改时区default.timezone.id=Asia/Shanghai


3、启动azkaban

cd /usr/local/soft/azkaban-solo-server# 启动./bin/start-solo.sh 
jps


4、访问azkaban

http://master:8081用户名密码  azkaban/azkaban

  • 下载Azkaban案例观察配置文件

flowchatflowchat---config:  failure.emails: noreply@foo.comnodes:  - name: jobC    type: noop    # jobC depends on jobA and jobB    dependsOn:      - jobA      - jobB  - name: jobA    type: command    config:      command: echo "This is an echoed text."  - name: jobB    type: command    config:      command: pwd
  • Azkaban调度策略(每天1点执行一次)


5、配置邮箱服务

vim conf/azkaban.propertiesmail.sender 发送方mail.host  邮箱服务器的地址mail.user 用户名mail.password 授权码# 增加以下配置mail.sender=987262086@qq.commail.host=smtp.qq.commail.user=987262086@qq.commail.password=aaaaa

6、重启 、关闭、启动

# 重启azkabancd /usr/local/soft/azkaban-solo-server# 关闭./bin/shutdown-solo.sh# 启动./bin/start-solo.sh 

在IDEA中ctyun目录下创建jobs目录

  • 将下载的Azkaban文件解压放到jobs目录下

中国电信大数据离线数据仓库


编写basic.flow

---config:  failure.emails: noreply@foo.com  day_id: $(new("org.joda.time.DateTime").minusDays(1).toString("yyyyMMdd"))nodes:  - name: dwi-res-regn-mergelocation-msk-day    type: command    config:      command: su - dwi -c "sh /home/dwi/dwi-res-regn-mergelocation-msk-day.sh ${day_id}"  - name: spacetime-adjoint    type: command    # jobC depends on jobA and jobB    config:      command: su - dws -c "sh /home/dws/spacetime-adjoint.sh ${day_id}"    dependsOn:      - dwi-res-regn-mergelocation-msk-day  - name: datax-crm-usertag-mysql-to-hive    type: command    config:      command: su - ods -c "sh /home/ods/datax-crm-usertag-mysql-to-hive.sh ${day_id}"  - name: dim-usertag-day    type: command    config:      command: su - dim -c "sh /home/dim/dim-usertag-day.sh ${day_id}"    dependsOn:      - datax-crm-usertag-mysql-to-hive

这里command需要切换 用户,不然会变成root用户,其它用户无法读取

  • 打开资源管理路径将jobs目录下的两个文件压缩成jobs.zip文件并上传到Azkaban

  • 查看运行日志

没有报错

  • 执行成功


市游客表


判断一个人是否是景区的游客


在dws层创建dws_city_tourist_msk_d游客表

CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_city_tourist_msk_d (    mdn string comment '手机号大写MD5加密'      ,source_county_id string comment '游客来源区县'      ,d_city_id string comment '旅游目的地市代码'      ,d_stay_time double comment '游客在该省停留的时间长度(小时)'      ,d_max_distance double comment '游客本次出游距离'  ) comment  '旅游应用专题数据城市级别-天'PARTITIONED BY (    day_id string comment '日分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/daas/motl/dws/dws_city_tourist_msk_d';
alter table dws.dws_city_tourist_msk_d add if not exists partition(day_id='20220409') ;

新建并编写DwsCityTouristMskDay类

package com.ctyun.dwsimport com.ctyun.utils.SparkToolimport org.apache.spark.sql._import org.apache.spark.sql.expressions.Windowobject DwsCityTouristMskDay extends SparkTool {  override def run(spark: SparkSession): Unit = {    import org.apache.spark.sql.functions._    import spark.implicits._    //1、读取融合表    val mergeLocation: Dataset[Row] = spark      .table("dwi.dwi_res_regn_mergelocation_msk_d")      .where($"day_id" === day_id)    //2|读取用户画像表    val usertag: Dataset[Row] = spark      .table("dim.dim_usertag_msk_d")      .where($"day_id" === day_id)    //行政区配置表    val adminCode: DataFrame = spark      .table("dim.dim_admincode")    /**      * 游客的判断条件      * 1、停留时间大于3小时      * 2、出游距离大于10KM      *      */    val resultDF: Dataset[Row] = mergeLocation      //1、关联位置融合表和行政区配置表获取停留点的城市编号      .join(adminCode.hint("broadcast"), "county_id")      //计算用户在一个城市内的停留时间      //获取一个人在一个城市内的第一个时间和最后一个时间      .withColumn("min_date", min($"start_date") over Window.partitionBy($"mdn", $"city_id"))      .withColumn("max_date", max($"end_date") over Window.partitionBy($"mdn", $"city_id"))      .withColumn("d_stay_time", unix_timestamp($"max_date", "yyyyMMddHHmmss") - unix_timestamp($"min_date", "yyyyMMddHHmmss"))      //1、停留时间大于3小时      .where($"d_stay_time" > 60 * 60 * 3)      //关联获取用户常住地的网格      .join(usertag, "mdn")      //计算用户常住地到停留点的距离      .withColumn("distance", calculateLengthByGrid($"resi_grid_id", $"grid_id"))      //计算最远距离      .withColumn("d_max_distance", max($"distance") over Window.partitionBy($"mdn", $"city_id"))      //2、出游距离大于10KM      .where($"d_max_distance" > 10000)      .select($"mdn", $"resi_county_id", $"city_id", round($"d_stay_time" / 3600, 2), round($"d_max_distance" / 1000, 2))      .distinct()    resultDF.write      .format("csv")      .option("sep", "\t")      .mode(SaveMode.Overwrite)      .save(s"/daas/motl/dws/dws_city_tourist_msk_d/day_id=$day_id")    spark.sql(      s"""  |alter table dws.dws_city_tourist_msk_d add if not exists partition(day_id='$day_id') |      """.stripMargin)  }}

在SparkTool里新增

  //计算两个网格距离  val calculateLengthByGrid: UserDefinedFunction = udf((grid1: String, grid2: String) => {    //计算距离    Geography.calculateLength(grid1.toLong, grid2.toLong)  })

在Geography里新增

    /**     * 计算两个网格之间的距离     *     * @param p1 第一个网格点     * @param p2 第二个网格点     * @return 返回两个点的距离     */    public static double calculateLength(Long p1, Long p2) { Point2D.Double point1 = Grid.getCenter(p1); Point2D.Double point2 = Grid.getCenter(p2); return calculateLength(point1.x, point1.y, point2.x, point2.y);    }

在common新建Grid并编写

/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils;import java.awt.geom.Point2D;import java.math.BigDecimal;public class Grid {/** * 获取网格中心点 * @param gridId 网格id * @return 中心点坐标 */public static Point2D.Double getCenter(Long gridId){double[] bound = Grid.getBound(gridId);double x = (bound[0]+bound[1])/2;double y = (bound[2]+bound[3])/2;return new Point2D.Double(x, y);}/** * 根据ID获取网格覆盖范围, 使用了BigDecimal确保精度,但速度稍慢。查询经纬度对应网格编号请使用getGridID系列方法 * * @param gridID *     网格ID * @return [xmin, xmax, ymin, ymax],精确到小数点后第四位 */public static double[] getBound(long gridID) {double[] bound = new double[4];int z = (int) (gridID % 100) / 10;GridLevel zLevel = GridLevel.levelOfZ(z);//double length = zLevel.length();BigDecimal length = BigDecimal.valueOf(zLevel.length());gridID /= 100;// ymin = ybound[2] = (gridID % 1000000) / 10000.00;// ymax = y+length\//BIgDecimal可替换为double, 但精度有问题//bound[3] = bound[2]+length;bound[3] = length.add(BigDecimal.valueOf(bound[2])).doubleValue();gridID /= 1000000;// xmin = xbound[0] = gridID / 10000.00;// xman = x+length//bound[1] = bound[0]+length;bound[1] = length.add(BigDecimal.valueOf(bound[0])).doubleValue();return bound;}}

在common新建GridLevel.java并编写

/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils;import java.util.ArrayList;import java.util.List;/** * 基础网格级别,经纬度网格 *  * @author Lin Dong * */public enum GridLevel {/** * 0.1度(约10公里) */GRID_0_1,/** * 0.05度(约5公里) */GRID_0_05,/** * 0.02度(约2公里) */GRID_0_02,/** * 0.01度(约1公里) */GRID_0_01,/** * 0.005度(约500米) */GRID_0_005,/** * 0.002度(约200米) */GRID_0_002,/** * 0.001度(约100米) */GRID_0_001;/** * 返回网格的高度/宽度(经纬度) *  * @param level *     网格级别 * @return 网格高度/宽度 */private static double value(GridLevel level) {switch (level) {case GRID_0_001:return 0.00100;case GRID_0_002:return 0.00200;case GRID_0_005:return 0.00500;case GRID_0_01:return 0.01000;case GRID_0_02:return 0.02000;case GRID_0_05:return 0.05000;case GRID_0_1:return 0.10000;default:// 实际上不应该执行return Double.NaN;}}/** * 返回网格的大小 *  * @return 网格高度/宽度(单位:度) */public double length() {return this.value();}/** * 返回网格的高度/宽度(经纬度) *  * @return 网格高度/宽度 */public double value() {return GridLevel.value(this);}/** * 返回网格的近似高/宽 *  * @param level *     网格级别 * @return 网格近似高/宽,单位:米 */private static double lengthAlias(GridLevel level) {// 1度约等于100kmreturn level.value() * 100000;}/** * 返回网格的近似高/宽 *  * @return 网格近似高/宽,单位:米 */public double lengthAlias() {return lengthAlias(this);}/** * 返回网格的准确宽度。网格对应的宽度随纬度变化,即便同城也有微小变化,但大多数场景下可以忽略 *  * @param level *     网格级别 * @param y 网格左下角的纬度 * @return 网格宽度,单位:米 */private static double lengthX(GridLevel level, double y) {double delta = level.value();return GISUtil.getDistance(0, y, 0 + delta, y);}/** * 返回网格的准确宽度。网格对应的宽度随纬度变化,即便同城也有微小变化,但大多数场景下可以忽略 *  * @param y *     网格左下角的纬度 * @return 网格宽度,单位:米 */public double lengthX(double y) {return lengthX(this, y);}/** * 返回网格的准确高度 *  * @param level *     网格级别 * @return 网格高度,单位:米 */private static double lengthY(GridLevel level) {// 1度约等于100kmreturn level.value() * Constants.M_PER_DEGREE_Y;}/** * 返回网格的准确高度 *  * @return 网格高度,单位:米 */public double lengthY() {return lengthY(this);}/** * 根据近似距离判断网格级别 *  * @param length *     近似距离(单位:米)。目前仅支持10000、5000、2000、1000、500、200、100这几种 * @return 对应网格级别,如果不存在返回{@code null} */public static GridLevel levelOfAlias(int length) {switch (length) {case 10000:return GRID_0_1;case 5000:return GRID_0_05;case 2000:return GRID_0_02;case 1000:return GRID_0_01;case 500:return GRID_0_005;case 200:return GRID_0_002;case 100:return GRID_0_001;default:// 不支持的网格宽度return null;}}/** * 返回网格的z代码
* Z: 网格级别,1位,用于区分网格级别。列表见下。
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
编号级别右上角偏移量(度)备注
0预留
1100m(+0.001,+0.001基本单元
2200m(+0.002,+0.002)可由100m聚合
3预留
4500m(+0.005,+0.005)
51000m(+0.01,+0.01)
62000m(+0.02,+0.02)可由1000m聚合
7预留
85000m(+0.05,+0.05)
910000m(+0.1,+0.1)
* * @param level * 网格级别 * @return 级别的z代号 */
private static int z(GridLevel level) {switch (level) {case GRID_0_001:return 1;case GRID_0_002:return 2;case GRID_0_005:return 4;case GRID_0_01:return 5;case GRID_0_02:return 6;case GRID_0_05:return 8;case GRID_0_1:return 9;default:// 实际上不应该执行return -1;}}/** * 返回网格的z代码
* Z: 网格级别,1位,用于区分网格级别。列表见下。
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
编号级别右上角偏移量(度)备注
0预留
1100m(+0.001,+0.001基本单元
2200m(+0.002,+0.002)可由100m聚合
3预留
4500m(+0.005,+0.005)
51000m(+0.01,+0.01)
62000m(+0.02,+0.02)可由1000m聚合
7预留
85000m(+0.05,+0.05)
910000m(+0.1,+0.1)
* * @return 级别的z代号 */
public int z() {return z(this);}/** * 返回z代码对应的网个级别
* @param z 网格级别,1位,用于区分网格级别。列表见下。
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *
编号级别右上角偏移量(度)备注
0预留
1100m(+0.001,+0.001基本单元
2200m(+0.002,+0.002)可由100m聚合
3预留
4500m(+0.005,+0.005)
51000m(+0.01,+0.01)
62000m(+0.02,+0.02)可由1000m聚合
7预留
85000m(+0.05,+0.05)
910000m(+0.1,+0.1)
* * @return z对应的级别,不存在返回{@code null} */
public static GridLevel levelOfZ(int z) {switch (z) {case 0:// 预留return null;case 1:return GRID_0_001;case 2:return GRID_0_002;case 3:// 预留return null;case 4:return GRID_0_005;case 5:return GRID_0_01;case 6:return GRID_0_02;case 7:// 预留return null;case 8:return GRID_0_05;case 9:return GRID_0_1;default:return null;}}/** * 返回相邻级别的网格(更小的) * * @return 下一级网格,如果不存在则返回{@code null} */public GridLevel nextLevel() {switch (this) {// case GRID_0_001:case GRID_0_002:return GRID_0_001;case GRID_0_005:return GRID_0_002;case GRID_0_01:return GRID_0_005;case GRID_0_02:return GRID_0_01;case GRID_0_05:return GRID_0_02;case GRID_0_1:return GRID_0_05;default:return null;}}/** * 返回相邻级别的网格(更大的) * * @return 上一级网格,如果不存在则返回{@code null} */public GridLevel lastLevel() {switch (this) {case GRID_0_001:return GRID_0_002;case GRID_0_002:return GRID_0_005;case GRID_0_005:return GRID_0_01;case GRID_0_01:return GRID_0_02;case GRID_0_02:return GRID_0_05;case GRID_0_05:return GRID_0_1;// case GRID_0_1:default:return null;}}/** * 相邻级别(上若干级和下若干级,含自身) * @param distance 级差范围 * * @return 相邻的网格级别 */public List<GridLevel> neighbourLevel(int distance) {List<GridLevel> neighbours = new ArrayList<GridLevel>();if (distance <= 0) {return null;}neighbours.add(this);GridLevel last = this.lastLevel();GridLevel next = this.nextLevel();if (last != null) {neighbours.add(0,last);}if (next != null) {neighbours.add(neighbours.size(),next);}for (int i = 1; i < distance; i++) {if (last != null) {last = last.lastLevel();if(last != null){neighbours.add(0,last);}}if (next != null) {next = next.nextLevel();if(next != null){neighbours.add(neighbours.size(),next);}}}return neighbours;}/** * 相邻级别(上一级和下一级,含自身) * * @return 相邻的网格级别,可能包含2个或1个元素 */public List<GridLevel> neighbourLevel() {return neighbourLevel(1);}/*public static void main(String[] args) {System.out.println(GridLevel.GRID_0_002.neighbourLevel(2));} */}

在common中新建GISUtil.java并编写

/********************************************************************* * * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ * *  [2015] - [2020] China Telecom Corporation Limited, *  All Rights Reserved. * * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained * herein are proprietary to China Telecom Corporation and its * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or * copyright law. Dissemination of this information or * reproduction of this material is strictly forbidden unless prior * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils;import com.ctyun.utils.entity.Gps;public class GISUtil {public static final int r = 6371000;// private static final double PI = 3.141592653589793D;// private static final double R = 6371229.0D;public static final String BAIDU_LBS_TYPE = "bd09ll";public static double getDistance(double longt1, double lat1, double longt2, double lat2) {double x = (longt2 - longt1) * 3.141592653589793D * 6371229.0D* Math.cos((lat1 + lat2) / 2.0D * 3.141592653589793D / 180.0D)/ 180.0D;double y = (lat2 - lat1) * 3.141592653589793D * 6371229.0D / 180.0D;double distance = Math.hypot(x, y);return distance;}public static double getLongt(double longt1, double lat1, double distance) {double a = 180.0D* distance/ (2.0015806220738243E7D * Math.cos(lat1 * 3.141592653589793D / 180.0D));return a;}public static double getLat(double longt1, double lat1, double distance) {double a = 180.0D* distance/ (2.0015806220738243E7D * Math.cos(lat1 * 3.141592653589793D / 180.0D));return a;}public static double pi = 3.141592653589793D;public static double a = 6378245.0D;public static double ee = 0.006693421622965943D;public static Gps gps84_To_Gcj02(double lat, double lon) {if (outOfChina(lat, lon)) {return null;}double dLat = transformLat(lon - 105.0D, lat - 35.0D);double dLon = transformLon(lon - 105.0D, lat - 35.0D);double radLat = lat / 180.0D * pi;double magic = Math.sin(radLat);magic = 1.0D - ee * magic * magic;double sqrtMagic = Math.sqrt(magic);dLat = dLat * 180.0D / (a * (1.0D - ee) / (magic * sqrtMagic) * pi);dLon = dLon * 180.0D / (a / sqrtMagic * Math.cos(radLat) * pi);double mgLat = lat + dLat;double mgLon = lon + dLon;return new Gps(mgLat, mgLon);}public static Gps gcj_To_Gps84(double lat, double lon) {Gps gps = transform(lat, lon);double lontitude = lon * 2.0D - gps.getWgLon();double latitude = lat * 2.0D - gps.getWgLat();return new Gps(latitude, lontitude);}public static Gps gcj02_To_Bd09(double gg_lat, double gg_lon) {double x = gg_lon;double y = gg_lat;double z = Math.sqrt(x * x + y * y) + 2.0E-5D * Math.sin(y * pi);double theta = Math.atan2(y, x) + 3.0E-6D * Math.cos(x * pi);double bd_lon = z * Math.cos(theta) + 0.0065D;double bd_lat = z * Math.sin(theta) + 0.006D;return new Gps(bd_lat, bd_lon);}public static Gps bd09_To_Gcj02(double bd_lat, double bd_lon) {double x = bd_lon - 0.0065D;double y = bd_lat - 0.006D;double z = Math.sqrt(x * x + y * y) - 2.0E-5D * Math.sin(y * pi);double theta = Math.atan2(y, x) - 3.0E-6D * Math.cos(x * pi);double gg_lon = z * Math.cos(theta);double gg_lat = z * Math.sin(theta);return new Gps(gg_lat, gg_lon);}public static Gps bd09_To_Gps84(double bd_lat, double bd_lon) {Gps gcj02 = bd09_To_Gcj02(bd_lat, bd_lon);Gps map84 = gcj_To_Gps84(gcj02.getWgLat(), gcj02.getWgLon());return map84;}public static boolean outOfChina(double lat, double lon) {if ((lon < 72.004D) || (lon > 137.8347D))return true;if ((lat < 0.8293D) || (lat > 55.8271D))return true;return false;}public static Gps transform(double lat, double lon) {if (outOfChina(lat, lon)) {return new Gps(lat, lon);}double dLat = transformLat(lon - 105.0D, lat - 35.0D);double dLon = transformLon(lon - 105.0D, lat - 35.0D);double radLat = lat / 180.0D * pi;double magic = Math.sin(radLat);magic = 1.0D - ee * magic * magic;double sqrtMagic = Math.sqrt(magic);dLat = dLat * 180.0D / (a * (1.0D - ee) / (magic * sqrtMagic) * pi);dLon = dLon * 180.0D / (a / sqrtMagic * Math.cos(radLat) * pi);double mgLat = lat + dLat;double mgLon = lon + dLon;return new Gps(mgLat, mgLon);}public static double transformLat(double x, double y) {double ret = -100.0D + 2.0D * x + 3.0D * y + 0.2D * y * y + 0.1D * x* y + 0.2D * Math.sqrt(Math.abs(x));ret += (20.0D * Math.sin(6.0D * x * pi) + 20.0D * Math.sin(2.0D * x* pi)) * 2.0D / 3.0D;ret += (20.0D * Math.sin(y * pi) + 40.0D * Math.sin(y / 3.0D * pi)) * 2.0D / 3.0D;ret += (160.0D * Math.sin(y / 12.0D * pi) + 320.0D * Math.sin(y * pi/ 30.0D)) * 2.0D / 3.0D;return ret;}public static double transformLon(double x, double y) {double ret = 300.0D + x + 2.0D * y + 0.1D * x * x + 0.1D * x * y + 0.1D*Math.sqrt(Math.abs(x));ret += (20.0D * Math.sin(6.0D * x * pi) + 20.0D * Math.sin(2.0D * x* pi)) * 2.0D / 3.0D;ret += (20.0D * Math.sin(x * pi) + 40.0D * Math.sin(x / 3.0D * pi)) * 2.0D / 3.0D;ret = ret+ (150.0D * Math.sin(x / 12.0D * pi) + 300.0D * Math.sin(x/ 30.0D * pi)) * 2.0D / 3.0D;return ret;}public static int getCellID(int xGridNum, int yGridNum) {return xGridNum * 10000 + yGridNum;}public static void main(String[] args) {Gps gps = new Gps(31.426896D, 119.496145D);System.out.println("gps :" + gps);Gps gcj = gps84_To_Gcj02(gps.getWgLat(), gps.getWgLon());System.out.println("gcj :" + gcj);Gps star = gcj_To_Gps84(gcj.getWgLat(), gcj.getWgLon());System.out.println("star:" + star);Gps bd = gcj02_To_Bd09(gcj.getWgLat(), gcj.getWgLon());System.out.println("bd  :" + bd);Gps gcj2 = bd09_To_Gcj02(bd.getWgLat(), bd.getWgLon());System.out.println("gcj :" + gcj2);}}

在 common新建 entity目录并编写Const.java、Gps.java、GridEntity.java

  • Const.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.entity;public class Const {public static final double START_LONGITUDE = 73.497522D;public static final double START_LATITUDE = 3.77849D;public static final double END_LONGITUDE = 135.032012D;public static final double END_LATITUDE = 53.52909D;}
  • Gps.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.entity;import java.text.DecimalFormat;public class Gps {private double wgLat;private double wgLon;public Gps(double wgLat, double wgLon) {DecimalFormat df = new DecimalFormat("0.000000");setWgLat(Double.parseDouble(df.format(wgLat)));setWgLon(Double.parseDouble(df.format(wgLon)));}public double getWgLat() {return this.wgLat;}public void setWgLat(double wgLat) {this.wgLat = wgLat;}public double getWgLon() {return this.wgLon;}public void setWgLon(double wgLon) {this.wgLon = wgLon;}public String toString() {return this.wgLat + "," + this.wgLon;}}
  • GridEntity.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.entity;import java.text.DecimalFormat;public class Gps {private double wgLat;private double wgLon;public Gps(double wgLat, double wgLon) {DecimalFormat df = new DecimalFormat("0.000000");setWgLat(Double.parseDouble(df.format(wgLat)));setWgLon(Double.parseDouble(df.format(wgLon)));}public double getWgLat() {return this.wgLat;}public void setWgLat(double wgLat) {this.wgLat = wgLat;}public double getWgLon() {return this.wgLon;}public void setWgLon(double wgLon) {this.wgLon = wgLon;}public String toString() {return this.wgLat + "," + this.wgLon;}}

在common创建Constants.java并编写

/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils;/** * 网格相关参数 * @author Lin Dong * */class Constants {/** * 1纬度对应的距离(单位:米) */final static double M_PER_DEGREE_Y = GISUtil.getDistance(0, 0, 0, 1);}

在dws层打包上传并运行

sh dws-city-tourist-msk-day.sh 20220409
  • 运行报错需在dim层给权限

hadoop dfs -setfacl -m user:dws:r-x /daas/motl/dimhadoop dfs -setfacl -R -m user:dws:r-x /daas/motl/dim/dim_admincode

访问master:8088查看运行状态


在dws层查看运行结果

hiveuse dws;select * from dws_city_tourist_msk_d limit 10;


在jobs目录下编写Azkaban的脚本(新增)

  - name: dws-city-tourist-msk-day    type: command    config:      command: su - dim -c "sh /home/dws/dws-city-tourist-msk-day.sh ${day_id}"    dependsOn:      - dwi-res-regn-mergelocation-msk-day      - dim-usertag-day

最后项目完成统一进行调度


景区游客表


从百度地图上拿下合肥景区数据放到ctyun目录下命名为a.json

{  "status": "1",  "searchOpt": {    "city": "340104",    "pageSize": "20",    "pageIndex": "1",    "ac": "false",    "keyword": "蜀山森林公园",    "type": "3"  },  "data": {    "busline_list": [],    "bus_list": [],    "codepoint": 0,    "code": "1",    "suggestion": {},    "busline_count": "0",    "timestamp": "1649494052.53",    "bounds": "117.170908;31.841590;117.170908;31.841590",    "lqii": {      "cache_directive": { "cache_all": {   "flag": "1",   "expires": "24" }      },      "general_flag": "",      "specialclassify": "0",      "view_region": "117.140629300,31.853891300,117.199653700,31.828723700",      "smartspot": "0",      "suggestcontent": "",      "pdheatmap": "",      "render_name_flag": "1",      "is_current_city": "2",      "expand_range_tip": "",      "suggest_query": { "data": [], "col": "", "row": ""      },      "change_query_tip": "",      "is_tupu_sug": "0",      "suggestionview": "1",      "show_hand_drawing": "0",      "change_query_type": "",      "business": "",      "car_icon_flag": "0",      "slayer_type": "none",      "show_pic": "",      "is_view_city": "1",      "querytype": "5",      "self_navigation": "",      "showaoi": "",      "utd_sceneid": "1000",      "distance_info": "",      "slayer": "0",      "target_view_city": "合肥市",      "call_taxi": "0",      "preload_next_page": "",      "magic_box": {}    },    "is_general_search": "0",    "version": "2.0-3.0.2096.0949",    "result": "true",    "magicbox_data": {},    "keywords": "",    "message": "Successful.",    "total": "1",    "poi_list": [      { "rating": "4.6", "tel": "0551-65324811;0551-65313400", "typecode": "110101", "areacode": "0551", "address": "玉兰大道8号", "cityname": "合肥市", "display_brand": "", "shape_region": "117.146,31.8518,117.195,31.8308", "longitude": "117.170908", "review_total": "152", "cinemazuo_flag": "0", "diner_flag": "0", "id": "B02270086A", "name": "蜀山森林公园", "group_flag": "0", "distance": "0", "entrances": [   {     "latitude": "31.848936",     "longitude": "117.181698"   } ], "recommend_flag": "0", "exits": [], "adcode": "340104", "domain_list": [   {     "type": "html",     "id": "1015",     "name": "poiclosed"   },   {     "type": "img",     "id": "1003",     "name": "icon"   },   {     "type": "html",     "id": "1011",     "name": "traffic"   },   {     "type": "text",     "id": "1014",     "name": "roadaoi"   },   {     "type": "button",     "id": "1012",     "name": "ext_btn"   },   {     "type": "webimg",     "id": "1009",     "value": "http://store.is.autonavi.com/showpic/296545250cc551cba902965d72a6c599",     "name": "pic_info"   },   {     "type": "img",     "id": "1008",     "name": "overbooked"   },   {     "type": "img",     "id": "1007",     "name": "moreservice"   },   {     "type": "html",     "id": "1006",     "name": "tag"   },   {     "type": "text",     "id": "1005",     "value": "查看出入口、停车场",     "name": "parent_info"   },   {     "pys": "31.832007+++31.848312+++31.839897+++31.849348+++31.848289",     "poiids": "B0FFLG3RBY+++B0FFK8ERLS+++B0FFKEG4G2+++B0FFHTTTNX+++B0FFGAPMM5",     "name": "parent_other_rel",     "pxs": "117.179490+++117.183098+++117.183640+++117.18166+++117.183341",     "childtype": "31+++31+++31+++41+++41",     "poiname": "蜀山森林公园(南出入口)+++蜀山森林公园(东出入口)+++大蜀山国家森林公园(东门)+++蜀山森林公园-东出入口西停车场+++蜀山森林公园东出入口东停车场",     "address": "安徽经济管理学院对面+++进山路与环山路交叉口(大蜀山地铁站C口步行450米)+++安徽经济管理学院斜对面+++环山路大蜀山国家森林公园+++玉兰大道8号蜀山森林公园",     "shortname": "南出入口+++东出入口+++东门+++东出入口西停车场+++东出入口东停车场",     "type": "array",     "id": "1004"   },   {     "type": "text",     "id": "1010",     "value": "蜀山区",     "name": "business_area"   },   {     "type": "html",     "id": "1002",     "value": "合肥标志性建筑物",     "name": "deepinfo"   },   {     "type": "html",     "id": "1001",     "name": "price"   },   {     "type": "text",     "id": "1013",     "value": "117.145604,31.850797_117.149268,31.850836_117.152334,31.850827_117.152366,31.850828_117.152382,31.850826_117.15239,31.850815_117.152397,31.850742_117.152404,31.850731_117.152427,31.850729_117.15266,31.850729_117.152679,31.850731_117.152687,31.850737_117.152688,31.850746_117.152683,31.850824_117.152683,31.850835_117.152692,31.850841_117.152707,31.850847_117.153785,31.850883_117.154725,31.85092_117.156694,31.850969_117.158136,31.85096_117.159095,31.850936_117.159569,31.850939_117.160849,31.850947_117.161796,31.850936_117.166558,31.851005_117.166575,31.851005_117.166594,31.850997_117.166597,31.850989_117.166602,31.85092_117.16661,31.85091_117.166625,31.850906_117.166864,31.850907_117.166884,31.850916_117.166896,31.850931_117.166897,31.851001_117.166903,31.85101_117.166919,31.851018_117.167204,31.851021_117.167222,31.851014_117.167227,31.851008_117.167228,31.850997_117.167257,31.849867_117.168465,31.849901_117.168519,31.849887_117.16854,31.849862_117.16854,31.84983_117.168575,31.849249_117.16858,31.849229_117.168599,31.849217_117.168618,31.849217_117.168645,31.849229_117.169047,31.849463_117.169157,31.849525_117.169294,31.84958_117.169404,31.849616_117.169927,31.84978_117.170997,31.850049_117.171013,31.850058_117.171032,31.850074_117.171043,31.850104_117.171144,31.850976_117.171166,31.851008_117.171195,31.851029_117.171241,31.851045_117.17133,31.851054_117.171721,31.85106_117.171949,31.851033_117.173046,31.851065_117.173108,31.851067_117.173277,31.851047_117.173816,31.850988_117.174575,31.85096_117.175114,31.850962_117.17553,31.85099_117.176021,31.851092_117.176163,31.851111_117.176305,31.85112_117.176933,31.851104_117.176965,31.851086_117.176981,31.851058_117.176986,31.851026_117.176994,31.850614_117.177,31.850584_117.177021,31.850566_117.178923,31.849981_117.179081,31.849933_117.179186,31.849889_117.179277,31.849846_117.179357,31.849794_117.179454,31.84973_117.179854,31.849472_117.179934,31.849438_117.180135,31.849359_117.180913,31.849076_117.180953,31.849065_117.180986,31.849062_117.181002,31.849067_117.181023,31.849083_117.181045,31.849103_117.181309,31.849479_117.181625,31.849837_117.181655,31.849857_117.181683,31.849867_117.181734,31.849872_117.181879,31.849873_117.181901,31.84987_117.181923,31.849859_117.181939,31.849838_117.181951,31.849807_117.182111,31.849232_117.18218,31.848969_117.182219,31.848778_117.182247,31.84874_117.182776,31.848591_117.183035,31.848616_117.184625,31.850564_117.184816,31.850826_117.184703,31.851497_117.184704,31.851512_117.184725,31.851525_117.184758,31.851534_117.185233,31.851611_117.185941,31.85169_117.186597,31.851794_117.186628,31.851794_117.186641,31.85179_117.186657,31.851782_117.1867,31.851749_117.186721,31.851712_117.187037,31.850902_117.187318,31.850348_117.187391,31.850148_117.187466,31.849879_117.187519,31.849599_117.18753,31.849476_117.187535,31.849332_117.187588,31.848076_117.187596,31.848052_117.187608,31.848039_117.187623,31.848029_117.187643,31.848022_117.187682,31.848012_117.187731,31.848005_117.1881,31.848001_117.190289,31.847994_117.191343,31.847985_117.193017,31.847981_117.1936,31.847972_117.193753,31.847898_117.194693,31.847876_117.194706,31.84787_117.19471,31.847863_117.194712,31.847853_117.194712,31.845362_117.19472,31.84308_117.194735,31.840517_117.194673,31.836184_117.194673,31.83617_117.194662,31.83615_117.194641,31.836136_117.194511,31.836122_117.194492,31.836116_117.194461,31.836096_117.194383,31.836028_117.194259,31.835857_117.194216,31.835744_117.194177,31.835164_117.194175,31.835143_117.194169,31.835132_117.194162,31.835126_117.194149,31.83512_117.194129,31.835115_117.19409,31.835111_117.193981,31.83511_117.193034,31.835176_117.192534,31.835211_117.192306,31.835214_117.192279,31.835217_117.192262,31.83522_117.192247,31.835224_117.19224,31.83523_117.192225,31.835244_117.192216,31.835266_117.192211,31.835291_117.192195,31.835682_117.192183,31.835791_117.192173,31.835807_117.19216,31.835818_117.192136,31.835825_117.190491,31.83597_117.190466,31.835971_117.190447,31.835971_117.190422,31.835964_117.190403,31.835949_117.190388,31.835925_117.19038,31.835895_117.190364,31.835411_117.190403,31.834796_117.190461,31.834229_117.190487,31.833936_117.190487,31.833915_117.190474,31.833899_117.190455,31.83389_117.190026,31.833849_117.189218,31.833435_117.188135,31.83307_117.187349,31.832815_117.187268,31.832774_117.187236,31.832737_117.187183,31.832641_117.187158,31.832612_117.187121,31.832589_117.187073,31.832573_117.185587,31.832559_117.185538,31.832559_117.18552,31.832555_117.18549,31.832528_117.185482,31.832493_117.185367,31.831999_117.185351,31.831965_117.185329,31.831949_117.185305,31.831942_117.185257,31.83194_117.182497,31.831935_117.180035,31.831951_117.178393,31.831944_117.177787,31.831983_117.177162,31.831972_117.175625,31.831928_117.174527,31.831919_117.174531,31.831552_117.174604,31.831437_117.174631,31.831256_117.174645,31.831235_117.174657,31.831225_117.174673,31.83122_117.174704,31.831216_117.175461,31.831206_117.175485,31.831199_117.175495,31.831189_117.1755,31.831168_117.175502,31.831134_117.175504,31.830844_117.175492,31.830829_117.175477,31.830822_117.175439,31.830821_117.174545,31.830838_117.174521,31.830858_117.174497,31.831278_117.174478,31.8313_117.174468,31.831305_117.174445,31.831307_117.17437,31.831305_117.174363,31.831305_117.17435,31.831308_117.174344,31.831313_117.174342,31.831325_117.174338,31.831891_117.174336,31.831901_117.174327,31.831913_117.174311,31.831923_117.174297,31.831924_117.171847,31.831941_117.170326,31.83196_117.169793,31.832013_117.168189,31.832019_117.167003,31.831982_117.164771,31.831818_117.163449,31.83172_117.156054,31.831173_117.155968,31.831164_117.155915,31.831166_117.155858,31.831182_117.155821,31.831205_117.155767,31.831259_117.153396,31.834249_117.151712,31.836738_117.151165,31.837658_117.149673,31.840265_117.148257,31.843236_117.146809,31.847_117.146031,31.84937_117.145548,31.850732_117.145548,31.85075_117.145556,31.850766_117.145572,31.850785_117.145604,31.850797",     "name": "aoi"   } ], "newtype": "110101", "disp_name": "蜀山森林公园", "cpdata": "", "latitude": "31.84159", "discount_flag": "0"      }    ]  }}

在dws层的hive新建dws_scenic_tourist_msk_d表

CREATE EXTERNAL TABLE IF NOT EXISTS dws.dws_scenic_tourist_msk_d (    mdn string comment '游客手机号码'      ,source_city_id string comment '游客来源城市'      ,d_scenic_id string comment '旅游目的地景区代码'      ,d_scenic_name string comment '旅游目的地景区名'    ,d_arrive_time string comment '游客进入景区的时间'      ,d_stay_time double comment '游客在该景区停留的时间长度(小时)'  ) comment  '旅游应用专题数据景区级别-天'PARTITIONED BY (    day_id string comment '日分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/dws/dws_scenic_tourist_msk_d';
alter table  dws.dws_scenic_tourist_msk_d add if not exists partition(day_id='20220409') ;

在dws层新建DwsScenicTouristMskDay.scala类并编写

package com.ctyun.dwsimport com.ctyun.utils.SparkToolimport com.ctyun.utils.poly.Polygonimport org.apache.spark.sql.expressions.UserDefinedFunctionimport org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}object DwsScenicTouristMskDay extends SparkTool {  override def run(spark: SparkSession): Unit = {    import org.apache.spark.sql.functions._    import spark.implicits._    //判断经纬度时候在边界内    val polygonContains: UserDefinedFunction = udf((lon: String, lat: String, boundary: String) => {      val polygon = new Polygon(boundary)      //判断一个点是否在一个边界内      polygon.contains(lon.toDouble, lat.toDouble)    })    //判断一个网格是否在一个边界内    val polygonContainsByGrid: UserDefinedFunction = udf((grid: String, boundary: String) => {      val polygon = new Polygon(boundary)      //判断一个点是否在一个边界内      !polygon.contains(grid.toLong)    })    //1、读取融合表    val mergeLocation: Dataset[Row] = spark      .table("dwi.dwi_res_regn_mergelocation_msk_d")      .where($"day_id" === day_id)    //2|读取用户画像表    val usertag: Dataset[Row] = spark      .table("dim.dim_usertag_msk_d")      .where($"day_id" === day_id)    //景区配置表    val scenicBoundary: Dataset[Row] = spark      .table("dim.dim_scenic_boundary")    /**      * 景区游客判断条件      * 1、常住地不在景区内      * 2、目的地出现在景区内,停留时间大于30分钟      *      */    val resultDF: Dataset[Row] = mergeLocation      //笛卡尔积      .crossJoin(scenicBoundary)      //判断目的地是否在景区边界内      .where(polygonContains($"longi", $"lati", $"boundary"))      //关联用户画像表获取常住地网格      .join(usertag, "mdn")      //判断常住地是否在景区边界内      .where(polygonContainsByGrid($"resi_grid_id", $"boundary"))      .select($"mdn", $"scenic_id", $"scenic_name")    resultDF      .write      .format("csv")      .option("sep", "\t")      .mode(SaveMode.Overwrite)      .save(s"/daas/motl/dws/dws_scenic_tourist_msk_d/day_id=$day_id")  }}

在common下创建poly并编写Circle.java、 CommonUtil.java、DataFormatUtil.java、Line.java、StringUtil.java、Polygon.java

  • Circle.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.poly;import com.ctyun.utils.Geography;import java.awt.geom.Point2D.Double;import java.util.Random;/** * 数学模型 圆 * @author dingjingbo */public class Circle {private Double p;private double r;/** * 构造方法 * @param p 圆心 * @param r 半径 米 */public Circle(Double p, double r) {super();this.p = p;this.r = r;}/** * 构造方法 * @param x 圆心 x坐标 * @param y 圆心 y坐标 * @param r 半径 米 */public Circle(double x, double y, double r){this(new Double(x, y),r);}public Double getP() {return p;}public void setP(Double p) {this.p = p;}public double getR() {return r;}public void setR(double r) {this.r = r;}/** * 判断圆是否包含某个点 * @param point 检测点 * @return true 包含 false 不包含 */public boolean contains(Double point){return Geography.calculateLength(point.x, point.y, this.p.x, this.p.y)<=r;}/** * 判断圆是否包含某个多边形 * @return true 包含 false 不包含 */public boolean contains(Polygon polygon){for(Double point : polygon.getBoundary()){if(!contains(point)){return false;}}return true;}/** * 判断圆是否与某个多边形相交 * @return true 包含 false 不包含 */public boolean intersect(Polygon polygon){if(!contains(polygon)){for(Double point : polygon.getBoundary()){if(contains(point)){return true;}}return false;}else{return false;}}/** * 生成圆内的随机点 * @return 圆内的随机点 */public Double randomPoint(){//找出包含多边形的矩形的四个端点double maxLati = this.p.getY()+Geography.lengthLati(r);double minLati = this.p.getY()-Geography.lengthLati(r);double maxLongi = this.p.getX()+Geography.lengthLongi(this.p.getY(), r);double minLongi = this.p.getX()-Geography.lengthLongi(this.p.getY(), r);//随机一个在里面的点while(true){Double randomPoint = nextRandomPoint(maxLongi, minLongi,maxLati, minLati);if(contains(randomPoint)){return randomPoint;}}}/** * 随机声场一个矩形内的点 * @param maxLongi 最大经度 * @param minLongi 最小经度 * @param maxLati  最大纬度 * @param minLati  最小纬度 * @return 矩形内的随机点 */private Double nextRandomPoint(double maxLongi, double minLongi,double maxLati, double minLati) {int a = 1000000000;int r1 = (int)((maxLongi-minLongi)*a);int r2 = (int)((maxLati-minLati)*a);Random randomLongi = new Random();Random randomLati = new Random();double nextLongi = (double)(randomLongi.nextInt(r1))/a;double nextLati = (double)(randomLati.nextInt(r2))/a;return new Double(minLongi+nextLongi, minLati+nextLati);}}
  • CommonUtil.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.poly;import java.io.FileInputStream;import java.io.InputStream;import java.math.BigDecimal;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.*;/** * 通用工具类 * @author dingjingbo */public class CommonUtil {private CommonUtil(){}/** * 判断一个对象是否为空 * @param obj 判断对象 * @return 是否为空 */public static boolean isBlank(Object obj){if(obj instanceof String){String str = (String) obj;return !(str!=null&&!"".equals(str));}else{return obj==null;}}/** * 日期格式转换 * @param date 日期 * @param pattern 模型 * @return 转换后的结果 */public static String convertDate2String(Date date,String pattern){if(date==null){return "";}SimpleDateFormat sf= new SimpleDateFormat(pattern);return sf.format(date);}/** * 日期格式转换 默认yyyy-MM-dd * @param date 日期 * @return 转换后的结果 */public static String convertDate2String(Date date){return convertDate2String(date, "yyyy-MM-dd");}/** * 字符串转化成日期 * @param date 字符串日期 * @return 转换后的结果 */public static Date convertString2Date(String date){try {SimpleDateFormat sf= new SimpleDateFormat("yyyy-MM-dd");return sf.parse(date);} catch (ParseException e) {throw new RuntimeException(e);}}/** * 字符串连接 * @param strs 字符串集合 * @param joinStr 连接字符 * @return 连接后的字符 */public static String join(List<String> strs,String joinStr){if(strs==null){return null;}int i = 0; StringBuilder sb = new StringBuilder();for(String str : strs){if(i==0){sb.append(str);}else{sb.append(joinStr);sb.append(str);}i++;}return sb.toString();}/** * 字符串连接 * @param strs 字符串数组 * @param joinStr 连接字符 * @return 连接后的字符 */public static String join(String[] strs,String joinStr){if(strs==null){return null;}int i = 0; StringBuilder sb = new StringBuilder();for(String str : strs){if(i==0){sb.append(str);}else{sb.append(joinStr);sb.append(str);}i++;}return sb.toString();}/** * 加载propertis配置文件,文件根目录为classpath * @param path 配置文件路径 * @return 参数集合 */public static Properties loadProperties(String path,boolean classPath){try {InputStream in = null;if(classPath){in = CommonUtil.class.getClassLoader().getResourceAsStream(path);}else{in = new FileInputStream(path);}Properties prop = new Properties();prop.load(in);return prop;} catch (Exception e) {throw new RuntimeException(e);}}/** * 找出头N条 * @param count 统计数据 * @param n top n * @return top n */public static List<String> findTopN(Map<String, Integer> count,int n) {Set<String> aaa = new TreeSet<String>();for(Map.Entry<String, Integer> entry : count.entrySet()){String des = StringUtil.addAtFirst2Fixlength(entry.getValue()+"", "0", 10);aaa.add(des+"_"+entry.getKey());}List<String> bbb = new ArrayList<String>();int i = 0;for(String str : aaa){i++;if(i>aaa.size()-n-1){bbb.add(str.split("_")[1]);}}Collections.reverse(bbb);return bbb;}/** * 判断n是否是s和e的范围内 * @param n 判断值 * @param s 端点值1 * @param e 端点值2 * @return 是否在该范围内 */public static boolean between(double n,double s,double e){if(s<e){return n>=s&&n<=e;}else if(s>e){return n>=e&&n<=s;}else{return Double.doubleToRawLongBits(n)==Double.doubleToRawLongBits(s);}}/** * 保留小数位数 * @param num * @param scale * @return */public static double decimal(double num,int scale){return BigDecimal.valueOf(num).divide(BigDecimal.valueOf(1), scale, BigDecimal.ROUND_HALF_UP).doubleValue();}}
  • DataFormatUtil.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.poly;import java.awt.geom.Point2D;import java.util.ArrayList;import java.util.List;public class DataFormatUtil {private DataFormatUtil(){}/** * 字符串边界转成list * @param boundary * @return */public static List<Point2D.Double> stringBoundary2List(String boundary){try {List<Point2D.Double> points = new ArrayList<Point2D.Double>();for(String pp : boundary.split(",")){String[] split = pp.split(" ");try {points.add(new Point2D.Double(Double.parseDouble(split[0]), Double.parseDouble(split[1])));} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();}}return points;} catch (Exception e) {throw new RuntimeException(e);}}/** * list边界转成String * @param boundary * @return */public static String listBoundary2String(List<Point2D.Double> boundary){StringBuilder sb = new StringBuilder();for(Point2D.Double p : boundary){sb.append(p.getX()+" "+p.getY()+",");}return sb.deleteCharAt(sb.length()-1).toString();}/** * 坐标抽稀 * @param boundary * @param m 最多保留点数 * @return */public static String vacuate(String boundary,int m){List<Point2D.Double> stringBoundary2List = DataFormatUtil.stringBoundary2List(boundary);List<Point2D.Double> newstringBoundary2List = new ArrayList<Point2D.Double>();if(stringBoundary2List.size()<m){for(int i = 0;i<stringBoundary2List.size();i++){newstringBoundary2List.add(new Point2D.Double(CommonUtil.decimal(stringBoundary2List.get(i).x,5), CommonUtil.decimal(stringBoundary2List.get(i).y,5)));}}else{int n = stringBoundary2List.size()/m;for(int i = 0;i<stringBoundary2List.size();i++){if(i%n==0){newstringBoundary2List.add(new Point2D.Double(CommonUtil.decimal(stringBoundary2List.get(i).x,5), CommonUtil.decimal(stringBoundary2List.get(i).y,5)));}}}return DataFormatUtil.listBoundary2String(newstringBoundary2List);}}
  • Line.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.poly;import java.awt.geom.Point2D;/** * 数学模型 直线 * @author dingjingbo */public class Line {private double a;private double b;private double x1;/** * 构造方法 * @param a 斜率 * @param b 常量 */public Line(double a, double b) {super();this.a = a;this.b = b;}/** * 构造方法 * @param x1 点1横坐标 * @param y1 点1纵坐标 * @param x2 点2横坐标 * @param y2 点2纵坐标 */public Line(double x1, double y1, double x2, double y2){this.x1 = x1;this.a = (y1-y2)/(x1-x2);//斜率this.b = y1-a*x1;//常数}/** * 构造方法 * @param point1 点1 * @param point2 点2 */public Line(Point2D.Double point1, Point2D.Double point2){this(point1.x,point1.y,point2.x,point2.y);}public double getA() {return a;}public void setA(double a) {this.a = a;}public double getB() {return b;}public void setB(double b) {this.b = b;}/** * 根据横坐标获取纵坐标 * @param x 横坐标 * @return 纵坐标 */public double getY(double x){return a*x+b;}/** * 根据纵坐标获取横坐标 * @param y 纵坐标 * @return 横坐标 */public double getX(double y){return (y-b)/a;}/** * 求点到直线的最短距离 * @param x 点横坐标 * @param y 点纵坐标 * @return 最短距离 */public double getMinLength(double x,double y){Point2D.Double point = getMinPoint(x,y);double sqrt = Math.sqrt((y-point.y)*(y-point.y)+(x-point.x)*(x-point.x));return Math.abs(sqrt);}/** * 点到直线的垂直点 * @param x 点横坐标 * @param y 点纵坐标 * @return 垂足点 */public Point2D.Double getMinPoint(double x,double y){if(Double.doubleToRawLongBits(this.a)==Double.doubleToRawLongBits(Double.NEGATIVE_INFINITY)||Double.doubleToRawLongBits(this.a)==Double.doubleToRawLongBits(Double.POSITIVE_INFINITY)){return new Point2D.Double(x1, y);}if(Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(0d)){return new Point2D.Double(x, b);}return new Point2D.Double((x+a*y-a*b)/(a-1), a*(x+a*y-a*b)/(a-1)+b); }/** * 求两条直线的交点 * @param line 相交直线 * @return 交点 */public Point2D.Double getIntersectPoint(Line line){if(Double.doubleToRawLongBits(this.a)==Double.doubleToRawLongBits(line.a)){return null;}if((Double.doubleToRawLongBits(line.getA())==Double.doubleToRawLongBits(Double.NEGATIVE_INFINITY)||Double.doubleToRawLongBits(line.getA())==Double.doubleToRawLongBits(Double.POSITIVE_INFINITY))&&(Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(Double.NEGATIVE_INFINITY)||Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(Double.POSITIVE_INFINITY))){return null;}double x = 0;double y = 0;if(Double.doubleToRawLongBits(line.getA())==Double.doubleToRawLongBits(Double.NEGATIVE_INFINITY)||Double.doubleToRawLongBits(line.getA())==Double.doubleToRawLongBits(Double.POSITIVE_INFINITY)){x = line.x1;y = getY(x);}else if(Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(Double.NEGATIVE_INFINITY)||Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(Double.POSITIVE_INFINITY)){x = this.x1;y = line.getY(x);}else{x = (line.b-this.b)/(this.a-line.a);y = this.a*(line.b-this.b)/(this.a-line.a)+this.b;}return new Point2D.Double(x, y);}@Overridepublic String toString() {return Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(Double.NEGATIVE_INFINITY)||Double.doubleToRawLongBits(this.getA())==Double.doubleToRawLongBits(Double.POSITIVE_INFINITY)?"x="+x1:"y="+a+"x+"+b;}}
  • StringUtil.java
/********************************************************************* *  * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ *  *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. *  * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.poly;public class StringUtil {private StringUtil(){}/** * 获取一个字符在某个字符串中出现的次数 * @param str 字符串 * @param c 字符 * @return 出现的次数 */public static int numOfChar(String str,char c){int num = 0;char[] chars = str.toCharArray();for(int i=0;i<chars.length;i++){if(chars[i]==c){num++;}}return num;}/** * 找出字符串中某个字符出现第n次的索引 * @param str * @param c * @param n * @return */public static int indexOf(String str,char c,int n){int num = 0;char[] chars = str.toCharArray();for(int i=0;i<chars.length;i++){if(chars[i]==c){num++;if(num==n){return i;}}}return -1;}/** * 在某个字符串的某索引处添加字符串,若指定索引大于原字符串索引则返回原字符串 * @param str 原字符串 * @param index 索引 * @param s 要添加的字符串 * @return */public static String insertStr(String str,int index,String s){if(index>str.length()-1){return str;}else{String s1 = str.substring(0,index);String s2 = str.substring(index);String newStr = s1+s+s2;return newStr;}}/** * 在某个字符串前面添加某个字符到指定长度 * @param src 源字符串 * @param addstr 要添加的支付 * @param fixlength 添加到的长度 * @return */public static String addAtFirst2Fixlength(String src,String addstr,int fixlength){int length = src.length();//原字符串长度int bc = fixlength-length;//需要补的长度String bbb = "";for(int i =0;i<bc;i++){bbb=bbb+addstr;}String newStr = bbb+src;return newStr;}/** * 比较两个字符串的相识度 * @param str1 比较字符串 * @param str2 被比较字符串 * @return 相识度 */public static double similar(String str1,String str2){char[] charArray1 = str1.toCharArray();char[] charArray2 = str2.toCharArray();int count = 0;for(char c1 : charArray1){for(char c2 : charArray2){if(c1==c2){count++;break;}}}return Double.parseDouble(count+"")/str1.length();}}
  • Polygon.java
/********************************************************************* * * CHINA TELECOM CORPORATION CONFIDENTIAL * ______________________________________________________________ * *  [2015] - [2020] China Telecom Corporation Limited,  *  All Rights Reserved. * * NOTICE:  All information contained herein is, and remains * the property of China Telecom Corporation and its suppliers, * if any. The intellectual and technical concepts contained  * herein are proprietary to China Telecom Corporation and its  * suppliers and may be covered by China and Foreign Patents, * patents in process, and are protected by trade secret  or  * copyright law. Dissemination of this information or  * reproduction of this material is strictly forbidden unless prior  * written permission is obtained from China Telecom Corporation. **********************************************************************/package com.ctyun.utils.poly;import com.ctyun.utils.Geography;import com.ctyun.utils.Grid;import java.awt.geom.GeneralPath;import java.awt.geom.Point2D;import java.util.*;/** * 几何模型 多边形 * * @author dingjingbo */public class Polygon {    private Point2D.Double peakPoint;    private GeneralPath generalPath;    private List<Point2D.Double> boundary;    private List<Point2D.Double> boundaryConvert;    /**     * 构造方法     */    public Polygon() { super();    }    /**     * 构造方法     *     * @param boundary 边界点(经纬度)     */    public Polygon(List<Point2D.Double> boundary) { super(); init(boundary);    }    /**     * 构造方法     *     * @param boundary 边界点(经纬度)     */    public Polygon(String boundary) { this(DataFormatUtil.stringBoundary2List(boundary));    }    private void init(List<Point2D.Double> boundary) { this.boundary = boundary; List<Point2D.Double> points = new ArrayList<Point2D.Double>(); int i = 0; for (Point2D.Double point : boundary) {     i++;     if (i == 1) {  this.peakPoint = point;     }     points.add(Geography.longilati2Decare(peakPoint, point)); } this.boundaryConvert = points; drawMyself();    }    public Polygon(Circle circle) { List<Point2D.Double> points = new ArrayList<Point2D.Double>(); for (int i = 0; i < 360; i++) {     double val = 111319.55 * Math.cos(circle.getP().y * Math.PI / 180);     double lon = circle.getP().x + (circle.getR() * Math.sin(i * Math.PI / 180)) / val;     double lat = circle.getP().y + (circle.getR() * Math.cos(i * Math.PI / 180)) / 111133.33;     points.add(new Point2D.Double(lon, lat)); } init(points);    }    public void setPeakPoint(Point2D.Double peakPoint) { this.peakPoint = peakPoint;    }    public Point2D.Double getPeakPoint() { return peakPoint;    }    public GeneralPath getGeneralPath() { return generalPath;    }    public List<Point2D.Double> getBoundary() { return boundary;    }    /**     * 设置多边形边界点     *     * @param boundary 边界点(经纬度)     */    public void setBoundary(List<Point2D.Double> boundary) { init(boundary);    }    public List<Point2D.Double> getBoundaryConvert() { return boundaryConvert;    }    /**     * 构建多边形     */    public void drawMyself() { GeneralPath p = new GeneralPath(); Point2D.Double first = boundaryConvert.get(0); p.moveTo(first.x, first.y); for (int i = 1; i < boundaryConvert.size(); i++) {     p.lineTo(boundaryConvert.get(i).x, boundaryConvert.get(i).y); } p.lineTo(first.x, first.y); p.closePath(); this.generalPath = p;    }    /**     * 计算面积     *     * @return 面积(平方米)     */    public double getArea() { return Math.abs(getSignedArea());    }    /**     * 计算有向面积     *     * @return 面积(平方米)     */    public double getSignedArea() { //S = 0.5 * ( (x0*y1-x1*y0) + (x1*y2-x2*y1) + ... + (xn*y0-x0*yn) ) double area = 0.00; for (int i = 0; i < boundaryConvert.size(); i++) {     if (i < boundaryConvert.size() - 1) {  Point2D.Double p1 = boundaryConvert.get(i);  Point2D.Double p2 = boundaryConvert.get(i + 1);  area += p1.getX() * p2.getY() - p2.getX() * p1.getY();     } else {  Point2D.Double pn = boundaryConvert.get(i);  Point2D.Double p0 = boundaryConvert.get(0);  area += pn.getX() * p0.getY() - p0.getX() * pn.getY();     } } area = area / 2.00; return area;    }    /**     * 内部随机点     *     * @return 随机点     */    public Point2D.Double randomPoint() { //找出包含多边形的矩形的四个端点 double maxLongi = 0d; double minLongi = Double.MAX_VALUE; double maxLati = 0d; double minLati = Double.MAX_VALUE; for (Point2D.Double point : boundary) {     if (point.getX() < minLongi) {  minLongi = point.getX();     }     if (point.getX() > maxLongi) {  maxLongi = point.getX();     }     if (point.getY() < minLati) {  minLati = point.getY();     }     if (point.getY() > maxLati) {  maxLati = point.getY();     } } //随机一个在里面的点 while (true) {     Point2D.Double randomPoint = nextRandomPoint(maxLongi, minLongi, maxLati, minLati);     if (randomPoint != null && contains(randomPoint)) {  return randomPoint;     } }    }    /**     * 内部随机点     *     * @return 随机点     */    public Point2D.Double randomPoint(double seedLongi, double seedLati) { //找出包含多边形的矩形的四个端点 double maxLongi = 0d; double minLongi = Double.MAX_VALUE; double maxLati = 0d; double minLati = Double.MAX_VALUE; for (Point2D.Double point : boundary) {     if (point.getX() < minLongi) {  minLongi = point.getX();     }     if (point.getX() > maxLongi) {  maxLongi = point.getX();     }     if (point.getY() < minLati) {  minLati = point.getY();     }     if (point.getY() > maxLati) {  maxLati = point.getY();     } } //随机一个在里面的点 while (true) {     Point2D.Double randomPoint = nextRandomPoint(seedLongi, seedLati, maxLongi, minLongi, maxLati, minLati);     if (randomPoint != null && contains(randomPoint)) {  return randomPoint;     } }    }    /**     * 获取内切圆向外扩大指定范围的圆     *     * @param length 扩大范围,单位m     * @return     */    public Circle enlarge(double length) { double maxLongi = 0d; double minLongi = Double.MAX_VALUE; double maxLati = 0d; double minLati = Double.MAX_VALUE; for (Point2D.Double point : boundary) {     if (point.getX() < minLongi) {  minLongi = point.getX();     }     if (point.getX() > maxLongi) {  maxLongi = point.getX();     }     if (point.getY() < minLati) {  minLati = point.getY();     }     if (point.getY() > maxLati) {  maxLati = point.getY();     } } double centerLongi = (minLongi + maxLongi) / 2; double centerLati = (minLati + maxLati) / 2; double r = Geography.calculateLength(minLongi, maxLati, maxLongi, minLati) / 2; return new Circle(centerLongi, centerLati, r + length);    }    /**     * 根据重心等比例扩大指定范围     *     * @return     */    public Polygon enlargeM(double len) { List<Point2D.Double> points = new ArrayList<Point2D.Double>(); Point2D.Double gravityPoint = getGravityPoint(); Point2D.Double gravityPointDecare = Geography.longilati2Decare(this.getPeakPoint(), gravityPoint); double maxlength = 0d; for (int i = 0; i < this.boundary.size(); i++) {     double length = Geography.calculateLength(gravityPoint.x, gravityPoint.y, this.boundary.get(i).x, this.boundary.get(i).y);     if (length > maxlength) {  maxlength = length;     } } double minMultiple = len / maxlength; for (int i = 0; i < this.boundary.size(); i++) {     Geography.longilati2Decare(getGravityPoint(), this.boundary.get(i));     Point2D.Double pointDecare = Geography.longilati2Decare(this.getPeakPoint(), this.boundary.get(i));     Line line = new Line(gravityPointDecare, pointDecare);     double x = minMultiple * (pointDecare.x - gravityPointDecare.x) + pointDecare.x;     if (CommonUtil.between(x, gravityPointDecare.x, pointDecare.x)) {  x = -minMultiple * (pointDecare.x - gravityPointDecare.x) + pointDecare.x;     }     double y = line.getY(x);     Point2D.Double decare2Longilati = Geography.decare2Longilati(this.getPeakPoint(), new Point2D.Double(x, y));     points.add(decare2Longilati); } return new Polygon(points);    }    /**     * 根据重心等距距扩大指定范围     *     * @return     */    public Polygon enlargeN(double len) { List<Point2D.Double> points = new ArrayList<Point2D.Double>(); Point2D.Double gravityPoint = getGravityPoint(); Point2D.Double gravityPointDecare = Geography.longilati2Decare(this.getPeakPoint(), gravityPoint); for (int i = 0; i < this.boundary.size(); i++) {     double length = Geography.calculateLength(gravityPoint.x, gravityPoint.y, this.boundary.get(i).x, this.boundary.get(i).y);     double minMultiple = len / length;     Geography.longilati2Decare(getGravityPoint(), this.boundary.get(i));     Point2D.Double pointDecare = Geography.longilati2Decare(this.getPeakPoint(), this.boundary.get(i));     Line line = new Line(gravityPointDecare, pointDecare);     double x = minMultiple * (pointDecare.x - gravityPointDecare.x) + pointDecare.x;     if (CommonUtil.between(x, gravityPointDecare.x, pointDecare.x)) {  x = -minMultiple * (pointDecare.x - gravityPointDecare.x) + pointDecare.x;     }     double y = line.getY(x);     Point2D.Double decare2Longilati = Geography.decare2Longilati(this.getPeakPoint(), new Point2D.Double(x, y));     points.add(decare2Longilati); } return new Polygon(points);    }    /**     * 获取多边形重心点     *     * @return     */    public Point2D.Double getGravityPoint() { double area = 0.0;//多边形面积 double Gx = 0.0, Gy = 0.0;// 重心的x、y for (int i = 1; i <= this.boundary.size(); i++) {     double iLat = this.boundary.get(i % this.boundary.size()).y;     double iLng = this.boundary.get(i % this.boundary.size()).x;     double nextLat = this.boundary.get(i - 1).y;     double nextLng = this.boundary.get(i - 1).x;     double temp = (iLat * nextLng - iLng * nextLat) / 2.0;     area += temp;     Gx += temp * (iLat + nextLat) / 3.0;     Gy += temp * (iLng + nextLng) / 3.0; } Gx = Gx / area; Gy = Gy / area; if (area - 0 < 0.0000001) {     return getCenterPointX(); } return new Point2D.Double(Gy, Gx);    }    /**     * 根据输入的地点坐标计算中心点     *     * @return     */    public Point2D.Double getCenterPointX() { int total = this.boundary.size(); double X = 0, Y = 0, Z = 0; for (Point2D.Double g : this.boundary) {     double lat, lon, x, y, z;     lat = g.y * Math.PI / 180;     lon = g.x * Math.PI / 180;     x = Math.cos(lat) * Math.cos(lon);     y = Math.cos(lat) * Math.sin(lon);     z = Math.sin(lat);     X += x;     Y += y;     Z += z; } X = X / total; Y = Y / total; Z = Z / total; double Lon = Math.atan2(Y, X); double Hyp = Math.sqrt(X * X + Y * Y); double Lat = Math.atan2(Z, Hyp); return new Point2D.Double(Lon * 180 / Math.PI, Lat * 180 / Math.PI);    }    /**     * 根据输入的地点坐标计算中心点(适用于400km以下的场合)     *     * @return     */    public Point2D.Double getCenterPointS() { //以下为简化方法(400km以内) int total = this.boundary.size(); double lat = 0, lon = 0; for (Point2D.Double g : this.boundary) {     lat += g.y * Math.PI / 180;     lon += g.x * Math.PI / 180; } lat /= total; lon /= total; return new Point2D.Double(lon * 180 / Math.PI, lat * 180 / Math.PI);    }    /**     * 随机生成一个矩形内的点     *     * @param maxLongi 最大经度     * @param minLongi 最小经度     * @param maxLati  最大纬度     * @param minLati  最小纬度     * @return 矩形内的随机点     */    private Point2D.Double nextRandomPoint(double maxLongi, double minLongi, double maxLati, double minLati) { int a = 100000000; int r1 = Math.abs((int) ((maxLongi - minLongi) * a)); int r2 = Math.abs((int) ((maxLati - minLati) * a)); Random randomLongi = new Random(); Random randomLati = new Random(); int nextInt1 = randomLongi.nextInt(r1); int nextInt2 = randomLati.nextInt(r2); double nextLongi = Double.parseDouble(nextInt1 + ".0") / a; double nextLati = Double.parseDouble(nextInt2 + ".0") / a; return new Point2D.Double(minLongi + nextLongi, minLati + nextLati);    }    /**     * 随机生成一个矩形内的点 靠近种子点分布     *     * @param seedLongi 种子点经度     * @param seedLati  种子点纬度     * @param maxLongi  最大经度     * @param minLongi  最小经度     * @param maxLati   最大纬度     * @param minLati   最小纬度     * @return     */    public Point2D.Double nextRandomPoint(double seedLongi, double seedLati, double maxLongi, double minLongi,double maxLati, double minLati) { Point2D.Double point = nextRandomPoint(maxLongi, minLongi, maxLati, minLati); double maxLength = Geography.calculateLength(maxLongi, maxLati, minLongi, minLati); double length = Geography.calculateLength(seedLongi, seedLati, point.x, point.y); double df = 1 / maxLength;//权重衰减因子,每米点的密度衰减 Random random = new Random(); int nextInt = random.nextInt(10); if (nextInt < (1 - length * df) * 10) {     return point; } else {     return null; }    }    /**     * 是否包含一个点     *     * @param point 点     * @return 是否包含     */    public boolean contains(Point2D.Double point) { Point2D.Double convertCoord = Geography.longilati2Decare(peakPoint, point); return this.getGeneralPath().contains(convertCoord);    }    /**     * 是否包含一个点     *     * @param x 经度     * @param y 纬度     * @return 是否包含     */    public boolean contains(double x, double y) { Point2D.Double convertCoord = Geography.longilati2Decare(peakPoint, new Point2D.Double(x, y)); return this.getGeneralPath().contains(convertCoord.x, convertCoord.y);    }    /**     * 是否包含一个点     *     * @return 是否包含     */    public boolean contains(Long grid) { Point2D.Double point1 = Grid.getCenter(grid); Point2D.Double convertCoord = Geography.longilati2Decare(peakPoint, new Point2D.Double(point1.x, point1.y)); return this.getGeneralPath().contains(convertCoord.x, convertCoord.y);    }    /**     * 判断一个多边形是否包含另一个多边形     *     * @param polygon     * @return     */    public boolean contains(Polygon polygon) { for (Point2D.Double p : polygon.getBoundary()) {     if (!this.contains(p)) {  return false;     } } return true;    }    public boolean contains(Circle circle) { return !circle.intersect(this) && this.contains(circle.getP());    }    /**     * 是否与另外一个多变形相交     *     * @param polygon 多边形     * @return 是否相交     */    public boolean intersect(Polygon polygon) { if (!contains(polygon) && !polygon.contains(this)) {     for (Point2D.Double p : polygon.getBoundary()) {  if (this.contains(p)) {      return true;  }     }     for (Point2D.Double p : this.getBoundary()) {  if (polygon.contains(p)) {      return true;  }     }     for (int m = 0; m < polygon.getBoundary().size() - 1; m++) {  Point2D.Double points = polygon.getBoundary().get(m);  Point2D.Double pointe = polygon.getBoundary().get(m + 1);  Set<Point2D.Double> intersectPoints = intersectPoints(points, pointe);  if (!intersectPoints.isEmpty()) {      return true;  }     }     return false; } else {     return false; }    }    /**     * 计算一个多边形与另一个多边形相交面积     *     * @param polygon     * @return     */    public double intersectArea(Polygon polygon) { Polygon intersectPolygon = intersectPolygon(polygon); return intersectPolygon == null ? 0 : intersectPolygon.getArea();    }    /**     * 计算一个多边形与另一个多边形相交面积     *     * @param polygon     * @return     */    public Polygon intersectPolygon(Polygon polygon) { if (contains(polygon)) {     return polygon; } else if (polygon.contains(this)) {     return this; } else if (!intersect(polygon)) {     return null; } Set<Point2D.Double> intersectPoints = new HashSet<Point2D.Double>(); for (int m = 0; m < polygon.getBoundary().size() - 1; m++) {     Point2D.Double points = polygon.getBoundary().get(m);     Point2D.Double pointe = polygon.getBoundary().get(m + 1);     intersectPoints.addAll(intersectPoints(points, pointe)); } Point2D.Double points = polygon.getBoundary().get(polygon.getBoundary().size() - 1); Point2D.Double pointe = polygon.getBoundary().get(0); intersectPoints.addAll(intersectPoints(points, pointe)); for (Point2D.Double p : this.getBoundary()) {     if (polygon.contains(p)) {  intersectPoints.add(p);     } } for (Point2D.Double p : polygon.getBoundary()) {     if (this.contains(p)) {  intersectPoints.add(p);     } } for (Point2D.Double p : intersectPoints) {     Polygon pc = new Polygon(new Circle(p, 1));     List<Point2D.Double> inpoints = new ArrayList<Point2D.Double>();     for (Point2D.Double p1 : pc.getBoundary()) {  if (this.contains(p1) && polygon.contains(p1)) {      inpoints.add(p1);  }     }     if (inpoints.size() > 2) {  Point2D.Double selectp = inpoints.get(inpoints.size() / 2);  return sort(selectp, intersectPoints);     } } return null;    }    private Polygon sort(Point2D.Double pi, Set<Point2D.Double> intersectPoints) { Map<String, Point2D.Double> treeMap = new TreeMap<String, Point2D.Double>(); for (Point2D.Double p : intersectPoints) {     if (p.equals(pi)) {  continue;     }     double calculateAngle = Geography.calculateAngle(pi.x, pi.y, p.x, p.y);     String[] split = (calculateAngle + "").split("\\.");     String fixlength = StringUtil.addAtFirst2Fixlength(split[0], "0", 30) + "." + split[1];     treeMap.put(fixlength, p); } List<Point2D.Double> a = new ArrayList<Point2D.Double>(); for (Map.Entry<String, Point2D.Double> entry : treeMap.entrySet()) {     a.add(entry.getValue()); } return new Polygon(a);    }    /**     * 求多边形与某条线段的交点     *     * @param point1     * @param point2     * @return     */    public Set<Point2D.Double> intersectPoints(Point2D.Double point1, Point2D.Double point2) { Set<Point2D.Double> ps = new HashSet<Point2D.Double>();//相交点 Point2D.Double pointls = Geography.longilati2Decare(this.getPeakPoint(), point1); Point2D.Double pointle = Geography.longilati2Decare(this.getPeakPoint(), point2); Line line = new Line(pointls, pointle); for (int m = 0; m < this.getBoundaryConvert().size() - 1; m++) {     Point2D.Double points = this.getBoundaryConvert().get(m);     Point2D.Double pointe = this.getBoundaryConvert().get(m + 1);     Line line2 = new Line(points, pointe);     Point2D.Double intersectionPoint = line.getIntersectPoint(line2);     if ((intersectionPoint != null) && (CommonUtil.between(intersectionPoint.x, points.x, pointe.x)) && (CommonUtil.between(intersectionPoint.y, points.y, pointe.y))) {  if ((CommonUtil.between(intersectionPoint.x, pointls.x, pointle.x)) && (CommonUtil.between(intersectionPoint.y, pointls.y, pointle.y)) && (CommonUtil.between(intersectionPoint.x, points.x, pointe.x)) && (CommonUtil.between(intersectionPoint.y, points.y, pointe.y))) {      ps.add(Geography.decare2Longilati(this.getPeakPoint(), intersectionPoint));  }     } } Point2D.Double points = this.getBoundaryConvert().get(this.getBoundaryConvert().size() - 1); Point2D.Double pointe = this.getBoundaryConvert().get(0); Line line2 = new Line(points, pointe); Point2D.Double intersectionPoint = line.getIntersectPoint(line2); if ((intersectionPoint != null) && (CommonUtil.between(intersectionPoint.x, points.x, pointe.x)) && (CommonUtil.between(intersectionPoint.y, points.y, pointe.y))) {     if ((CommonUtil.between(intersectionPoint.x, pointls.x, pointle.x)) && (CommonUtil.between(intersectionPoint.y, pointls.y, pointle.y)) && (CommonUtil.between(intersectionPoint.x, points.x, pointe.x)) && (CommonUtil.between(intersectionPoint.y, points.y, pointe.y))) {  ps.add(Geography.decare2Longilati(this.getPeakPoint(), intersectionPoint));     } } return ps;    }    @Deprecated    public Point2D.Double getCenterPoint() { Point2D.Double point0 = getBoundaryConvert().get(0); Point2D.Double point1 = getBoundaryConvert().get(1); Point2D.Double point2 = getBoundaryConvert().get(getBoundaryConvert().size() - 1); Line line1 = new Line(point0, point1); Line line2 = new Line(point0, point2); Line line3 = new Line(point1, point2); double a = (line1.getA() + line2.getA())  / (1.0D - line1.getA() * line2.getA()); double b = point0.y - a * point0.x; Line line4 = new Line(a, b); Point2D.Double intersectionPoint = line4.getIntersectPoint(line3); return Geography.decare2Longilati(getBoundary().get(0), intersectionPoint);    }    public static void main(String[] args) { String boundary = "117.18294 31.848502,117.183571 31.848315,117.185324 31.847719,117.185799 31.8474,117.187003 31.846552,117.187482 31.846443,117.187654 31.846443,117.1877 31.846153,117.187741 31.845951,117.188478 31.843394,117.188152 31.843348,117.187203 31.843327,117.18688 31.843258,117.186203 31.843029,117.185563 31.842814,117.185444 31.842739,117.185347 31.842647,117.185365 31.842395,117.185802 31.841367,117.186252 31.840176,117.18922 31.840389,117.18966 31.839601,117.189852 31.83855,117.189857 31.838151,117.189802 31.837929,117.189749 31.837721,117.18967 31.837522,117.189554 31.837306,117.189328 31.837054,117.189029 31.836728,117.187515 31.835252,117.186092 31.833766,117.185817 31.833364,117.185698 31.833025,117.185587 31.832613,117.185549 31.832352,117.1855 31.831892,117.183536 31.831911,117.181442 31.831921,117.179594 31.831922,117.179543 31.831937,117.179513 31.831984,117.179447 31.832904,117.179365 31.833851,117.179359 31.834226,117.179377 31.834357,117.179423 31.834583,117.179441 31.834818,117.179539 31.835264,117.179518 31.835285,117.179482 31.835286,117.179177 31.83521,117.17905 31.835145,117.178926 31.835042,117.178826 31.834964,117.178692 31.834873,117.178573 31.834826,117.178433 31.834806,117.17812 31.83483,117.177907 31.83485,117.177689 31.834853,117.176599 31.834723,117.175869 31.834604,117.175743 31.834585,117.175596 31.834579,117.175455 31.83459,117.174738 31.834783,117.174319 31.83493,117.174001 31.835057,117.173307 31.835207,117.17306 31.835283,117.172288 31.835696,117.17186 31.835921,117.171486 31.836145,117.171458 31.836187,117.171446 31.836235,117.171443 31.836499,117.171424 31.836642,117.171328 31.836802,117.170525 31.83756,117.170002 31.838087,117.16975 31.838502,117.169349 31.838871,117.169232 31.839137,117.169105 31.839321,117.168831 31.839918,117.168711 31.840646,117.168718 31.840781,117.168743 31.840878,117.168803 31.840972,117.168849 31.841042,117.167447 31.841288,117.167357 31.841272,117.167307 31.841206,117.167118 31.840858,117.167035 31.840808,117.166945 31.840801,117.166316 31.840941,117.165382 31.840924,117.163677 31.841549,117.164266 31.843439,117.164404 31.843638,117.16484 31.844064,117.165635 31.844817,117.166243 31.845405,117.166744 31.846003,117.167006 31.846454,117.167124 31.846723,117.167577 31.847837,117.167838 31.848189,117.168456 31.848893,117.168673 31.849098,117.168925 31.8493,117.169225 31.849466,117.1696 31.849624,117.17101 31.850006,117.17191 31.850212,117.174397 31.850708,117.174608 31.850733,117.174976 31.850738,117.176838 31.850647,117.177117 31.850588,117.1775 31.850484,117.179331 31.849864,117.179775 31.849573,117.179986 31.849464,117.181193 31.849009,117.18294 31.848502"; Polygon polygon = new Polygon(boundary); long start = System.currentTimeMillis(); boolean contains = polygon.contains(117.176846, 31.844419); long end = System.currentTimeMillis(); System.out.println(end - start); System.out.println(contains);    }}

编写dws-scenic-tourist-msk-day.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:spacetime-adjoint.sh# **  创建日期: 202248日# **  编写人员: liangz# **  输入信息: oidd# **  输出信息: 景区表# **# **  功能描述: 景区表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************# 时间参数day_id=$1spark-submit \--master yarn-client \--num-executors 1 \--executor-cores 4 \--executor-memory 6G \--conf spark.sql.shuffle.partitions=10 \--class com.ctyun.dws.DwsScenicTouristMskDay \--jars common-1.0.jar \dws-1.0.jar $day_id

在dim层修改权限

hadoop dfs -setfacl -R -m user:dws:r-x /daas/motl/dim/dim_scenic_boundary

打包上传运行脚本

sh dws-scenic-tourist-msk-day.sh 20220409

访问mater:8088查看运行状态


在dws的hive查看运行结果

hiveuse dws;select * from dws_scenic_tourist_msk_d limit 100;

由于分区指定的是 20220409所以有空数据应为20180503


构建景区网格表


上传dim_scenic_grid数据

hadoop dfs -put dim_geotag_grid /daas/motl/dim
  • 查看上传的数据
select * from dim_geotag_grid  limit 10;

在dim层创建dim.dim_scenic_grid表

CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_scenic_grid (    scenic_id string comment '景区id'      ,scenic_name string comment '景区名称'      ,grids string comment '景区网格列表'  ) comment  '景区配置表'ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'  location '/daas/motl/dim/dim_scenic_grid'; 

在dim层创建dim.dim_geotag_grid表

CREATE EXTERNAL TABLE IF NOT EXISTS dim.dim_geotag_grid (    grid_id string comment '网格ID,500米级别'      ,center_longi string comment '中心点经度'      ,center_lati string comment '中心点纬度'      ,county_id string comment '区县id',county_type string comment '区县类型,0郊区,1城区'      ,grid_type string comment '网格类型,详见网格类型码表'  ) comment  'gis网格配置表'ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS PARQUETlocation '/daas/motl/dim/dim_geotag_grid';

在dim层创建DimScenicGrid.scala类并编写

package com.ctyun.dimimport com.ctyun.utils.SparkToolimport com.ctyun.utils.poly.Polygonimport org.apache.spark.sql.expressions.UserDefinedFunctionimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object DimScenicGrid extends SparkTool {  override def run(spark: SparkSession): Unit = {    import org.apache.spark.sql.functions._    import spark.implicits._    //判断经纬度时候在边界内    val polygonContains: UserDefinedFunction = udf((lon: String, lat: String, boundary: String) => {      val polygon = new Polygon(boundary)      //判断一个点是否在一个边界内      polygon.contains(lon.toDouble, lat.toDouble)    })    //1、读取景区配置表    val scenic: DataFrame = spark.table("dim.dim_scenic_boundary")    //2、读取网格配置表    val geotag: DataFrame = spark.table("dim.dim_geotag_grid")    /**      * 循环判断网格是否在景区内,取出景区内所有的网格      *      */    val resultDF: DataFrame = scenic      .crossJoin(geotag)      //判断网格中心点是否在景区边界内      .where(polygonContains($"center_longi", $"center_lati", $"boundary"))      .select($"scenic_id", $"scenic_name", $"county_id", $"grid_id")      .groupBy($"scenic_id", $"scenic_name", $"county_id")      //将同一个景区内的多个网格合并成一行      .agg(collect_set($"grid_id") as "grids")      .select($"scenic_id", $"scenic_name", $"county_id", concat_ws(",", $"grids") as "grids")    resultDF.write      .format("csv")      .option("sep", "\t")      .mode(SaveMode.Overwrite)      .save("/daas/motl/dim/dim_scenic_grid")  }}

在dim层创建DimScenicGrid.scala脚本并编写

#!/usr/bin/env bash#***********************************************************************************# **  文件名称: dim-usertag-day.sh# **  创建日期: 2022410日# **  编写人liangzai# **  输入信息: 景区网格表# **  输出信息: 景区网格表# **# **  功能描述: 景区网格表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************spark-submit \--master yarn-client \--class com.ctyun.dim.DimScenicGrid \--jars common-1.0.jar \dim-1.0.jar 20220409

在dim层打包上传并运行

sh dim-scenic-grid.sh

查看运行结果

hadoop dfs  -cat /daas/motl/dim/dim_scenic_grid/*

中国电信大数据离线数据仓库


第七章

指标统计


统计游客指标

统计游客指标客流量按天 [市,客流量]性别按天 [市,性别,客流量]年龄按天 [市,年龄,客流量]常住地按天 [市,常住地市,客流量]归属地按天 [市,归属地市,客流量]终端型号按天 [市,终端型号,客流量]消费等级按天 [市,消费等级,客流量]停留时长按天 [市,停留时长,客流量]出游距离按天 [市,出游距离,客流量]性别年龄按天 [市,性别,年龄,客流量]停留时长按天 [市,停留时长,年龄,客流量]平均停留时间 [市,平均停留时间]平均停留时间 [市,性别,平均停留时间]

使用spark-sql执行效率更高


1、客流量按天 [市,客流量]

select d_city_id,count(mdn) from dws.dws_city_tourist_msk_d group by d_city_id;

中国电信大数据离线数据仓库


2、性别按天 [市,性别,客流量]

select d_city_id,gender,count(1) from dws.dws_city_tourist_msk_d  as ajoin dim.dim_usertag_msk_d as bon a.mdn=b.mdngroup by d_city_id,gender;

中国电信大数据离线数据仓库


3、年龄按天 [市,年龄,客流量]

select d_city_id,age,count(1) from dws.dws_city_tourist_msk_d  as ajoin dim.dim_usertag_msk_d as bon a.mdn=b.mdnwhere d_city_id='83401'group by d_city_id,age;

中国电信大数据离线数据仓库


4、性别年龄按天 [市,性别,年龄,客流量]

select d_city_id,gender,age,count(1) from dws.dws_city_tourist_msk_d  as ajoin dim.dim_usertag_msk_d as bon a.mdn=b.mdnwhere d_city_id='83401'group by d_city_id,gender,age;


5、停留时长按天 [市,停留时长,客流量]

select d_city_id,case when d_stay_time>=3 and d_stay_time < 6 then '[3-6)'when d_stay_time>=6 and d_stay_time < 12 then '[6-12)'when d_stay_time>=12 and d_stay_time < 24 then '[12-24)'else '[24-)' end  as d_stay_time,count(mdn) from dws.dws_city_tourist_msk_d where d_city_id='83401'group by d_city_id,case when d_stay_time>=3 and d_stay_time < 6 then '[3-6)'when d_stay_time>=6 and d_stay_time < 12 then '[6-12)'when d_stay_time>=12 and d_stay_time < 24 then '[12-24)'else '[24-)' end;


宽表

  • 宽表:将后面需要使用的所有的字段都放到一张表中,可以减少表关联,数据会冗余

用空间换时间

在IDEA的ctyun目录下新dal模块

com.ctyun.dal

导入spark的maven依赖

    <dependencies> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-library</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-compiler</artifactId> </dependency> <dependency>     <groupId>org.scala-lang</groupId>     <artifactId>scala-reflect</artifactId> </dependency> <dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-sql_${scala.library.version}</artifactId> </dependency> <dependency>     <groupId>com.ctyun</groupId>     <artifactId>common</artifactId> </dependency>    </dependencies>    <build> <plugins>          <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-compiler-plugin</artifactId>     </plugin>          <plugin>  <groupId>org.scala-tools</groupId>  <artifactId>maven-scala-plugin</artifactId>     </plugin> </plugins>    </build>

在dal层创建DalCityTouristMskWideDay.scala类并编写

package com.ctyun.dalimport com.ctyun.utils.SparkToolimport org.apache.spark.sql.SparkSessionobject DalCityTouristMskWideDay extends SparkTool {  override def run(spark: SparkSession): Unit = {    spark.sql(      s"""  |insert overwrite table dal.dal_city_tourist_msk_wide_d partition(day_id='$day_id')  |select  |/*+broadcast(c,d) */  |a.mdn,  |c.county_name as source_county_name,  |c.city_name as source_city_name,  |c.prov_name as source_province_name,  |d.city_name as d_city_name,  |case when a.d_stay_time >= 3 and a.d_stay_time = 6 and a.d_stay_time = 9 and a.d_stay_time = 12 and a.d_stay_time = 10   and a.d_max_distance= 50   and a.d_max_distance= 80   and a.d_max_distance= 120  and a.d_max_distance= 200  and a.d_max_distance= 400  and a.d_max_distance 0  and b.age  20 and b.age  25 and b.age  30 and b.age  35 and b.age  40 and b.age  45 and b.age  50 and b.age  55 and b.age  60 and b.age <65 then '[60-65)'  |else '[60~' end as age,  |b.number_attr,  |trmnl_brand,  |trmnl_price,  |packg,  |conpot  |from  |(select * from  dws.dws_city_tourist_msk_d where day_id='$day_id') as a  |join  |(select * from  dim.dim_usertag_msk_d where day_id='$day_id') as b  |on a.mdn=b.mdn  |join  |dim.dim_admincode as c  |on a.source_county_id=c.county_id  |join  |(select distinct city_id,city_name from dim.dim_admincode) as d  |on a.d_city_id=d.city_id  |  |  |  |      """.stripMargin)  }}

在dal用户的hive创建 dal_city_tourist_msk_wide_d表

CREATE EXTERNAL TABLE IF NOT EXISTS dal.dal_city_tourist_msk_wide_d (    mdn string comment '手机号大写MD5加密'      ,source_county_name string comment '游客来源区县'      ,source_city_name string comment '游客来源城市'      ,source_province_name string comment '游客来源省'      ,d_city_id string comment '旅游目的地市代码'      ,d_stay_time double comment '游客在该省停留的时间长度(小时),分段处理'      ,d_distance double comment '游客本次出游距离,分段处理'      ,gender string comment '性别,1男2女'      ,age string comment '年龄,分段处理'      ,number_attr string comment '号码归属地'      ,trmnl_brand string comment '终端品牌' ,trmnl_price string comment '终端价格'    ,packg string comment '套餐'      ,conpot string comment '消费潜力'  ) comment  '旅游应用专题数据城市级别-天'PARTITIONED BY (    day_id string comment '日分区'  ) ROW FORMAT DELIMITED     FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'location '/daas/motl/dal/dal_city_tourist_msk_wide_d';

通过dws用户给dal增加权限

hadoop dfs -setfacl -m user:dal:r-x /daas/motl/dwshadoop dfs -setfacl -R -m user:dal:r-x /daas/motl/dws/dws_city_tourist_msk_d

通过dim用户给dal增加权限

hadoop dfs -setfacl -m user:dal:r-x /daas/motl/dimhadoop dfs -setfacl -R -m user:dal:r-x /daas/motl/dim/dim_usertag_msk_dhadoop dfs -setfacl -R -m user:dal:r-x /daas/motl/dim/dim_admincode

通过dal用户使用spark-sql查看sql是否能运行

insert overwrite table dal.dal_city_tourist_msk_wide_d partition(day_id='20220409')select /*+broadcast(c,d) */a.mdn,c.county_name as source_county_name,c.city_name as source_city_name,c.prov_name as source_province_name,d.city_name as d_city_name,case when a.d_stay_time >= 3 and a.d_stay_time < 6 then "[3-6)"when a.d_stay_time >= 6 and a.d_stay_time < 9 then "[6-9)"when a.d_stay_time >= 9 and a.d_stay_time < 12 then "[9-12)"when a.d_stay_time >= 12 and a.d_stay_time < 15 then "[12-15)"else '[15~' end  as d_stay_time,case when a.d_max_distance >= 10   and a.d_max_distance<50 then  '[10-50)'when a.d_max_distance >= 50   and a.d_max_distance<80 then  '[50-80)'when a.d_max_distance >= 80   and a.d_max_distance<120 then '[80-120)'when a.d_max_distance >= 120  and a.d_max_distance<200 then '[120-200)'when a.d_max_distance >= 200  and a.d_max_distance<400 then '[200-400)'when a.d_max_distance >= 400  and a.d_max_distance<800 then '[400-800)'else '[800~' end  as d_distance,case when b.gender ='1' then '男'else '女' end as gender,case when b.age > 0  and b.age <20 then '[0-20)'when b.age > 20 and b.age <25 then '[20-25)'when b.age > 25 and b.age <30 then '[25-30)'when b.age > 30 and b.age <35 then '[30-35)'when b.age > 35 and b.age <40 then '[35-40)'when b.age > 40 and b.age <45 then '[40-45)'when b.age > 45 and b.age <50 then '[45-50)'when b.age > 50 and b.age <55 then '[50-55)'when b.age > 55 and b.age <60 then '[55-60)'when b.age > 60 and b.age <65 then '[60-65)'else '[60~' end as age,b.number_attr,trmnl_brand,trmnl_price,packg,conpotfrom (select * from  dws.dws_city_tourist_msk_d where day_id='20220409') as ajoin (select * from  dim.dim_usertag_msk_d where day_id='20220409') as bon a.mdn=b.mdnjoin dim.dim_admincode as con a.source_county_id=c.county_idjoin(select distinct city_id,city_name from dim.dim_admincode) as don a.d_city_id=d.city_id;

在dal层 编写dal-city-tourist-msk-wide-day.sh脚本

#!/usr/bin/env bash#***********************************************************************************# **  文件名称:spacetime-adjoint.sh# **  创建日期: 2022410日# **  编写人员: liangzai# **  输入信息: 宽表# **  输出信息: 宽表# **# **  功能描述: 宽表# **  处理过程:# **  Copyright(c) 2016 TianYi Cloud Technologies (China), Inc.# **  All Rights Reserved.#***********************************************************************************#***********************************************************************************#==修改日期==|===修改人=====|======================================================|##***********************************************************************************# 时间参数day_id=$1spark-submit \--master yarn-client \--num-executors 1 \--executor-cores 4 \--executor-memory 6G \--conf spark.sql.shuffle.partitions=10 \--class com.ctyun.dal.DalCityTouristMskWideDay \--jars common-1.0.jar \dal-1.0.jar $day_id

打包上传并运行dal-city-tourist-msk-wide-day.sh脚本

sh dal-city-tourist-msk-wide-day.sh 20220409

访问master:8088查看运行情况


查看dal-city-tourist-msk-wide-day.sh脚本运行结果

hadoop dfs -cat /daas/motl/dal/dal_city_tourist_msk_wide_d/day_id=20220409/part-00000-c9e9bd4f-dbb3-495d-8ff6-21b0bb56e910-c000

select * from dal_city_tourist_msk_wide_d limit 20;


第八章

Finebi

Finebi官网https://www.finebi.com/

Finebi的安装教程在官网的安装与升级

注意

这里一定要 选择对路径,选择空文件夹安装,否则会将所有文件删除 (不可逆 )

访问

http://master:37799/webroot/decision/login/initialization
adminadminadmin

adminadmin


(使用dal用户)开启hive服务连接JDBC

hive --service hiveserver2 &
  • 新开一个master窗口
cd /usr/local/soft/hive-1.2.1/bin./beeline!connect jdbc:hive2://master:10000# 退出!exit

解决0: jdbc:hive2://master:10000 (closed)>问题

把RunJar进程全部杀死重新启动hive服务和伪分布式服务即可
show tables;

将下载的驱动包放解压放到root用户的/usr/local/soft/finebi/webapps/webroot/WEB-INF/lib目录下

最后还是不行,下载新版本!

卸载(root)

cd /usr/local/soft/finebi/sh uninstallrm -rf linux_unix_FineBI5_1-CN.sh

当遇到这个问题别急好办,去VMware装

将下载的驱动解压上传


重启finebi

/usr/local/soft/finebi/binps -aux | grep finebikill -9 进程号 nohup ./finebi &

新建驱动

将下载的两个压缩包解压都上传

新建连接

数据准备

建议使用MySQL连接,hive是离线数据仓库读数据慢

中国电信大数据离线数据仓库 中国电信大数据离线数据仓库

可视化效果


电信离线仓库项目总结


到底啦!靓仔呕心沥血写了那么多:欢迎评论区下面提出问题(大部分问题博客已解决)
给个关注吧!

英文字体下载