活动介绍

Exception in thread "main" java.lang.IllegalArgumentException: input == null!这个报错怎么排查

时间: 2023-10-29 07:41:29 浏览: 344
这个报错的意思是输入的参数为null,因此需要检查传入该参数的代码是否确保传入非null的参数。 以下是可能的排查步骤: 1. 检查调用该方法的代码是否传入了null值的参数。 2. 如果该参数是通过方法调用返回的,请确保该方法不会返回null。 3. 检查该方法的实现是否正确处理了null值的情况。例如,如果该方法需要使用该参数的内容,请确保它不是null,或者在出现null时采取适当的措施。 4. 如果该参数是从外部来源传入的,例如文件或网络,请确保该来源不会返回null值。 5. 如果无法确定导致该问题的原因,请使用调试器跟踪代码并查看哪个变量的值为null。 希望能对你有所帮助!
相关问题

exception in thread "main" java.lang.illegalargumentexception: input == null!

### 回答1: 异常:在主线程中发生了异常,java.lang.illegalargumentexception: input == null! 意思:输入参数为空,不合法! 解决方案:检查输入参数是否为空,如果为空,则需要提供有效的输入参数。 ### 回答2: exception in thread "main" java.lang.illegalargumentexception: input == null! 是Java中的一个错误提示,意思是输入为null。在Java中,当程序需要接收数据或参数时,如果输入为空,则会出现这种错误。 通常,程序员需要做出一些改进来避免出现这个问题。以下是几个避免这个问题的方法: 1. 使用if语句进行输入检查。在处理输入数据之前,应该先检查它是否为空。如果它为空,则应该停止程序运行并抛出异常。 2. 在方法参数中添加注释。在编写方法时,应该在方法参数中添加注释,以便其他程序员知道方法需要什么样的输入条件。 3. 使用对象类型而不是原始类型。对于数字、字符串和其他常见类型,使用对象类型而不是原始类型可以避免NullPointerException并确保输入不能为空。 4. 在设计类和接口时,应该使用更加稳健的编程技术,例如预先定义输入范围和使用空指针检查。 当出现exception in thread "main" java.lang.illegalargumentexception: input == null!时,程序员需要迅速找出问题的原因,并进行调整来避免以后再次发生。通过遵从最佳的编程实践,可以避免出现输入空值的情况,并确保程序能够正常运行。 ### 回答3: 这是Java中经常遇到的一种异常,它通常是由于代码中出现了某个输入参数为空而导致的。这种异常常常出现在程序的开发、调试和运行过程中,它的提示信息很明确,告诉我们输入参数为空,需要对其进行检查和处理,否则程序无法正常运行。 对于这种异常,我们需要检查和修复代码中的问题,主要有以下几种方法: 1. 检查并修正代码中的空指针引用问题。通常来说,我们应该在使用对象之前先行检查其是否为空,避免出现空指针异常; 2. 对可能为空的输入参数进行判断和处理。对于那些可能为空的变量或参数,我们需要在使用它们之前先判断其是否为空,避免出现输入参数为空的情况; 3. 添加异常处理代码以捕获并处理异常。当程序出现输入参数为空的异常时,我们需要添加相应的异常处理代码,以防止程序崩溃或出现其他不可预料的错误。 总之,当程序出现“exception in thread "main" java.lang.illegalargumentexception: input == null!”异常时,我们需要仔细检查代码中的问题,并采取相应的措施进行修正和处理,使程序能够正常运行,避免出现类似的错误。

