Oracle XStream CDC Source Connector for Confluent Cloud¶
The fully-managed Oracle XStream CDC Source connector for Confluent Cloud captures all changes made to rows in an Oracle database and represents the changes as change event records in Apache Kafka® topics. The connector can be configured to capture changes from a subset of tables in a database by using an include regular expression to match the table identifiers. It can also be configured to not capture tables that match a separate exclude regular expression.
This Quick Start is for the fully-managed Confluent Cloud connector. If you are installing the connector locally for Confluent Platform, see Oracle XStream CDC Source for Confluent Platform.
If you require private networking for fully-managed connectors, make sure to set up the proper networking beforehand. For more information, see Manage Networking for Confluent Cloud Connectors.
Supported Versions¶
Be sure to review the following information before using the Oracle XStream CDC Source connector.
Oracle versions¶
The connector is compatible with the following Oracle versions:
- Oracle 19c Enterprise Edition
- Oracle 21c Enterprise Edition
The connector supports Oracle Exadata.
The connector supports Oracle Database 19c using the non-CDB architecture on Amazon RDS for Oracle. For more information, see Working with Amazon RDS for Oracle.
Java versions¶
The connector requires Java version 17 or higher.
Limitations¶
Be sure to review the following information:
- The connector has not been tested against managed database services from cloud service providers (CSPs), other than Amazon RDS for Oracle.
- The connector does not work with Oracle Autonomous Databases and Oracle Standby databases (using Oracle Data Guard).
- The connector does not support Downstream Capture configurations.
- If you plan to use Confluent Cloud Schema Registry, see Schema Registry Enabled Environments.
- If you plan to use one or more Single Message Transforms (SMTs), see SMT Limitations.
- The connector does not support the following Single Message Transforms (SMTs): GzipDecompress, TimestampRouter, and MessageTimestampRouter.
Manage custom offsets¶
You can manage the offsets for this connector. Offsets provide information on the point in the system from which the connector is accessing data. For more information, see Manage Offsets for Fully-Managed Connectors in Confluent Cloud.
To manage offsets:
- Use Confluent Cloud APIs. For more information, see Cluster API reference.
- Use either an Oracle System Change Number (SCN) or Logical Change Record (LCR) position.
To get the current offset, make a GET request that specifies the environment, Kafka
cluster, and connector name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets
Host: https://api.confluent.cloud
Response:
Successful calls return HTTP 200 with a JSON payload that describes the offset.
The following example shows the offset once the snapshot has been completed.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "4374567",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
],
"metadata": {
"observed_at": "2025-03-01T12:30:00.151015100Z"
}
}
The following example shows the offset once streaming is in progress.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "000000000044150e0000000100000001000000000044150d000000140000000102"
}
}
],
"metadata": {
"observed_at": "2025-03-01T12:30:00.151015100Z"
}
}
Responses include the following information:
- The position of latest offset.
- The observed time of the offset in the metadata portion of the payload. The
observed_attime indicates a snapshot in time for when the API retrieved the offset. A running connector is always updating its offsets. Useobserved_atto get a sense for the gap between real time and the time at which the request was made. By default, offsets are observed every minute. CallingGETrepeatedly will fetch more recently observed offsets. - Information about the connector.
To update the offset, make a POST request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies new offset and a patch type.
The following example shows how to update the offset using an SCN.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "4374567",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
]
}
The following example shows how to update the offset using an LCR position.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "PATCH",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "0000000000432bd400000001000000010000000000432bd3000000140000000102"
}
}
]
}
Considerations:
- You can only make one offset change at a time for a given connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- For source connectors, the connector attempts to read from the position defined by the requested offsets.
Response:
Successful calls return HTTP 202 Accepted with a JSON payload that describes the offset.
The following example shows the response to an offset update request using an SCN.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "4374567",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "PATCH"
}
The following example shows the response to an offset update request using an LCR position.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "0000000000432bd400000001000000010000000000432bd3000000140000000102"
}
}
],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "PATCH"
}
Responses include the following information:
- The requested position of the offsets in the source.
- The time of the request to update the offset.
- Information about the connector.
To delete the offset, make a POST request that specifies the environment, Kafka cluster, and connector
name. Include a JSON payload that specifies the delete type.
POST /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request
Host: https://api.confluent.cloud
{
"type": "DELETE"
}
Considerations:
- Delete requests delete the offset for the provided partition and reset to the base state. A delete request is as if you created a fresh new connector.
- This is an asynchronous request. To check the status of this request, you must use the check offset status API. For more information, see Get the status of an offset request.
- Do not issue delete and patch requests at the same time.
- For source connectors, the connector attempts to read from the position defined in the base state.
Response:
Successful calls return HTTP 202 Accepted with a JSON payload that describes the result.
{
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "DELETE"
}
Responses include the following information:
- Empty offsets.
- The time of the request to delete the offset.
- Information about Kafka cluster and connector.
- The type of request.
To get the status of a previous offset request, make a GET request that specifies the environment, Kafka cluster, and connector
name.
GET /connect/v1/environments/{environment_id}/clusters/{kafka_cluster_id}/connectors/{connector_name}/offsets/request/status
Host: https://api.confluent.cloud
Considerations:
- The status endpoint always shows the status of the most recent PATCH/DELETE operation.
Response:
Successful calls return HTTP 200 with a JSON payload that describes the result. The following is an example
of an applied patch.
{
"request": {
"id": "lcc-example",
"name": "{connector_name}",
"offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"scn": "2778404",
"snapshot": "INITIAL",
"snapshot_completed": true
}
}
],
"requested_at": "2025-03-01T12:30:00.151015100Z",
"type": "PATCH"
},
"status": {
"phase": "APPLIED",
"message": "The Connect framework-managed offsets for this connector have been altered successfully. However, if this connector manages offsets externally, they will need to be manually altered in the system that the connector uses."
},
"previous_offsets": [
{
"partition": {
"server": "{topic_prefix}"
},
"offset": {
"lcr_position": "00000000002a8684000000010000000100000000002a8680000000010000000102"
}
}
],
"applied_at": "2025-03-01T12:30:10.151015100Z"
}
Responses include the following information:
- The original request, including the time it was made.
- The status of the request: applied, pending, or failed.
- The time you issued the status request.
- The previous offsets. These are the offsets that the connector last updated prior to updating the offsets. Use these to try to restore the state of your connector if a patch update causes your connector to fail or to return a connector to its previous state after rolling back.
JSON payload¶
You can use either an Oracle System Change Number (SCN) or Logical Change Record (LCR) position to update the offsets of the Oracle XStream CDC Source connector.
The table below offers a description of the unique fields in the JSON payload for managing offsets of the connector using an Oracle SCN.
| Field | Definition | Required/Optional |
|---|---|---|
scn |
The SCN of the change. It is set to the current SCN during the snapshot phase and obtained from the LCR position during the streaming phase. To manage offsets, set this to the SCN from which to resume streaming. |
Required |
snapshot |
Indicates the type of snapshot. It is set to To manage offsets, set this to |
Required |
snapshot_completed |
Indicates whether the snapshot has been completed. To manage offsets, set this to |
Required |
The table below offers a description of the unique fields in the JSON payload for managing offsets of the connector using an LCR position.
| Field | Definition | Required/Optional |
|---|---|---|
lcr_position |
The position of the LCR. Set in streaming phase only. To manage offsets, set this to the LCR position from which to resume streaming. |
Required |
General considerations¶
An existing outbound server can be used if the SCN or LCR position used in the offsets request is equal to or greater than the outbound server’s processed low position. To find the processed low position for an outbound server, see Displaying the Processed Low Position for an Outbound Server.
An existing outbound server cannot be used if the SCN or LCR position used in the offsets request is earlier than the outbound server’s processed low position. In this case, a new capture process and outbound server much be created with a first SCN and start SCN that precede the SCN or LCR position used in the offset.
- The first SCN and start SCN must be a valid SCN and present in the redo log files available to the capture process.
- The first SCN can be set to any value returned by the following query:
SELECT DISTINCT FIRST_CHANGE#, NAME FROM V$ARCHIVED_LOG WHERE DICTIONARY_BEGIN = 'YES'- The value returned in the
NAMEcolumn indicates the redo log file containing the SCN corresponding to the first SCN. This redo log file, along with all subsequent redo log files, must be available to the capture process. If the query returns multiple distinctFIRST_CHANGE#values, choose the first SCN value that is most appropriate for the capture process you are creating.
- The start SCN specified must be greater than or equal to the first SCN for the capture process.
To create a new connector with offsets, set the SCN in the offset and then start the connector in
recoverysnapshot mode using thesnapshot.modeconfiguration property. The connector will first capture a snapshot of the schema for the capture tables, populating the schema history topic, and then begin streaming from the specified SCN in the offset. Once the recovery process is complete and streaming begins, reset thesnapshot.modeto eitherinitialorno_datato prevent the connector from initiating a recovery upon future restarts.Important
The connector will fail if there have been schema changes to the captured tables after the specified SCN.
To update the offsets of an existing connector, set the SCN or LCR position in the offset.
Important
The connector will fail if there have been schema changes to the capture tables between the SCN or LCR position specified in the offsets request and the connector’s last processed SCN or LCR position.
Migrate connectors¶
Considerations:
The self-managed connector must be operating in streaming mode. If the self-managed connector is still in the process of making a snapshot, you can either create a new connector on Confluent Cloud which starts the snapshot process from the beginning or wait for the snapshot process to complete and follow the migration guidance.
The configurations of the self-managed connector must match the configurations of the fully-managed connector. You need to set the
snapshot.modeconfiguration property torecovery. This ensures that the connector will first capture a snapshot of the schema for the capture tables, populating the schema history topic, and then begin streaming from the specified SCN in the offset.Important
The connector will fail if there have been schema changes to the captured tables after the specified SCN.
Quick Start¶
Use this quick start to get up and running with the Confluent Cloud Oracle XStream CDC Source connector. The quick start provides the basics of selecting the connector and configuring it to obtain a snapshot of the existing data in an Oracle database and then monitoring and recording all subsequent row-level changes.
Before configuring the connector, see Oracle Database Prerequisites for Oracle database configuration information and post-configuration validation steps.
Prerequisites¶
- Authorized access to a Confluent Cloud cluster on Amazon Web Services (AWS), Microsoft Azure (Azure), or Google Cloud.
- The Confluent CLI installed and configured for the cluster. See Install the Confluent CLI.
- Schema Registry must be enabled to use a Schema Registry-based format (for example, Avro, JSON_SR (JSON Schema), or Protobuf). For more information, see Schema Registry Enabled Environments.
- For networking considerations, see Networking and DNS. To use a set of public egress IP addresses, see Public Egress IP Addresses for Confluent Cloud Connectors.
- Kafka cluster credentials. The following lists the different ways you can provide credentials.
- Enter an existing service account resource ID.
- Create a Confluent Cloud service account for the connector. Make sure to review the ACL entries required in the service account documentation. Some connectors have specific ACL requirements.
- Create a Confluent Cloud API key and secret. To create a key and secret, you can use confluent api-key create or you can autogenerate the API key and secret directly in the Cloud Console when setting up the connector.
Using the Confluent Cloud Console¶
Step 1: Launch your Confluent Cloud cluster¶
To create and launch a Kafka cluster in Confluent Cloud, see Create a kafka cluster in Confluent Cloud.
Step 2: Add a connector¶
In the left navigation menu, click Connectors. If you already have connectors in your cluster, click + Add connector.
Step 4: Enter the connector details¶
Note
- Make sure you have all your prerequisites completed.
- An asterisk ( * ) designates a required entry.
At the Add Oracle XStream CDC Source Connector screen, complete the following:
Select the way you want to provide Kafka Cluster credentials. You can choose one of the following options:
- My account: This setting allows your connector to globally access everything that you have access to. With a user account, the connector uses an API key and secret to access the Kafka cluster. This option is not recommended for production.
- Service account: This setting limits the access for your connector by using a service account. This option is recommended for production.
- Use an existing API key: This setting allows you to specify an API key and a secret pair. You can use an existing pair or create a new one. This method is not recommended for production environments.
Note
Freight clusters support only service accounts for Kafka authentication.
Click Continue.
- Add the following database connection details:
- Database hostname: The IP address or hostname of the Oracle database server.
- Database port: The port number used to connect to Oracle database server. Defaults
to
1521. - Database username: The name of the Oracle database user connecting to the Oracle database.
- Database password: The password for the Oracle database user connecting to the Oracle database.
- Database name: The name of the database to connect to. In a multitenant architecture, it refers to the container database (CDB) name.
- Database service name: Name of the database service to which to connect. In a multitenant container database, this is the service used to connect to the CDB. For Oracle Real Application Clusters (RACs), use the service created by Oracle XStream.
- Pluggable database (PDB) name: The name of the pluggable database (PDB) to connect to in a
multitenant architecture. By default, this is not set, indicating that the tables to
capture reside in the CDB root. If this is set, you must specify the CDB name in the
Database namefield. - XStream outbound server name: The name of the XStream outbound server to connect to.
- TLS mode: Specify whether to use Transport Layer Security (TLS) to connect to the
Oracle database. Defaults to
disable. If you set this property toone-way, the connector uses a TLS-encrypted connection and verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates. - Total number of Oracle processors to license: The number of Oracle processor licenses required for the source database server or cluster. To determine this out, multiply the total number of processor cores by a core processor licensing factor, as mentioned in the Oracle Processor Core Factor Table.
- Click Continue.
Add the following details:
Output messages
- Output Kafka record key format: Sets the output Kafka record key format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF. Defaults to AVRO.
- Output Kafka record value format: Sets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured if using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
How should we name your topic(s)?
Topic prefix: The topic prefix provides a namespace for the Oracle database server or cluster used by the connector to capture changes. It must be unique and can include only alphanumeric characters, hyphens, dots, and underscores. This prefix is added to all Kafka topic names receiving events from this connector.
Warning
Do not change the value of this property. If changed, upon restart, the connector will start emitting events to new topics based on the revised value and not to the original topics, and it won’t be able to recover its database schema history topic.
Connector configuration
Table include list: A comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. The connector will only capture changes from tables that match these expressions. Each identifier is of the form
schemaName.tableName. By default, the connector captures changes from all non-system tables in each captured database.To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
Note
If you use this property, do not use the
table.exclude.listproperty.Table exclude list: A comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. The connector will only capture changes from any table that is not specified in the exclude list. Each identifier is of the form
schemaName.tableName.To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name.
Note
If you use this property, do not use
table.include.listproperty.Snapshot mode: The criteria for running a snapshot upon startup of the connector. Select one of the following snapshot options:
- initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
- no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
- recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth.
Warning
Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.
Data encryption
- Enable Client-Side Field Level Encryption for data encryption. Specify a Service Account to access the Schema Registry and associated encryption rules or keys with that schema. For more information on CSFLE setup, see Manage CSFLE for connectors.
Show advanced configurations
Schema context: Select a schema context to use for this connector, if using a schema-based data format. This property defaults to the Default context, which configures the connector to use the default schema set up for Schema Registry in your Confluent Cloud environment. A schema context allows you to use separate schemas (like schema sub-registries) tied to topics in different Kafka clusters that share the same Schema Registry environment. For example, if you select a non-default context, a Source connector uses only that schema context to register a schema and a Sink connector uses only that schema context to read from. For more information about setting up a schema context, see What are schema contexts and when should you use them?.
Auto-restart policy
Enable Connector Auto-restart: Control the auto-restart behavior of the connector and its task in the event of user-actionable errors. Defaults to
true, enabling the connector to automatically restart in case of user-actionable errors. Set this property tofalseto disable auto-restart for failed connectors. In such cases, you would need to manually restart the connector.
Connector configuration
Skip unparseable DDL statement: Specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is
falsewhich causes the connector to fail when it encounters an unparseable DDL statement.Warning
Setting the value to
trueshould be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.Max retries on snapshot database errors: The number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to
ORA-01466error. By default, no retries are attempted.Emit tombstone on delete: Controls whether a delete event is followed by a tombstone event. The following values are possible:
true (default): For each delete operation, the connector emits a delete event and a subsequent tombstone event.
false: For each delete operation, the connector emits only a delete event.
After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.
Skipped operations: A comma-separated list of the operation types to skip during streaming. You can configure the connector to skip the following types of operations:
c(create/insert)u(update)d(delete)t(truncate)
You use
noneto indicate that no operations are skipped. By default, only truncate (t) operations are skipped.Schema name adjustment mode: Specifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (default): Does not apply any adjustment.
avro: Replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like
_uxxxx.Note
_is an escape sequence like backslash in Java.
Field name adjustment mode: Specifies how field names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (default): Does not apply any adjustment.
avro: Replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode: Replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like
_uxxxx.Note
_is an escape sequence like backslash in Java.
Heartbeat interval (ms): Controls the frequency of heartbeat messages sent by the connector to a heartbeat topic.
Useful in situations when no changes occur in the captured tables for an extended period.
In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged.
As a result, the connector is unable to update the processed low watermark on the outbound server, which could result in the database retaining archived redo log files longer than needed.
The default value is
0, which disables the heartbeat mechanism.
Database server’s operating system (OS) timezone: Specifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid
java.time.ZoneIdidentifier.Column include list: An optional, comma-separated list of regular expressions that match fully-qualified column identifiers to be included in change event values. Each identifier is of the form
schemaName.tableName.columnName.To match the name of a column, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the column; it does not match substrings that might be present in a column name. If you use this property, do not set the
column.exclude.listproperty.Note
Primary key columns are always included in an event’s key, even if you do not use this property to explicitly include its value.
Column exclude list: An optional, comma-separated list of regular expressions that match fully-qualified column identifiers to be excluded from change event values. Each identifier is of the form
schemaName.tableName.columnName.To match the name of a column, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the column; it does not match substrings that might be present in a column name. If you use this property, do not set the
column.include.listproperty.Note
Primary key columns are always included in an event’s key, even if you use this property to explicitly exclude its value.
Unavailable value placeholder: Specifies the constant that the connector uses to indicate that the original value is unavailable and not provided by the database. Defaults to
__cflt_unavailable_value.Oversized large object (LOB) threshold: Specifies the maximum size threshold (in bytes) for large object (LOB) column values, including CLOB, NCLOB, and BLOB. For CLOB and NCLOB values, the connector calculates the size as the UTF-8 encoded byte length of the string. If a LOB value exceeds this threshold, the connector handles it according to the strategy specified using the
lob.oversize.handling.modeconfiguration. The default value is -1, which disables oversize handling.Oversized large object (LOB) handling mode: Defines how the connector handles LOB column values that exceed the size threshold specified using the
lob.oversize.thresholdconfiguration. Select one of the following options:fail (default): The connector stops processing and reports an error.
skip: The connector replaces the LOB value with a placeholder specified using the
skip.value.placeholderconfiguration.
Skip value placeholder: Specifies the constant provided by the connector to indicate that the original value was skipped by the connector due to exceeding the configured size threshold. Defaults to
__cflt_skipped_value.
How should we handle data types?
Decimal handling mode: Specifies how the connector should handle NUMBER, DECIMAL and NUMERIC columns. You can set one of the following options:
precise (default): Uses
java.math.BigDecimalto represent values, which are encoded in the change events using a binary representation and Kafka Connect’sorg.apache.kafka.connect.data.Decimaltype. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.string: Encodes values as formatted strings. Using the
stringoption is easier to consume, but results in a loss of semantic information about the real type.double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.
Binary handling mode: Specifies how the connector should handle binary (BLOB) columns. You can set one of the following options:
bytes (default): Represents binary data as byte array.
base64: Represents binary data as base64-encoded string.
base64-url-safe: Represents binary data as base64-url-safe-encoded string.
hex: Represents binary data as hex-encoded (base16) string.
Time precision mode: Specifies how the connector should handle time, date, and timestamps columns. You can set one of the following options:
adaptive (default): Bases the precision of time, date, and timestamp values on the database column’s precision.
connect: Always represents time, date, and timestamp values using Kafka Connect’s built-in representations for
Time,Date, andTimestamp, which uses millisecond precision regardless of the database columns’ precision.
Transforms
Single Message Transforms: To add a new SMT using the UI, see Add transforms. For more information about the Debezium SMT ExtractNewRecordState, see Debezium transformations.
For all property values and definitions, see Configuration Properties.
Click Continue.
Based on the number of topic partitions you select, you will be provided with a recommended number of tasks.
To change the number of tasks, enter the desired number of tasks for the connector to use in the Maximum number of tasks field.
Note
The connector always operates with a single task.
Click Continue.
Verify the connection details by previewing the running configuration.
Tip
For information about previewing your connector output, see Data Previews for Confluent Cloud Connectors.
After you’ve validated that the properties are configured to your satisfaction, click Launch.
The status for the connector should go from Provisioning to Running.
If the connector is not running, see Oracle Database Prerequisites and review the Oracle database configuration information and post-configuration validation steps.
Step 5: Check the Kafka topic¶
After the connector is running, verify that records are populating your Kafka topic.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Using the Confluent CLI¶
Complete the following steps to set up and run the connector using the Confluent CLI.
Note
- Make sure you have all your prerequisites completed.
- The example commands use Confluent CLI version 2. For more information see, Confluent CLI v2.
Step 1: List the available connectors¶
Enter the following command to list available connectors:
confluent connect plugin list
Step 2: List the connector configuration properties¶
Enter the following command to show the connector configuration properties:
confluent connect plugin describe <connector-plugin-name>
The command output shows the required and optional configuration properties.
For example:
confluent connect plugin describe OracleXStreamSource
Example output:
The following are required configs:
connector.class : OracleXStreamSource
database.dbname
database.hostname
database.service.name
database.out.server.name
database.user
database.password
name
output.data.value.format
tasks.max
topic.prefix
kafka.api.key : ["kafka.api.key" is required when "kafka.auth.mode==KAFKA_API_KEY"]
kafka.api.secret : ["kafka.api.secret" is required when "kafka.auth.mode==KAFKA_API_KEY"]
Step 3: Create the connector configuration file¶
Create a JSON file that contains the connector configuration properties to deploy an instance of the connector.
The connector setup below performs the following:
- Connects to the
ORCLPDB1pluggable database located atdb.example.comon port1521. - Initiates a snapshot of the
employeestable in thesampleschema within theORCLPDB1pluggable database. - After snapshot completion, the connector listens for changes made to the
employeestable through theXOUToutbound server. - Streams the changes to the Kafka topic,
cflt.SAMPLE.EMPLOYEES.
{
"name": "oracle-connector",
"config": {
"connector.class": "io.confluent.connect.oracle.xstream.cdc.OracleXStreamSourceConnector",
"tasks.max" : "1",
"database.hostname": "db.example.com",
"database.port": "1521",
"database.user": "C##CFLTUSER",
"database.password": "secret",
"database.dbname": "ORCLCDB",
"database.service.name": "ORCLCDB",
"database.pdb.name": "ORCLPDB1",
"database.out.server.name": "XOUT",
"table.include.list": "SAMPLE.EMPLOYEES",
"topic.prefix": "cflt",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "****************",
"kafka.api.secret": "**************************************************"
}
}
Note
(Optional) To enable CSFLE for data encryption, specify the following properties:
csfle.enabled: Flag to indicate whether the connector honors CSFLE rules.sr.service.account.id: A Service Account to access the Schema Registry and associated encryption rules or keys with that schema.
For more information on CSFLE setup, see Manage CSFLE for connectors.
Single Message Transforms: To add a new SMT using the CLI, see Add transforms. For more information about the Debezium SMT ExtractNewRecordState, see Debezium transformations.
See Configuration Properties for all properties and definitions.
Step 4: Load the properties file and create the connector¶
Enter the following command to load the configuration and start the connector:
confluent connect cluster create --config-file <file-name>.json
For example:
confluent connect cluster create --config-file oracle-xstream-cdc-source.json
Example output:
Created connector OracleXStreamSource_0 lcc-ix4dl
Step 5: Check the connector status¶
Enter the following command to check the connector status:
confluent connect cluster list
Example output:
ID | Name | Status | Type
+----------+-----------------------+---------+--------+
lcc-ix4dl | OracleXStreamSource_0 | RUNNING | source
Step 6: Check the Kafka topic.¶
After the connector is running, verify that messages are populating your Kafka topic.
If the connector is not running, see Oracle Database Prerequisites for Oracle XStream CDC Source Connector for Confluent Cloud and review the Oracle database configuration information and post-configuration validation steps.
For more information and examples to use with the Confluent Cloud API for Connect, see the Confluent Cloud API for Connect Usage Examples section.
Configuration Properties¶
Use the following configuration properties with the fully-managed Oracle XStream CDC Source connector.
How should we connect to your data?¶
nameSets a name for your connector.
- Type: string
- Valid Values: A string at most 64 characters long
- Importance: high
Kafka Cluster credentials¶
kafka.auth.modeKafka Authentication mode. It can be one of KAFKA_API_KEY or SERVICE_ACCOUNT. It defaults to KAFKA_API_KEY mode.
- Type: string
- Default: KAFKA_API_KEY
- Valid Values: KAFKA_API_KEY, SERVICE_ACCOUNT
- Importance: high
kafka.api.keyKafka API Key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
kafka.service.account.idThe Service Account that will be used to generate the API keys to communicate with Kafka Cluster.
- Type: string
- Importance: high
kafka.api.secretSecret associated with Kafka API key. Required when kafka.auth.mode==KAFKA_API_KEY.
- Type: password
- Importance: high
Schema Config¶
schema.context.nameAdd a schema context name. A schema context represents an independent scope in Schema Registry. It is a separate sub-schema tied to topics in different Kafka clusters that share the same Schema Registry instance. If not used, the connector uses the default schema configured for Schema Registry in your Confluent Cloud environment.
- Type: string
- Default: default
- Importance: medium
How should we connect to your database?¶
database.hostnameIP address or hostname of the Oracle database server.
- Type: string
- Importance: high
database.portPort number of the Oracle database server.
- Type: int
- Default: 1521
- Valid Values: [1,…,65535]
- Importance: high
database.userName of the Oracle database user to use when connecting to the database.
- Type: string
- Valid Values: Must match the regex
^[^\?=%&\(\)]*$ - Importance: high
database.passwordPassword of the Oracle database user to use when connecting to the database.
- Type: password
- Importance: high
database.dbnameName of the database to connect to. In a multitenant container database, this is the name of the container database (CDB).
- Type: string
- Valid Values: Must match the regex
^[a-zA-Z][a-zA-Z0-9$#_]*$ - Importance: high
database.service.nameName of the database service to which to connect. In a multitenant container database, this is the service used to connect to the container database (CDB). For Oracle Real Application Clusters (RAC), use the service created by Oracle XStream.
- Type: string
- Valid Values: Must match the regex
^[a-zA-Z][a-zA-Z0-9$#._\\]*$ - Importance: high
database.pdb.nameName of the pluggable database to connect to in a multitenant architecture. The container database (CDB) name must be given via
database.dbnamein this case. This configuration should not be specified when connecting to a non-container database.- Type: string
- Valid Values: Must match the regex
^([a-zA-Z][a-zA-Z0-9$#_]*)*$ - Importance: high
database.out.server.nameName of the XStream outbound server to connect to.
- Type: string
- Importance: high
database.tls.modeSpecifies whether to use Transport Layer Security (TLS) to connect to the Oracle database. Select one of the following options:
disable (default): Does not use a TLS connection.
one-way: Uses a TLS encrypted connection and also verifies the server’s TLS certificate against the configured Certificate Authority (CA) certificates.
- Type: string
- Default: disable
- Valid Values: disable, one-way
- Importance: medium
database.processor.licensesSpecifies the number of Oracle processor licenses required for the source database server or cluster. The is determined by multiplying the total number of processor cores by a core processor licensing factor, as specified in the Oracle Processor Core Factor Table.
- Type: int
- Valid Values: [1,…,4096]
- Importance: medium
Output messages¶
output.key.formatSets the output Kafka record key format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured when using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: AVRO
- Valid Values: AVRO, JSON_SR, PROTOBUF
- Importance: high
output.data.formatSets the output Kafka record value format. Valid entries are AVRO, JSON_SR, or PROTOBUF. Note that you need to have Confluent Cloud Schema Registry configured when using a schema-based message format like AVRO, JSON_SR, and PROTOBUF.
- Type: string
- Default: AVRO
- Valid Values: AVRO, JSON_SR, PROTOBUF
- Importance: high
How should we name your topic(s)?¶
topic.prefixTopic prefix that provides a namespace for the Oracle database server or cluster from which the connector captures changes. The topic prefix should be unique across all other connectors, since it is used as a prefix for all Kafka topic names that receive events from this connector. Only alphanumeric characters, hyphens, dots and underscores are accepted.
Warning: Do not change the value of this property. If you change the value, after a restart, instead of continuing to emit events to the original topics, the connector emits subsequent events to topics whose names are based on the new value. The connector is also unable to recover its database schema history topic.
- Type: string
- Importance: high
Connector configuration¶
table.include.listAn optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you want to capture. When this property is set, the connector will only capture changes from the specified tables. Each identifier is of the form schemaName.tableName. By default, the connector captures changes from all non-system tables in each captured database. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the
table.exclude.listproperty.- Type: string
- Importance: high
table.exclude.listAn optional, comma-separated list of regular expressions that match fully-qualified table identifiers for the tables whose changes you do not want to capture. When this property is set, the connector captures changes from any table that is not specified in the exclude list. Each identifier is of the form schemaName.tableName. To match the name of a table, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the table; it does not match substrings that might be present in a table name. If you include this property in the configuration, do not set the
table.include.listproperty.- Type: string
- Importance: high
snapshot.modeThe criteria for running a snapshot upon startup of the connector. Select one of the following snapshot options:
initial (default): The snapshot includes both the structure (schema) and data of the captured tables. Specify this value to populate topics with a complete representation of the data from the captured tables. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
no_data: The snapshot includes only the structure (schema) of captured tables. Specify this value if you want the connector to capture data only for changes that occur after the snapshot. After the snapshot completes, the connector begins to stream event records for subsequent database changes.
recovery: Set this option to restore a database schema history topic that is lost or corrupted. After a restart, the connector runs a snapshot that rebuilds the topic from the source tables. You can also set the property to periodically prune a database schema history topic that experiences unexpected growth. WARNING: Do not use this mode to perform a snapshot if schema changes were committed to the database after the last connector shutdown.
- Type: string
- Default: initial
- Valid Values: initial, no_data, recovery
- Importance: medium
schema.history.internal.skip.unparseable.ddlA boolean value that specifies whether the connector should ignore a DDL statement that cannot be parsed or stop processing for a human to address the issue. The safe default is false which causes the connector to fail when it encounters an unparseable DDL statement. Setting the value to true should be done with care as it will cause the connector to skip processing any DDL statement it cannot parse, and this could potentially lead to schema mismatches and data loss.
- Type: boolean
- Default: false
- Importance: low
snapshot.database.errors.max.retriesSpecifies the number of retry attempts the connector will make to snapshot a table if a database error occurs. This configuration property currently only retries failures related to ORA-01466 error. By default, no retries are attempted.
- Type: int
- Default: 0
- Valid Values: [0,…,3]
- Importance: low
tombstones.on.deleteControls whether a delete event is followed by a tombstone event. The following values are possible:
true: For each delete operation, the connector emits a delete event and a subsequent tombstone event.
false: For each delete operation, the connector emits only a delete event.
After a source record is deleted, a tombstone event (the default behavior) enables Kafka to completely delete all events that share the key of the deleted row in topics that have log compaction enabled.
- Type: boolean
- Default: true
- Importance: medium
skipped.operationsA comma-separated list of operations to skip during streaming. You can configure the connector to skip the following types of operations: c (inserts/create), u (updates), d (deletes), t (truncates), and none to indicate nothing is skipped. The default value is t, ensuring that only truncate operations are skipped.
- Type: string
- Default: t
- Importance: low
schema.name.adjustment.modeSpecifies how schema names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (the default) does not apply any adjustment.
avro replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: low
field.name.adjustment.modeSpecifies how field names should be adjusted for compatibility with the message converter used by the connector. The following values are possible:
none (the default) does not apply any adjustment.
avro replaces the characters that cannot be used in the Avro type name with underscore.
avro_unicode replaces the underscore or characters that cannot be used in the Avro type name with corresponding unicode like _uxxxx. Note: _ is an escape sequence like backslash in Java.
- Type: string
- Default: none
- Valid Values: avro, avro_unicode, none
- Importance: low
heartbeat.interval.msControls how often the connector sends heartbeat messages to a heartbeat topic. It is useful in situations when no changes occur in the captured tables for an extended period. In such cases, there are no change event messages generated, causing the committed source offset to remain unchanged. As a result, the connector is unable to update the processed low watermark on the outbound server which could result in the database retaining archived redo log files longer than needed. The default value is 0 which disables the heartbeat mechanism.
- Type: int
- Default: 0
- Valid Values: [0,…]
- Importance: medium
database.os.timezoneSpecifies the database server’s operating system timezone. This is used to read the time when the LCR was generated at the source database. The default timezone is UTC. The value has to be a valid java.time.ZoneId identifier.
- Type: string
- Default: UTC
- Importance: low
column.include.listAn optional, comma-separated list of regular expressions that match fully-qualified column identifiers to be included in change event values. Each identifier is of the form schemaName.tableName.columnName. To match the name of a column, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the column; it does not match substrings that might be present in a column name. If you include this property in the configuration, do not set the column.exclude.list property. Note: Primary key columns are always included in an event’s key, even if you do not use this property to explicitly include its value.
- Type: string
- Importance: medium
column.exclude.listAn optional, comma-separated list of regular expressions that match fully-qualified column identifiers to be excluded from change event values. Each identifier is of the form schemaName.tableName.columnName. To match the name of a column, the connector applies the regular expression that you specify as an anchored regular expression. That is, the specified expression is matched against the entire identifier for the column; it does not match substrings that might be present in a column name. If you include this property in the configuration, do not set the column.include.list property. Note: Primary key columns are always included in an event’s key, even if you use this property to explicitly exclude its value.
- Type: string
- Importance: medium
unavailable.value.placeholderSpecifies the constant provided by the connector to indicate that the original value was unavailable and not provided by the database.
- Type: string
- Default: __cflt_unavailable_value
- Importance: low
lob.oversize.thresholdSpecifies the maximum size threshold (in bytes) for large object (LOB) column values, including CLOB, NCLOB, and BLOB. For CLOB and NCLOB values, the connector calculates the size as the UTF-8 encoded byte length of the string. If a LOB value exceeds this threshold, the connector handles it according to the strategy specified using the
lob.oversize.handling.modeconfiguration. The default value is -1, which disables oversize handling.- Type: int
- Default: -1
- Valid Values: [-1,…]
- Importance: low
lob.oversize.handling.modeDefines how the connector handles large object (LOB) column values that exceed the size threshold specified using the
lob.oversize.thresholdconfiguration. Select one of the following options:fail (default): The connector stop processing and reports an error.
skip: The connector replaces the LOB value with a placeholder specified using the
skip.value.placeholderconfiguration.- Type: string
- Default: fail
- Valid Values: fail, skip
- Importance: low
skip.value.placeholderSpecifies the constant provided by the connector to indicate that the original value was skipped by the connector due to exceeding the configured size threshold.
- Type: string
- Default: __cflt_skipped_value
- Importance: low
How should we handle data types?¶
decimal.handling.modeSpecifies how the connector should handle NUMBER, DECIMAL and NUMERIC columns. You can set one of the following options:
precise (the default): Uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect’s org.apache.kafka.connect.data.Decimal type. Depending on the precision and scale, the most appropriate Kafka Connect integer type is used for integral values, ensuring that the value is represented without any loss of precision.
string: Encodes values as formatted strings. Using the string option is easier to consume, but results in a loss of semantic information about the real type.
double: Represents values using Java’s double. Using double values is easier, but can result in a loss of precision.
- Type: string
- Default: precise
- Valid Values: double, precise, string
- Importance: medium
binary.handling.modeSpecifies how the connector should handle binary (BLOB) columns. You can set one of the following options:
bytes (the default): Represents binary data as byte array.
base64: Represents binary data as base64-encoded string.
base64-url-safe: Represents binary data as base64-url-safe-encoded string.
hex: Represents binary data as hex-encoded (base16) string.
- Type: string
- Default: bytes
- Valid Values: base64, base64-url-safe, bytes, hex
- Importance: medium
time.precision.modeSpecifies how the connector should handle time, date, and timestamp columns. You can set one of the following options:
adaptive (the default): Bases the precision of time, date, and timestamp values on the database column’s precision.
connect: always represents time, date, and timestamp values using Kafka Connect’s built-in representations for Time, Date, and Timestamp, which uses millisecond precision regardless of the database columns’ precision.
- Type: string
- Default: adaptive
- Valid Values: adaptive, connect
- Importance: medium
Number of tasks for this connector¶
tasks.maxSpecifies the maximum number of tasks for the connector. Since this connector supports only a single task, the maximum is capped at 1.
- Type: int
- Default: 1
- Valid Values: [1,…,1]
- Importance: high
Additional Configs¶
header.converterThe converter class for the headers. This is used to serialize and deserialize the headers of the messages.
- Type: string
- Importance: low
producer.override.compression.typeThe compression type for all data generated by the producer. Valid values are none, gzip, snappy, lz4, and zstd.
- Type: string
- Importance: low
producer.override.linger.msThe producer groups together any records that arrive in between request transmissions into a single batched request. More details can be found in the documentation: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#linger-ms.
- Type: long
- Valid Values: [100,…,1000]
- Importance: low
value.converter.allow.optional.map.keysAllow optional string map key when converting from Connect Schema to Avro Schema. Applicable for Avro Converters.
- Type: boolean
- Importance: low
value.converter.auto.register.schemasSpecify if the Serializer should attempt to register the Schema.
- Type: boolean
- Importance: low
value.converter.connect.meta.dataAllow the Connect converter to add its metadata to the output schema. Applicable for Avro Converters.
- Type: boolean
- Importance: low
value.converter.enhanced.avro.schema.supportEnable enhanced schema support to preserve package information and Enums. Applicable for Avro Converters.
- Type: boolean
- Importance: low
value.converter.enhanced.protobuf.schema.supportEnable enhanced schema support to preserve package information. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.flatten.unionsWhether to flatten unions (oneofs). Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.generate.index.for.unionsWhether to generate an index suffix for unions. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.generate.struct.for.nullsWhether to generate a struct variable for null values. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.int.for.enumsWhether to represent enums as integers. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.latest.compatibility.strictVerify latest subject version is backward compatible when use.latest.version is true.
- Type: boolean
- Importance: low
value.converter.object.additional.propertiesWhether to allow additional properties for object schemas. Applicable for JSON_SR Converters.
- Type: boolean
- Importance: low
value.converter.optional.for.nullablesWhether nullable fields should be specified with an optional label. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.optional.for.proto2Whether proto2 optionals are supported. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.use.latest.versionUse latest version of schema in subject for serialization when auto.register.schemas is false.
- Type: boolean
- Importance: low
value.converter.use.optional.for.nonrequiredWhether to set non-required properties to be optional. Applicable for JSON_SR Converters.
- Type: boolean
- Importance: low
value.converter.wrapper.for.nullablesWhether nullable fields should use primitive wrapper messages. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
value.converter.wrapper.for.raw.primitivesWhether a wrapper message should be interpreted as a raw primitive at root level. Applicable for Protobuf Converters.
- Type: boolean
- Importance: low
errors.toleranceUse this property if you would like to configure the connector’s error handling behavior. WARNING: This property should be used with CAUTION for SOURCE CONNECTORS as it may lead to dataloss. If you set this property to ‘all’, the connector will not fail on errant records, but will instead log them (and send to DLQ for Sink Connectors) and continue processing. If you set this property to ‘none’, the connector task will fail on errant records.
- Type: string
- Default: none
- Importance: low
key.converter.key.subject.name.strategyHow to construct the subject name for key schema registration.
- Type: string
- Default: TopicNameStrategy
- Importance: low
value.converter.decimal.formatSpecify the JSON/JSON_SR serialization format for Connect DECIMAL logical type values with two allowed literals:
BASE64 to serialize DECIMAL logical types as base64 encoded binary data and
NUMERIC to serialize Connect DECIMAL logical type values in JSON/JSON_SR as a number representing the decimal value.
- Type: string
- Default: BASE64
- Importance: low
value.converter.flatten.singleton.unionsWhether to flatten singleton unions. Applicable for Avro and JSON_SR Converters.
- Type: boolean
- Default: false
- Importance: low
value.converter.reference.subject.name.strategySet the subject reference name strategy for value. Valid entries are DefaultReferenceSubjectNameStrategy or QualifiedReferenceSubjectNameStrategy. Note that the subject reference name strategy can be selected only for PROTOBUF format with the default strategy being DefaultReferenceSubjectNameStrategy.
- Type: string
- Default: DefaultReferenceSubjectNameStrategy
- Importance: low
value.converter.value.subject.name.strategyDetermines how to construct the subject name under which the value schema is registered with Schema Registry.
- Type: string
- Default: TopicNameStrategy
- Importance: low
Auto-restart policy¶
auto.restart.on.user.errorEnable connector to automatically restart on user-actionable errors.
- Type: boolean
- Default: true
- Importance: medium
Connect to an Oracle Real Application Cluster (RAC) Database¶
Confluent recommends configuring the following properties to ensure that the connector will be able to connect and attach to the specific RAC instance running the XStream components:
Configure the
database.hostnameproperty to the Oracle RAC database SCAN address.Note
If a SCAN address is unavailable, configure the
database.hostnameproperty to the hostname of the instance where the XStream components are running. You will need to manually reconfigure the connector whenever the instance running the XStream components changes.Configure the
database.service.nameproperty to the auto-created Oracle XStream service.
Supported Data Types¶
The connector creates change events for database changes. Each change event mirrors the table’s schema, with a field for every column value. The data type of each table column determines how the connector represents the column values in the corresponding change event fields.
For certain data types, such as numeric data types, you can customize how the connector maps them by modifying the default configuration settings. This allows more control over handling various data types, ensuring that the change events reflect the desired format and meet specific requirements.
Character data types¶
The following table describes how the connector maps character types.
| Oracle data type | Connect type |
|---|---|
| CHAR | STRING |
| VARCHAR / VARCHAR2 | STRING |
| NCHAR | STRING |
| NVARCHAR | STRING |
In all cases, the connector ensures that character data is converted to a string type in Kafka Connect when creating change events.
Large object (LOB) types¶
You can adjust how the connector maps binary LOB data types by changing the binary.handling.mode configuration property.
The following table describes how the connector maps binary and character LOB types.
| Oracle data type | Connect type | Notes |
|---|---|---|
| BLOB | BYTES | Based on
|
| CLOB | STRING | |
| NCLOB | STRING |
Numeric data types¶
You can adjust how the connector maps numeric data types by changing the decimal.handling.mode
configuration property.
The table below shows the mapping of numeric types when decimal.handling.mode is set to precise.
| Oracle data type | Connect type | Notes |
|---|---|---|
| NUMBER(P, S <= 0) | INT8 / INT16 / INT32 / INT64 / BYTES | Based on the precision and scale, the connector selects a matching Kafka Connect integer type:
NUMBER columns with a scale of |
| NUMBER(P, S > 0) | BYTES | org.apache.kafka.connect.data.Decimal |
| NUMBER(P, [, * ]) | STRUCT | io.debezium.data.VariableScaleDecimal Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form. |
| SMALLINT, INT, INTEGER | BYTES | org.apache.kafka.connect.data.Decimal Oracle maps SMALLINT, INT and INTEGER to NUMBER(38,0). As a result, these types can hold values that exceed the maximum range of any of the INT types. |
| NUMERIC, DECIMAL | INT8 / INT16 / INT32 / INT64 / BYTES | Handles in the same way as the NUMBER data type (note that scale defaults to 0 for NUMERIC). |
FLOAT[(P)] Maps to FLOAT(126) when P not mentioned |
STRUCT | io.debezium.data.VariableScaleDecimal Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form. |
REAL - Maps to FLOAT(63) DOUBLE PRECISION - Maps to FLOAT(126) |
STRUCT | io.debezium.data.VariableScaleDecimal Contains a structure with two fields: scale (of type INT32) that contains the scale of the transferred value, and value (of type BYTES) containing the original value in an unscaled form. |
| BINARY_FLOAT | FLOAT32 | |
| BINARY_DOUBLE | FLOAT64 |
Note
When decimal.handling.mode is set to:
- string: The Oracle numeric data types are mapped to the Kafka Connect
STRINGtype. - double: The Oracle numeric data types are mapped to the Kafka Connect
FLOAT64type.
Temporal data types¶
You can adjust how the connector maps some of the temporal data types by changing the time.precision.mode
configuration property.
The table below shows the mapping of temporal types:
| Oracle data type | Connect type | Notes |
|---|---|---|
| DATE | INT64 | Based on
|
| TIMESTAMP[(P)] | INT64 | Based on adaptive: If precision <= 3: io.debezium.time.Timestamp
Else if precision <= 6: io.debezium.time.MicroTimestamp
Else: io.debezium.time.NanoTimestamp
connect: org.apache.kafka.connect.data.Timestamp Represents the number of milliseconds since the UNIX epoch, without timezone information. |
| TIMESTAMP WITH TIMEZONE | STRING | io.debezium.time.ZonedTimestamp A string representation of a timestamp with timezone information. |
| TIMESTAMP WITH LOCAL TIMEZONE | STRING | io.debezium.time.ZonedTimestamp A string representation of a timestamp in UTC. |
| INTERVAL YEAR[(P)] TO MONTH | STRING | io.debezium.time.Interval A string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S. |
| INTERVAL DAY[(P)] TO SECOND[(FP)] | STRING | io.debezium.time.Interval A string representation of the interval value in the ISO 8601 duration format: P<years>Y<months>M<days>DT<hours>H<minutes>M<seconds>S. |
Note
When time.precision.mode is set to connect, there could be a loss of precision if the
fractional second precision of a column exceeds 3, because Oracle supports a higher level
of precision than the logical types in Kafka Connect.
Security¶
Native Network Encryption¶
Oracle database provides native network encryption and integrity to ensure data is secure during transit, without the need for setting up Transport Layer Security (TLS).
Encryption and integrity are managed based on a combination of client-side and server-side
encryption settings, which can be configured using parameters in the sqlnet.ora configuration
file. For more information on configuring Oracle Advanced Security for network encryption and
integrity, see Support for Network Encryption and Integrity section in the Oracle
Database JDBC Developer’s Guide.
The connector uses the Oracle JDBC OCI driver to communicate with the Oracle database. You can use the database’s native network encryption and data integrity to securely transmit data between the connector and the Oracle database. For more information on relevant configuration settings, see Table 9-2 OCI Driver Client Parameters for Encryption and Integrity in the Oracle Database JDBC Developer’s Guide.
The following configurations are set on the connector:
SQLNET.ALLOW_WEAK_CRYPTOparameter is set to FALSE to ensure that the connector uses strong algorithms when communicating with the Oracle database.SQLNET.ENCRYPTION_CLIENTandSQLNET.CRYPTO_CHECKSUM_CLIENTparameters are set to ACCEPTED (the default value).
To enable network encryption and integrity, configure the SQLNET.ENCRYPTION_SERVER and
SQLNET.CRYPTO_CHECKSUM_SERVER parameters on the server to either REQUESTED or REQUIRED.
Additionally, specify strong encryption and crypto-checksum algorithms by setting the
SQLNET.ENCRYPTION_TYPES_SERVER and SQLNET.CRYPTO_CHECKSUM_TYPES_SERVER parameters.
For more information, see Improving Native Network Encryption Security
section in the Oracle Database Security Guide.
Transport Layer Security (TLS)¶
You can configure Transport Layer Security (TLS) to secure connections between the client (connector) and the Oracle database. Currently, only one-way TLS, without client wallets, is supported.
In one-way TLS, the database server presents a certificate to authenticate itself to the client (connector). The client needs access to the trusted Certificate Authority (CA) root certificate that signed the server’s certificate to verify it. Currently, the connector only supports certificates signed by well-known CAs, where the corresponding CA certificate is present in the default certificate store of the system running the connector.
For more information on configuring TLS, see Configuring Transport Layer Security Authentication chapter of the Oracle Database Security Guide.
You can enable TLS connections between the connector and the Oracle database server by
using the database.tls.mode configuration property. Set database.tls.mode to one-way
to enable TLS encryption and server authentication.
Note
When database.tls.mode is set to one-way, ensure that the port specified in
database.port corresponds to the listener on the server that supports TLS connections.
Transparent Data Encryption (TDE)¶
Transparent Data Encryption (TDE) enables you to encrypt sensitive data stored in tables and tablespaces. The data is transparently decrypted for authorized users or applications during access.
The connector supports capturing changes from databases encrypted with Oracle TDE. Oracle XStream performs decryption, and the connector receives decrypted data from the XStream outbound sever.
In multitenant environments, only united mode is supported. In this mode, a single shared keystore is configured for the CDB root and any associated united mode PDBs.
Note
For Oracle XStream to encrypt columns transparently, the encryption master key must be stored in the keystore on the local database, and the keystore must be open.
Next Steps¶
For an example that shows fully-managed Confluent Cloud connectors in action with Confluent Cloud for Apache Flink, see the Cloud ETL Demo. This example also shows how to use Confluent CLI to manage your resources in Confluent Cloud.
