CREATE TABLE oracle_source ( BARCODE STRING NOT NULL, PIC_NO_TOUF STRING, TOUFK_NO STRING NOT NULL, TOUF_TIME TIMESTAMP, FENBZC_TIME TIMESTAMP, FENB_TIME TIMESTAMP, MAP_TIME TIMESTAMP, TOUF_STATE STRING, FENBZC_STATE STRING, FENB_STATE STRING, MAP_STATE STRING, ZCSSX_NO STRING, FBSSX_NO STRING, MPLOC_NO STRING, TUOP_BARCODE STRING, SHANGP_ID STRING, LOT STRING, MAP_XL BIGINT, YEWDJ_NO STRING, HUOW_ID STRING, PRIMARY KEY (BARCODE) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '10.19.0.137', 'port' = '1521', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'LMIS', 'schema-name' = 'LMIS', 'table-name' = 'LMIS.RK_YJMINFO_PRODUCTLINE', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.log.mining.continuous.mine' = 'true' ); CREATE TABLE mysql_sink ( BARCODE STRING NOT NULL, PIC_NO_TOUF STRING, TOUFK_NO STRING NOT NULL, TOUF_TIME TIMESTAMP, FENBZC_TIME TIMESTAMP, FENB_TIME TIMESTAMP, MAP_TIME TIMESTAMP, TOUF_STATE STRING, FENBZC_STATE STRING, FENB_STATE STRING, MAP_STATE STRING, ZCSSX_NO STRING, FBSSX_NO STRING, MPLOC_NO STRING, TUOP_BARCODE STRING, SHANGP_ID STRING, LOT STRING, MAP_XL BIGINT, YEWDJ_NO STRING, HUOW_ID STRING, PRIMARY KEY (BARCODE) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.21.0.173:3306/test1?characterEncoding=UTF-8', 'table-name' = 'RK_YJMINFO_PRODUCTLINE', 'username' = 'bi', 'password' = 'Bi@202503' ); INSERT INTO mysql_sink SELECT BARCODE, PIC_NO_TOUF, TOUFK_NO, TOUF_TIME, FENBZC_TIME, FENB_TIME, MAP_TIME, TOUF_STATE, FENBZC_STATE, FENB_STATE, MAP_STATE, ZCSSX_NO, FBSSX_NO, MPLOC_NO, TUOP_BARCODE, SHANGP_ID, LOT, MAP_XL, YEWDJ_NO, HUOW_ID FROM oracle_source; 用java代码编写
时间: 2025-06-18 09:18:49 浏览: 19
<think>嗯,用户给了我一个Flink SQL的DDL和DML语句,然后让我用Java代码编写。首先,我需要理解这些SQL语句的功能。看起来用户是在用Flink CDC从Oracle同步数据到MySQL。CREATE TABLE定义了源表和目标表,然后用INSERT INTO做流式插入。
接下来,我需要把这些SQL转换成Java代码。用户可能正在使用Flink的Table API或者DataStream API。但根据提供的SQL,使用Table API会更直接,因为可以直接执行SQL语句。
首先,我得考虑如何配置Flink环境。需要创建StreamExecutionEnvironment和StreamTableEnvironment。然后,注册源表和目标表,可能通过执行用户的CREATE TABLE语句。最后执行INSERT语句。
不过要注意,Flink的Java代码中执行SQL通常使用tableEnv.executeSql()方法。所以可能需要将那些CREATE TABLE和INSERT语句作为字符串传入executeSql中。
另外,用户提供的SQL里用了Oracle CDC连接器和JDBC连接器,所以需要确保相关的依赖已经添加到项目中。比如flink-connector-jdbc、flink-sql-connector-oracle-cdc等。可能需要提醒用户添加这些依赖,否则代码会运行失败。
还有主键的定义中有NOT ENFORCED,这在Flink中是正确的,因为Flink不会强制检查主键约束,只是用于元数据。
然后,代码结构大致是:设置环境,执行所有DDL,然后执行INSERT,最后触发执行。但Flink的作业需要显式调用execute()方法,所以可能在最后需要调用env.execute("JobName")。
不过,当使用Table API的executeSql()执行INSERT语句时,实际上会提交作业,所以可能不需要显式调用execute()。但为了保险起见,可能还是需要。
现在,检查是否有遗漏的部分。比如,是否需要处理时间特性,或者配置checkpoint?如果用户需要容错,可能需要配置checkpoint,但用户没有提到,所以可能不需要。保持代码简单。
最后,将所有SQL语句按顺序在Java代码中用executeSql执行。需要注意的是,每个SQL语句作为独立的字符串传入,并且确保没有语法错误。
所以最终的Java代码应该是创建一个main方法,初始化环境,然后依次执行用户提供的各个SQL语句,最后触发作业执行。同时,提醒用户添加依赖,配置正确的主机、端口、用户名密码等信息是否正确,以及Flink版本是否兼容这些连接器。</think>以下是用Java编写的Flink CDC数据同步程序实现Oracle到MySQL的同步:
```java
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import flink.table.api.bridge.java.StreamTableEnvironment;
public class OracleToMySQLSync {
public static void main(String[] args) throws Exception {
// 1. 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2. 创建Oracle CDC源表
String createOracleSource = "CREATE TABLE oracle_source (\n"
+ " BARCODE STRING NOT NULL,\n"
+ " PIC_NO_TOUF STRING,\n"
+ " TOUFK_NO STRING NOT NULL,\n"
+ " TOUF_TIME TIMESTAMP,\n"
+ " FENBZC_TIME TIMESTAMP,\n"
+ " FENB_TIME TIMESTAMP,\n"
+ " MAP_TIME TIMESTAMP,\n"
+ " TOUF_STATE STRING,\n"
+ " FENBZC_STATE STRING,\n"
+ " FENB_STATE STRING,\n"
+ " MAP_STATE STRING,\n"
+ " ZCSSX_NO STRING,\n"
+ " FBSSX_NO STRING,\n"
+ " MPLOC_NO STRING,\n"
+ " TUOP_BARCODE STRING,\n"
+ " SHANGP_ID STRING,\n"
+ " LOT STRING,\n"
+ " MAP_XL BIGINT,\n"
+ " YEWDJ_NO STRING,\n"
+ " HUOW_ID STRING,\n"
+ " PRIMARY KEY (BARCODE) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'oracle-cdc',\n"
+ " 'hostname' = '10.19.0.137',\n"
+ " 'port' = '1521',\n"
+ " 'username' = 'flinkuser',\n"
+ " 'password' = 'flinkpw',\n"
+ " 'database-name' = 'LMIS',\n"
+ " 'schema-name' = 'LMIS',\n"
+ " 'table-name' = 'LMIS.RK_YJMINFO_PRODUCTLINE',\n"
+ " 'debezium.log.mining.strategy' = 'online_catalog',\n"
+ " 'debezium.log.mining.continuous.mine' = 'true'\n"
+ ")";
tableEnv.executeSql(createOracleSource);
// 3. 创建MySQL目标表
String createMySQLSink = "CREATE TABLE mysql_sink (\n"
+ " BARCODE STRING NOT NULL,\n"
+ " PIC_NO_TOUF STRING,\n"
+ " TOUFK_NO STRING NOT NULL,\n"
+ " TOUF_TIME TIMESTAMP,\n"
+ " FENBZC_TIME TIMESTAMP,\n"
+ " FENB_TIME TIMESTAMP,\n"
+ " MAP_TIME TIMESTAMP,\n"
+ " TOUF_STATE STRING,\n"
+ " FENBZC_STATE STRING,\n"
+ " FENB_STATE STRING,\n"
+ " MAP_STATE STRING,\n"
+ " ZCSSX_NO STRING,\n"
+ " FBSSX_NO STRING,\n"
+ " MPLOC_NO STRING,\n"
+ " TUOP_BARCODE STRING,\n"
+ " SHANGP_ID STRING,\n"
+ " LOT STRING,\n"
+ " MAP_XL BIGINT,\n"
+ " YEWDJ_NO STRING,\n"
+ " HUOW_ID STRING,\n"
+ " PRIMARY KEY (BARCODE) NOT ENFORCED\n"
+ ") WITH (\n"
+ " 'connector' = 'jdbc',\n"
+ " 'url' = 'jdbc:mysql://10.21.0.173:3306/test1?characterEncoding=UTF-8',\n"
+ " 'table-name' = 'RK_YJMINFO_PRODUCTLINE',\n"
+ " 'username' = 'bi',\n"
+ " 'password' = 'Bi@202503'\n"
+ ")";
tableEnv.executeSql(createMySQLSink);
// 4. 执行数据同步
String insertSQL = "INSERT INTO mysql_sink\n"
+ "SELECT\n"
+ " BARCODE,\n"
+ " PIC_NO_TOUF,\n"
+ " TOUFK_NO,\n"
+ " TOUF_TIME,\n"
+ " FENBZC_TIME,\n"
+ " FENB_TIME,\n"
+ " MAP_TIME,\n"
+ " TOUF_STATE,\n"
+ " FENBZC_STATE,\n"
+ " FENB_STATE,\n"
+ " MAP_STATE,\n"
+ " ZCSSX_NO,\n"
+ " FBSSX_NO,\n"
+ " MPLOC_NO,\n"
+ " TUOP_BARCODE,\n"
+ " SHANGP_ID,\n"
+ " LOT,\n"
+ " MAP_XL,\n"
+ " YEWDJ_NO,\n"
+ " HUOW_ID\n"
+ "FROM oracle_source";
tableEnv.executeSql(insertSQL);
// 5. 启动任务
env.execute("Oracle to MySQL CDC Sync");
}
}
```
**使用说明:**
1. **依赖要求**:
```xml
<!-- Flink核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<version>2.3.0</version>
</dependency>
```
2. **配置说明**:
- 确保Oracle和MySQL的网络连通性
- 检查数据库用户权限(Oracle用户需要`LOG MINING`权限)
- 调整Flink checkpoint配置(如果需要容错)
- 建议Flink版本≥1.13
3. **运行方式**:
- 打包为JAR文件后通过`flink run`提交到集群
- 或在IDE中直接运行(开发测试)
该程序实现了:
- 实时捕获Oracle表`RK_YJMINFO_PRODUCTLINE`的变化
- 全量+增量数据同步
- 基于主键的幂等写入(`BARCODE`为主键)
- 字段级别的数据映射和传输
**注意事项**:
1. 需要确保Oracle数据库开启归档日志模式
2. MySQL表结构应与Oracle表结构完全一致
3. 生产环境建议增加异常处理和监控机制
4. 根据数据量调整Flink任务并行度
5. 建议使用连接池配置JDBC连接参数
阅读全文
相关推荐

















