> 技术文档 > Spark DataFrame与数据源交互

Spark DataFrame与数据源交互


本篇文件将通过spark,完成对数据源的操作

数据源 比较长 这里仅仅做个参考,主要是学习整体思路

2018-09-04T20:27:31+08:00    http://datacenter.bdqn.cn/logs/user?actionBegin=1536150451540&actionClient=Mozilla%2F5.0+%28Windows+NT+10.0%3B+WOW64%29+AppleWebKit%2F537.36+%28KHTML%2C+like+Gecko%29+Chrome%2F58.0.3029.110+Safari%2F537.36+SE+2.X+MetaSr+1.0&actionEnd=1536150451668&actionName=startEval&actionTest=0&actionType=3&actionValue=272090&clientType=001_kgc&examType=001&ifEquipment=web&isFromContinue=false&skillIdCount=0&skillLevel=0&testType=jineng&userSID=B842B843AE317425D53D0C567A903EF7.exam-tomcat-node3.exam-tomcat-node3&userUID=272090&userUIP=1.180.18.157    GET    200    192.168.168.64    -    -    Apache-HttpClient/4.1.2 (java 1.5)

 整体流程

读入日志文件并转化为RDD[Row]类型按照Tab切割数据过滤掉字段数量少于8个的对数据进行清洗按照第一列和第二列对数据进行去重过滤掉状态码非200过滤掉event_time为空的数据url按照”&”以及”=”切割保存数据将数据写入mysql表中

 使用DataFrame向mysql,parquet,hive中写入数据和从mysql中读取数据

//要求是RDD[ROW],最终形成DataFrame,所以使用RDD[ROW]和Schema拼接成DataFramepackage org.Test1import org.apache.commons.lang.StringUtilsimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{StringType, StructField, StructType}object Etl { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster(\"local[*]\").setAppName(\"etl\") val ss = SparkSession.builder().config(conf).getOrCreate() val sc:SparkContext = ss.sparkContext import ss.implicits._ val RddLog = sc.textFile(\"in/test.log\") val rowRDD = RddLog.map(x => x.split(\"\\t\")) .filter(x => (x.length >= 8)) .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7))) //使用row和schema拼接形成dataframe val logsSchema:StructType = StructType( Array( StructField(\"event_time\", StringType), StructField(\"url\", StringType), StructField(\"method\", StringType), StructField(\"status\", StringType), StructField(\"sip\", StringType), StructField(\"user_uip\", StringType), StructField(\"action_prepend\", StringType), StructField(\"action_client\", StringType) ) ) val dataFrame = ss.createDataFrame(rowRDD, logsSchema) val rowsRDD = dataFrame.dropDuplicates(\"event_time\", \"url\") .filter(x => x(3) == \"200\") .filter(x => StringUtils.isNotEmpty(x(0).toString)) val full_log_df = rowsRDD.map(line => { val str = line.getAs[String](\"url\") val paramsArray: Array[String] = str.split(\"\\\\?\") var paramsMap: Map[String, String] = null; if (paramsArray.length == 2) { paramsMap = paramsArray(1) .split(\"&\") .map(x => x.split(\"=\")) .filter(x => x.length == 2) .map(x => (x(0), x(1))) .toMap } ( line.getAs[String](\"event_time\"), line.getAs[String](\"method\"), line.getAs[String](\"status\"), line.getAs[String](\"sip\"), line.getAs[String](\"user_uip\"), line.getAs[String](\"action_prepend\"), line.getAs[String](\"action_client\"), paramsMap.getOrElse[String](\"userUID\", \"\"), paramsMap.getOrElse[String](\"userSID\", \"\"), paramsMap.getOrElse[String](\"actionBegin\", \"\"), paramsMap.getOrElse[String](\"actionEnd\", \"\"), paramsMap.getOrElse[String](\"actionType\", \"\"), paramsMap.getOrElse[String](\"actionName\", \"\"), paramsMap.getOrElse[String](\"actionValue\", \"\"), paramsMap.getOrElse[String](\"actionTest\", \"\"), paramsMap.getOrElse[String](\"ifEquipment\", \"\") ) }).toDF(\"event_time\", \"method\", \"status\", \"sip\", \"user_uip\", \"action_prepend\", \"action_client\", \"userUID\", \"userSID\", \"actionBegin\", \"actionEnd\", \"actionType\", \"actionName\", \"actionValue\", \"actionTest\", \"ifEquipment\") JdbcUtils.dataFrameToMySql(full_log_df, JdbcUtils.tb_log, 1) ss.close() }}

JdbcUtils 进行数据库连接和逻辑编写

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}import java.util.Propertiesobject JdbcUtils { val url=\"jdbc:mysql://hd002:3306/etclog\" val driver =\"com.mysql.jdbc.Driver\" val user =\"root\" val password =\"ok\" val tb_full_log = \"tb_full_log\" private val prop = new Properties() prop.setProperty(\"user\",user) prop.setProperty(\"password\",password) prop.setProperty(\"driver\",driver) /** *将dataFrame写入到mysql中 * @param df spark中的DataFrame 对象 * @param table mysql表名 * @param op 操作方式 0:Append 1:Overwrite */ def dataFrameToMySql(df:DataFrame, table:String, op:Int=1) ={ if(op==0){ df.write.mode(SaveMode.Append).jdbc(url,table,prop) }else if(op==1){ df.write.mode(SaveMode.Overwrite).jdbc(url,table,prop) } } /** *将dataFrame写入到parquet文件中 * @param df spark中的DataFrame 对象 * @param outpath 输出路径 * @param op 操作方式 */ def dataFrameToParquet(df:DataFrame, outpath:String, op:Int=1)={ if(op==0){ df.write.mode(SaveMode.Append).parquet(outpath) }else if(op==1){ df.write.mode(SaveMode.Overwrite).parquet(outpath) } } /** *将dataFrame写入到hive中 * @param df spark中的DataFrame 对象 * @param table hive中的表名 * @param op 操作方式 */ def dataFrameToHive(df:DataFrame, table:String, op:Int=1)={ if(op==0){ df.write.mode(SaveMode.Append).saveAsTable(table) }else if(op==1){ df.write.mode(SaveMode.Overwrite).saveAsTable(table) } } /** * 从mysql中获取数据 * @param spark SparkSession 对象 * @param table mysql数据库名 * @return DataFrame对象 */ def getDataFrameFromMySQL(spark:SparkSession, table:String)={ spark.read.jdbc(url,table,prop) } def getDataFrameFromParquet(spark:SparkSession,path:String)={ spark.read.parquet(path) } def getDataFrameFromHive(spark:SparkSession,table:String)={ spark.sql(\"select * from\"+table) }}

从数据源中读取数据

import org.apache.spark.SparkConfimport org.apache.spark.sql.{DataFrame, SparkSession}import org.apache.spark.storage.StorageLevelobject Retention { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(\"Retention\").setMaster(\"local[*]\") val ss = SparkSession.builder().config(conf).getOrCreate() val sc = ss.sparkContext import ss.implicits._ val logDF:DataFrame = JdbcUtils.getDataFrameFromMySQL(ss, \"tb_full_log\").persist(StorageLevel.MEMORY_ONLY) logDF.filter($\"actionName\"===\"Registered\") logDF.printSchema() logDF.show() }}