> 文档中心 > 基于Spark技术实现大规模时间序列异常检测成功落地

基于Spark技术实现大规模时间序列异常检测成功落地

最近一直忙于异常检测项目的上线,一直没有时间来更新博客,该系统已经在大规模时间序列场景稳定运行1个多月,简单总结一下。

达到的目标,通过Spark对3万个服务器进行预测,每个服务器包括5个指标,每个指标对应一个时间序列,模型全量15万,全量训练用21个Core耗时3个小时,预测程序12个core运行状态良好,整个系统已经平稳运行一个月,整个系统预估能够支持上亿个模型进行同时训练和预测。

设计到的技术:SPARK/Kafka/HBase/Opentsdb/Redis/ES/ZK/YARN/HDFS,其中SPARK做训练和预测,Kafka从实时溜进来要分析的数据,Opentsdb存储训练模型要用的时间序列,Redis存储训练好的模型,ES存储预测过程中检测出来的异常数据点。

整个项目设计到训练和预测两个过程,

1).训练过程用到的技术Opentsdb(基于HBase数据库)开源时间序列数据库、Spark用于模型训练,训练之后模型保存到Redis数据库,之前训练之后的模型保存到HDFS(由于预测程序从HDFS加载模型比较慢,最后调整到redis数据库);

采用两种算法进行模型的训练:指数移动平均和HotWinters,模型一天训练一次,即每天0点开始训练,训练介绍后根据训练出的新模型进行异常检测,具体包括点的预测以及点的异常检测;每4个小时检查一下是否有新增的模型,若有就进行训练并把模型保存到Redis中,每训练一个模型需要对应时间序列跨度为7天的数据,合计60 * 24 * 7 大概一万个点。

2).预测过程永不停机,每4个小时检索MYSQL数据库,每天会有一个全量的模型更新,若有需要增量更新的模型,直接在Executor内存中跟新成最新的模型,若遇到有停止状态的模型,就把内存中对应的模型删除掉;该过程采用Spark Streaming提供的MapWithState算子,实现对数据进行实时预测和实时更新。

Kafka设计成保存以分钟为粒度的预测数据,SparkStreaming进行实时预测,每分钟要预测15万个点,检测出来的异常点存储到ES数据库中;预测过程开发两个版本:(一)在Driver端采用多线程方式从Redis数据库加载模型,(二)在Executor端分别加载自己需要的模型,通过性能测试验证第二个版本性能较好;

训练和预测都通过MySQL数据库维护模型不同时间点的状态集

val MODEL_STATUS_LOADED = 0val MODEL_STATUS_TRAINED = 100val MODEL_STATUS_STILL_AVAILABLE = 101val MODEL_STATUS_NEW_ADD = 200val MODEL_STATUS_ABNORMAL = 300val MODEL_STATUS_LACK_DATA = 1000val MODEL_STATUS_INACTIVE = 10000

对应配置表结构如下:

 CREATE TABLE `anomaly_config` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `appid` int(11) NOT NULL,
  `insid` int(11) NOT NULL,
  `metric` varchar(30) NOT NULL,
  `algorithm` varchar(30) NOT NULL,
  `window_width` int(11) NOT NULL DEFAULT '7',
  `status` int(11) DEFAULT '10000' COMMENT '0:Loaded; 100:Trained, need loading; 101:Training failed, still available; 200:New item, need training; 300:Abnormal, unavailable; 1000:No data; 10000:Inactive',
  `last_model_time` timestamp NOT NULL DEFAULT '2000-01-01 00:00:00',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `idx_config_lcxtest_0` (`appid`,`insid`)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 

预测程序通过SparkStreaming实时从kafka通过director模式进行拉取数据,自己控制kafka中每次消费之后的offset偏移量,并保存到zk节点中。第一次预测时间从Redis加载模型,延迟2分钟,之后的预测以及模型更新都通过mapwithstate算子在内存中维护一个state状态要保存增量更新之后的模型,直接操作内存速度比较快,也减少不必要的网络传输,也只在模型第一次加载时间需要从redis数据库读取到自己的模型。

