0%

flink定制jvm监控metrics

flink定制jvm的metaspace等监控指标

起因

项目中使用了Flink计算框架去针对实时数据做分析. 我们基于目前最新的Flink1.10.1版本进行开发, 定制自己的业务处理算子逻辑. 开发中我们多次遇到一个问题:java.lang.OutOfMemoryError:Metaspace. 其发生的场景很不规律, 经过多次观察日志发现, 似乎集中发生在频繁重新提交新的flink执行任务到task manager并且停止旧的任务的场景下.

具体原因很有可能就是flink框架对同一个task manager执行了任务重新分配时, jvm并不是重新启动的. 由于一个task manager的jvm每次都会通过Janino 动态重新加载job manager发过来的任务代码解析执行, 此时如果代码编写不当或者框架本身出现内存泄漏,那么将迟早遇见这个metaspace的oom错误.

关于框架可能出现的问题, 在https://issues.apache.org/jira/browse/FLINK-16225https://issues.apache.org/jira/browse/FLINK-11205 中有详细的讨论.

ps: 最新的进展是这个问题声称被修复了: https://issues.apache.org/jira/browse/FLINK-16408, 修复的发布版本是1.11.0, 期待ing…

虽然框架层面声称解决了问题, 但是代码层面是我们自己写的啊, 你无法保证一个错误的写法不会导致 mataspace的 oom. 为此我们的需求就很简单, 持续监控各个task manager的jvm内存信息, 通过prometheus监控系统把指标放到grafana中观察, 结合日志等信息去定位oom前后都发生了什么.

flink提供了很全面的metrics指标系统, 但是却偏偏没有jvm的metaspace信息指标, 只有nonheap和heap的指标. 虽然这个指标也包含了metaspace信息,但是有点笼统了, 我就想知道确切的metaspace内存用量.

思路

同事说, 既然没有办法获取, 那我们就利用jmx自己去收集, 再搞的jmx exporter 发送给prometheus不就行了吗? 诚然, 这样做没毛病. 但是我们要搞出好几个依赖来,维护成本有点大, 另外, 既然flink提供了heap等jvm信息, 那么暴露metaspace信息不也应该是顺手的事情吗? 想到这里, 那我们看看官方源码先:

flink的源码中对于监控指标暴露设计的很好, flink-metrics目录下定义了多种metrics收集信息的转换逻辑, 而产生metrics的定义不在这里. 在flink-runtimesrc/main/java/org.apache.flink.runtimemetircs目录中定义了监控指标如何生成的.

我们这里关心的jvm信息收集定义在util/MetricUtils.java中.

1
2
3
4
5
6
7
8
9
10
public static void instantiateStatusMetrics(
MetricGroup metricGroup) {
MetricGroup jvm = metricGroup.addGroup("JVM");

instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"));
instantiateGarbageCollectorMetrics(jvm.addGroup("GarbageCollector"));
instantiateMemoryMetrics(jvm.addGroup("Memory"));
instantiateThreadMetrics(jvm.addGroup("Threads"));
instantiateCPUMetrics(jvm.addGroup("CPU"));
}

instantiateStatusMetrics中调用了内存信息收集方法instantiateMemoryMetrics(jvm.addGroup("Memory"));, 跳到其中可见第1,2行调用了获取nonheap和heap信息的方法:

1
2
3
4
5
6
7
8
9
private static void instantiateMemoryMetrics(MetricGroup metrics) {
instantiateHeapMemoryMetrics(metrics.addGroup(METRIC_GROUP_HEAP_NAME));
instantiateNonHeapMemoryMetrics(metrics.addGroup(METRIC_GROUP_NONHEAP_NAME));

final MBeanServer con = ManagementFactory.getPlatformMBeanServer();

final String directBufferPoolName = "java.nio:type=BufferPool,name=direct";

...

这两个函数定义是:

1
2
3
4
5
6
7
8
9
@VisibleForTesting
static void instantiateHeapMemoryMetrics(final MetricGroup metricGroup) {
instantiateMemoryUsageMetrics(metricGroup, () -> ManagementFactory.getMemoryMXBean().getHeapMemoryUsage());
}

@VisibleForTesting
static void instantiateNonHeapMemoryMetrics(final MetricGroup metricGroup) {
instantiateMemoryUsageMetrics(metricGroup, () -> ManagementFactory.getMemoryMXBean().getNonHeapMemoryUsage());
}

@VisibleForTesting?当前的内存信息暴露只是测试版?好吧…

两者都去调用了instantiateMemoryUsageMetrics以暴露used,committed,max信息

1
2
3
4
5
private static void instantiateMemoryUsageMetrics(final MetricGroup metricGroup, final Supplier<MemoryUsage> memoryUsageSupplier) {
metricGroup.<Long, Gauge<Long>>gauge(MetricNames.MEMORY_USED, () -> memoryUsageSupplier.get().getUsed());
metricGroup.<Long, Gauge<Long>>gauge(MetricNames.MEMORY_COMMITTED, () -> memoryUsageSupplier.get().getCommitted());
metricGroup.<Long, Gauge<Long>>gauge(MetricNames.MEMORY_MAX, () -> memoryUsageSupplier.get().getMax());
}

看到这里我就发现, 问题好办了, 用了ManagementFactory是吧? matasapce也可以轻松拿到啊.开撸吧.

改造

首先构造一个获取各种jvm内存信息的方法, 有关used,committed,max的信息生成也通过instantiateMemoryUsageMetrics

1
2
3
4
5
6
7
@VisibleForTesting
private static void instantiateMemoryDetailMetrics(MetricGroup metrics) {
List<MemoryPoolMXBean> pools = ManagementFactory.getMemoryPoolMXBeans();
for (final MemoryPoolMXBean pool: pools) {
instantiateMemoryUsageMetrics(metrics.addGroup(pool.getName()), pool::getUsage);
}
}

再在instantiateMemoryMetrics中添加我们定义的方法调用即可

1
2
3
4
5
6
private static void instantiateMemoryMetrics(MetricGroup metrics) {
instantiateHeapMemoryMetrics(metrics.addGroup(METRIC_GROUP_HEAP_NAME));
instantiateNonHeapMemoryMetrics(metrics.addGroup(METRIC_GROUP_NONHEAP_NAME));

instantiateMemoryDetailMetrics(metrics);
...

编译

完成了我们就编译一下,上线部署即可了. 但是编译的时候却碰到了一些坑.

  1. IDEA中导入flink依赖出现
    Cannot resolve io.confluent:kafka-schema-registry-client:3.3.1错误.

    解决办法:

    1
    2
    $ wget http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar`
    $ mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=./kafka-schema-registry-client-3.3.1.jar

    flink编译中会执行语法检查, 对于import xxx.*这种语法会报错拒绝编译.

  2. 执行mvn clean package -DskipTests出现语法不规范的错误提示
    这个问题其实不是坑,而是我们代码不规范! 可见代码规范很重要, 越大的项目越是在乎, 专门通过mvn做了检查. 比如: for (final GarbageCollectorMXBean garbageCollector: garbageCollectors){ 这个for(之间必须有个空格; import依赖必须按照字典顺序排序等.
    ps: 此处吐槽一下, 每次编译十多分钟, 苦等半天你告诉我就因为代码不规范…我吐血哦…

FQA
Q: 有人问, 我只做了生成metrics, 那么怎么暴露的, 怎么转为prometheus的?
A: 这个就要看flink-metrics中代码了, 其实我们什么也不用做, 其会根据配置使用对应的监控数据格式将所有metrics转换后暴露出去. 接口设计的很nice.

Q: 有做好的grafana看板吗?我想做个伸手党.
A: 有, 我也是站在巨人的肩膀上, 改进了下. 我想你也可以, 等我有空了整理一套prometheus的各种看板一起贴出.