D:\JAVA\bin\java.exe "-javaagent:D:\JAVA\IntelliJ IDEA 2022.3.2\lib\idea_rt.jar=52525:D:\JAVA\IntelliJ IDEA 2022.3.2\bin" -Dfile.encoding=UTF-8 -Dsun.stdout.encoding=UTF-8 -Dsun.stderr.encoding=UTF-8 -classpath D:\JAVA\untitled4\out\production\untitled4 UnZipTester Exception in thread "main" java.lang.IllegalArgumentException: malformed input off : 12, length : 1 at java.base/java.lang.String.throwMalformed(String.java:1252) at java.base/java.lang.String.decodeUTF8_UTF16(String.java:1119) at java.base/java.lang.String.newStringUTF8NoRepl(String.java:741) at java.base/java.lang.System$2.newStringUTF8NoRepl(System.java:2462) at java.base/java.util.zip.ZipCoder$UTF8ZipCoder.toString(ZipCoder.java:199) at java.base/java.util.zip.ZipCoder.toString(ZipCoder.java:66) at java.base/java.util.zip.ZipInputStream.readLOC(ZipInputStream.java:302) at java.base/java.util.zip.ZipInputStream.getNextEntry(ZipInputStream.java:125) at Unzip.doUnZip(Unzip.java:39) at UnZipTester.main(UnZipTester.java:8) Caused by: java.nio.charset.MalformedInputException: Input length = 1 ... 10 more出现这样的错误

这个错误可能是因为文件名中含有不合法的字符引起的。在解压缩过程中,Java 使用 UTF-8 编码来处理文件名,如果文件名中的字符不符合 UTF-8 格式,就会出现这个错误。 你可以尝试使用其他解压缩工具来解压缩文件,或者修改文件名中的特殊字符。如果无法修改文件名,你可以尝试使用 Java 的 NIO 库来解决该问题。具体实现方法可以参考以下代码: ```java ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFile), Charset.forName("GBK")); ZipEntry entry; while ((entry = zis.getNextEntry()) != null) { String fileName = entry.getName(); fileName = new String(fileName.getBytes("GBK"), "UTF-8"); // ... } ``` 这里使用了 GBK 编码来读取 zip 文件,并将文件名转换为 UTF-8 编码,以避免出现编码问题。
阅读全文

