> 技术文档 > 91-基于Spark的空气质量数据分析可视化系统_spark 大数据空气质量检测系统

91-基于Spark的空气质量数据分析可视化系统_spark 大数据空气质量检测系统


基于Spark的空气质量数据分析可视化系统设计与实现

项目概述

本项目是一个基于Apache Spark的大数据分析和可视化系统,专门用于空气质量数据的采集、分析、预测和可视化展示。系统采用分布式计算架构,结合机器学习算法,实现了对全国12个主要城市空气质量数据的全面分析和预测功能。

项目背景与意义

随着城市化进程的加快和工业化的快速发展,空气质量问题日益成为公众关注的焦点。传统的空气质量监测方式存在数据分散、分析效率低、可视化效果差等问题。本项目旨在构建一个完整的大数据分析和可视化平台,为空气质量监测提供科学、高效的技术解决方案。

项目目标

  1. 数据采集自动化:实现多城市空气质量数据的自动采集和更新
  2. 大数据分析:利用Spark分布式计算能力,处理大规模空气质量数据
  3. 智能预测:基于机器学习算法,预测空气质量变化趋势
  4. 可视化展示:提供直观、交互式的数据可视化界面
  5. 系统集成:构建完整的数据科学流程,从采集到展示

项目特色

  • 技术先进性:采用最新的Spark 3.x版本和机器学习技术
  • 架构完整性:涵盖数据采集、存储、分析、预测、可视化的完整流程
  • 扩展性强:支持新城市、新指标的快速接入
  • 用户友好:提供直观的Web界面和丰富的交互功能

项目演示

在这里插入图片描述

首页

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

项目演示视频

91-基于Spark的空气质量数据分析预测系统的设计与实现

技术架构

核心技术栈

后端框架
  • Django 3.1.14 - 成熟的Python Web应用框架,提供完整的MVT架构
  • Apache Spark 3.x - 分布式大数据处理引擎,支持内存计算和流处理
  • PySpark - Spark的Python API接口,提供DataFrame和SQL操作
数据库系统
  • MySQL 8.0 - 主数据库,存储分析结果、用户数据和系统配置
  • Apache Hive - 数据仓库,存储原始空气质量数据,支持SQL查询
  • SQLite - 本地开发数据库,轻量级,便于开发和测试
数据科学与机器学习
  • scikit-learn 1.3.2 - 机器学习算法库,提供分类、回归、聚类等算法
  • pandas 1.4.3 - 数据处理和分析库,提供DataFrame操作
  • numpy 1.23.1 - 数值计算库,提供高效的数组操作
  • matplotlib 3.5.2 - 数据可视化库,支持多种图表类型
数据采集技术
  • requests 2.31.0 - HTTP请求库,支持GET/POST请求和会话管理
  • BeautifulSoup4 4.12.3 - 网页解析库,支持HTML/XML解析
  • lxml 4.9.3 - XML/HTML解析器,提供高性能的解析能力
前端技术
  • Bootstrap 4.x - 响应式CSS框架,提供移动优先的设计
  • ECharts 5.x - 数据可视化图表库,支持多种交互式图表
  • jQuery 3.x - JavaScript库,简化DOM操作和AJAX请求
  • Material Design Icons - 现代化图标系统,提供丰富的图标资源
自然语言处理
  • jieba 0.42.1 - 中文分词库,支持精确模式和全模式分词
  • wordcloud 1.8.2.2 - 词云生成库,支持自定义形状和颜色
  • snownlp 0.12.3 - 中文自然语言处理库,提供情感分析等功能
大数据处理
  • PyHive 0.7.0 - Hive连接器,支持Python连接Hive数据库
  • thrift 0.21.0 - 跨语言RPC框架,用于Hive连接
  • mysqlclient 2.2.4 - MySQL数据库驱动,提供高性能的数据库连接

系统架构设计

整体架构图
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ 数据采集层 │ │ 数据存储层 │ │ 数据处理层 ││  │ │  │ │  ││ • 网络爬虫 │───▶│ • MySQL │───▶│ • Apache Spark ││ • 数据清洗 │ │ • Hive │ │ • PySpark ││ • 实时更新 │ │ • SQLite │ │ • 分布式计算 ││ • 数据验证 │ │ • 数据备份 │ │ • 内存计算 │└─────────────────┘ └─────────────────┘ └─────────────────┘ │┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ 机器学习层 │◀───│ 数据分析层 │◀───│ 数据展示层 ││  │ │  │ │  ││ • 线性回归 │ │ • 统计分析 │ │ • Web界面 ││ • 模型训练 │ │ • 趋势分析 │ │ • 图表可视化 ││ • 预测算法 │ │ • 关联分析 │ │ • 交互式展示 ││ • 模型评估 │ │ • 聚类分析 │ │ • 响应式设计 │└─────────────────┘ └─────────────────┘ └─────────────────┘
数据流向图
数据源网站 ──→ 爬虫程序 ──→ 数据清洗 ──→ Hive数据仓库 │  │  │  │ ▼  ▼  ▼  ▼ 原始数据 结构化数据 清洗后数据 存储数据 │  │  │  │ ▼  ▼  ▼  ▼Spark分析 ──→ 分析结果 ──→ MySQL存储 ──→ Web展示 │  │  │  │ ▼  ▼  ▼  ▼机器学习 ──→ 预测模型 ──→ 预测结果 ──→ 可视化图表
技术架构特点

1. 分层架构设计

  • 表现层:Django Web框架,提供用户界面和交互
  • 业务逻辑层:Spark分布式计算,处理大规模数据分析
  • 数据访问层:多数据库支持,实现数据的高效存储和查询
  • 基础设施层:Hadoop生态系统,提供分布式存储和计算能力

