活动介绍
file-type

Flink连接器Kudu:支持动态数据源与范围分区功能

下载需积分: 50 | 162KB | 更新于2024-12-24 | 64 浏览量 | 4 评论 | 14 下载量 举报 2 收藏
download 立即下载
是一个基于Apache Bahir项目中的Kudu连接器进行改造的Flink连接器,旨在满足公司内部对于Kudu数据库连接和操作的需求。该连接器特别支持了Flink 1.11.x版本的动态数据源和数据接收器(Dynamic Table Source Sink)的功能,并且加入了范围分区(range partitioning)的特性,允许用户定义哈希分桶数(hash buckets)以优化数据存储和访问。这个改造版本不仅增强了原有的功能,还贡献了部分改进的功能回Apache Flink社区。 在Apache Flink中使用Kudu连接器可以实现高效的数据持久化和处理。Kudu是一个开源的分布式列式存储系统,适用于快速和可预测的数据分析,特别适合于OLAP(在线分析处理)工作负载。它被设计用于支持快速插入和更新,并且能够在同一数据集上执行高效的扫描和聚合操作。Flink是一个开源的流处理框架,用于处理高吞吐量的数据流,支持有状态的计算和精确一次的状态一致性。 以下是一些关键知识点的详细说明: 1. Kudu连接器改造背景: Kudu连接器的改造主要是为了解决公司在使用Flink与Kudu集成时的特定需求,特别是需要支持动态数据源功能和更灵活的数据分区策略。这一改造基于Apache Bahir项目中的Kudu连接器进行了深化,加入了一些企业级的特性,如范围分区和哈希分桶数的定义。 2. 支持的Flink版本和特性: 连接器支持Flink 1.11.x版本,这是Flink社区稳定发布的重要版本之一。在该版本中,Flink引入了Dynamic Table Source Sink的概念,即动态数据源和数据接收器的概念,这允许在流处理应用中动态地添加或移除数据源和数据接收器,大大提高了应用的灵活性和扩展性。flink-connector-kudu完美地融入了这一特性,为用户提供了更加灵活和强大的数据处理能力。 3. 范围分区(Range Partitioning): 范围分区是数据库系统中常见的分区策略之一,它允许根据数据行的某个或某些字段的值范围来将数据分散存储在不同的分区中。在Kudu连接器中实现的范围分区特性,可以帮助用户按照业务逻辑或数据访问模式将数据分组存储,提高查询效率并优化存储结构。 4. 哈希分桶(Hash Buckets): 哈希分桶是一种基于哈希函数的分桶策略,通过为数据项生成一个哈希值,然后将数据根据哈希值分配到不同的分桶中。在Kudu连接器中,用户可以定义分桶的数量,这有助于在分布式环境中的负载均衡和查询优化。 5. 使用方法和配置: 为了在Flink项目中使用该Kudu连接器,用户需要按照以下步骤操作: - 克隆代码库。 - 修改pom.xml文件中的项目坐标,以便使用公司私有服务器上的依赖。 - 在Java代码中设置Kudu目录和环境,创建StreamExecutionEnvironment实例,然后配置KuduCatalog实例,并将其注册到Flink的TableEnvironment中。 6. 标签解析: - kudu:指的是Apache Kudu数据库。 - flink:指的是Apache Flink流处理框架。 - datastream:指的是Flink中的流式数据处理概念。 - flink-sql:指的是Flink提供的SQL API,用于执行SQL查询。 - Java:指的是连接器的开发语言,也是用户实现业务逻辑的主要编程语言。 通过以上知识点的讲解,我们可以看到flink-connector-kudu连接器在Apache Flink社区中的重要位置以及在实现数据处理、存储和查询中的关键作用。它的开发和贡献不仅是对社区的贡献,同时也极大地丰富了数据处理生态系统,为更多用户带来了便利。

相关推荐

filetype