相关推荐

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:258) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) at akka.dispatch.OnComplete.internal(Future.scala:300) at akka.dispatch.OnComplete.internal(Future.scala:297) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=10000) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:85) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1545) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932) ... 1 more Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from './mysql-bin.000001' at 482020, the last byte read from './mysql-bin.000001' at 482020. Error code: 1236; SQLSTATE: HY000. The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now. The server id conflict may happen in the following situations: 1. The server id has been used by other mysql cdc table in the current job. 2. The server id has been used by the mysql cdc table in other jobs. 3. The server id has been used by other sync tools like canal, debezium and so on. at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1489) ... 5 more Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from './mysql-bin.000001' at 482020, the last byte read from './mysql-bin.000001' at 482020. at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043) ... 3 more我现在出现这个报错 我的调用代码是package net.bwie.realtime.jtp.common.utils; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; //import org.apache.kafka.connect.data.SchemaBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetAddress; import java.net.UnknownHostException; import java.net.InetAddress; import java.net.UnknownHostException; import java.time.*; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.function.Consumer; /** * Flink CDC实时捕获Mysql数据库表中数据 * @author xuanyun */ public class MysqlCdcUtil { /** * Flink CDC 读取数据时,没有特殊设置反序列化,及针对Decimal类型和DateTime类型数据。 */ public static DataStream<String> cdcMysqlRaw(StreamExecutionEnvironment env, String database, String table) throws UnknownHostException { // a. 数据源 MySqlSource<String> mysqlsource = MySqlSource.<String>builder() .hostname("node101") .port(3306) .databaseList(database) .tableList(database + "." + table) .username("root") .password("123456") .serverId(generateUniqueServerId()) .serverTimeZone("Asia/Shanghai") .startupOptions(StartupOptions.earliest()) .deserializer(new JsonDebeziumDeserializationSchema()) .build(); // b. 读取数据 DataStreamSource<String> stream = env.fromSource( mysqlsource, WatermarkStrategy.noWatermarks(), "MysqlSource" ); // c. 返回 return stream; } /** * 使用Flink CDC方式,拉取Mysql表数据,从最新offset偏移量读取数据 * @param env 流式执行环境 * @param database 数据库名称 * @param table 表名称 * @return 数据流,数据类型为json字符串 */ public static DataStream<String> cdcMysqlDeser(StreamExecutionEnvironment env, String database, String table) throws UnknownHostException { // a. 反序列化:DECIMAL类型数据使用NUMERIC数值转换 Map<String, Object> configs = new HashMap<>(); configs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, configs); // b. 数据源 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("node101") .port(3306) .databaseList(database) .tableList(database + "." + table) .username("root") .password("123456") .serverId(generateUniqueServerId()) .serverTimeZone("Asia/Shanghai") .startupOptions(StartupOptions.earliest()) .debeziumProperties(getDebeziumProperties()) .deserializer(schema) .build(); // c. 读取数据 DataStreamSource<String> stream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source" ); // d. 返回 return stream; } /** * 使用Flink CDC方式,拉取Mysql表数据,从最新offset偏移量读取数据 * @param env 流式执行环境 * @param database 数据库名称 * @param tableList 表名称,可以传递多个 * @return 数据流,数据类型为json字符串 */ public static DataStream<String> cdcMysqlEarliest(StreamExecutionEnvironment env, String database, String... tableList) throws UnknownHostException { // a. 反序列化 Map<String, Object> configs = new HashMap<>(); configs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, configs); StringBuffer buffer = new StringBuffer(); for (String table : tableList) { buffer.append(database).append(".").append(table).append(","); } buffer = buffer.deleteCharAt(buffer.length() - 1); // b. 数据源 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("node101") .port(3306) .databaseList(database) .tableList(buffer.toString()) .username("root") .password("123456") .serverId(generateUniqueServerId()) .serverTimeZone("Asia/Shanghai") .startupOptions(StartupOptions.earliest()) .debeziumProperties(getDebeziumProperties()) .deserializer(schema) .build(); // c. 读取数据 DataStreamSource<String> stream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlEarliestSource" ); // d. 返回 return stream; } /** * 使用Flink CDC方式,拉取Mysql表数据,从binlog中最早offset偏移量读取数据 * @param env 流式执行环境 * @param database 数据库名称 * @return 数据流,数据类型为json字符串 */ public static DataStream<String> cdcMysqlInitial(StreamExecutionEnvironment env, String database, String... tableList) throws UnknownHostException { // a. 反序列化 Map<String, Object> configs = new HashMap<>(); configs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, configs); StringBuffer buffer = new StringBuffer(); for (String table : tableList) { buffer.append(database).append(".").append(table).append(","); } buffer = buffer.deleteCharAt(buffer.length() - 1); // b. 数据源 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("node101") .port(3306) .databaseList(database) .tableList(buffer.toString()) .username("root") .password("123456") .serverId(generateUniqueServerId()) .serverTimeZone("Asia/Shanghai") .startupOptions(StartupOptions.initial()) .debeziumProperties(getDebeziumProperties()) .deserializer(schema) .build(); // c. 读取数据 DataStreamSource<String> stream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlLInitialSource" ); // d. 返回 return stream; } /** * 使用Flink CDC方式,拉取Mysql表数据,从最新offset偏移量读取数据 * @param env 流式执行环境 * @param database 数据库名称 * @return 数据流,数据类型为json字符串 */ public static DataStream<String> cdcMysql(StreamExecutionEnvironment env, String database) throws UnknownHostException { // a. 反序列化 Map<String, Object> configs = new HashMap<>(); configs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, configs); // b. 数据源 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("node101") .port(3306) .databaseList(database) .tableList() .username("root") .password("123456") .serverId(generateUniqueServerId()) .serverTimeZone("Asia/Shanghai") .startupOptions(StartupOptions.latest()) .debeziumProperties(getDebeziumProperties()) .deserializer(schema) .build(); // c. 读取数据 DataStreamSource<String> stream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MysqlLatestSource" ); // d. 返回 return stream; } /** * 使用Flink CDC方式,拉取Mysql表数据,从最新offset偏移量读取数据 * @param env 流式执行环境 * @param database 数据库名称 * @param tableList 表名称,可以传递多个 * @return 数据流,数据类型为json字符串 */ public static DataStream<String> cdcMysql(StreamExecutionEnvironment env, String database, String... tableList) throws UnknownHostException { // a. 反序列化 Map<String, Object> configs = new HashMap<>(); configs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, configs); StringBuffer buffer = new StringBuffer(); for (String table : tableList) { buffer.append(database).append(".").append(table).append(","); } buffer = buffer.deleteCharAt(buffer.length() - 1); // b. 数据源 MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("node101") .port(3306) .databaseList(database) .tableList(buffer.toString()) .username("root") .password("123456" ) .serverId(generateUniqueServerId()) .serverTimeZone("Asia/Shanghai") .startupOptions(StartupOptions.latest()) .debeziumProperties(getDebeziumProperties()) .deserializer(schema) .build(); // c. 读取数据 DataStreamSource<String> stream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source" ); // d. 返回 return stream; } private static Properties getDebeziumProperties(){ Properties properties = new Properties(); properties.setProperty("converters", "dateConverters"); properties.setProperty("dateConverters.type", MySqlDateTimeConverter.class.getName()); properties.setProperty("dateConverters.format.date", "yyyy-MM-dd"); properties.setProperty("dateConverters.format.time", "HH:mm:ss"); properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss"); properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8"); return properties; } /** * 自定义时间转换配置。 */ public static class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> { private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class); private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE; private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME; private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME; private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME; private ZoneId timestampZoneId = ZoneId.systemDefault(); @Override public void configure(Properties props) { readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p)); readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z)); } private void readProps(Properties properties, String settingKey, Consumer<String> callback) { String settingValue = (String) properties.get(settingKey); if (settingValue == null || settingValue.isEmpty()) { return; } try { callback.accept(settingValue.trim()); } catch (IllegalArgumentException | DateTimeException e) { logger.error("The {} setting is illegal: {}",settingKey,settingValue); throw e; } } @Override public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) { String sqlType = column.typeName().toUpperCase(); SchemaBuilder schemaBuilder = null; Converter converter = null; if ("DATE".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("debezium.date.string"); converter = this::convertDate; } if ("TIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("debezium.date.string"); converter = this::convertTime; } if ("DATETIME".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("debezium.date.string"); converter = this::convertDateTime; } if ("TIMESTAMP".equals(sqlType)) { schemaBuilder = SchemaBuilder.string().optional().name("debezium.date.string"); converter = this::convertTimestamp; } if (schemaBuilder != null) { registration.register(schemaBuilder, converter); } } private String convertDate(Object input) { if (input == null) return null; if (input instanceof LocalDate) { return dateFormatter.format((LocalDate) input); } if (input instanceof Integer) { LocalDate date = LocalDate.ofEpochDay((Integer) input); return dateFormatter.format(date); } return String.valueOf(input); } private String convertTime(Object input) { if (input == null) return null; if (input instanceof Duration) { Duration duration = (Duration) input; long seconds = duration.getSeconds(); int nano = duration.getNano(); LocalTime time = LocalTime.ofSecondOfDay(seconds).withNano(nano); return timeFormatter.format(time); } return String.valueOf(input); } private String convertDateTime(Object input) { if (input == null) return null; if (input instanceof LocalDateTime) { return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " "); } return String.valueOf(input); } private String convertTimestamp(Object input) { if (input == null) return null; if (input instanceof ZonedDateTime) { // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间 ZonedDateTime zonedDateTime = (ZonedDateTime) input; LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime(); return timestampFormatter.format(localDateTime).replaceAll("T", " "); } return String.valueOf(input); } } // ... private static String generateUniqueServerId() throws UnknownHostException { long baseTimestamp = System.currentTimeMillis() / 1000; String hostName; try { hostName = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e) { hostName = "unknown"; } // 使用主机名 + 时间戳生成唯一 serverId int hash = (hostName + baseTimestamp).hashCode(); // 确保为正数并控制在合理范围 return String.valueOf(Math.abs(hash) % 100000000 + 5); } } 4

