flink消费kafka,使用自定义实现kafka的消息反序列化,报错

org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:484)
    at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:190)
    at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:638)
    at com.liujun.flink.demo.QR_LOAD_Kafka_TO_ORACLE.main(QR_LOAD_Kafka_TO_ORACLE.java:65)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'interface org.apache.flink.streaming.api.functions.source.ParallelSourceFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information.
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:827)
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:747)
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:713)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2347)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1713)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1701)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1663)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1645)
    at com.liujun.flink.demo.QR_LOAD_Kafka_TO_ORACLE.main(QR_LOAD_Kafka_TO_ORACLE.java:60)

解决办法:以下方法默认null改为:BasicTypeInfo.STRING_TYPE_INFO

@Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

 

原文地址:http://www.cnblogs.com/qsds/p/16784626.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性