2025-03-28 14:36:35,936 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: tran_data_log[1] -> DropUpdateBefore[2] -> ConstraintEnforcer[3] -> kafka_sink[3]: Writer -> kafka_sink[3]: Committer (1/1)#5 (f5c075ff51859f30eba349e73094fdff_cbc357ccb763df2852fee8c4fc7d55f2_0_5) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.KafkaException: Failed to construct kafka producer at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:473) ~[kafka-clients-3.7.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:294) ~[kafka-clients-3.7.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:321) ~[kafka-clients-3.7.0.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:306) ~[kafka-clients-3.7.0.jar:?] at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:55) ~[flink-connector-kafka-1.17.2.jar:1.17.2] at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:182) ~[flink-connector-kafka-1.17.2.jar:1.17.2] at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111) ~[flink-connector-kafka-1.17.2.jar:1.17.2] at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57) ~[flink-connector-kafka-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[flink-dist-1.17.2.jar:1.17.2] at org.apac

filetype

Starting Job Manager [ERROR] The execution result is empty. [ERROR] Could not get JVM parameters and dynamic configurations properly. [ERROR] Raw output from BashJavaUtils: INFO [] - Using standard YAML parser to load flink configuration file from /opt/flink/conf/config.yaml. ERROR [] - Failed to parse YAML configuration org.snakeyaml.engine.v2.exceptions.YamlEngineException: expected '<document start>', but found '<scalar>' in reader, line 1, column 9 at org.snakeyaml.engine.v2.parser.ParserImpl$ParseDocumentStart.produce(ParserImpl.java:493) ~[flink-dist-2.0.0.jar:2.0.0] at org.snakeyaml.engine.v2.parser.ParserImpl.lambda$produce$1(ParserImpl.java:232) ~[flink-dist-2.0.0.jar:2.0.0] at java.util.Optional.ifPresent(Unknown Source) ~[?:?] at org.snakeyaml.engine.v2.parser.ParserImpl.produce(ParserImpl.java:232) ~[flink-dist-2.0.0.jar:2.0.0] at org.snakeyaml.engine.v2.parser.ParserImpl.peekEvent(ParserImpl.java:206) ~[flink-dist-2.0.0.jar:2.0.0] at org.snakeyaml.engine.v2.parser.ParserImpl.checkEvent(ParserImpl.java:198) ~[flink-dist-2.0.0.jar:2.0.0] at org.snakeyaml.engine.v2.composer.Composer.getSingleNode(Composer.java:131) ~[flink-dist-2.0.0.jar:2.0.0] at org.snakeyaml.engine.v2.api.Load.loadOne(Load.java:110) ~[flink-dist-2.0.0.jar:2.0.0] at org.snakeyaml.engine.v2.api.Load.loadFromInputStream(Load.java:123) ~[flink-dist-2.0.0.jar:2.0.0] at org.apache.flink.configuration.YamlParserUtils.loadYamlFile(YamlParserUtils.java:100) [flink-dist-2.0.0.jar:2.0.0] at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:252) [flink-dist-2.0.0.jar:2.0.0] at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:150) [flink-dist-2.0.0.jar:2.0.0] at org.apache.flink.runtime.util.ConfigurationParserUtils.loadCommonConfiguration(ConfigurationParserUtils.java:153) [flink-dist-2.0.0.jar:2.0.0] at org.apache.flink.runtime.util.bash.FlinkConfigLoader.loadConfiguration(FlinkConfigLoader.java:41) [flink-dist-2.0.0.jar:2.24.1] at org.apache.flink.runtime.util.bash.BashJavaUtils.runCommand(BashJavaUtils.java:66) [bash-java-utils.jar:2.24.1] at org.apache.flink.runtime.util.bash.BashJavaUtils.main(BashJavaUtils.java:54) [bash-java-utils.jar:2.24.1] Exception in thread "main" java.lang.RuntimeException: Error parsing YAML configuration. at org.apache.flink.configuration.GlobalConfiguration.loadYAMLResource(GlobalConfiguration.java:257) at org.apache.flink.configuration.GlobalConfiguration.loadConfiguration(GlobalConfiguration.java:150) at org.apache.flink.runtime.util.ConfigurationParserUtils.loadCommonConfiguration(ConfigurationParserUtils.java:153) at org.apache.flink.runtime.util.bash.FlinkConfigLoader.loadConfiguration(FlinkConfigLoader.java:41)--- services: kafka-0: image: apache/kafka:3.9.1 container_name: kafka-0 ports: - "19092:9092" - "19093:9093" environment: KAFKA_NODE_ID: 1 KAFKA_PROCESS_ROLES: broker,controller KAFKA_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:19093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092,CONTROLLER://localhost:19093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_LOG_DIRS: /var/lib/kafka/data KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" volumes: - ./kafka/conf/log4j.properties:/opt/kafka/config/log4j.properties - ./kafka/0/conf/kraft/server.properties:/opt/kafka/config/kraft/server.properties - ./kafka/0/data:/var/lib/kafka/data command: - sh - -c - > if [ ! -f /var/lib/kafka/data/meta.properties ]; then # 生成随机 UUID 并格式化存储(-c 指定配置文件路径) /opt/kafka/bin/kafka-storage.sh format \ -t $(/opt/kafka/bin/kafka-storage.sh random-uuid) \ -c /opt/kafka/config/kraft/server.properties fi exec /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties healthcheck: test: - CMD - kafka-broker-api-versions.sh - --bootstrap-server - localhost:19092 interval: 10s timeout: 10s retries: 5 networks: - datacamp-net flink-jobmanager-0: image: flink:2.0.0-java17 container_name: flink-jobmanager-0 ports: - "18081:8081" environment: FLINK_PROPERTIES: | jobmanager.rpc.address: flink-jobmanager-0 state.backend: filesystem state.checkpoints.dir: file:///tmp/flink-checkpoints heartbeat.interval: 1000 heartbeat.timeout: 5000 rest.flamegraph.enabled: true web.upload.dir: /opt/flink/usrlib volumes: - ./flink/jobmanager/conf:/opt/flink/conf - ./flink/jobmanager/0/flink-checkpoints:/tmp/flink-checkpoints - ./flink/jobmanager/0/usrlib:/opt/flink/usrlib command: jobmanager healthcheck: test: - CMD - curl - -f - http://localhost:8081 interval: 15s timeout: 5s retries: 10 networks: - datacamp-net flink-taskmanager-0: image: flink:2.0.0-java17 container_name: flink-taskmanager-0 environment: FLINK_PROPERTIES: | jobmanager.rpc.address: flink-jobmanager-0 taskmanager.numberOfTaskSlots: 2 state.backend: filesystem state.checkpoints.dir: file:///tmp/flink-checkpoints heartbeat.interval: 1000 heartbeat.timeout: 5000 volumes: - ./flink/taskmanager/conf:/opt/flink/conf - ./flink/taskmanager/0/flink-checkpoints:/tmp/flink-checkpoints - ./flink/taskmanager/0/usrlib:/opt/flink/usrlib command: taskmanager depends_on: flink-jobmanager-0: condition: service_healthy networks: - datacamp-net flink-taskmanager-1: image: flink:2.0.0-java17 container_name: flink-taskmanager-1 environment: FLINK_PROPERTIES: | jobmanager.rpc.address: flink-jobmanager-0 taskmanager.numberOfTaskSlots: 2 state.backend: filesystem state.checkpoints.dir: file:///tmp/flink-checkpoints heartbeat.interval: 1000 heartbeat.timeout: 5000 volumes: - ./flink/taskmanager/conf:/opt/flink/conf - ./flink/taskmanager/1/flink-checkpoints:/tmp/flink-checkpoints - ./flink/taskmanager/1/usrlib:/opt/flink/usrlib command: taskmanager depends_on: flink-jobmanager-0: condition: service_healthy networks: - datacamp-net networks: datacamp-net: driver: bridge