大家在看

recommend-type

批量提取eml

支持批量提取eml附件,邮件头等支持批量提取eml附件,邮件头等支持批量提取eml附件,邮件头等
recommend-type

AMIDE-开源

Amide是医学图像数据检查器:Amide是用于查看,注册和分析解剖和功能性体积医学图像数据集的工具。
recommend-type

基于 ADS9110的隔离式数据采集 (DAQ) 系统方案(待编辑)-电路方案

描述 该“可实现最大 SNR 和采样率的 18 位 2Msps 隔离式数据采集参考设计”演示了如何应对隔离式数据采集系统设计中的典型性能限制挑战: 通过将数字隔离器引入的传播延迟降至最低,使采样率达到最大 通过有效地减轻数字隔离器引入的 ADC 采样时钟抖动,使高频交流信号链性能 (SNR) 达到最大 特性 18 位、2Msps、1 通道、差分输入、隔离式数据采集 (DAQ) 系统 利用 ADS9110 的 multiSPI:trade_mark: 数字接口实现 2MSPS 采样率,同时保持低 SPI 数据速率 源同步 SPI 数据传输模式,可将隔离器传播延迟降至最低并提高采样率 可降低隔离器引入的抖动的技术,能够将 SNR 提高 12dB(100kHz Fin,2MSPS) 经测试的设计包含理论和计算、组件选择、PCB 设计和测量结果 原理图 附件文档: 方案相关器件: ISO1541:低功耗、双向 I2C 隔离器 ISO7840:高性能 5.7kVRMS 增强型四通道数字隔离器 ISO7842:高性能 5.7kVRMS 增强型四通道数字隔离器
recommend-type