2. 微服务架构思想

  • 数据采集服务:独立的爬虫服务,支持多城市并行采集
  • 数据分析服务:Spark集群服务,提供分布式计算能力
  • 机器学习服务:独立的模型训练和预测服务
  • Web展示服务:Django应用服务,提供用户界面

3. 数据流设计

  • 实时数据流:支持实时数据采集和处理
  • 批量数据流:支持大规模历史数据的批量分析
  • 流式处理:支持数据流的实时处理和响应

核心功能模块

1. 数据采集模块

功能特点
  • 多城市支持:支持12个主要城市的数据采集:北京、天津、上海、重庆、广州、深圳、杭州、成都、沈阳、南京、长沙、南昌
  • 历史数据获取:自动爬取历史空气质量数据,支持指定年份和月份的数据采集
  • 完整指标采集:采集完整的空气质量指标:AQI、PM2.5、PM10、SO2、NO2、CO、O3
  • 数据质量控制:实现数据去重、清洗和格式标准化,确保数据质量
  • 反爬虫策略:采用随机延时、User-Agent轮换等策略,避免被目标网站封禁
  • 错误处理机制:完善的异常处理和重试机制,确保数据采集的稳定性
技术实现细节

1. 爬虫架构设计

class AqiSpider: def __init__(self, cityname, realname): self.cityname = cityname self.realname = realname # 配置请求头,模拟真实浏览器 self.headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36\", \"Accept\": \"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8\", \"Accept-Language\": \"zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3\", \"Accept-Encoding\": \"gzip, deflate\", \"Connection\": \"keep-alive\", \"Upgrade-Insecure-Requests\": \"1\" } # 配置CSV文件写入器 self.f = open(f\'data.csv\', \'a\', encoding=\'utf-8-sig\', newline=\'\') self.writer = csv.DictWriter(self.f, fieldnames=[ \'city\', \'date\', \'airQuality\', \'AQI\', \'rank\', \'PM2.5\', \'PM10\', \'So2\', \'No2\', \'Co\', \'O3\' ])

2. 数据解析策略