filetype

2023-07-13 09:15:56,872 WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [] - Unhandled exception java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:1.8.0_372] at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:1.8.0_372] at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[?:1.8.0_372] at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[?:1.8.0_372] at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[?:1.8.0_372] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) [flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [flink-dist-1.15.3.jar:1.15.3] at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist-1.15.3.jar:1.15.3] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]

filetype

我本地idea上flink任务可以正常消费Kafka,但是上yarn集群就报如下错误: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:97) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:185) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:259) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:148) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:315) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:626) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:596) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) ~[bz-sport-realtime-1.0-SNAPSHOT.jar:?] at org.apache.flink.streaming.runtime.tasks.Regula

filetype

flink doris connector 采集任务报错 2025-07-21 18:02:13,730 ERROR io.debezium.pipeline.ErrorHandler [] - Producer failure io.debezium.DebeziumException: java.lang.NullPointerException at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85) ~[flink-sql-connector-mysql-cdc-2.4.2.jar:2.4.2] at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:155) ~[flink-sql-connector-mysql-cdc-2.4.2.jar:2.4.2] at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:137) ~[flink-sql-connector-mysql-cdc-2.4.2.jar:2.4.2] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109) ~[flink-sql-connector-mysql-cdc-2.4.2.jar:2.4.2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_212] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_212] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_212] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212] Caused by: java.lang.NullPointerException at io.debezium.connector.oracle.OracleSnapshotChangeEventSource.createSchemaChangeEventsForTables(OracleSnapshotChangeEventSource.java:230) ~[flink-sql-connector-oracle-cdc-2.4.2.jar:2.4.2] at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:121) ~[flink-sql-connector-mysql-cdc-2.4.2.jar:2.4.2] at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76) ~[flink-sql-connector-mysql-cdc-2.4.2.jar:2.4.2] ... 8 more 2025-07-21 18:02:13,736 INFO io.debezium.pipeline.ChangeEventSourceCoordinator [] - Connected metrics set to 'false' 2025-07-21 18:02:14,138 INF