自动化图书管理系统 v7.0

自动化图书馆管理系统包含了目前图书馆管理业务的每个环节,能同时管理图书和期刊,能打印条码、书标,并制作借书证,最大藏书量在300万册以上。系统采用CNMARC标准及中图法第四版分类,具有Web检索与发布功能,条码扫描,支持一卡通,支持触摸屏。系统包括系统管理、读者管理、编目、流通、统计、查询等功能。能够在一个界面下实现图书、音像、期刊的管理,设置假期、设置暂离锁(提高安全性)、暂停某些读者的借阅权、导入导出读者、交换MARC数据、升级辅助编目库等。安装本系统前请先安装SQL 2000SQL 下载地址 http://pan.baidu.com/s/145vkr安装过程如有问题可咨询: TEL 13851381727  QQ 306404635
recommend-type

白盒测试基本路径自动生成工具制作文档附代码

详细设计任务: 1.为模块进行详细的算法设计。 要求:获取一个想要的指定文件的集合。获取E:\experience下(包含子目录)的所有.doc的文件对象路径。并存储到集合中。 思路: 1,既然包含子目录,就需要递归。 2,在递归过程中需要过滤器。 3,满足条件,都添加到集合中。 2.为模块内的数据结构进行设计,对于需求分析,概要设计确定的概念性的数据类型进行确切的定义。 对指定目录进行递归。 (1)通过listFiles方法,获取dir当前下的所有的文件和文件夹对象。 (2)遍历该数组。 (3)判断是否是文件夹,如果是,递归。如果不是,那就是文件,就需要对文件进行过滤。 (4)通过过滤器对文件进行过滤 3编写详细设计说明书 过程设计语言(PDL),也称程序描述语言,又称为“伪码”。它是一种用于描述模块算法设计和处理细节的语言。 for(遍历文件){ if (是文件夹) { 递归 } Else { if (是.doc文件) { 添加到集合中 } } }

最新推荐

recommend-type

中医元仔智能医疗机器人-基于LangChain4j与阿里通义千问的中医诊疗对话AI-集成多轮对话记忆与RAG知识检索的智能助手-支持预约挂号与取消功能的医疗系统-采用Java17.zip

cursor免费次数用完中医元仔智能医疗机器人_基于LangChain4j与阿里通义千问的中医诊疗对话AI_集成多轮对话记忆与RAG知识检索的智能助手_支持预约挂号与取消功能的医疗系统_采用Java17.zip
recommend-type

LabVIEW结合YOLOv5与TensorRT实现高效并行推理及DLL封装技术在工业领域的应用 · DLL封装

