irpas技术客

Flink中使用Kryo序列化器的注意事项_悟初境_flink kryo序列化

大大的周 6838

你以为我要说的是在Flink中使用Kryo序列化吗?不是的,还记得上一篇关于Kryo序列化的问题的文章:Kryo序列化:Class Not Found的可能原因.

里面介绍了因为在Spark环境下由于类加载器原因导致Kryo反序列化时找不到类的问题。

没错,还有续集。这次是在Flink下,也出现了同样的问题。

问题复现

见如下代码,是Flink提交给YARN的主函数类,里面反序列化一个 StreamParam的参数类。这个类就在提交的jar包里。 (KryoSerializer是我们自己封装了下Kryo,里面还是Kryo实例。)

object MyFlinkDriver { def main(args: Array[String]): Unit = { Assert.paramMiss(args.length > 0, "StreamParam JsonString") val param = KryoSerializer.deserialize( EncodeUtil.base64DecodeBytes(args(0)) ).asInstanceOf[StreamParam] val res = param.streamResource val env = getStreamExecutionEnv(res) new StreamGraphExecutor(param.streamGraph, param.config, env).execute() env.execute(res.getTaskId) } }

在本地单测运行是没问题的,在服务器会发现类找不到。

Caused by: java.lang.ClassNotFoundException: com.jimo.sdk.core.analyze.stream.StreamParam at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[?:1.8.0_66] at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_66] at org.springframework.boot.loader.LaunchedURLClassLoader.loadClass(LaunchedURLClassLoader.java:151) ~[just-cmc.jar:2.2.3-SNAPSHOT] at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_66] at java.lang.Class.forName0(Native Method) ~[?:1.8.0_66] at java.lang.Class.forName(Class.java:348) ~[?:1.8.0_66] at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ~[kryo-2.24.0.jar!/:?] at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) ~[kryo-2.24.0.jar!/:?] at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[kryo-2.24.0.jar!/:?] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[kryo-2.24.0.jar!/:?] at com.jimo.sdk.core.serialize.KryoSerializer.deserialize(KryoSerializer.java:59) ~[?:?] at com.jimo.executor.stream.MyFlinkDriver$.main(MyFlinkDriver.scala:19) ~[?:?]

同一个地方,同一个报错,应该是同一个原因吧。

设置类加载器

所以我们就设置类加载器吧。

KryoSerializer.setClassLoader(MyFlinkDriver.getClass.getClassLoader) val param = KryoSerializer.deserialize( EncodeUtil.base64DecodeBytes(args(0)) ).asInstanceOf[StreamParam]

当然,问题解决了,就这么简单。不过事情还没结束。

用的什么类加载器?

用的是Flink的 FlinkUserCodeClassLoader。Flink的自定义类加载器分为 parent-first和child-first,可以通过配置文件配置。 在我们这个场景下,需要配成 parent-first,不然jar包里的类是Flink的类加载器加载的,而序列化时用的是 AppClassLoader,会导致反序列化的实例不是同一个,因为是不同的类加载器加载的。

总结 将flink的flink-conf.yaml的classloader.resolve-order改为parent-first,不然类加载器导致反序列化不是同一个类同时要将Kryo的ClassLoader设置成Flink的类加载器,否则找不到类


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #kryo序列化 #not #没错还有续集