def parse_response(self, response): soup = BeautifulSoup(response, \'html.parser\') tr = soup.find_all(\'tr\') for j in tr[1:]: # 跳过表头 td = j.find_all(\'td\') if len(td) < 10: # 数据完整性检查 continue  # 数据提取和清洗 Date = td[0].get_text().strip() Quality_level = td[1].get_text().strip() AQI = td[2].get_text().strip() AQI_rank = td[3].get_text().strip() PM25 = td[4].get_text().strip() PM10 = td[5].get_text().strip() SO2 = td[6].get_text().strip() NO2 = td[7].get_text().strip() CO = td[8].get_text().strip() O3 = td[9].get_text().strip() # 数据验证和转换 data_dict = self.validate_and_convert_data({ \'city\': self.realname, \'date\': Date, \'airQuality\': Quality_level, \'AQI\': AQI, \'rank\': AQI_rank, \'PM2.5\': PM25, \'PM10\': PM10, \'So2\': SO2, \'No2\': NO2, \'Co\': CO, \'O3\': O3, }) if data_dict: # 只保存有效数据 self.save_data(data_dict)

3. 数据验证和清洗

def validate_and_convert_data(self, data_dict): \"\"\"数据验证和类型转换\"\"\" try: # 验证日期格式 if not self.is_valid_date(data_dict[\'date\']): return None  # 转换数值类型 numeric_fields = [\'AQI\', \'PM2.5\', \'PM10\', \'So2\', \'No2\', \'Co\', \'O3\'] for field in numeric_fields: value = data_dict[field] if value and value != \'—\': try:  data_dict[field] = float(value) except ValueError:  data_dict[field] = 0.0 else: data_dict[field] = 0.0 # 验证AQI范围 if data_dict[\'AQI\'] < 0 or data_dict[\'AQI\'] > 500: return None  return data_dict except Exception as e: print(f\"数据验证失败: {e}\") return Nonedef is_valid_date(self, date_str): \"\"\"验证日期格式\"\"\" try: from datetime import datetime datetime.strptime(date_str, \'%Y-%m-%d\') return True except ValueError: return False

4. 错误处理和重试机制

def send_request_with_retry(self, year, month, max_retries=3): \"\"\"带重试机制的请求发送\"\"\" for attempt in range(max_retries): try: url = f\"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html\" response = requests.get(url, headers=self.headers, timeout=60) if response.status_code == 200: return self.parse_response(response.text) else: print(f\"请求失败,状态码: {response.status_code}\") except requests.exceptions.RequestException as e: print(f\"请求异常 (尝试 {attempt + 1}/{max_retries}): {e}\") if attempt < max_retries - 1: time.sleep(2 ** attempt) # 指数退避 else: print(f\"请求失败,已重试{max_retries}次\") except Exception as e: print(f\"未知错误: {e}\") break

技术实现

import csvimport timeimport requestsfrom bs4 import BeautifulSoupclass AqiSpider: def __init__(self, cityname, realname): self.cityname = cityname self.realname = realname self.headers = { \"User-Agent\": \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.0.0 Safari/537.36\" } self.f = open(f\'data.csv\', \'a\', encoding=\'utf-8-sig\', newline=\'\') self.writer = csv.DictWriter(self.f, fieldnames=[ \'city\', \'date\', \'airQuality\', \'AQI\', \'rank\', \'PM2.5\', \'PM10\', \'So2\', \'No2\', \'Co\', \'O3\' ]) def send_request(self, year, month): url = f\"http://www.tianqihoubao.com/aqi/{self.cityname}-{year}{month:02d}.html\" response = requests.get(url, headers=self.headers, timeout=60) time.sleep(2) # 避免请求过于频繁 print(f\"响应状态码:{response.status_code}\") return self.parse_response(response.text) def parse_response(self, response): soup = BeautifulSoup(response, \'html.parser\') tr = soup.find_all(\'tr\') for j in tr[1:]: # 跳过表头 td = j.find_all(\'td\') Date = td[0].get_text().strip() # 日期 Quality_level = td[1].get_text().strip() # 空气质量等级 AQI = td[2].get_text().strip() AQI_rank = td[3].get_text().strip() PM25 = td[4].get_text().strip() PM10 = td[5].get_text().strip() SO2 = td[6].get_text().strip() NO2 = td[7].get_text().strip() CO = td[8].get_text().strip() O3 = td[9].get_text().strip() data_dict = { \'city\': self.realname, \'date\': Date, \'airQuality\': Quality_level, \'AQI\': AQI, \'rank\': AQI_rank, \'PM2.5\': PM25, \'PM10\': PM10, \'So2\': SO2, \'No2\': NO2, \'Co\': CO, \'O3\': O3, } self.save_data(data_dict) def save_data(self, data_dict): self.writer.writerow(data_dict) def run(self): for month in range(1, 6): print(f\"正在爬取2025年{month}月的数据\") self.send_request(2025, month)if __name__ == \'__main__\': cityList = [\'beijing\', \'tianjin\', \'shanghai\', \'chongqing\', \'guangzhou\',  \'shenzhen\', \'hangzhou\', \'chengdu\', \'shenyang\', \'nanjing\', \'changsha\', \'nanchang\'] nameList = [\'北京\', \'天津\', \'上海\', \'重庆\', \'广州\', \'深圳\', \'杭州\',  \'成都\', \'沈阳\', \'南京\', \'长沙\', \'南昌\'] city_dict = dict(zip(cityList, nameList)) for k, v in city_dict.items(): AS = AqiSpider(k, v) AS.run()

2. Spark大数据分析模块

分析维度与业务价值

1. 城市平均AQI分析

  • 业务价值:识别空气质量最佳和最差的城市,为城市环境治理提供数据支撑
  • 分析指标:计算各城市平均空气质量指数,进行城市排名
  • 应用场景:城市环境评估、政策制定参考、公众健康指导

2. 六项污染物分析

  • 业务价值:全面了解各污染物的分布特征和贡献度
  • 分析指标:PM、PM10、SO2、NO2、CO、O3的统计分析和相关性研究
  • 应用场景:污染物来源分析、治理效果评估、健康风险评估

3. 时间序列分析

  • 业务价值:发现空气质量的时间变化规律和季节性特征
  • 分析指标:按年月分析AQI的最大值、最小值变化趋势
  • 应用场景:季节性污染预测、治理措施效果评估、公众出行指导

4. 污染物分布分析

  • 业务价值:了解污染物浓度的分布特征和超标情况
  • 分析指标:统计不同浓度区间的污染物分布情况
  • 应用场景:污染等级评估、预警阈值设定、治理目标制定
技术实现架构

1. Spark集群配置

# Spark会话配置spark = SparkSession.builder.appName(\"sparkSQL\").master(\"local[*]\").\\ config(\"spark.sql.shuffle.partitions\", 2). \\ config(\"spark.sql.warehouse.dir\", \"hdfs://node1:8020/user/hive/warehouse\"). \\ config(\"hive.metastore.uris\", \"thrift://node1:9083\"). \\ config(\"spark.sql.adaptive.enabled\", \"true\"). \\ config(\"spark.sql.adaptive.coalescePartitions.enabled\", \"true\"). \\ config(\"spark.sql.adaptive.skewJoin.enabled\", \"true\"). \\ enableHiveSupport().\\ getOrCreate()# 性能优化配置spark.conf.set(\"spark.sql.adaptive.enabled\", \"true\")spark.conf.set(\"spark.sql.adaptive.coalescePartitions.enabled\", \"true\")spark.conf.set(\"spark.sql.adaptive.skewJoin.enabled\", \"true\")

2. 数据预处理和清洗

def preprocess_air_data(spark): \"\"\"数据预处理和清洗\"\"\" # 读取原始数据 airdata = spark.read.table(\"airdata\") # 数据类型转换 airdata = airdata.withColumn(\"date\", airdata[\"date\"].cast(\"date\")) airdata = airdata.withColumn(\"AQI\", airdata[\"AQI\"].cast(\"double\")) airdata = airdata.withColumn(\"PM\", airdata[\"PM\"].cast(\"double\")) airdata = airdata.withColumn(\"PM10\", airdata[\"PM10\"].cast(\"double\")) airdata = airdata.withColumn(\"So2\", airdata[\"So2\"].cast(\"double\")) airdata = airdata.withColumn(\"No2\", airdata[\"No2\"].cast(\"double\")) airdata = airdata.withColumn(\"Co\", airdata[\"Co\"].cast(\"double\")) airdata = airdata.withColumn(\"O3\", airdata[\"O3\"].cast(\"double\")) # 数据清洗:处理缺失值和异常值 airdata = airdata.na.fill(0, subset=[\"AQI\", \"PM\", \"PM10\", \"So2\", \"No2\", \"Co\", \"O3\"]) # 异常值处理:AQI范围检查 airdata = airdata.filter((col(\"AQI\") >= 0) & (col(\"AQI\") <= 500)) return airdata

3. 高级分析功能

def advanced_analysis(airdata): \"\"\"高级分析功能\"\"\" # 1. 空气质量等级分析 airdata = airdata.withColumn( \"air_quality_level\", when(col(\"AQI\") <= 50, \"优\") .when(col(\"AQI\") <= 100, \"良\") .when(col(\"AQI\") <= 150, \"轻度污染\") .when(col(\"AQI\") <= 200, \"中度污染\") .when(col(\"AQI\") <= 300, \"重度污染\") .otherwise(\"严重污染\") ) # 2. 污染物相关性分析 correlation_analysis = airdata.select( corr(\"AQI\", \"PM\").alias(\"AQI_PM_corr\"), corr(\"AQI\", \"PM10\").alias(\"AQI_PM10_corr\"), corr(\"AQI\", \"So2\").alias(\"AQI_So2_corr\"), corr(\"AQI\", \"No2\").alias(\"AQI_No2_corr\"), corr(\"AQI\", \"Co\").alias(\"AQI_Co_corr\"), corr(\"AQI\", \"O3\").alias(\"AQI_O3_corr\") ) # 3. 季节性分析 seasonal_analysis = airdata.groupby( year(\"date\").alias(\"year\"), month(\"date\").alias(\"month\") ).agg( avg(\"AQI\").alias(\"avg_aqi\"), stddev(\"AQI\").alias(\"std_aqi\"), count(\"*\").alias(\"data_count\") ).orderBy(\"year\", \"month\") # 4. 城市聚类分析 city_clustering = airdata.groupby(\"city\").agg( avg(\"AQI\").alias(\"avg_aqi\"), avg(\"PM\").alias(\"avg_pm\"), avg(\"PM10\").alias(\"avg_pm10\"), avg(\"So2\").alias(\"avg_so2\"), avg(\"No2\").alias(\"avg_no2\"), avg(\"Co\").alias(\"avg_co\"), avg(\"O3\").alias(\"avg_o3\") ) return correlation_analysis, seasonal_analysis, city_clustering

4. 性能优化策略

def optimize_spark_performance(spark): \"\"\"Spark性能优化配置\"\"\" # 1. 内存优化 spark.conf.set(\"spark.sql.adaptive.enabled\", \"true\") spark.conf.set(\"spark.sql.adaptive.coalescePartitions.enabled\", \"true\") spark.conf.set(\"spark.sql.adaptive.skewJoin.enabled\", \"true\") # 2. 缓存策略 spark.conf.set(\"spark.sql.crossJoin.enabled\", \"true\") spark.conf.set(\"spark.sql.adaptive.localShuffleReader.enabled\", \"true\") # 3. 并行度优化 spark.conf.set(\"spark.sql.shuffle.partitions\", \"200\") spark.conf.set(\"spark.sql.adaptive.advisoryPartitionSizeInBytes\", \"128m\") # 4. 数据倾斜处理 spark.conf.set(\"spark.sql.adaptive.skewJoin.enabled\", \"true\") spark.conf.set(\"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes\", \"256m\")

5. 结果存储和导出

def save_analysis_results(results, spark): \"\"\"保存分析结果到不同存储系统\"\"\" # 1. 保存到MySQL for table_name, data in results.items(): data.write.mode(\"overwrite\"). \\ format(\"jdbc\"). \\ option(\"url\", \"jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8\"). \\ option(\"dbtable\", table_name). \\ option(\"user\", \"root\"). \\ option(\"password\", \"root\"). \\ option(\"encoding\", \"utf-8\"). \\ save() # 2. 保存到Hive for table_name, data in results.items(): data.write.mode(\"overwrite\").saveAsTable(table_name, \"parquet\") # 3. 导出为CSV文件 for table_name, data in results.items(): data.toPandas().to_csv(f\"results/{table_name}.csv\", index=False, encoding=\'utf-8-sig\')

Spark SQL实现示例

#coding:utf8from pyspark.sql import SparkSessionfrom pyspark.sql.functions import monotonically_increasing_idfrom pyspark.sql.functions import count, mean, col, sum, when, year, month, max, min, avgfrom pyspark.sql import functions as Ffrom pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatTypeif __name__ == \'__main__\': # 构建Spark会话 spark = SparkSession.builder.appName(\"sparkSQL\").master(\"local[*]\").\\ config(\"spark.sql.shuffle.partitions\", 2). \\ config(\"spark.sql.warehouse.dir\", \"hdfs://node1:8020/user/hive/warehouse\"). \\ config(\"hive.metastore.uris\", \"thrift://node1:9083\"). \\ enableHiveSupport().\\ getOrCreate() sc = spark.sparkContext # 读取数据 airdata = spark.read.table(\"airdata\") # 需求一:城市平均AQI分析 result1 = airdata.groupby(\"city\")\\ .agg(mean(\"AQI\").alias(\"avg_AQI\"))\\ .orderBy(\"avg_AQI\", ascending=False) # 需求二:六项污染物分析 result2 = airdata.groupby(\"city\") \\ .agg( mean(\"PM\").alias(\"avg_PM\"), mean(\"PM10\").alias(\"avg_PM10\"), mean(\"So2\").alias(\"avg_So2\"), mean(\"No2\").alias(\"avg_No2\"), mean(\"Co\").alias(\"avg_Co\"), mean(\"O3\").alias(\"avg_O3\") ) # 需求三:年度空气质量趋势分析 airdata = airdata.withColumn(\"date\", airdata[\"date\"].cast(\"date\")) result3 = airdata.groupby(\"city\", year(\"date\").alias(\"year\"), month(\"date\").alias(\"month\"))\\ .agg( max(\"AQI\").alias(\"max_AQI\"), min(\"AQI\").alias(\"min_AQI\") ) # 需求四:月度PM污染物分析 result4 = airdata.groupby(\"city\", year(\"date\").alias(\"year\"), month(\"date\").alias(\"month\")) \\ .agg( avg(\"PM\").alias(\"max_PM\"), avg(\"PM10\").alias(\"min_PM10\") ) # 需求五:优质空气天数统计 result5 = airdata.groupby(\"city\", year(\"date\").alias(\"year\"), month(\"date\").alias(\"month\"))\\ .agg( count(when(airdata[\"AQI\"] < 50, True)).alias(\"greatAirCount\") ) # 需求六:污染物最大值分析 result6 = airdata.groupby(\"city\")\\ .agg( max(\"So2\").alias(\"max_So2\"), max(\"No2\").alias(\"max_No2\") ) # 需求七:CO浓度分级统计 airdata = airdata.withColumn( \"Co_category\", when((col(\"Co\") >= 0) & (col(\"Co\") < 0.25), \'0-0.25\') .when((col(\"Co\") >= 0.25) & (col(\"Co\") < 0.5), \'0.25-0.5\') .when((col(\"Co\") >= 0.5) & (col(\"Co\") < 0.75), \'0.5-0.75\') .when((col(\"Co\") >= 0.75) & (col(\"Co\") < 1.0), \'0.75-1\') .otherwise(\"1以上\") ) result7 = airdata.groupby(\"Co_category\").agg(count(\'*\').alias(\'Co_count\')) # 需求八:O3浓度分级统计 airdata = airdata.withColumn( \"O3_category\", when((col(\"O3\") >= 0) & (col(\"O3\") < 25), \'0-25\') .when((col(\"O3\") >= 0.25) & (col(\"O3\") < 50), \'25-50\') .when((col(\"O3\") >= 50) & (col(\"O3\") < 75), \'50-75\') .when((col(\"O3\") >= 75) & (col(\"O3\") < 100), \'75-100\') .otherwise(\"100以上\") ) result8 = airdata.groupby(\"O3_category\").agg(count(\'*\').alias(\'O3_count\')) # 保存结果到MySQL result1.write.mode(\"overwrite\"). \\ format(\"jdbc\"). \\ option(\"url\", \"jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true&charset=utf8\"). \\ option(\"dbtable\", \"avgCityAqi\"). \\ option(\"user\", \"root\"). \\ option(\"password\", \"root\"). \\ option(\"encoding\", \"utf-8\"). \\ save()

3. 机器学习预测模块

预测模型设计与业务价值

1. 模型选择与理论基础

  • 线性回归模型:基于污染物浓度与AQI的线性关系,适合快速预测
  • 理论基础:AQI计算公式中各项污染物都有对应的权重系数
  • 业务价值:为环境监测和公众健康提供实时预测服务
  • 应用场景:空气质量预警、出行建议、健康防护指导

2. 特征工程与数据预处理

  • 特征选择:PM2.5、SO2、NO2、O3作为主要预测特征
  • 数据清洗:处理缺失值、异常值和数据标准化
  • 特征重要性:通过模型分析各污染物对AQI的贡献度
  • 数据质量:确保训练数据的准确性和完整性

3. 模型评估与优化

  • 评估指标:R²、MAE、RMSE等指标评估模型性能
  • 交叉验证:使用K折交叉验证确保模型泛化能力
  • 超参数调优:通过网格搜索优化模型参数
  • 模型监控:实时监控模型性能,及时更新模型
技术实现细节

1. 数据预处理与特征工程

import numpy as npimport pandas as pdfrom sklearn.preprocessing import StandardScaler, LabelEncoderfrom sklearn.model_selection import train_test_split, cross_val_score, GridSearchCVfrom sklearn.linear_model import LinearRegression, Ridge, Lassofrom sklearn.metrics import mean_squared_error, mean_absolute_error, r2_scorefrom sklearn.ensemble import RandomForestRegressorimport joblibclass AirQualityPredictor: def __init__(self): self.scaler = StandardScaler() self.model = None self.feature_names = [\'PM2_5\', \'SO2\', \'NO2\', \'O3\'] def preprocess_data(self, data): \"\"\"数据预处理和特征工程\"\"\" # 1. 数据清洗 data = data.dropna(subset=[\'AQI\', \'PM2_5\', \'SO2\', \'NO2\', \'O3\']) # 2. 异常值处理 data = data[(data[\'AQI\'] >= 0) & (data[\'AQI\'] <= 500)] data = data[(data[\'PM2_5\'] >= 0) & (data[\'PM2_5\'] <= 500)] data = data[(data[\'SO2\'] >= 0) & (data[\'SO2\'] <= 1000)] data = data[(data[\'NO2\'] >= 0) & (data[\'NO2\'] <= 400)] data = data[(data[\'O3\'] >= 0) & (data[\'O3\'] <= 300)] # 3. 特征工程 # 添加时间特征 data[\'date\'] = pd.to_datetime(data[\'date\']) data[\'year\'] = data[\'date\'].dt.year data[\'month\'] = data[\'date\'].dt.month data[\'day\'] = data[\'date\'].dt.day data[\'day_of_week\'] = data[\'date\'].dt.dayofweek # 添加空气质量等级特征 level_map = {\'优\': 1, \'良\': 2, \'轻度污染\': 3, \'中度污染\': 4, \'重度污染\': 5, \'严重污染\': 6} data[\'level_numeric\'] = data[\'level\'].map(level_map).fillna(0) # 添加污染物交互特征 data[\'PM_SO2_ratio\'] = data[\'PM2_5\'] / (data[\'SO2\'] + 1) data[\'NO2_O3_ratio\'] = data[\'NO2\'] / (data[\'O3\'] + 1) return data def feature_selection(self, data): \"\"\"特征选择\"\"\" # 基础特征 base_features = [\'PM2_5\', \'SO2\', \'NO2\', \'O3\'] # 时间特征 time_features = [\'year\', \'month\', \'day\', \'day_of_week\'] # 交互特征 interaction_features = [\'PM_SO2_ratio\', \'NO2_O3_ratio\'] # 组合所有特征 all_features = base_features + time_features + interaction_features return data[all_features], all_features

2. 模型训练与优化

def train_model(self, X_train, y_train): \"\"\"模型训练与优化\"\"\" # 1. 数据标准化 X_train_scaled = self.scaler.fit_transform(X_train) # 2. 基础线性回归 lr_model = LinearRegression() lr_model.fit(X_train_scaled, y_train) # 3. 正则化模型 ridge_model = Ridge(alpha=1.0) lasso_model = Lasso(alpha=0.1) # 4. 随机森林模型 rf_model = RandomForestRegressor(n_estimators=100, random_state=42) # 5. 模型评估 models = { \'LinearRegression\': lr_model, \'Ridge\': ridge_model, \'Lasso\': lasso_model, \'RandomForest\': rf_model } best_model = None best_score = -float(\'inf\') for name, model in models.items(): # 交叉验证 cv_scores = cross_val_score(model, X_train_scaled, y_train, cv=5, scoring=\'r2\') avg_score = cv_scores.mean() print(f\"{name}: CV R² Score = {avg_score:.4f} (+/- {cv_scores.std() * 2:.4f})\") if avg_score > best_score: best_score = avg_score best_model = model # 训练最佳模型 best_model.fit(X_train_scaled, y_train) self.model = best_model return best_model, best_scoredef hyperparameter_tuning(self, X_train, y_train): \"\"\"超参数调优\"\"\" # 定义参数网格 param_grid = { \'alpha\': [0.001, 0.01, 0.1, 1.0, 10.0], \'max_iter\': [1000, 2000, 3000] } # 网格搜索 grid_search = GridSearchCV( Ridge(), param_grid, cv=5, scoring=\'r2\', n_jobs=-1 ) X_train_scaled = self.scaler.transform(X_train) grid_search.fit(X_train_scaled, y_train) print(f\"最佳参数: {grid_search.best_params_}\") print(f\"最佳得分: {grid_search.best_score_:.4f}\") return grid_search.best_estimator_

3. 模型评估与预测

def evaluate_model(self, X_test, y_test): \"\"\"模型评估\"\"\" X_test_scaled = self.scaler.transform(X_test) y_pred = self.model.predict(X_test_scaled) # 计算评估指标 mse = mean_squared_error(y_test, y_pred) mae = mean_absolute_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) rmse = np.sqrt(mse) print(f\"模型评估结果:\") print(f\"R² Score: {r2:.4f}\") print(f\"MAE: {mae:.4f}\") print(f\"RMSE: {rmse:.4f}\") print(f\"MSE: {mse:.4f}\") # 特征重要性分析 if hasattr(self.model, \'feature_importances_\'): feature_importance = pd.DataFrame({ \'feature\': self.feature_names, \'importance\': self.model.feature_importances_ }).sort_values(\'importance\', ascending=False) print(f\"\\n特征重要性:\") print(feature_importance) return { \'r2\': r2, \'mae\': mae, \'rmse\': rmse, \'mse\': mse, \'predictions\': y_pred }def predict_aqi(self, pm25, so2, no2, o3): \"\"\"实时AQI预测\"\"\" # 构建输入数据 input_data = np.array([[pm25, so2, no2, o3]]) # 数据标准化 input_scaled = self.scaler.transform(input_data) # 预测 prediction = self.model.predict(input_scaled)[0] # 预测结果解释 aqi_level = self.get_aqi_level(prediction) return { \'predicted_aqi\': round(prediction, 2), \'aqi_level\': aqi_level, \'confidence\': self.get_prediction_confidence(input_scaled) }def get_aqi_level(self, aqi): \"\"\"获取AQI等级\"\"\" if aqi <= 50: return \"优\" elif aqi <= 100: return \"良\" elif aqi <= 150: return \"轻度污染\" elif aqi <= 200: return \"中度污染\" elif aqi <= 300: return \"重度污染\" else: return \"严重污染\"def get_prediction_confidence(self, input_data): \"\"\"获取预测置信度\"\"\" # 基于模型的不确定性估计 if hasattr(self.model, \'predict_proba\'): confidence = np.max(self.model.predict_proba(input_data)) else: # 对于回归模型,使用预测值的合理性作为置信度 prediction = self.model.predict(input_data)[0] confidence = max(0, min(1, 1 - abs(prediction - 100) / 100)) return round(confidence, 3)

4. 模型持久化与部署

def save_model(self, model_path): \"\"\"保存模型\"\"\" model_data = { \'model\': self.model, \'scaler\': self.scaler, \'feature_names\': self.feature_names } joblib.dump(model_data, model_path) print(f\"模型已保存到: {model_path}\")def load_model(self, model_path): \"\"\"加载模型\"\"\" model_data = joblib.load(model_path) self.model = model_data[\'model\'] self.scaler = model_data[\'scaler\'] self.feature_names = model_data[\'feature_names\'] print(f\"模型已从 {model_path} 加载\")def batch_predict(self, data_batch): \"\"\"批量预测\"\"\" predictions = [] for data in data_batch: pred = self.predict_aqi( data[\'PM2_5\'], data[\'SO2\'], data[\'NO2\'], data[\'O3\'] ) predictions.append(pred) return predictions

模型实现

import numpy as npimport pandas as pdfrom sklearn.preprocessing import LabelEncoderfrom sklearn.model_selection import train_test_splitfrom sklearn.linear_model import LinearRegressionfrom utils.query import querysdef getData(): # 从数据库获取数据 data = querys(\'select * from airData\', [], \'select\') print(\"数据库返回的列数:\", len(data[0]) if data else 0) print(\"第一行数据:\", data[0] if data else None) # 转换为DataFrame,根据实际数据调整列名 products = pd.DataFrame(data, columns=[\'city\', \'date\', \'level\', \'AQI\', \'PM2_5\', \'PM10\', \'SO2\', \'NO2\', \'CO\', \'O3\', \'predict\', \'extra\']) # 处理空气质量等级,将文本转换为数值 level_map = {\'优\': 1, \'良\': 2, \'轻度污染\': 3, \'中度污染\': 4, \'重度污染\': 5, \'严重污染\': 6} products[\'level\'] = products[\'level\'].map(level_map).fillna(0).astype(\'int\') # 确保数值列为数值类型 numeric_columns = [\'AQI\', \'PM2_5\', \'PM10\', \'SO2\', \'NO2\', \'CO\', \'O3\'] for col in numeric_columns: products[col] = pd.to_numeric(products[col], errors=\'coerce\').fillna(0).astype(\'float\') return productsdef model_train(data): # 特征选择:PM2.5、SO2、NO2、O3 x_train, x_test, y_train, y_test = train_test_split( data[[\"PM2_5\", \"SO2\", \"NO2\", \"O3\"]], data[\'AQI\'], test_size=0.25, random_state=1 ) model = LinearRegression() model.fit(x_train, y_train) return modeldef model_predict(model, data): return model.predict(data)def pred(model, *args): # 将输入参数转换为numpy数组 data = np.array([args]).reshape(1, -1) pred = model.predict(data) return predif __name__ == \'__main__\': trainData = getData() model = model_train(trainData) print(pred(model, 10, 2, 18, 30))

4. 数据可视化模块

图表类型

  • 年度空气质量分析图表:展示年度AQI变化趋势和季节性规律
  • 月度分析图表:按月展示空气质量变化,识别污染高峰期
  • 气体分析图表:六项污染物的对比分析,展示污染物相关性
  • 城市分布图表:各城市空气质量对比,识别区域差异
  • 数据词云图:空气质量相关词汇的可视化展示

图表数据处理

def getIndexData(defaultCity): avgCityAqiList = list(getavgCityAqi()) avgCitySixList = list(getavgCitySix()) realSixList = querys(\'select * from avgCitySix where city = %s\', [defaultCity], \'select\')[0] yLine = list(realSixList[1:]) yLine = [round(x, 1) for x in yLine] xLine = [\'PM2.5\', \'PM10\', \'So2\', \'No2\', \'Co\', \'O3\'] xBar = [x[0] for x in avgCityAqiList] yBar = [round(x[1], 0) for x in avgCityAqiList] return xBar, yBar, xLine, yLinedef getYearChartData(city): maxCityAqiList = querys(\'select * from maxCityAqi where city = %s\', [city], \'select\') y1DataM = [x[3] if x[3] is not None else 0 for x in maxCityAqiList] y1DataN = [x[4] if x[4] is not None else 0 for x in maxCityAqiList] xData = [] for i in range(1, 13): xData.append(str(i) + \'月\') avgCityPMList = querys(\'select * from avgCityPM where city = %s\', [city], \'select\') y2Data2 = [round(x[3], 1) if x[3] is not None else 0 for x in avgCityPMList] y2Data10 = [round(x[4], 1) if x[4] is not None else 0 for x in avgCityPMList] return xData, y1DataM, y1DataN, y2Data2, y2Data10def getAirMonthData(city, month): airDataList = querys(\'select * from airData where city = %s and SUBSTRING(date,6,2) = %s\', [city, month], \'select\') # 按日期排序,确保日期格式正确 airDataList = sorted(airDataList, key=lambda x: int(x[1].split(\'-\')[2])) y1Data = [x[3] if x[3] is not None else 0 for x in airDataList] y2Data = [int(x[4]) if x[4] is not None else 0 for x in airDataList] dateList = [x[1] for x in airDataList] xData = [] for date in dateList: year, month, day = date.split(\'-\') xData.append(day) greatAirList = querys(\'select * from greatAir where city = %s\', [city], \'select\') funnelData = [] for i in greatAirList: funnelData.append({ \'name\': str(i[2]) + \'月\', \'value\': i[3] if i[3] is not None else 0 }) return xData, y1Data, y2Data, funnelData

ECharts实现示例

// 年度AQI趋势图var option = { title: { text: \'年度空气质量趋势分析\' }, tooltip: { trigger: \'axis\' }, legend: {}, toolbox: { show: true, feature: { dataZoom: { yAxisIndex: \'none\' }, dataView: { readOnly: false }, magicType: { type: [\'line\', \'bar\'] }, restore: {}, saveAsImage: {} } }, xAxis: { type: \'category\', boundaryGap: false, data: {{ xData | safe }} }, yAxis: { type: \'value\', axisLabel: { formatter: \'{value} \' } }, series: [ { name: \'So2\', type: \'line\', data: {{ y1Data }}, markPoint: { data: [  { type: \'max\', name: \'Max\' },  { type: \'min\', name: \'Min\' } ] }, markLine: { data: [{ type: \'average\', name: \'Avg\' }] } }, { name: \'No2\', type: \'line\', data: {{ y2Data }}, markPoint: { data: [{ name: \'周最低\', value: -2, xAxis: 1, yAxis: -1.5 }] }, markLine: { data: [  { type: \'average\', name: \'Avg\' },  [ { symbol: \'none\', x: \'90%\', yAxis: \'max\' }, { symbol: \'circle\', label: { position: \'start\', formatter: \'Max\' }, type: \'max\', name: \'最高点\' }  ] ] } } ]};

5. Web用户界面模块

界面功能

  • 用户注册登录系统
  • 个人信息管理
  • 数据总览表格展示
  • 交互式日期选择器
  • 响应式设计,支持多设备访问

数据模型设计

from django.db import modelsclass User(models.Model): id = models.AutoField(\"id\", primary_key=True) username = models.CharField(\"用户名\", max_length=255, default=\'\') password = models.CharField(\"密码\", max_length=255, default=\'\') creteTime = models.DateField(\"创建时间\", auto_now_add=True) class Meta: db_table = \'user\'

数据库查询工具

from pyhive import hiveimport timefrom thrift.Thrift import TApplicationExceptiondef get_connection(): try: return hive.Connection(host=\'node1\', port=\'10000\', username=\'hadoop\') except Exception as e: print(f\"连接Hive失败: {str(e)}\") return Nonedef querys(sql, params, type=\'no_select\', max_retries=3): retry_count = 0 while retry_count < max_retries: try: conn = get_connection() if not conn: raise Exception(\"无法建立Hive连接\") cursor = conn.cursor() params = tuple(params) cursor.execute(sql, params) if type != \'no_select\': data_list = cursor.fetchall() conn.commit() return data_list else: conn.commit() return \'数据库语句执行成功\' except TApplicationException as e: print(f\"查询执行失败 (尝试 {retry_count + 1}/{max_retries}): {str(e)}\") retry_count += 1 if retry_count == max_retries: raise Exception(f\"查询执行失败,已重试{max_retries}次: {str(e)}\") time.sleep(2) # 等待2秒后重试  except Exception as e: print(f\"发生错误: {str(e)}\") raise  finally: try: if \'cursor\' in locals():  cursor.close() if \'conn\' in locals():  conn.close() except: pass

Django视图实现

def index(request): uname = request.session.get(\'username\') userInfo = User.objects.get(username=uname) airdataList = list(getairdata()) dateList = list(set([x[1] for x in airdataList])) # 提取年月日数据 yearList = [] monthList = [] dayList = [] for date in dateList: year, month, day = date.split(\'-\') yearList.append(year) monthList.append(month) dayList.append(day) yearList = sorted(set(yearList)) monthList = sorted(set(monthList)) dayList = sorted(set(dayList)) defaultYear = \'2023\' defaultMonth = \'01\' defaultDay = \'01\' cityList = list(set(x[0] for x in airdataList)) if request.method == \'POST\': defaultYear = request.POST.get(\'defaultYear\') defaultMonth = request.POST.get(\'defaultMonth\') defaultDay = request.POST.get(\'defaultDay\') defaultCity = request.session.get(\'defaultCity\') currentDate = defaultYear + \'-\' + defaultMonth + \'-\' + defaultDay # 获取图表数据 xBar, yBar, xLine, yLine = getIndexData(defaultCity) currentData = querys(\'select * from airdata where city = %s and date = %s\', [defaultCity, currentDate], \'select\')[0] currentData = convert_none_to_null(currentData) return render(request, \'index.html\', { \'userInfo\': userInfo, \'yearList\': yearList, \'monthList\': monthList, \'dayList\': dayList, \'defaultYear\': defaultYear, \'defaultMonth\': defaultMonth, \'defaultDay\': defaultDay, \'cityList\': cityList, \'defaultCity\': defaultCity, \'currentData\': currentData, \'xBar\': convert_none_to_null(xBar), \'yBar\': convert_none_to_null(yBar), \'xLine\': convert_none_to_null(xLine), \'yLine\': convert_none_to_null(yLine) })

系统特色功能

1. 分布式数据处理

  • 利用Spark的分布式计算能力,处理大规模空气质量数据
  • 支持实时数据流处理和批量数据分析
  • 实现数据并行计算,提高处理效率

2. 智能预测分析

  • 基于历史数据训练机器学习模型
  • 支持多维度特征工程和模型优化
  • 提供预测结果的可信度评估

3. 多维度数据可视化

  • 支持多种图表类型:折线图、柱状图、散点图、热力图等
  • 实现交互式数据探索和动态筛选
  • 提供图表导出和分享功能

4. 实时数据更新

  • 定时任务自动更新空气质量数据
  • 支持增量数据同步和全量数据更新
  • 确保数据的时效性和准确性

项目部署与运维

环境配置

# 安装Python依赖pip install -r requirements.txt# 配置数据库mysql -u root -p < design_91_airdata.sql# 启动Spark集群spark-submit --master local[*] sparks/sparkAna.py# 启动Django应用python manage.py runserver

系统要求

  • Python 3.8+
  • Apache Spark 3.0+
  • MySQL 8.0+
  • 内存:8GB+
  • 存储:100GB+

性能优化

1. 数据处理优化

  • 使用Spark的缓存机制减少重复计算
  • 优化SQL查询,使用索引提高查询效率
  • 实现数据分区,提高并行处理能力

2. 前端性能优化

  • 使用CDN加速静态资源加载
  • 实现图表懒加载和分页显示
  • 优化JavaScript代码,减少页面加载时间

3. 数据库优化

  • 建立合适的索引提高查询速度
  • 使用连接池管理数据库连接
  • 定期清理历史数据,保持数据库性能

项目成果

1. 技术成果

  • 成功构建了完整的大数据分析和可视化系统
  • 实现了从数据采集到预测分析的完整流程
  • 建立了可扩展的系统架构,支持功能扩展

2. 业务价值

  • 为空气质量监测提供了科学的数据分析工具
  • 支持多城市空气质量对比和趋势分析
  • 为环境决策提供了数据支撑

3. 用户体验

  • 提供了直观友好的用户界面
  • 支持多维度数据探索和可视化
  • 实现了响应式设计,适配多种设备

未来展望

1. 功能扩展

  • 增加更多城市的空气质量数据
  • 支持更多污染物指标的监测
  • 集成更多机器学习算法

2. 技术升级

  • 升级到Spark 3.x版本,利用新特性
  • 引入深度学习模型提高预测精度
  • 实现实时数据流处理

3. 用户体验优化

  • 开发移动端APP
  • 增加个性化推荐功能
  • 提供API接口供第三方调用

总结

本项目成功构建了一个基于Spark的空气质量数据分析可视化系统,实现了从数据采集、存储、分析、预测到可视化的完整数据科学流程。系统采用现代化的技术栈,具备良好的可扩展性和维护性,为空气质量监测和分析提供了强有力的技术支撑。

通过本项目的开发,不仅掌握了大数据处理、机器学习、Web开发等多项技术,更重要的是理解了如何将复杂的技术栈整合成一个完整的业务系统,为用户提供有价值的服务。


联系方式:如需源码等情况,欢迎主页联系

许可证:MIT License