LabVIEW平台结合YOLOv5和TensorRT进行高效并行推理的技术及其应用。首先简述了YOLOv5作为一种高效目标检测算法的优势,接着探讨了TensorRT作为深度学习推理引擎的作用,特别是在LabVIEW平台上通过DLL封装实现高效、灵活的模型推理。文中重点讲解了支持多模型并行推理的功能,使得视频和图片识别速度达到6ms以内。此外,还提供了从pt模型到engine模型的转换工具,以适应不同平台的需求。最后展示了该技术在工业自动化、视频监控、智能安防等领域的广泛应用前景,并强调了其高性能和灵活性。 适合人群:从事工业自动化、视频监控、智能安防等相关领域的技术人员,尤其是对深度学习技术和LabVIEW平台有一定了解的研发人员。 使用场景及目标:适用于需要高效视频和图片识别的场景,如工业自动化生产线的质量检测、视频监控系统的目标跟踪、智能安防系统的入侵检测等。目标是提升识别速度和准确性,优化资源配置,降低成本。 阅读建议:读者可以通过本文深入了解YOLOv5和TensorRT在LabVIEW平台上的集成方式和技术细节,掌握多模型并行推理的方法,从而更好地应用于实际项目中。
recommend-type

反弹头发福瑞特如果热隔热

如果如果热隔热隔热个人果然
recommend-type

MATLAB中ABS防抱死系统加入干扰并使用PID进行校正的方法 MATLAB

如何在MATLAB环境中构建ABS防抱死系统的模型,探讨了如何引入现实驾驶中的干扰因素,并使用PID控制器进行校正。首先,文章解释了ABS系统的基本原理及其重要性,然后逐步引导读者在MATLAB中建立ABS系统的模型,包括车辆轮胎、刹车系统和控制算法。接着,讨论了如何通过设置随机噪声或特定函数来模拟实际驾驶中的干扰因素。随后,深入讲解了PID控制器的工作机制及其在ABS系统中的具体应用,展示了如何通过调整PID参数来优化ABS系统的性能。最后,进行了仿真实验,验证了PID控制器的有效性和改进效果。 适合人群:汽车工程专业学生、研究人员以及对汽车控制系统感兴趣的工程师。 使用场景及目标:适用于希望深入了解ABS防抱死系统工作原理和技术实现的人群,旨在帮助他们掌握如何在MATLAB中建模、引入干扰因素并通过PID控制器进行校正的技术方法。 其他说明:本文不仅提供了理论知识,还包含了具体的实验步骤和结果分析,有助于读者全面理解和实践ABS系统的控制策略。
recommend-type

Notes App API开发与使用指南

