关闭SUPPLEMENTAL_LOG_DATA_ALL
时间: 2023-08-01 20:13:42 浏览: 174
要关闭Oracle数据库的SUPPLEMENTAL_LOG_DATA_ALL,需要执行以下步骤:
1. 登录到Oracle数据库中的系统管理员帐户。
2. 使用以下命令查看当前数据库是否启用了SUPPLEMENTAL_LOG_DATA_ALL:
SELECT supplemental_log_data_all FROM v$database;
如果返回的结果为YES,则表示启用了SUPPLEMENTAL_LOG_DATA_ALL;如果返回的结果为NO,则表示未启用。
3. 如果启用了SUPPLEMENTAL_LOG_DATA_ALL,则使用以下命令关闭它:
ALTER DATABASE NO SUPPLEMENTAL LOG DATA ALL;
4. 确认SUPPLEMENTAL_LOG_DATA_ALL已经关闭:
SELECT supplemental_log_data_all FROM v$database;
如果返回的结果为NO,则表示成功关闭了SUPPLEMENTAL_LOG_DATA_ALL。
请注意,关闭SUPPLEMENTAL_LOG_DATA_ALL可能会影响一些数据恢复和复制的功能,因此在执行此操作之前,请务必进行测试并备份数据库。
相关问题
-- LMIS.RK_YJMINFO_PRODUCTLINE definition CREATE TABLE "LMIS"."RK_YJMINFO_PRODUCTLINE" ( "BARCODE" VARCHAR2(255) NOT NULL ENABLE, "PIC_NO_TOUF" VARCHAR2(11), "TOUFK_NO" VARCHAR2(10) NOT NULL ENABLE, "TOUF_TIME" DATE, "FENBZC_TIME" DATE, "FENB_TIME" DATE, "MAP_TIME" DATE, "TOUF_STATE" VARCHAR2(2), "FENBZC_STATE" VARCHAR2(2), "FENB_STATE" VARCHAR2(2), "MAP_STATE" VARCHAR2(2), "ZCSSX_NO" VARCHAR2(10), "FBSSX_NO" VARCHAR2(10), "MPLOC_NO" VARCHAR2(10), "TUOP_BARCODE" VARCHAR2(10), "SHANGP_ID" VARCHAR2(50), "LOT" VARCHAR2(20), "MAP_XL" NUMBER(10,0), "YEWDJ_NO" VARCHAR2(50), "HUOW_ID" VARCHAR2(50), CONSTRAINT "PK" PRIMARY KEY ("BARCODE", "TOUF_TIME") USING INDEX PCTFREE 10 INITRANS 2 MAXTRANS 255 COMPUTE STATISTICS STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT) TABLESPACE "LMIS" ENABLE, SUPPLEMENTAL LOG DATA (ALL) COLUMNS ) SEGMENT CREATION IMMEDIATE PCTFREE 10 PCTUSED 40 INITRANS 1 MAXTRANS 255 NOCOMPRESS LOGGING STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT) TABLESPACE "LMIS" ; CREATE UNIQUE INDEX "LMIS"."PK" ON "LMIS"."RK_YJMINFO_PRODUCTLINE" ("BARCODE", "TOUF_TIME") PCTFREE 10 INITRANS 2 MAXTRANS 255 COMPUTE STATISTICS STORAGE(INITIAL 65536 NEXT 1048576 MINEXTENTS 1 MAXEXTENTS 2147483645 PCTINCREASE 0 FREELISTS 1 FREELIST GROUPS 1 BUFFER_POOL DEFAULT FLASH_CACHE DEFAULT CELL_FLASH_CACHE DEFAULT) TABLESPACE "LMIS" ; 根据这个DDL生成Java POJO
<think>嗯,用户的问题是根据提供的Oracle表定义DDL生成对应的Java POJO类。具体来说,他们需要基于LMIS.RK_YJMINFO_PRODUCTLINE这个表的DDL来创建Java实体类。首先,我需要回忆一下Oracle表结构和Java类之间的映射关系。
首先,我需要确定用户的表结构是什么样的。虽然用户没有给出具体的DDL,但通常一个表会有多个列,每个列有数据类型、是否允许为空等信息。比如,VARCHAR2对应Java的String,NUMBER对应Integer、Long或者BigDecimal,DATE对应java.util.Date或者LocalDateTime等。
接下来,我需要考虑如何将每个数据库字段转换为Java类的属性。例如,如果有一个列名为PRODUCT_ID,类型是NUMBER(10),那么在Java中应该是private Integer productId;,同时使用驼峰命名法。此外,主键可能需要用@Id注解,JPA的话可能还需要@GeneratedValue来指定主键生成策略。
然后,用户可能需要使用像JPA/Hibernate这样的ORM框架,所以需要考虑相关的注解,比如@Entity、@Table、@Column等。例如,表名可能与类名不同,这时候需要用@Table(name = "RK_YJMINFO_PRODUCTLINE")来指定。另外,字段名如果与数据库列名不一致,也需要用@Column(name = "PRODUCT_ID")来映射。
另外,需要注意数据类型转换的准确性。比如,Oracle的DATE类型是否包含时间,如果包含,可能需要用java.time.LocalDateTime,否则用java.sql.Date。还有NUMBER类型的精度和标度,如果涉及到小数,应该用BigDecimal,否则根据长度选择Integer或Long。
用户可能还需要生成getter和setter方法,以及toString()等方法。虽然现在很多IDE可以自动生成这些,但在代码示例中提供基本的结构是有帮助的。此外,Lombok库的@Data注解可以简化代码,但可能需要用户的项目中已经引入了Lombok依赖。
考虑到用户可能没有提供具体的DDL,我应该给出一个通用的示例,并说明需要根据实际表结构进行调整。例如,假设表有PRODUCTLINE_ID、LINE_NAME、CREATED_DATE等字段,对应的Java类应该包含这些属性,并使用合适的注解和数据类型。
还需要提醒用户注意命名规范,比如类名通常采用大驼峰,属性名小驼峰,以及字段与属性的对应关系。此外,如果有关联关系,比如外键,可能需要使用@OneToMany等注解,但如果没有提供具体信息,这部分可能暂时不需要涉及。
最后,应该给出一个具体的示例代码,展示如何将假设的DDL转换为Java类,并指出需要根据实际DDL修改的地方。同时,可能还需要提到一些工具,如MyBatis Generator或Hibernate Tools,可以自动生成POJO类,但手动编写更灵活可控。</think>根据Oracle表结构生成Java实体类需要遵循数据类型映射和命名规范。以下是详细步骤:
### 一、数据类型映射示例
假设表结构如下:
$$CREATE TABLE LMIS.RK_YJMINFO_PRODUCTLINE (
PRODUCTLINE_ID NUMBER(10) PRIMARY KEY,
LINE_NAME VARCHAR2(50) NOT NULL,
PRODUCTION_CAPACITY NUMBER(12,2),
CREATED_DATE DATE DEFAULT SYSDATE,
IS_ACTIVE CHAR(1)
)$$
### 二、Java实体类实现
```java
import javax.persistence.*;
import java.math.BigDecimal;
import java.util.Date;
@Entity
@Table(name = "RK_YJMINFO_PRODUCTLINE", schema = "LMIS")
public class RkYjminfoProductline {
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE)
@Column(name = "PRODUCTLINE_ID")
private Integer productlineId;
@Column(name = "LINE_NAME", nullable = false, length = 50)
private String lineName;
@Column(name = "PRODUCTION_CAPACITY", precision = 12, scale = 2)
private BigDecimal productionCapacity;
@Temporal(TemporalType.DATE)
@Column(name = "CREATED_DATE", insertable = false)
private Date createdDate;
@Column(name = "IS_ACTIVE", length = 1)
private String isActive;
// Getters/Setters
}
```
### 三、关键实现要点
1. **命名规范转换**
- 表名转类名:$RK_YJMINFO_PRODUCTLINE$ ➔ RkYjminfoProductline
- 列名转属性名:$PRODUCTLINE_ID$ ➔ productlineId
2. **特殊类型处理**
- NUMBER(10) ➔ Integer
- NUMBER(12,2) ➔ BigDecimal
- DATE ➔ java.util.Date
- CHAR(1) ➔ String
3. **注解配置**
- @Table指定schema时需配置hibernate.default_schema参数[^1]
- TemporalType处理日期精度问题
- insertable=false用于数据库默认值
### 四、自动化工具建议
对于复杂表结构,可使用:
1. **JPA Buddy**(IntelliJ插件)
2. **Hibernate Tools**逆向工程
3. **MyBatis Generator**生成基础POJO
package com.alibaba.otter.canal.server.embedded.handle.oracle; import com.alibaba.otter.canal.instance.core.CanalInstance; import com.alibaba.otter.canal.meta.FileMixedMetaManager; import com.alibaba.otter.canal.parse.CanalEventParser; import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser; import com.alibaba.otter.canal.parse.support.AuthenticationInfo; import com.alibaba.otter.canal.protocol.FlatMessage; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.server.embedded.handle.Constant; import com.alibaba.otter.canal.server.embedded.handle.NotMysqlHandler; import com.alibaba.otter.canal.server.exception.CanalServerException; import com.alibaba.otter.canal.utils.OracleUtil; import com.google.common.collect.Lists; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.sql.*; import java.text.SimpleDateFormat; import java.util.Date; import java.util.*; import java.util.concurrent.atomic.AtomicLong; /** * @author : yangpeng * create at: 2022/5/21 13:09 * @description: oracle连接实现 */ public class OracleHandler implements NotMysqlHandler { private static final Logger logger = LoggerFactory.getLogger(OracleHandler.class); private final static String ORACLE_DRIVER_URL = "jdbc:oracle:thin:@"; private final static String DATA_DICTIONARY_PATH = "/oracle/oradata/orcl/logmnr"; private Connection connection; private AuthenticationInfo authInfo; private AtomicLong currentScn = new AtomicLong(0); private Map<String, List<String>> pkMaps; private List<String> whiteList; //归档日志本文件最后一个点位-1, 下一个文件的开始点位 private Long nextChange = null; private Long firstChange = null; private String currentModel; private String currentRedoGroups; private String lastRedoGroups; private String opt = ">="; private List<String> action = new ArrayList<>(); static { //加载驱动 try { Class.forName("oracle.jdbc.OracleDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } } @Override public void subscribe(CanalInstance canalInstance) { //数据库基本信息 CanalEventParser eventParser = canalInstance.getEventParser(); MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser; this.authInfo = mysqlEventParser.getMasterInfo(); connect(); String filterRegex = authInfo.getFilterRegex(); if(!org.springframework.util.StringUtils.isEmpty(filterRegex) && !filterRegex.endsWith(".*")){ whiteList = Arrays.asList(filterRegex.split(",")); } logger.warn("订阅实例: {}, 过滤规则: {}, 初始 SCN: {}", canalInstance.getDestination(), filterRegex, currentScn.get()); // 新增日志 String index = FileMixedMetaManager.getScnLocal(canalInstance.getDestination()); //首次启动, 文件中的index是0, 同步类型: 全量+增量时使用衔接点位作为开始点位, //增量时衔接点位可能为空, 为空时获取系统最新点位作为开始点位 if("0".equals(index)){ String scn = authInfo.getTimestamp(); if(StringUtils.isEmpty(scn)) scn = getStartPosition(); try { this.currentScn.set(Long.valueOf(scn)); FileMixedMetaManager.saveScnLocal(canalInstance.getDestination(), Long.valueOf(scn)); } catch (IOException e) { e.printStackTrace(); } }else{ //index.scn文件中是非0点位, 说明当前实例运行被停止了或者被删除后实例重建了, 再次启动时比较文件点位和衔接点位 //衔接点位大于文件点位说明是删除后实例重建了, 删除后重建实例应使用最新的衔接点位开始增量同步 //衔接点位小于等于文件点位说明是暂停后重启了, 暂停后重启应使用文件中点位继续增量同步 String scn = authInfo.getTimestamp(); if(!StringUtils.isEmpty(scn) && Long.valueOf(scn) > Long.valueOf(index)){ try { this.currentScn.set(Long.valueOf(scn)); FileMixedMetaManager.saveScnLocal(canalInstance.getDestination(), Long.valueOf(scn)); } catch (IOException e) { e.printStackTrace(); } }else{ opt = ">"; this.currentScn.set(Long.valueOf(index)); } } if(StringUtils.isEmpty(authInfo.getAction())){ action.add("INSERT"); action.add("UPDATE"); action.add("DELETE"); }else{ action = Arrays.asList(authInfo.getAction().split(",")); } logger.warn("开始分析的点位:"+this.currentScn.get()); currentRedoGroups = queryCurrentRedoGroups(); lastRedoGroups = currentRedoGroups; queryLastArchivedLogScn(); authInfo.setArchived(true); updateSettings(); //获取主键 pkMaps = new HashMap<>(); for (String table : whiteList) { List<String> pkNames = getPkNames(authInfo.getDefaultDatabaseName(), table); pkMaps.put(authInfo.getDefaultDatabaseName()+"."+table, pkNames); } } private String queryCurrentRedoGroups(){ //获取当前正在使用的redo日志文件的相关组信息 Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery("SELECT GROUP# FROM V$LOG WHERE STATUS = 'CURRENT'"); resultSet.next(); return resultSet.getString(1); } catch (SQLException e) { logger.warn("Query Current Redo Group failed!"); } finally { close(statement, resultSet); } return ""; } private void queryLastArchivedLogScn(){ Statement statement = null; ResultSet resultSet = null; try { logger.warn("获取最后一个归档日志的开始点位和结束点位..."); statement = connection.createStatement(); String sql = "SELECT FIRST_CHANGE#, NEXT_CHANGE# FROM v$archived_log WHERE DEST_ID = 1 ORDER BY RECID DESC"; logger.warn(sql); resultSet = statement.executeQuery(sql); if(resultSet.next()){ firstChange = resultSet.getLong(1); nextChange = resultSet.getLong(2); } } catch (Exception e) { logger.warn("query last one Archive log startScn and endScn failed!"); } finally { close(statement, resultSet); } } private void isArchived(ArrayList<String> logFiles){ Statement statement = null; ResultSet resultSet = null; try { logger.warn("分析归档日志..."); currentModel = "archive"; statement = connection.createStatement(); String sql = "SELECT NAME, FIRST_CHANGE#, NEXT_CHANGE# FROM v$archived_log WHERE DEST_ID = 1 AND " + "FIRST_CHANGE# <= "+currentScn.get()+" AND NEXT_CHANGE# > "+currentScn.get(); logger.warn(sql); logger.warn("### [ARCHIVE] 查询归档日志: {}", sql); resultSet = statement.executeQuery(sql); logger.warn("分析归档日志,当前 SCN: {}, 匹配条件: FIRST_CHANGE# <= {} AND NEXT_CHANGE# > {}", currentScn.get(), currentScn.get(), currentScn.get()); // 新增日志 if(resultSet.next()){ String NAME = resultSet.getString(1); firstChange = resultSet.getLong(2); nextChange = resultSet.getLong(3); logger.warn("找到归档日志文件: {}, SCN 范围: [{}, {})", NAME, firstChange, nextChange); // 新增日志 //添加归档日志 logFiles.add(NAME); if(!Objects.equals(currentRedoGroups, lastRedoGroups)){ lastRedoGroups = currentRedoGroups; } } if(CollectionUtils.isEmpty(logFiles) && !Objects.equals(currentRedoGroups, lastRedoGroups)){ int num = 0; while (true){ resultSet = statement.executeQuery(sql); if(resultSet.next()){ String NAME = resultSet.getString(1); firstChange = resultSet.getLong(2); nextChange = resultSet.getLong(3); //添加归档日志 logFiles.add(NAME); } //在线日志写入到归档异步进行, 可能刚切换归档日志还没有落地, 若没获取到日志文件就阻塞, 最大60秒 if(CollectionUtils.isEmpty(logFiles)){ Thread.sleep(1000L); num++; logger.warn("在线日志写入到归档未完成, 线程已等待: "+num+"秒"); if(num >= 60) { logger.warn("在线日志写入到归档未完成等待60秒未完成, 默认放弃分析本次归档日志, 可能造成数据丢失, 丢失可能发生的点位:"+currentScn.get()); break; } } else{ lastRedoGroups = currentRedoGroups; break; } } } } catch (Exception e) { logger.warn("Archive log parsing failed!"); } finally { close(statement, resultSet); } } private void addLogfile(ArrayList<String> logFiles){ if(CollectionUtils.isEmpty(logFiles)){ Statement statement = null; ResultSet resultSet = null; try { logger.warn("分析在线日志..."); currentModel = "online"; statement = connection.createStatement(); //分析前校验日志组是否发生了变更, 发生变更的话从分析归档开始 String group = queryCurrentRedoGroups(); if(!currentRedoGroups.equals(group)){ currentRedoGroups = group; isArchived(logFiles); }else{ String sql = "SELECT f.member,g.group#,g.FIRST_CHANGE# FROM v$log g left join v$logfile f on g.group# = f.group# " + "where g.FIRST_CHANGE# <= " + currentScn.get() + " AND g.NEXT_CHANGE# > " + currentScn.get(); resultSet = statement.executeQuery(sql); logger.warn(sql); logger.warn("### [ONLINE] 查询在线日志: {}", sql); while (resultSet.next()) { String fileName = resultSet.getString(1); if(!fileName.contains("sredo")){ logFiles.add(fileName); firstChange = resultSet.getLong(3); } } } } catch (SQLException e) { logger.warn("Description Failed to query online logs!"); } finally { close(statement, resultSet); } } } private void offsetLogfile(ArrayList<String> logFiles){ if(CollectionUtils.isEmpty(logFiles)){ Statement statement = null; ResultSet resultSet = null; try { logger.warn("补偿措施, 分析最新的在线日志..."); currentModel = "online"; statement = connection.createStatement(); String sql = "SELECT f.member,g.group#,g.FIRST_CHANGE# FROM v$log g left join v$logfile f on g.group# = f.group# where g.status = 'CURRENT'"; resultSet = statement.executeQuery(sql); logger.warn(sql); while (resultSet.next()) { String fileName = resultSet.getString(1); if(!fileName.contains("sredo")){ logFiles.add(fileName); firstChange = resultSet.getLong(3); } } if(!CollectionUtils.isEmpty(logFiles)){ currentScn.set(firstChange); opt = ">="; } } catch (SQLException e) { logger.warn("Description Failed to query online logs!"); } finally { close(statement, resultSet); } } } private void setSqlTrace() { String setSqlTraceSql = "BEGIN\n" + " DBMS_SESSION.SET_SQL_TRACE(FALSE);\n" + " END;"; logger.warn(setSqlTraceSql); getCallableStatement(setSqlTraceSql); } private void setLogAnalyseDir(){ String createDictSql = "BEGIN dbms_logmnr_d.build(dictionary_filename =>'" + authInfo.getOraName() + "', dictionary_location =>'" + DATA_DICTIONARY_PATH + "'); END;"; logger.warn(createDictSql); getCallableStatement(createDictSql); } private void addAllfile(ArrayList<String> logFiles){ logger.warn("添加 LogMiner 日志文件: {}", logFiles); // 新增日志 StringBuilder sbSQL = new StringBuilder(); //logmnr最大分析文件个数3 for (int i = 0; i < logFiles.size(); i++) { if (i == 0) { sbSQL.append("BEGIN\n"); } sbSQL.append("dbms_logmnr.add_logfile(logfilename=>'").append(logFiles.get(i)).append("',options=>dbms_logmnr.") .append(i == 0 ? "new" : "addfile").append(");\n"); } logFiles.clear(); sbSQL.append("END;\n"); logger.warn(sbSQL.toString()); getCallableStatement(sbSQL.toString()); } public void updateSettings() { ArrayList<String> logFiles = new ArrayList<>(); // 禁用当前会话的SQL跟踪功能 setSqlTrace(); // 设置日志分析目录 // setLogAnalyseDir(); // 是否分析归档日志 isArchived(logFiles); // 分析在线日志 addLogfile(logFiles); // 归档和在线都没有符合点位的文件, 可能是停机久了再次启动时归档文件被迁移或删除了, 从最新的在线日志文件开始点位开始分析 if (logFiles.isEmpty()) offsetLogfile(logFiles); // 归档丢失, 在线不存在, 数据库可能出现故障 if (logFiles.isEmpty()) throw new RuntimeException("The Oracle log file was not read"); // 添加所有日志文件 addAllfile(logFiles); logger.warn("### [LOGMINER-CFG] 模式: {}, 起始SCN: {}, 加载文件: {}", currentModel, currentScn.get(), logFiles); try { // 开启logmnr视图 String startsql = "BEGIN\n" + "dbms_logmnr.start_logmnr(startScn=>'" + currentScn.get() + "'" //开始的scn号 // + ",dictfilename=>'" + DATA_DICTIONARY_PATH + "/" + authInfo.getOraName() + "'" //字典路径 + ",options=>" + " dbms_logmnr.dict_from_online_catalog" + //使用在线数据字典来查看相关的元数据信息 " + dbms_logmnr.skip_corruption" + //跳过损坏的日志快 " + dbms_logmnr.no_sql_delimiter" + //去掉SQL分隔符 " + dbms_logmnr.no_rowid_in_stmt" + //去掉SQL中的ROWID // " + dbms_logmnr.committed_data_only" + //去掉未提交事务的数据 " + dbms_logmnr.string_literals_in_stmt) ;\n" //输出SQL语句中的字符串常量 + "END;"; logger.warn(startsql); logger.warn("### [DICTIONARY] 使用{}数据字典", startsql.contains("dict_from_online_catalog") ? "在线" : "持久化"); getCallableStatement(startsql); logger.warn("dbms_logmnr successfully! 当前分析模式: " + currentModel); } catch (Exception e) { logger.error(e.getMessage(), e); } } private void logSwitchVerify(){ String crgs = queryCurrentRedoGroups(); if(StringUtils.isNotEmpty(crgs) && !crgs.equals(currentRedoGroups)){ logger.warn("### [REDO-SWITCH] 日志组切换! 旧组: {} → 新组: {}", currentRedoGroups, crgs); currentRedoGroups = crgs; unsubscribe(); updateSettings(); } } @Override public synchronized Message getWithoutAck(CanalInstance canalInstance) { try { if (connection.isClosed()) { reconnect(); } } catch (Exception e) { logger.error(e.getMessage(), e); } logSwitchVerify(); List<FlatMessage> data = new ArrayList<>(); Statement statement = null; ResultSet resultSet = null; String sql =""; try { statement = connection.createStatement(); long endScn = currentScn.get(); String queryLogSql = "SELECT scn,operation,sql_redo,table_space,seg_owner,table_name,timestamp,csf FROM v$logmnr_contents " + "where scn %s %d and table_name = '%s' and operation in('INSERT','DELETE','UPDATE') order by scn asc"; //logger.warn(String.format(queryLogSql, opt, currentScn.get(), whiteList.get(0))); resultSet = statement.executeQuery(String.format(queryLogSql, opt, currentScn.get(), whiteList.get(0))); opt = ">"; long lastScn = currentScn.get(); while (resultSet.next()) { endScn = resultSet.getLong("scn"); String operation = resultSet.getString(Constant.operation); if(!action.contains(operation.toUpperCase())) continue; // String segOwner = resultSet.getString(Constant.seg_owner); // String tableSpace = resultSet.getString(Constant.table_space); String tableName = resultSet.getString(Constant.table_name); sql = resultSet.getString(Constant.sql_redo); logger.warn("### [ORACLE-LOG] 解析到原始SQL - SCN: {}, 操作: {}, 表: {}, SQL: {}", endScn, operation, tableName, sql); if(StringUtils.isEmpty(sql)){ logger.error(operation + "->" + tableName+", sql为空默认跳过"); continue; } FlatMessage flatMessage = new FlatMessage(); String timestamp = resultSet.getString(Constant.timestamp); if(!StringUtils.isEmpty(timestamp)){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long es = sdf.parse(timestamp).getTime() / 1000; flatMessage.setEs(es); }else{ long es = new Date().getTime() / 1000; flatMessage.setEs(es); } flatMessage.setDatabase(authInfo.getDefaultDatabaseName()); flatMessage.setTable(tableName); flatMessage.setIsDdl(false); flatMessage.setType(operation); flatMessage.setTs(System.currentTimeMillis()); flatMessage.setSql(sql); OracleUtil.toMqformat(flatMessage);//转换sql为mq所需json格式 if(null == flatMessage.getData()) { logger.warn("异常SQL: " + sql); continue; } logger.warn("### [CANAL-MSG] 转换后消息 - 类型: {}, 主键: {}, 数据: {}", flatMessage.getType(), flatMessage.getPkNames(), flatMessage.getData()); //主键设置 List<String> list = pkMaps.get(authInfo.getDefaultDatabaseName() + "." + tableName); if(!CollectionUtils.isEmpty(list)){ flatMessage.setPkNames(list); }else{ flatMessage.setPkNames(new ArrayList<>()); } data.add(flatMessage); } if(endScn != currentScn.get()){ logger.warn("### [SCN-UPDATE] 更新SCN: 旧={}, 新={}", currentScn.get(), endScn); currentScn.set(endScn); FileMixedMetaManager.saveScnLocal(canalInstance.getDestination(), currentScn.get()); } //本次的归档日志已分析结束, 结束本次会话, 开始下次分析 if("archive".equals(currentModel) && CollectionUtils.isEmpty(data)){ currentScn.set(nextChange); opt = ">="; FileMixedMetaManager.saveScnLocal(canalInstance.getDestination(), currentScn.get()); logger.warn("本轮归档日志已分析结束, 结束本次会话, 开始下次分析, 开始点位: "+nextChange); unsubscribe(); updateSettings(); } if (!CollectionUtils.isEmpty(data)) { return new Message(lastScn, data, Constant.NOT_MYSQL); } } catch (Exception e) { logger.error("oracle logminer select v$logmnr_contents failed"); logger.warn(sql); logger.error("### [ERROR] SQL解析异常: {}", sql, e); StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); // 将出错的栈信息输出到printWriter中 e.printStackTrace(pw); pw.flush(); sw.flush(); try { sw.close(); } catch (IOException ex) { throw new RuntimeException(ex); } pw.close(); unsubscribe(); updateSettings(); } finally { close(statement, resultSet); } return new Message(-1, true, null); } //TRIGGER private String getStartPosition() { String sql = "SELECT MAX(CURRENT_SCN) CURRENT_SCN FROM GV$DATABASE"; Statement statement = null; ResultSet resultSet = null; try { statement = connection.createStatement(); resultSet = statement.executeQuery(sql); while (resultSet.next()) { long scn = resultSet.getLong(1); return String.valueOf(scn); } } catch (SQLException e) { e.printStackTrace(); } finally { close(statement, resultSet); } return "0"; } public List<String> getPkNames(String datasourceName, String tableName) { List<String> pkNames = Lists.newArrayList(); Statement statement = null; ResultSet resultSet = null; logger.warn("### [PRIMARY-KEY] 查询主键: {}.{}", datasourceName, tableName); try { DatabaseMetaData metaData = connection.getMetaData(); resultSet = metaData.getPrimaryKeys(datasourceName, null, tableName); while (resultSet.next()) { String pkName = resultSet.getString("COLUMN_NAME"); // +++ 新增: 记录找到的主键 +++ logger.warn("### [PRIMARY-KEY] 找到主键列: {}", pkName); pkNames.add(pkName); } if (pkNames.isEmpty()) { logger.error("### [PRIMARY-KEY-WARN] 表 {}.{} 未找到主键!", datasourceName, tableName); } else { logger.warn("### [PRIMARY-KEY] 共找到 {} 个主键列", pkNames.size()); } return pkNames; } catch (Exception e) { logger.error(datasourceName+"."+tableName+"oracle get table primary key returns null", e); } finally { close(statement, resultSet); } return null; } private void close(Statement statement, ResultSet resultSet){ try { if (resultSet != null) resultSet.close(); } catch (SQLException e1) { e1.printStackTrace(); } finally { try { if (statement != null) statement.close(); } catch (SQLException e) { e.printStackTrace(); } } } @Override public void connect() { String jdbcUrl = ORACLE_DRIVER_URL + authInfo.getAddress().getHostString() + ":" + authInfo.getAddress().getPort() + ":" + authInfo.getServerName(); try { this.connection = DriverManager.getConnection(jdbcUrl, authInfo.getUsername(), authInfo.getPassword()); logger.warn("成功连接到 Oracle: {}:{}", authInfo.getAddress().getHostString(), authInfo.getAddress().getPort()); // 新增日志 } catch (SQLException e) { e.printStackTrace(); } } @Override public void reconnect() { disconnect(); connect(); } @Override public void disconnect() { if (connection != null) { try { unsubscribe(); connection.close(); } catch (SQLException e) { e.printStackTrace(); } } } @Override public void ack(CanalInstance canalInstance) { return; } @Override public boolean checkConsistent(CanalInstance canalInstance) { CanalEventParser eventParser = canalInstance.getEventParser(); MysqlEventParser parser = (MysqlEventParser) eventParser; AuthenticationInfo newAuthInfo = parser.getMasterInfo(); String newUrl = newAuthInfo.getAddress().getHostName() + newAuthInfo.getAddress().getPort(); String oldUrl = this.authInfo.getAddress().getHostName() + this.authInfo.getAddress().getPort(); if (Objects.equals(newUrl, oldUrl)) { return true; } return false; } @Override //回退到指定位置 public void rollback(CanalInstance canalInstance, Long batchId) { try { FileMixedMetaManager.saveScnLocal(canalInstance.getDestination(), batchId); currentScn.set(batchId); } catch (IOException e) { e.printStackTrace(); } logger.warn("回退到 SCN: {}, 实例: {}", batchId, canalInstance.getDestination()); // 新增日志 } public void getCallableStatement(String sql) { CallableStatement callableStatement = null; try { callableStatement = connection.prepareCall(sql); callableStatement.execute(); } catch (SQLException e) { logger.error(e.getMessage(), e); e.printStackTrace(); StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); // 将出错的栈信息输出到printWriter中 e.printStackTrace(pw); logger.error("执行 SQL 失败: {}, 错误码: {}, 堆栈: {}", sql, e.getErrorCode(), sw.toString()); // 新增日志 pw.flush(); sw.flush(); try { sw.close(); } catch (IOException ex) { throw new RuntimeException(ex); } pw.close(); if (e.getErrorCode() == 1291) { throw new CanalServerException("1291"); } } finally { close(callableStatement, null); } } /** * description: 关闭查询会话 * * @param {} * @return void */ public void unsubscribe() { logger.warn("### [SESSION-END] 结束会话! 当前SCN: {}", currentScn.get()); String endLogmnrSql = "BEGIN\n" + " dbms_logmnr.end_logmnr;\n" + " END;"; logger.warn(endLogmnrSql); getCallableStatement(endLogmnrSql); logger.warn("end_logmnr succeeded !"); }你觉得增量数据未同步的原因是啥 分析下代码 }
<think>嗯,用户这次的问题很具体啊,是关于Oracle增量数据未同步的原因分析,重点提到了Canal的OracleHandler、LogMiner配置和代码逻辑这几个方面。看来用户已经在生产环境遇到了同步中断的问题,需要快速定位原因。用户提到"增量数据未同步",这个表述比较笼统,可能需要引导用户提供更具体的现象描述。不过从技术角度看,这类问题通常集中在三个层面:数据源配置、同步中间件、目标端消费。用户特别强调了LogMiner的配置问题,说明ta可能已经排除了网络等基础问题。注意到用户引用了两篇关于MySQL同步的文章,但实际问题是关于Oracle的。这种情况挺常见的,很多开发者会把MySQL的经验直接套用到Oracle上。不过Oracle的LogMiner和MySQL的binlog机制差异很大,需要重点提醒用户注意Oracle特有的SupplementalLog设置。用户的问题拆解下来主要是四个技术点:LogMiner配置验证(包括归档日志设置)、Canal权限问题、OracleHandler代码逻辑(尤其事务过滤)、目标端写入异常。其中SupplementalLog是关键,很多团队都会在这里栽跟头——Oracle默认不记录键值,导致无法增量更新。用户应该需要具体的排查步骤而非理论说明。考虑把日志分析放在第一步很实用,因为Canal的日志格式比较规范,position记录、missingarchivelog报错都是明显线索。另外提醒用户注意切换Oracle归档路径的配置也很有必要,这是容易忽略的点。关于事务过滤的代码检查,用户可能需要更详细的示例。考虑在子表排查那里补充一个典型的判断逻辑会更好,比如检查是否因为插入记录缺少关键字段导致整批事务被跳过。最后考虑补充目标端监控的建议也很重要。之前有个案例就是因为Kafka主题写满导致同步阻塞,但Canal服务本身显示正常。这种情况用kafka-console-consumer就能快速验证。</think>
Oracle增量数据未同步的问题可能涉及**LogMiner配置、Canal组件逻辑、权限异常或数据过滤机制失效**,以下是系统性排查方案:
---
### 一、LogMiner配置问题(核心因素)
1. **归档日志缺失或损坏**
- 确认 `v$logmnr_contents` 是否生成有效日志:
```sql
SELECT COUNT(*) FROM v$logmnr_contents;
```
- 检查归档路径配置:
```sql
-- 查看归档模式状态
SELECT log_mode FROM v$database;
-- 确认归档日志路径是否可访问
SELECT * FROM v$logfile;
```
- **故障点**:归档日志未启用、存储空间不足或路径权限错误会导致LogMiner无法捕获增量变更[^1]。
2. **补充日志未启用(Supplemental Logging)**
LogMiner依赖补充日志记录主键/唯一键:
```sql
-- 检查补充日志状态
SELECT supplemental_log_data_min FROM v$database;
-- 开启补充日志(需DBA权限)
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER TABLE <表名> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
```
**未开启补充日志将导致UPDATE/DELETE语句无法解析出完整数据**。
3. **LogMiner会话异常**
- 查询活跃会话状态:
```sql
SELECT status FROM v$logmnr_session WHERE session_id = <Canal使用的SESSION_ID>;
```
- **长时间未重启LogMiner会导致内存泄漏,需周期性重启进程**[^1]。
---
### 二、Canal OracleHandler 逻辑故障
1. **元数据解析失败**
- 检查日志关键词:
```
ORA-01291: missing logfile
ORA-01333: failed to establish LogMiner session
```
- **定位点丢失**:确认Canal `meta.dat` 中的SCN是否有效:
```sql
-- 在Oracle中验证SCN有效性
SELECT scn FROM v$log WHERE scn > <Canal记录的SCN>;
```
- **解决方法**:重置Canal元数据(谨慎操作)。
2. **事务过滤机制误触发**
- **排除系统表变更**:检查 `canal.properties` 过滤规则:
```properties
canal.instance.filter.regex=.*\\..*
canal.instance.filter.black.regex=SYSTEM\\.\\*
```
- **事务合并延迟**:大事务拆解超时可能导致同步延迟。
---
### 三、代码逻辑问题排查路径
1. **连接中断重试机制**
验证网络中断后是否自动恢复连接:
```java
// OracleHandler 需实现Session断连重试逻辑
while (isRunning) {
try {
minerSession.start(<scn>);
} catch (LogMinerException e) {
reconnect(); // 指数退避重连
}
}
```
**同步线程阻塞会直接中断数据流**。
2. **目标端写入失败未处理**
- 检查Kafka生产者状态:
```java
producer.send(record, (metadata, exception) -> {
if (exception != null) {
canalRollback(); // 回滚到未ACK的位点
}
});
```
- **目标端写入异常若未触发位点回滚,将导致增量数据永久丢失**。
---
### 四、快速诊断流程
```mermaid
graph TD
A[数据未同步] --> B[检查Canal日志错误码]
B -->|ORA-日志错误| C[验证LogMiner配置]
B -->|无新日志| D[确认Oracle补充日志]
C --> E[检查归档路径权限]
D --> F[开启补充日志]
B -->|位点未更新| G[重置Canal元数据]
A --> H[检查Kafka是否收到消息]
H -->|无消息| I[验证OracleHandler线程状态]
H -->|有消息未消费| J[检查目标端消费者]
```
阅读全文
相关推荐















