flink定制jvm的metaspace等监控指标
起因
项目中使用了Flink计算框架去针对实时数据做分析. 我们基于目前最新的Flink1.10.1版本进行开发, 定制自己的业务处理算子逻辑. 开发中我们多次遇到一个问题:java.lang.OutOfMemoryError:Metaspace
. 其发生的场景很不规律, 经过多次观察日志发现, 似乎集中发生在频繁重新提交新的flink执行任务到task manager并且停止旧的任务的场景下.
具体原因很有可能就是flink框架对同一个task manager执行了任务重新分配时, jvm并不是重新启动的. 由于一个task manager的jvm每次都会通过Janino 动态重新加载job manager发过来的任务代码解析执行, 此时如果代码编写不当或者框架本身出现内存泄漏,那么将迟早遇见这个metaspace的oom错误.
关于框架可能出现的问题, 在https://issues.apache.org/jira/browse/FLINK-16225 和 https://issues.apache.org/jira/browse/FLINK-11205 中有详细的讨论.
ps: 最新的进展是这个问题声称被修复了: https://issues.apache.org/jira/browse/FLINK-16408, 修复的发布版本是1.11.0, 期待ing…
虽然框架层面声称解决了问题, 但是代码层面是我们自己写的啊, 你无法保证一个错误的写法不会导致 mataspace的 oom. 为此我们的需求就很简单, 持续监控各个task manager的jvm内存信息, 通过prometheus监控系统把指标放到grafana中观察, 结合日志等信息去定位oom前后都发生了什么.
flink提供了很全面的metrics指标系统, 但是却偏偏没有jvm的metaspace信息指标, 只有nonheap和heap的指标. 虽然这个指标也包含了metaspace信息,但是有点笼统了, 我就想知道确切的metaspace内存用量.
思路
同事说, 既然没有办法获取, 那我们就利用jmx自己去收集, 再搞的jmx exporter 发送给prometheus不就行了吗? 诚然, 这样做没毛病. 但是我们要搞出好几个依赖来,维护成本有点大, 另外, 既然flink提供了heap等jvm信息, 那么暴露metaspace信息不也应该是顺手的事情吗? 想到这里, 那我们看看官方源码先:
flink的源码中对于监控指标暴露设计的很好, flink-metrics
目录下定义了多种metrics收集信息的转换逻辑, 而产生metrics的定义不在这里. 在flink-runtime
下src/main/java/org.apache.flink.runtime
的metircs
目录中定义了监控指标如何生成的.
我们这里关心的jvm信息收集定义在util/MetricUtils.java
中.
1 | public static void instantiateStatusMetrics( |
在instantiateStatusMetrics
中调用了内存信息收集方法instantiateMemoryMetrics(jvm.addGroup("Memory"));
, 跳到其中可见第1,2行调用了获取nonheap和heap信息的方法:
1 | private static void instantiateMemoryMetrics(MetricGroup metrics) { |
这两个函数定义是:
1 |
|
@VisibleForTesting
?当前的内存信息暴露只是测试版?好吧…
两者都去调用了instantiateMemoryUsageMetrics
以暴露used,committed,max
信息
1 | private static void instantiateMemoryUsageMetrics(final MetricGroup metricGroup, final Supplier<MemoryUsage> memoryUsageSupplier) { |
看到这里我就发现, 问题好办了, 用了ManagementFactory
是吧? matasapce也可以轻松拿到啊.开撸吧.
改造
首先构造一个获取各种jvm内存信息的方法, 有关used,committed,max
的信息生成也通过instantiateMemoryUsageMetrics
1 |
|
再在instantiateMemoryMetrics
中添加我们定义的方法调用即可
1 | private static void instantiateMemoryMetrics(MetricGroup metrics) { |
编译
完成了我们就编译一下,上线部署即可了. 但是编译的时候却碰到了一些坑.
IDEA中导入flink依赖出现
Cannot resolve io.confluent:kafka-schema-registry-client:3.3.1
错误.解决办法:
1
2$ wget http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar`
$ mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=./kafka-schema-registry-client-3.3.1.jarflink编译中会执行语法检查, 对于
import xxx.*
这种语法会报错拒绝编译.执行
mvn clean package -DskipTests
出现语法不规范的错误提示
这个问题其实不是坑,而是我们代码不规范! 可见代码规范很重要, 越大的项目越是在乎, 专门通过mvn做了检查. 比如:for (final GarbageCollectorMXBean garbageCollector: garbageCollectors){
这个for
和(
之间必须有个空格; import依赖必须按照字典顺序排序等.
ps: 此处吐槽一下, 每次编译十多分钟, 苦等半天你告诉我就因为代码不规范…我吐血哦…
FQA
Q: 有人问, 我只做了生成metrics, 那么怎么暴露的, 怎么转为prometheus的?
A: 这个就要看flink-metrics
中代码了, 其实我们什么也不用做, 其会根据配置使用对应的监控数据格式将所有metrics转换后暴露出去. 接口设计的很nice.
Q: 有做好的grafana看板吗?我想做个伸手党.
A: 有, 我也是站在巨人的肩膀上, 改进了下. 我想你也可以, 等我有空了整理一套prometheus的各种看板一起贴出.