### API基础知识 #### 标题分析:“notes-app-api” 从标题“notes-app-api”可以推断,此API(Application Programming Interface,应用程序接口)是专为一个名为“notes-app”的应用程序设计的。这种API通常被用来允许不同的软件组件之间进行通信。在这个案例中,“notes-app”可能是一款笔记应用,该API提供了笔记数据的获取、更新、删除等操作的接口。 #### 描述分析:“API休息说明” 在提供的“API休息说明”中,我们可以看到几个重要的操作指令: 1. **指令“dev”:** `npm run dev` - 这是一个用于启动开发模式的命令。通常情况下,`npm run dev`会使用Node.js环境下的某种热重载功能,让开发者在开发过程中实时看到代码更改的效果。 - `npm`是Node.js的包管理器,用于安装项目所需的依赖、运行脚本等。 - `dev`是脚本命令的缩写,实际对应的是`package.json`文件中定义的某个开发环境下的脚本命令。 2. **指令“服务”:** `npm start` - 这是一个用于启动应用程序服务的命令。 - 同样利用Node.js的`npm`包管理器执行,其目的是部署应用程序,使其对外提供服务。 3. **指令“构建”:** `npm run build` - 这是用于构建项目的命令,通常会将源代码进行压缩、转译等操作,生成用于生产环境的代码。 - 例如,如果项目使用了TypeScript,构建过程可能包括将TypeScript代码编译成JavaScript,因为浏览器不能直接运行TypeScript代码。 #### 标签分析:“TypeScript” TypeScript是JavaScript的超集,提供了静态类型检查和ES6+的特性。使用TypeScript可以提高代码的可读性和可维护性,同时在编译阶段发现潜在的错误。 1. **TypeScript的特性:** - **静态类型检查:** 有助于在开发阶段捕捉类型错误,降低运行时错误的概率。 - **ES6+特性支持:** TypeScript支持最新的JavaScript语法和特性,可以使用装饰器、异步编程等现代JavaScript特性。 - **丰富的配置选项:** 开发者可以根据项目需求进行各种配置,如模块化系统、编译目标等。 2. **TypeScript的使用场景:** - 大型项目:在大型项目中,TypeScript有助于维护和扩展代码库。 - 多人协作:团队开发时,类型定义有助于减少沟通成本,提高代码一致性。 - 错误敏感应用:如金融、医疗等领域的应用,可以利用TypeScript的静态类型检查减少bug。 #### 文件分析:“压缩包子文件的文件名称列表: notes-app-api-develop” 这个文件列表中包含了“notes-app-api-develop”,它表明存在一个与开发相关的压缩包或存档文件。这个文件很可能包含了应用程序的源代码,通常还会包括`package.json`文件,这个文件定义了项目的依赖关系和可运行的脚本命令。在开发和部署过程中,开发者通常会根据`package.json`中定义的脚本来执行不同的任务,如`npm run dev`或`npm start`等。 ### Docker使用说明 在描述中还提到了使用Docker的命令: 1. **构建镜像:** `docker build -t notes-api .` - 这个命令用于构建一个名为`notes-api`的Docker镜像。 - `.`表示Dockerfile在当前目录。 - `-t`指定镜像的名称和标签。 2. **运行容器:** `docker run -d -it -p 3005:3005 notes-api` - 该命令用于从`notes-api`镜像启动一个容器,并在后台运行。 - `-d`表示后台运行容器。 - `-it`则是将容器的标准输入打开,并分配一个伪终端。 - `-p 3005:3005`将容器内部的3005端口映射到宿主机的3005端口,允许宿主机与容器内部的服务进行通信。 Docker作为容器化平台,它允许开发者打包应用及其依赖环境为一个轻量级、可移植的容器,这样无论在什么环境下,都能保证应用运行的一致性。这一点对于开发和部署API来说尤为重要,因为它确保了开发环境与生产环境的一致性,减少了所谓的“在我的机器上可以运行”的问题。 ### 总结 综上所述,“notes-app-api”是一个为笔记应用程序设计的API,开发者可以通过一系列npm脚本命令进行开发、服务和构建操作。此外,该API还涉及到使用Docker进行容器化部署的流程。最后,提及的TypeScript标签意味着该项目在前端开发上采用了TypeScript,以期获得更加健壮和可维护的代码库。了解这些知识点对开发、部署和维护现代Web应用至关重要。
recommend-type

【PMSM建模与测试最佳实践】:MATLAB电机仿真模型的权威指导

# 1. PMSM电机模型的基础知识 在现代工业自动化和电动车辆领域,永磁同步电机(PMSM
recommend-type

如何通过四元数避免万向节死锁?

<think>好的,用户想了解如何使用四元数来避免万向节锁问题,尤其是在3D旋转中保持姿态的稳定性。首先,我需要回忆万向节锁的基本概念和四元数如何解决这个问题。万向节锁发生在使用欧拉角时,当两个旋转轴对齐导致失去一个自由度。而四元数通过四维空间的旋转避免了这种顺序依赖。 接下来,我应该解释万向节锁的原因,比如三个轴依次旋转时,某个轴可能与其他轴对齐,导致无法正确旋转。然后对比四元数的优势,比如四元数的连续性和无奇异性。需要提到四元数的数学表示,如单位四元数和旋转插值方法(如球面线性插值),以及它们如何避免万向节锁。 还要考虑用户可能的实际应用场景,比如游戏开发或机器人学,是否需要示例代码?
recommend-type

Python实现Couchbase大规模数据复制技术

