FLINK异常总结
Flink常见异常错误总结
- 背景
- 异常信息总结
-
- 异常1:local class incompatible
- 异常2:Failure opening selector
- 异常3:The TaskExecutor is shutting down.
- 异常4:Cannot instantiate user function.
- 异常5:The RemoteEnvironment cannot be instantiated when running in a pre-defined context
- 异常6:Deadlock found when trying to get lock; try restarting transaction
- 异常7:Timeout of 60000ms expired before the position for partition XXX-1 could be determined
- 异常8:Exceeded checkpoint tolerable failure threshold
- 异常9:Task did not exit gracefully within 600 + seconds.
- 异常10:The last packet sent successfully to the server was XXX,XXX milliseconds ago.
- 异常11:java.lang.RuntimeException: Request cannot be executed; I/O reactor status: STOPPED
背景
最近工作中在做filnk的相关处理,因此将碰到过的一些flink相关异常进行总结,以避免他人在碰到相同的问题时,费时费力的去排查,希望能对大家有所帮助,如有雷同,不胜荣幸。
异常信息总结
异常1:local class incompatible
异常说明
flink集群中报错信息如下:
Flink Web App Exceptions
java.io.InvalidClassException: PublishInfMsg; local class incompatible: stream classdesc serialVersionUID = xxx, local class serialVersionUID = -xxx
异常原因
本地和远程的类中serialVersionUID 值不一样
问题可能原因:
- 在flink集群运行程序时,上传的jar包应用,与本地的程序代码不一致.
- flink集群的版本高于程序中flink依赖的版本
解决方法
请检查jar的地址是否写错,确包jar包中的类和本地程序代码一致。
异常2:Failure opening selector
异常说明
flink集群中报错信息如下:
Flink Web App Exceptions
2020-10-28 15:49:27java.lang.IllegalStateException: org.apache.http.nio.reactor.IOReactorException: Failure opening selectorat org.apache.http.impl.nio.client.IOReactorUtils.create(IOReactorUtils.java:45)at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:667)at java.security.AccessController.doPrivileged(Native Method)at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:219)at java.security.AccessController.doPrivileged(Native Method)at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:191)at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:283)at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:275)at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)at org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)at java.lang.Thread.run(Thread.java:748)Caused by: org.apache.http.nio.reactor.IOReactorException: Failure opening selectorat org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.<init>(AbstractMultiworkerIOReactor.java:144)at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.<init>(DefaultConnectingIOReactor.java:82)at org.apache.http.impl.nio.client.IOReactorUtils.create(IOReactorUtils.java:43)... 21 moreCaused by: java.io.IOException: 打开的文件过多at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)at java.nio.channels.Selector.open(Selector.java:227)at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.<init>(AbstractMultiworkerIOReactor.java:142)... 23 more
异常原因
执行任务时占用的句柄数超出限制
解决方法
可参考简书解决方案:
https://www.jianshu.com/p/4aba355b1a3d
可以参考如下两种解决方案:
- 降低算子并行度
env.setParallelism(parallelism);
- ESRestClientFactory的线程数限制在指定数量
esSinkBuilder.setRestClientFactory(new ESRestClientFactory(threadPoolCount));
异常3:The TaskExecutor is shutting down.
异常说明
写了一个简单的flink程序,功能只是用来打印数据源,当java版本选择jdk1.8.0_31的时候,运行几秒后,会出现如下错误:
Local程序控制台输出
[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (1/4) (4387dc71ef547750206e8f85f4024996) switched from RUNNING to FAILED.org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:301)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)at akka.actor.ActorCell.terminate(ActorCell.scala:374)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job my stream (52dcd2c2bdcf71c99eea341724ec2737) switched from state RUNNING to FAILING.org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:301)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)at akka.actor.ActorCell.terminate(ActorCell.scala:374)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)[flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Closing the SlotManager.[flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Suspending the SlotManager.[flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager - Unregister TaskManager 2dae47ecb3d46b4bf76e73b7ae4d9e6e from the SlotManager.[ForkJoinPool.commonPool-worker-1] INFO org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Removing cache directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-web-ui[flink-akka.actor.default-dispatcher-5] INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 5e54b4ca1c22736125ba4e4a45d897bb: ResourceManager leader changed to new address null.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (ea8a2962fb074ca77685c9de99ad1d52) switched from RUNNING to CANCELING.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (2/4) (4b9478b6a596e9693a97f932afb61f3b) switched from RUNNING to CANCELING.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (3/4) (e9cb2f9d9681598429acfdda9bdeafbf) switched from RUNNING to CANCELING.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (4/4) (c442536600835d379a79412c55b7ba48) switched from RUNNING to CANCELING.[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory C:\Users\ADMINI~1\AppData\Local\Temp\flink-io-b9a09532-72cf-4017-92d2-ff0bf47f827d[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source (1/1) (ea8a2962fb074ca77685c9de99ad1d52) switched from CANCELING to CANCELED.[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.io.network.NetworkEnvironment - Shutting down the network environment and its components.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (4/4) (c442536600835d379a79412c55b7ba48) switched from CANCELING to CANCELED.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (2/4) (4b9478b6a596e9693a97f932afb61f3b) switched from CANCELING to CANCELED.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: Print to Std. Out (3/4) (e9cb2f9d9681598429acfdda9bdeafbf) switched from CANCELING to CANCELED.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Try to restart or fail the job my stream (52dcd2c2bdcf71c99eea341724ec2737) if no longer possible.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job my stream (52dcd2c2bdcf71c99eea341724ec2737) switched from state FAILING to FAILED.org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:301)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)at akka.actor.ActorCell.terminate(ActorCell.scala:374)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Could not restart the job my stream (52dcd2c2bdcf71c99eea341724ec2737) because the restart strategy prevented it.org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:301)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)at akka.actor.ActorCell.terminate(ActorCell.scala:374)at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)at akka.dispatch.Mailbox.run(Mailbox.scala:224)at akka.dispatch.Mailbox.exec(Mailbox.scala:234)at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job 52dcd2c2bdcf71c99eea341724ec2737.[flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Stop job leader service.[flink-akka.actor.default-dispatcher-2] INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.[Time Trigger for Source: Custom Source (1/1)] WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker.java.lang.RuntimeException: segment has been freedat org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.IllegalStateException: segment has been freedat org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:228)at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:381)at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:85)at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:97)at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)... 10 more[Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source (1/1) (ea8a2962fb074ca77685c9de99ad1d52) switched from RUNNING to FAILED.java.lang.RuntimeException: segment has been freedat org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)at source.JsonGeneratorSource.run(JsonGeneratorSource.java:28)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.IllegalStateException: segment has been freedat org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:228)at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:381)at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:85)at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:97)at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)... 12 more[Source: Custom Source (1/1)] INFO org.apache.flink.runtime.taskmanager.Task - Freeing task resources for Source: Custom Source (1/1) (ea8a2962fb074ca77685c9de99ad1d52).[Source: Custom Source (1/1)] ERROR org.apache.flink.runtime.taskmanager.Task - FATAL - exception in resource cleanup of task Source: Custom Source (1/1) (ea8a2962fb074ca77685c9de99ad1d52).java.lang.IllegalStateException: Memory manager has been shut down.at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:479)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:836)at java.lang.Thread.run(Thread.java:745)[Source: Custom Source (1/1)] ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor - FATAL - exception in resource cleanup of task Source: Custom Source (1/1) (ea8a2962fb074ca77685c9de99ad1d52).java.lang.IllegalStateException: Memory manager has been shut down.at org.apache.flink.runtime.memory.MemoryManager.releaseAll(MemoryManager.java:479)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:836)at java.lang.Thread.run(Thread.java:745)
异常原因
jdk版本错误,具体可能是flink对jdk版本有要求
解决方法
切换java版本,选择jdk1.8.0_77,错误自动消失
异常4:Cannot instantiate user function.
异常说明
当我使用lamda表达式来写map方法时,就出现了如下异常:
Flink Web App Exceptions
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)at java.lang.Thread.run(Thread.java:748)Caused by: java.io.IOException: unexpected exception typeat java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1682)at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1254)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278)at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202)at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427)at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:276)... 8 moreCaused by: java.lang.reflect.InvocationTargetExceptionat sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)at sun.reflect.GeneratedMethodAccessor36.invoke(Unknown Source)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248)... 24 moreCaused by: java.lang.NoClassDefFoundError: Could not initialize class com.lenovo.eventanalysis.app.JhistAnalysisStreamApp... 32 more
异常原因
使用lamda表达式来编写map实现方法
解决方法
修改map实现方法为匿名内部类以后错误消失
异常5:The RemoteEnvironment cannot be instantiated when running in a pre-defined context
异常说明
将flink应用提交到集群时,出现以下异常:
Flink Web App Exceptions
org.apache.flink.api.common.InvalidProgramException: The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)at org.apache.flink.api.java.RemoteEnvironmentConfigUtils.validate(RemoteEnvironmentConfigUtils.java:52)at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.validateAndGetEffectiveConfiguration(RemoteStreamEnvironment.java:178)at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:158)at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:144)at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:113)at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:85)at org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.<init>(RemoteStreamEnvironment.java:62)at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createRemoteEnvironment(StreamExecutionEnvironment.java:2035)at com.lenovo.monitor.iface.Driver.driveRunner(Driver.java:81)at com.lenovo.monitor.iface.Driver.run(Driver.java:65)at com.lenovo.monitor.app.MyTestApp.main(MyTestApp.java:51)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
异常原因
提交到集群的flink应用程序,在指定StreamExecutionEnvironment环境时,使用的是createRemoteEnvironment,即远程代码提交的方式。
解决方法
将StreamExecutionEnvironment修改为getExecutionEnvironment,获取当前执行环境。
异常6:Deadlock found when trying to get lock; try restarting transaction
异常说明
在flink的RichSink中,对Mysql数据库进行读写操作,对同一张表进行了先Truncate后Insert操作,在并发度>1时,多个线程操作同一张表进行删除和插入,出现该问题。
Flink Web App Exceptions
2020-12-22 16:59:40 [Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, DependencyWindow) -> Sink: Unnamed (2/4)] ERROR com.lenovo.monitor.function.sink.rich.DependencyRichSink - Dependency--Failed to operate db.com.mysql.cj.jdbc.exceptions.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transactionat com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:123)at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:955)at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1094)at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1042)at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1345)at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdate(ClientPreparedStatement.java:1027)at com.lenovo.monitor.util.DbUtils.addDeleteModify(DbUtils.java:115)at com.lenovo.monitor.function.sink.rich.DependencyRichSink.invoke(DependencyRichSink.java:37)at com.lenovo.monitor.function.sink.rich.DependencyRichSink.invoke(DependencyRichSink.java:16)at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)at com.lenovo.monitor.function.window.DependencyWindow.apply(DependencyWindow.java:189)at com.lenovo.monitor.function.window.DependencyWindow.apply(DependencyWindow.java:21)at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1181)at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$13(StreamTask.java:1172)at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:282)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:190)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)at java.lang.Thread.run(Thread.java:748)
异常原因
多线程对同一张表进行Truncate和Inser。
解决方法
换用其他逻辑进行代替。
异常7:Timeout of 60000ms expired before the position for partition XXX-1 could be determined
异常说明
在上线flink程序到生产集群时出现报错,报错如下:
Flink Web App Exceptions
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition job_monitor-0 could be determined
异常原因
在TaskManager的log日志中,发现了java.nt.UnknownHostException相关异常,找不到生产环境Kafka服务器的域名信息。
解决方法
将生产环境Kafka的域名信息配置到hosts文件中,重新运行错误自动消失。
异常8:Exceeded checkpoint tolerable failure threshold
异常说明
Flink程序在Kafka中堆积的数据过多时,在exception中发现有Exceeded checkpoint tolerable failure threshold的错误信息。
异常原因
观察flink中checkpoint的保存信息,发现checkpoint每次在指定的CheckPoint的超时时间内都保存失败,如下图所示。
CheckPoint保存失败
继续在web界面查看应用的运行信息,发现在某些节点中出现高背压现象。
Filter算子背压状态
Window处理背压状态
可能由于数据流速太慢,导致CheckPoint的barrier无法在指定时间内(5m)完成运动,从而无法形成CheckPoint快照。继续查看算子中数据流动的大小,如下图所示。
数据流动大小
从上图可以看到在最后的Sink算子,流入的数据远小于上一个Window处理逻辑的算子,从此可以知道Sink的消费速度是整个数据流的瓶颈。
解决方法
解决方法有多种…
蠢方法:增大检查点的timeout,以确保有充分的时间来完成checkpoint快照,相关代码setCheckpointTimeout(checkpointTimeout)。
聪明方法:对数据流速进行提升,找到算子的性能瓶颈(数据库操作、IO、复杂的业务逻辑等),能够在较短时间内,完成barrier的移动形成Checkpoint。
异常9:Task did not exit gracefully within 600 + seconds.
异常说明
TaskManager在运行一段时间后,jvm进程挂掉,并在日志中可看到相关节点的HeartBeat Timeout。
TaskManager日志信息
2021-02-10 00:40:23,805 WARN org.apache.flink.runtime.taskmanager.Task [] - Task 'DependencyWindow -> Sink: DependencyRichSink (2/3)#4' did not react to cancelling signal for 30 seconds, but is stuck in method: java.util.Arrays.copyOf(Arrays.java:3181)java.util.concurrent.CopyOnWriteArrayList.addIfAbsent(CopyOnWriteArrayList.java:637)java.util.concurrent.CopyOnWriteArrayList.addIfAbsent(CopyOnWriteArrayList.java:615)com.mysql.cj.jdbc.ConnectionImpl.registerStatement(ConnectionImpl.java:1811)com.mysql.cj.jdbc.StatementImpl.<init>(StatementImpl.java:249)com.mysql.cj.jdbc.ClientPreparedStatement.<init>(ClientPreparedStatement.java:175)com.mysql.cj.jdbc.ClientPreparedStatement.<init>(ClientPreparedStatement.java:213)com.mysql.cj.jdbc.ClientPreparedStatement.<init>(ClientPreparedStatement.java:194)com.mysql.cj.jdbc.ClientPreparedStatement.getInstance(ClientPreparedStatement.java:136)com.mysql.cj.jdbc.ConnectionImpl.clientPrepareStatement(ConnectionImpl.java:677)com.mysql.cj.jdbc.ConnectionImpl.prepareStatement(ConnectionImpl.java:1670)com.mysql.cj.jdbc.ConnectionImpl.prepareStatement(ConnectionImpl.java:1590)com.lenovo.monitor.util.DbUtils.select(DbUtils.java:121)com.lenovo.monitor.function.sink.rich.DependencyRichSink.invoke(DependencyRichSink.java:40)com.lenovo.monitor.function.sink.rich.DependencyRichSink.invoke(DependencyRichSink.java:16)org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)com.lenovo.monitor.function.window.DependencyWindow.apply(DependencyWindow.java:28)com.lenovo.monitor.function.window.DependencyWindow.apply(DependencyWindow.java:18)org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:547)org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:501)org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)org.apache.flink.streaming.api.operators.InternalTimerServiceImpl$$Lambda$643/1062678580.onProcessingTime(Unknown Source)org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$351/90193008.run(Unknown Source)org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:283)org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184)org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)java.lang.Thread.run(Thread.java:748)
org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 600 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1573) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]2021-02-10 00:40:23,807 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 600 + seconds.org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 600 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1573) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]2021-02-10 00:40:23,807 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Task did not exit gracefully within 600 + seconds.org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 600 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1573) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]2021-02-10 00:40:23,835 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 600 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1573) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]2021-02-10 00:40:23,835 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 600 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1573) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]2021-02-10 00:40:23,835 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner [] - Fatal error occurred while executing the TaskManager. Shutting it down...org.apache.flink.util.FlinkRuntimeException: Task did not exit gracefully within 600 + seconds. at org.apache.flink.runtime.taskmanager.Task$TaskCancelerWatchDog.run(Task.java:1573) [flink-dist_2.11-1.12.0.jar:1.12.0] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]
Flink Web App Exceptions
2021-02-10 00:29:22java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 10.122.84.153:45797-c5e7f7 timed out.at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1239)at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:404)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:197)at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
异常原因
由TaskManager的第二段日志片段,我们可以看到TaskManager 出现Shutting it down,证明它的确挂掉了,原因是由于Task在10分钟以内都未正常退出,那为什么没有退出呢?
我们可以往上继续排查,来到TaskManager的第一个日志片段,从这个片段中知道Task在30秒内没有对取消命令做出响应,可以看到第一个日志片段中有个关键词stuck,意味着阻塞,阻塞应该在哪里呢?继续查看堆栈信息,发现==com.lenovo.monitor.util.DbUtils.select(DbUtils.java:121)==用户在进行数据库查询时,出现了阻塞,那这个不是广大程序员经常会遇到的问题么,大多数就是连接池的数据库连接被耗尽,要么就是数据表死锁。
解决方法
对程序逻辑进行调整,大家可以根据自己的实际情况进行解决。这里我的情况是连接池的数据库连接被消耗完了,减少对数据库连接的获取,增加连接池的数据库连接数。
异常10:The last packet sent successfully to the server was XXX,XXX milliseconds ago.
异常说明
flink应用启动一段时间后出现报错,应用被终止。
Flink Web App Exceptions
java.lang.Exception: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last packet successfully received from the server was 413,155 milliseconds ago. The last packet sent successfully to the server was 413,162 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:955)at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1005)at com.lenovo.monitor.util.DbUtils.queryForList(DbUtils.java:80)at com.lenovo.monitor.function.sink.rich.ConsistentRichSink.invoke(ConsistentRichSink.java:62)at com.lenovo.monitor.function.sink.rich.ConsistentRichSink.invoke(ConsistentRichSink.java:39)at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)at java.lang.Thread.run(Thread.java:748)Caused by: com.mysql.cj.exceptions.CJCommunicationsException: The last packet successfully received from the server was 413,155 milliseconds ago. The last packet sent successfully to the server was 413,162 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:562)at com.mysql.cj.protocol.a.NativeProtocol.checkErrorMessage(NativeProtocol.java:732)at com.mysql.cj.protocol.a.NativeProtocol.sendCommand(NativeProtocol.java:671)at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:986)at com.mysql.cj.NativeSession.execSQL(NativeSession.java:1168)at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:932)... 16 moreCaused by: java.io.EOFException: Can not read response from server. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:67)at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:63)at com.mysql.cj.protocol.a.SimplePacketReader.readHeader(SimplePacketReader.java:45)at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:52)at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readHeader(TimeTrackingPacketReader.java:41)at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:54)at com.mysql.cj.protocol.a.MultiPacketReader.readHeader(MultiPacketReader.java:44)at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:556)... 21 moreat com.lenovo.monitor.util.DbUtils.queryForList(DbUtils.java:99)at com.lenovo.monitor.function.sink.rich.ConsistentRichSink.invoke(ConsistentRichSink.java:62)at com.lenovo.monitor.function.sink.rich.ConsistentRichSink.invoke(ConsistentRichSink.java:39)at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)at java.lang.Thread.run(Thread.java:748)
异常原因
根据日志信息片段==is longer than the server configured value of ‘wait_timeout’==可得知连接池中的连接空闲时间,超过了数据库中配置的空闲超时时间,连接池认为此连接还是有效,但数据库已经放弃了该空闲连接,导致报错。
解决方法
我们尽量不变动数据库端的超时时间,主要针对连接池进行调整。
- 连接池调用setTestWhileIdle()方法,检测空闲的连接对象是否还有效。
- 设置连接池对象的minEvictableIdleTimeMillis最小空闲时间,达到此值的空闲连接可能会被移除。
异常11:java.lang.RuntimeException: Request cannot be executed; I/O reactor status: STOPPED
异常说明
flink的应用运行一段时间后,从web界面的exception中发现如下错误。
Flink Web App Exceptions
2021-06-02 11:24:40java.lang.RuntimeException: Request cannot be executed; I/O reactor status: STOPPEDat org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:831)at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1609)at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1579)at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1549)at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:529)at com.lenovo.monitor.function.sink.rich.ConsistentRichSink.invoke(ConsistentRichSink.java:66)at com.lenovo.monitor.function.sink.rich.ConsistentRichSink.invoke(ConsistentRichSink.java:39)at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)at java.lang.Thread.run(Thread.java:748)Caused by: java.lang.IllegalStateException: Request cannot be executed; I/O reactor status: STOPPEDat org.apache.http.util.Asserts.check(Asserts.java:46)at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.ensureRunning(CloseableHttpAsyncClientBase.java:90)at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:123)at org.elasticsearch.client.RestClient.performRequest(RestClient.java:244)... 19 more
异常原因
从web应用日志中的信息Request cannot be executed; I/O reactor status: STOPPED,可知道Elasticsearch的RestHighLevelClient客户端方法调用时,客户端链接已经被close了,在服务器上重新提交应用时,出现以下错误信息
Error occurred during initialization of VM
java.lang.OutOfMemoryError: unable to create new native thread
应用提交失败,网上搜索一下该错误,知道linux中存在最大线程数max_user_processes的限制,使用ulimit -a发现服务器上max_user_processes限制为4096,使用ps -ef | wc -l查询当前使用线程数,已经接近于最大限制数,无法再创建更多的线程。
解决方法
- 修改max_user_processes的值,修改方法可参考如下链接
linux下修改max_user_processes和open_file的最大值 - 检查程序中是否创建了很多无用线程,没有及时关闭,导致线程数量不断增加。