filetype

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not deploy Yarn job cluster. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) ~[flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) ~[flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:842) ~[flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) ~[flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1084) ~[flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at com.huawei.bigdata.job.action.FlinkClient.lambda$flinkClientSubmit$0(FlinkClient.java:64) ~[executor-job-flink-1.0.jar:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_372] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761) [flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at com.huawei.bigdata.job.action.FlinkClient.flinkClientSubmit(FlinkClient.java:64) [executor-job-flink-1.0.jar:?] at com.huawei.bigdata.job.action.FlinkMain.runJob(FlinkMain.java:200) [executor-job-flink-1.0.jar:?] at com.huawei.bigdata.job.action.LauncherMain.submit(LauncherMain.java:93) [executor-job-core-1.0.jar:?] at com.huawei.bigdata.job.action.LauncherMain.run(LauncherMain.java:49) [executor-job-core-1.0.jar:?] at com.huawei.bigdata.job.action.FlinkMain.main(FlinkMain.java:108) [executor-job-flink-1.0.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_372] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_372] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_372] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_372] at com.huawei.bigdata.job.LauncherAM.run(LauncherAM.java:126) [executor-job-core-1.0.jar:?] at com.huawei.bigdata.job.LauncherAM$1.run(LauncherAM.java:105) [executor-job-core-1.0.jar:?] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_372] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1761) [flink-dist_2.11-1.12.2-hw-ei-312091.jar:1.12.2-hw-ei-312091] at com.huawei.bigdata.job.LauncherAM.main(LauncherAM.java:101) [executor-job-core-1.0.jar:?] Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.

filetype