标题中提到的技术“couchbase-massive-replication”是一种针对Couchbase数据库的开源Python开发工具,专门用于高效地实现跨集群的大量存储桶和索引的复制。Couchbase是一个高性能、可扩展、容错的NoSQL文档数据库,它支持同步分布式复制(XDCR),能够实现跨地域的数据复制。 描述部分详细阐述了该技术的主要用途和优势。它解决了一个常见问题:在进行XDCR复制时,迁移大量存储桶可能会遇到需要手动检查并迁移缺失存储桶的繁琐步骤。Couchbase-massive-replication技术则允许用户在源和目标集群之间无需进行存储桶配置,简化了迁移过程。开发者可以通过简单的curl请求,向集群发送命令,从而实现大规模存储桶的自动化迁移。 此外,为了帮助用户更容易部署和使用该技术,项目提供了一个Dockerfile,允许用户通过Docker容器来运行程序。Docker是一种流行的容器化平台,可以将应用及其依赖打包到一个可移植的容器中,便于部署和扩展。用户只需执行几个Docker命令,即可快速启动一个名为“cbmigrator”的容器,版本为0.1。启动容器后,可以通过发送简单的POST请求来操作迁移任务。 项目中还提到了Docker Hub,这是一个公共的Docker镜像注册中心,用户可以在其中找到并拉取其他用户分享的镜像,其中就包括了“cbmigrator”镜像,即demir94/cbmigrator:0.1。这大大降低了部署和使用该技术的门槛。 根据标签“Python”,我们可以推断出该项目是使用Python开发的。Python是一种广泛使用的高级编程语言,以其简洁的语法和强大的库支持而闻名。该项目中Python的使用意味着用户可能需要具备一定的Python基础知识,以便对项目进行定制或故障排除。Python的动态类型系统和解释执行机制,使得开发过程中可以快速迭代和测试。 最后,从提供的压缩包子文件的文件名称列表“couchbase-massive-replication-main”来看,该项目的源代码文件夹可能遵循了通用的开源项目结构,其中“main”文件夹通常包含了项目的主要代码和入口文件。用户在获取项目后,可以在这个文件夹中找到相关的代码文件,包括配置文件、数据库模型、业务逻辑实现以及API接口等。 综合来看,这个项目涉及的技术点包括: - Couchbase数据库:一种文档数据库,广泛用于构建可扩展的应用程序。 - XDCR(Cross-Datacenter Replication):Couchbase提供的跨数据中心数据复制机制,实现数据的无缝迁移和灾难恢复。 - Python编程语言:用来开发该项目的高级编程语言,以其易读性和简洁的语法著称。 - Docker容器化技术:用于打包、分发和运行应用程序的平台,提供了一种便捷的部署方式。 - Docker Hub:一个存放和分享Docker镜像的平台,可以简化镜像的查找、下载和管理过程。 这个项目对于需要在多个Couchbase集群间迁移大量数据的开发者和运维人员来说是一个宝贵的资源,因为它大大简化了存储桶迁移的过程,并提高了操作的便利性和效率。
recommend-type

【MATLAB电机性能评估案例】:仿真环境下的深度研究

# 1. MATLAB在电机性能评估中的应用概述 电机作为现代工业中不可或缺的电力传动设备,其性能优劣直接影响整个系统的可靠性和效率。在众多的电机性能评估工具中,MATLAB凭借其强大的数值计算能力和丰富的工具箱资源,成为该领域研究和工程实践中的有力工具。本章将对MATLAB在电机性能评估中的应用进行概述,并介绍其在电机仿真、故障诊断和性能优化等方面的具体应用前景和价值。MA
recommend-type

专业版立创EDA怎么画板子边框圆角

<think>嗯,用户问的是如何在专业版立创EDA中画板子的圆角边框。首先,我得回忆一下立创EDA的功能布局。专业版和标准版可能有些不同,所以得确认步骤是否正确。 首先,用户可能需要知道如何进入板子边框的编辑模式。通常,这类操作是在PCB设计界面里进行的。所以第一步应该是打开或创建一个PCB文件。然后,找到板子边框的选项,可能在“设计”或者“工具”菜单下,或者有专门的边框层,比如Board Outline层。需要确认专业版的具体位置,可能在顶部工具栏或右键菜单里。 接下来,用户需要绘制一个矩形作为基础边框。这里可能需要使用绘制矩形工具,然后调整大小到所需的板子尺寸。但问题是如何将矩形的四个