训练过程通过定位从Opentsdb读取数据存在性能问题,每次训练一个模型要从OPENTSDB读取时间跨度为1个周的数据会设计到1万个点,由于HBase当初优化比较简陋,最后调整一些参数基本达到要求,具体参数如下:
hbase.hregion.max.filesize    region  split的大小  调整到 512M(536870912) ,  小的region有利于 数据在RS上面进行负载均衡    
hbase.regionserver.handler.count  设置成cup * 3   =  32   500有可能cpu上下文切换太多
hbase.client.write.buffer  调整到8M   2097152 (2M)    134217728(128M)   67108864  (64m)  268435456(256m)
hbase.client.scanner.caching  180行    默认Integer.MAX_VALUE ,5万行还没有2M,所有会有2万行进行查询,实际用的只有24 * 7 = 168行; hbase.regionserver.lease.period  (生产环境中,在执行一些任务时,如mapred时出现lease超时的报错,那这个时候就需要去调大这个值了,默认60秒够用)
服务器端内存消耗  大概为  hbase.client.write.buffer  * hbase.regionserver.handler.count

从服务器角度来看:
hbase.client.max.total.tasks   200   The maximum number of concurrent  tasks a single HTable instance will send to the cluster. (默认100)
hbase.client.max.perserver.tasks    30    The maximum number of concurrent  tasks a single HTable instance will send to a single region server. (默认2)
从客户端角度来看
hbase.client.max.perregion.tasks  默认值1,要调整大      30   当有hbase.client.max.perregion.tasks个在写一个, new puts won’t be sent to this region until some writes finishes
hbase.storescanner.parallel.seek.enable 这个能够并行扫数据,默认false,   Enables StoreFileScanner parallel-seeking in StoreScanner
hbase.master.balancer.maxRitPercent   If set this config to 0.01, It means that there are at most 1% regions in transition when balancing. Then the cluster’s availability is at least 99% when balancing.
hbase.client.scanner.max.result.size  默认2M
hbase.server.scanner.max.result.size  默认100M
hbase.ipc.server.callqueue.read.ratio  默认不区分读写,可以设置0.7,7 : 3 的 读写队列

为了优化读取性能,在本地搭建一个OPENTSDB源代码的调试环境,源码跟踪配置参数,从启动类一直跟踪到HBaseClient从HBase数据库中读取数据整个流程;

OPENTSDB参数优化:

tsd.storage.hbase.scanner.maxNumRows = 180,因为opentsdb在HBase中存放数据以小时为单位,一个小时的点默认会存储成为一行,  180 * 60 = 14800  比1一个星期的时间序列点稍大;

tsd.storage.hbase.prefetch_meta = true  开启该参数

tsd.core.preload_uid_cache = true开启该参数

预测程序启动脚本内容:

source /root/.bashrc
#export JAVA_HOME=/opt/jdk1.8.0_60
nohup /home/hadoop/.versions/spark-2.0.1/bin/spark-submit      \
--class  com.tingyun.tadsml.AnomalyDetectionManager      \
--conf "spark.yarn.driver.memoryOverhead=2048" \
--conf "spark.yarn.executor.memoryOverhead=2048"      \
--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"       \
--conf "spark.streaming.kafka.maxRatePerPartition=100"      \
--conf "spark.default.parallelism=12" \
--files /root/ML/lcxtest/log4j.properties          \
--master yarn          \

训练源代码:

package com.tingyun.tadsmlimport java.util.{Calendar, Date, Timer, TimerTask}import com.tingyun.tads.forecast.timeseries.TimeSeriesimport com.tingyun.tadsml.model.{ConfManager, Configuration, InfluxManager}import grizzled.slf4j.Loggerimport org.apache.spark.{Partitioner, SparkConf, SparkContext}object TrainTimeSerialsManager {  private val logger = Logger("com.tingyun.tadsml.TrainTimeSerialsManager")  def main(args: Array[String]): Unit = {    if (args.length != 1) {      logger.error("Please input the path of configuration!")      System.exit(1)    }    ConfManager.conf = new Configuration().load(args(0))    val sc = new SparkContext(new SparkConf()      .setAppName("TrainTimeSerialsManager")    )    val globalConf = sc.broadcast(ConfManager.conf)    // hourly timer    new Timer().schedule(new TimerTask {      override def run(): Unit = { if (Calendar.getInstance().get(Calendar.HOUR_OF_DAY) / ConfManager.conf.TRAINING_ABNORMAL_INTERVAL == 0) {   val conf = ConfManager.getNormalTrainingConfiguration   if (conf != null)     task(conf)   else     logger.warn("Daily training got NO CONFIGURATION!") } val conf = ConfManager.getAbnormalTrainingConfiguration if (conf != null)   task(conf) else   logger.warn("Hourly training got NO CONFIGURATION!")      }    }, 0, ConfManager.conf.TRAINING_ABNORMAL_INTERVAL * 3600 * 1000)    def task(taskConf: List[String]): Unit = {      sc.parallelize(taskConf) .zipWithIndex() .map(line => (line._2, line._1)) .partitionBy(new modelInstPartitioner(sc.defaultParallelism)) .foreachPartition(partition => {   ConfManager.conf = globalConf.value   InfluxManager.init()   partition.foreach(line => {     val fields = line._2.split("#")     val appid = fields(0)     val insid = fields(1)     val metric = fields(2)     val windowWidth = fields(3).toInt     val details = fields(4)     val context = new model.TrainingContext(appid, insid, metric)     context.evaluateTimeWithWindow(windowWidth)     val timeSeriesOrigin = ModelTrainer.genPredictedTimeSeriesFromInflux(context)     if (timeSeriesOrigin != null && timeSeriesOrigin.size > (windowWidth - ConfManager.conf.DATA_LACK_CEILING) * 1440)details.split(";").foreach(detail => {  val subStr = detail.split(",")  context.algorithm = subStr(0)  context.lastModelTime = subStr(1).toLong  val ts = new TimeSeries  ts.addAll(timeSeriesOrigin)  try {    ModelTrainer.train(ts, context)    ConfManager.updateModelStatus(context, ConfManager.conf.MODEL_STATUS_TRAINED)  } catch {    case ex: Exception =>      ex.printStackTrace()      val status = if (new Date().getTime - context.lastModelTime < ConfManager.conf.MODEL_EXPIRE * 86400 * 1000)   ConfManager.conf.MODEL_STATUS_STILL_AVAILABLE else   ConfManager.conf.MODEL_STATUS_ABNORMAL      ConfManager.updateModelStatus(context, status)  }})     else {if (timeSeriesOrigin != null)  logger.warn(context.toString + s" LACK DATA with data length:[${timeSeriesOrigin.size}]!")else  logger.warn(context.toString + " NO DATA!")ConfManager.updateModelStatus(context, ConfManager.conf.MODEL_STATUS_LACK_DATA)     }   }) })    }  }}class modelInstPartitioner(partitions: Int) extends Partitioner {  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")  override def numPartitions: Int = partitions  override def getPartition(key: Any): Int = {    val k = key.asInstanceOf[Long]    k.toInt % numPartitions  }}

预测源代码:

package com.tingyun.tadsmlimport java.util.{Calendar, Date, Timer, TimerTask}import com.networkbench.avro.cache.ZookeeperAvroSchemaPersisterimport com.networkbench.avro.serialize.AvroMessageDecoderimport com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own.MonitorWrappedMessageimport com.tingyun.mltrain.train.PredictModelimport com.tingyun.tads.adm.control.AnomalyDetectorimport com.tingyun.tads.adm.models.MetricMetaimport com.tingyun.tads.forecast.automatic.TimeSeriesPredictorAdapterimport com.tingyun.tads.forecast.models.PersistentModelimport com.tingyun.tads.forecast.timeseries.DataPointimport com.tingyun.tadsml.model._import grizzled.slf4j.Loggerimport kafka.serializer.{DefaultDecoder, StringDecoder}import org.apache.spark.broadcast.Broadcastimport org.apache.spark.SparkConfimport org.apache.spark.streaming._import org.json4s._import org.json4s.JsonDSL._import org.json4s.jackson.JsonMethods._import scala.collection.mutableimport scalaj.http.{Http, HttpOptions}object AnomalyDetectionManager {  val logger: Logger = Logger("com.tingyun.tadsml.AnomalyDetectionManager")  def main(args: Array[String]): Unit = {    if (args.length != 1) {      logger.error("Invalid parameters, please input the right path of configuration.")      System.exit(1)    }    ConfManager.conf = new Configuration().load(args(0))    val sc = new SparkConf()      .setAppName("AnomalyDetectionManager")      .set("spark.streaming.stopGracefullyOnShutdown", "true") // 优雅停机      .set("spark.streaming.backpressure.enabled", "true") // 开启反压机制      .set("spark.shuffle.file.buffer", "96k") // 默认32K      .set("spark.shuffle.consolidateFiles", "true") // reduceByKeyAndWindow采用HashPartitioner      .set("spark.shuffle.io.maxRetries", "20") // 默认3次      .set("spark.shuffle.io.retryWait", "30s") // 默认5s,有可能垃圾收集在作怪      .set("spark.reducer.maxSizeInFlight", "96m") // 默认48m      .set("spark.kryoserializer.buffer.max", "512m") // 默认64m 太小    val ssc = new StreamingContext(sc, Seconds(ConfManager.conf.PREDICTION_BATCH_DURATION))    val globalConf = ssc.sparkContext.broadcast(ConfManager.conf)    // hour timer    var predictConf: Broadcast[mutable.Map[String, mutable.Set[String]]] = null    new Timer().schedule(new TimerTask {      override def run(): Unit = { val conf = ConfManager.getAvailablePridictionConf if (conf != null)   predictConf = ssc.sparkContext.broadcast(conf) else   logger.warn("Prediction got NO CONFIGURATION!")      }    }, 0, ConfManager.conf.PREDICTION_CONF_UPDATE_INTERVAL * 3600 * 1000)    ssc.checkpoint(ConfManager.conf.PREDICTION_CHECKPOINT_ADDR)    val kafkaTopics = ConfManager.conf.TOPICNAME.split(",").toSet    val kafkaParams = Map[String, String](      "metadata.broker.list" -> ConfManager.conf.BROKERLIST,      "auto.offset.reset" -> "smallest",      "group.id" -> ConfManager.conf.PREDICTION_GROUPID    )    val km = new KafkaManager(kafkaParams)    val kafkaDirectStream = km.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, kafkaTopics).cache    kafkaDirectStream.mapPartitions( partitions => {   ConfManager.conf = globalConf.value   InfluxManager.init()   val zkAvroSchemaPersister = new ZookeeperAvroSchemaPersister   zkAvroSchemaPersister.setServers(ConfManager.conf.ZKAVRO)   zkAvroSchemaPersister.setConnectionTimeout(10000)   zkAvroSchemaPersister.init()   val avroMessageDecoder = new AvroMessageDecoder   avroMessageDecoder.setAvroMessageEntityPackageToScan("com.networkbench.newlens.datacollector.backend.aggregate.wrappedmessage.own")   avroMessageDecoder.setAvroSchemaPersister(zkAvroSchemaPersister)   try {     partitions.map(line => avroMessageDecoder.decode(line._2).asInstanceOf[MonitorWrappedMessage]).toList.toIterator   } finally {     zkAvroSchemaPersister.destroy() // 关闭zk链接   } }      ).map(mmm => (mmm.getApplicationId + "_" + mmm.getApplicationInstanceId, mmm))      .mapWithState(StateSpec.function(mapping _))      .foreachRDD(rdd => rdd.foreach(println))    //更新zk中的offset    kafkaDirectStream.foreachRDD(rdd => {      if (!rdd.isEmpty) km.updateZKOffsets(rdd)    })    ssc.start    ssc.awaitTermination    def mapping(word: String, message: Option[MonitorWrappedMessage], state: State[mutable.Map[String, PredictModel]])    : Option[(String, mutable.Map[String, PredictModel])] = {      predictConf.value.get(word) match { case Some(conf) => conf.foreach(k => {   val fields = k.split("_")   // Load model from Redis   val redisModel =     try {RedisManager.loadModel(word + "_" + k)     } catch {case ex: Exception => ex.printStackTrace(); null     }   if (redisModel != null) {     val predictionModel = new TimeSeriesPredictorAdapter(redisModel).getPredictor     val predictModel = new PredictModel     predictModel.setPredictionModel(predictionModel)     predictModel.setLastTimeStap(predictionModel.lastTimestamp)     // Update model status     ConfManager.updateModelStatus(word.split("_")(0).toInt, word.split("_")(1).toInt, fields(0), fields(1),ConfManager.conf.MODEL_STATUS_LOADED)     // Update state     state.getOption match {case Some(v) =>  v += (k -> predictModel)  state.update(v)case None =>  val models = mutable.Map[String, PredictModel]()  models += (k -> predictModel)  state.update(models)     }     predictConf.value -= word   } else     logger.error(s"Cannot get key:[${word}_$k] from Redis!") }) case None => logger.debug(s"There is no fresh model for instance:[$word].")      }      val modelAdapter = state.getOption match { case Some(v) => v case _ => null      }      if (modelAdapter != null) { val ctx = new PredictContext(message.get) modelAdapter.keys.foreach(k => {   ctx.metric = k.split("_")(0)   ctx.algorithm = k.split("_")(1)   ctx.pModel = modelAdapter(k)   predict(ctx) })      }      Some(word, modelAdapter)    }    def predict(ctx: PredictContext): Unit = {      val persistentModel: PersistentModel = null      val tsModel = new TimeSeriesPredictorAdapter(persistentModel)      tsModel.setPredictor(ctx.pModel.getPredictionModel)      logger.info(ctx.toString + s"predictor:[${tsModel.getPredictor}] before prediction.")      val updateStartTime = ctx.pModel.getLastTimeStap + 60      if (updateStartTime < ctx.timestamp) { val updateEndTime = ctx.timestamp - 60 val trainCtx = new model.TrainingContext(ctx.appid, ctx.insid, ctx.metric) trainCtx.algorithm = ctx.algorithm trainCtx.startTime = updateStartTime trainCtx.endTime = updateEndTime val timeSeries = ModelTrainer.genPredictedTimeSeriesFromInflux(trainCtx) if (timeSeries != null) {   try {     tsModel.update(timeSeries)     logger.info(ctx.toString + s" model updated succeed before prediction.")   } catch {     case ex: Throwable =>ex.printStackTrace()logger.error(ctx.toString + s" model update exception before prediction!")   } } val dp = new DataPoint(ctx.metricValues(ctx.metric), ctx.timestamp) var actual = dp val predicted = tsModel.predict(dp) logger.info(ctx.toString + s"predicted with value:[${predicted.value}]") val ad = new AnomalyDetector(new MetricMeta(), null) val anomaly = ad.detect(tsModel, dp, predicted) if (anomaly.isAnomaly) {   val insidTransformed: String = if (ctx.insid == "-1") "999999" else ctx.insid   val json = compact(render(     ("app_id" -> ctx.appid) ~ ("ins_id" -> insidTransformed) ~ ("mmetric" -> ctx.metric) ~ ("modelName" -> ctx.algorithm) ~("metricTime" -> ctx.timestamp) ~ ("anomaly" -> parse(anomaly.toJson, useBigDecimalForDouble = true))   ))   val result = Http("http://" + ConfManager.conf.ELASTIC_SERVER_HOST + ":" + ConfManager.conf.ELASTIC_SERVER_PORT + ConfManager.conf.ES_ANOMALY)     .postData(json)     .header("Content-Type", "application/json")     .header("Charset", "UTF-8")     .option(HttpOptions.readTimeout(10000)).asString   logger.info(ctx.toString + s"transfered anomaly data:[$json] to ElasticSearch with result:[$result].")   actual = anomaly.getNormalValue } // Update prediction records every x minutes   分散更新redis 中的 start  end两个时间戳,来控制训练TS的矫正方式 if (ctx.appid.toInt % ConfManager.conf.RECORDS_UPDATE_FREQ ==   Calendar.getInstance().get(Calendar.MINUTE) % ConfManager.conf.RECORDS_UPDATE_FREQ) {   val redisKey = ctx.appid + "_" + ctx.insid + "_" + ctx.metric + "_" + ctx.algorithm   try {     val record = RedisManager.getPredictionRecord(redisKey)     if (record == null || new Date().getTime / 1000 - record._2 > ConfManager.conf.RECORDS_GAP_CEILING * 3600)RedisManager.updatePredictionRecord(redisKey, new Date().getTime / 1000, new Date().getTime / 1000)     elseRedisManager.updatePredictionRecord(redisKey, record._1, new Date().getTime / 1000)   } catch {     case ex: Exception =>ex.printStackTrace()logger.error(ctx.toString + "update prediction record exception!")   } } try {   tsModel.update(actual)   logger.info(ctx.toString + s"predictor:[${tsModel.getPredictor}] model updated succeed after prediction.") } catch {   case ex: Throwable =>     ex.printStackTrace()     logger.error(ctx.toString + s"predictor:[${tsModel.getPredictor}] model update exception after prediction!") }      }    }    class PredictContext(msg: MonitorWrappedMessage) {      val appid: String = msg.getApplicationId.toString      val insid: String = msg.getApplicationInstanceId.toString      val metricValues: Map[String, Double] = Map[String, Double]( ConfManager.conf.CPU_METRIC -> msg.getCpuUsageRatio, ConfManager.conf.MEM_METRIC -> msg.getMemoryUsageMean, ConfManager.conf.REPS_METRIC -> msg.getResponseTimeMean, ConfManager.conf.ERRO_METRIC -> msg.getErrRatio, ConfManager.conf.APDEX_METRIC -> msg.getApdex      )      val timestamp: Long = msg.getTimestamp      var algorithm: String = _      var metric: String = _      var pModel: PredictModel = _      override def toString: String = s"appid:[$appid], insid:[$insid], metric:[$metric], algorithm:[$algorithm], value:[${metricValues(metric)}], " + s"timestamp:[$timestamp], "    }  }}