C:\Users\admin\.jdks\ms-11.0.27\bin\java.exe "-javaagent:D:\软件\Jetb\IntelliJ IDEA 2025.1.2\lib\idea_rt.jar=59847" -Dfile.encoding=UTF-8 -classpath D:\软件\Jetb\tanshanxueyuan\tanshanxueyuan\target\classes;C:\Users\admin\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\admin\.m2\repository\org\apache\flink\force-shading\1.13.6\force-shading-1.13.6.jar;C:\Users\admin\.m2\repository\org\xerial\snappy\snappy-java\1.1.8.3\snappy-java-1.1.8.3.jar;C:\Users\admin\.m2\repository\org\lz4\lz4-java\1.6.0\lz4-java-1.6.0.jar;C:\Users\admin\.m2\repository\org\apache\flink\flink-connector-kafka_2.12\1.13.6\flink-connector-kafka_2.12-1.13.6.jar;C:\Users\admin\.m2\repository\org\apache\kafka\kafka-clients\2.4.1\kafka-clients-2.4.1.jar;C:\Users\admin\.m2\repository\com\github\luben\zstd-jni\1.4.3-1\zstd-jni-1.4.3-1.jar;C:\Users\admin\.m2\repository\org\apache\flink\flink-connector-base\1.13.6\flink-connector-base-1.13.6.jar;C:\Users\admin\.m2\repository\org\apache\flink\flink-statebackend-rocksdb_2.12\1.13.6\flink-statebackend-rocksdb_2.12-1.13.6.jar;C:\Users\admin\.m2\repository\com\ververica\frocksdbjni\5.17.2-ververica-2.1\frocksdbjni-5.17.2-ververica-2.1.jar;C:\Users\admin\.m2\repository\mysql\mysql-connector-java\8.0.20\mysql-connector-java-8.0.20.jar;C:\Users\admin\.m2\repository\com\google\protobuf\protobuf-java\3.6.1\protobuf-java-3.6.1.jar;C:\Users\admin\.m2\repository\commons-dbcp\commons-dbcp\1.4\commons-dbcp-1.4.jar;C:\Users\admin\.m2\repository\commons-pool\commons-pool\1.5.4\commons-pool-1.5.4.jar;C:\Users\admin\.m2\repository\com\alibaba\fastjson\1.2.83\fastjson-1.2.83.jar;C:\Users\admin\.m2\repository\org\projectlombok\lombok\1.18.14\lombok-1.18.14.jar;C:\Users\admin\.m2\repository\org\apache\commons\commons-math3\3.6.1\commons-math3-3.6.1.jar;C:\Users\admin\.m2\repository\org\apache\flink\flink-connector-jdbc_2.11\1.13.6\flink-connector-jdbc_2.11-1.13.6.jar;C:\Users\admin\.m2\repository\com\oceanbase\oceanbase-client\2.4.8\oceanbase-client-2.4.8.jar;C:\Users\admin\.m2\repository\org\apache\commons\commons-csv\1.10.0\commons-csv-1.10.0.jar org.example.WordCount 错误: 无法初始化主类 org.example.WordCount 原因: java.lang.NoClassDefFoundError: org/apache/flink/api/common/functions/FlatMapFunction

filetype

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:89) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.sintay.Functions.PaimonFlatMapFunction.flatMap(PaimonFlatMapFunction.java:76) at com.sintay.Functions.PaimonFlatMapFunction.flatMap(PaimonFlatMapFunction.java:39) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException 报错信息是这个

filetype
资源评论
用户头像
萱呀
2025.07.27
Apache Flink扩展了与Kudu的连接能力,适用于动态数据源操作。
用户头像
丽龙
2025.06.25
易于集成的Kudu连接器,满足企业级Flink应用需求。
用户头像
白绍伟
2025.06.10
该项目对接Kudu实现高效流处理,且已向社区贡献更新。
用户头像
田仲政
2025.03.14
文档详细介绍了如何在Flink中使用Kudu连接器进行数据操作。
李青廷Austin
  • 粉丝: 35
上传资源 快速赚钱