Oracle 同步数据至 Doris
前言
在上篇博客中,我们掌握了如何使用 Flink-CDC 和 Doris-Flink-Connector 将 MySQL 的数据同步至 Doris,本篇博客一起来探索如何将 Oracle 的数据同步至 Doris。本次使用的主要技术栈如下:
- Doris 2.1.6
- Oracle_11g
- Flink 1.18.0
- FlinkCDC 3.1.0
- Doris-Flink-Connector 24.0.0
准备工作
Oracle 安装并配置
Oracle 的配置相较于 MySQL 来说会复杂一些,这里逐步来演示一下。采用的方式是使用 Docker 部署 Oracle,然后开启归档模式。
安装 Oracle 11g
拉取 Oracle 11g 的镜像:
|
|
执行以下命令以创建并运行 Oracle 11g 容器(其中:1521 为映射主机端口,8071 为管理界面端口,helowin 为 Oracle 数据库唯一实例 ID,端口号可以根据服务器实际情况进行映射,防止冲突即可):
|
|
这样我们就有了一个 Oracle 11g 的容器,可以使用 docker exec -it oracle_cdc bash
进入容器内部。
配置 Oracle 环境
- 切换至 root 用户(默认密码为 helowin):
|
|
- 修改环境变量:
|
|
- 创建 sqlplus 软链接:
|
|
通过在 /usr/bin
目录下创建软链接,使得 sqlplus
命令可以在系统的任何位置被直接调用,而不需要指定完整路径,方便后续操作。
配置数据库恢复和归档日志
登录 SQL*Plus 并执行以下命令:
|
|
创建表空间和用户
- 创建表空间:
|
|
- 创建用户并授权:
|
|
创建示例表
切换到 flinkuser 并创建表:
|
|
Flink 配置
FLink 仍旧采用的是单机部署,直接解压即可,可以根据自己的服务器情况进行调整 flink-conf.yaml
配置文件。然后最关键的是依赖的配置,由于 License 的不同,这些需要用户自己去手动配置。主要需要的有:
doris-flink-connector-1.18-24.0.0.jar
:Doris 的 Flink 连接器;flink-sql-connector-oracle-cdc-3.1.0.jar
:Flink 的 Oracle CDC 连接器;ojdbc8-19.3.0.0.jar
:Oracle 的 JDBC 驱动。
上述配置文件可以下载放在 flink/lib
目录下,可以参考下图(还包含了上一篇 MySQL 同步数据的配置文件):
完成上述配置之后,使用 start-cluster.sh
启动 Flink 集群。
代码同步
可以创建 Maven 项目,并添加所需依赖如下所示:
|
|
创建一个 Java 类命名为 OracleToDoris
,并编写代码如下:
|
|
这段代码展示了如何使用 Java 来配置和执行 Oracle 到 Doris 的数据同步任务。它首先设置了 Flink 的执行环境,包括并行度、算子链和检查点等基本参数。然后,代码分别配置了 Oracle 源和 Doris 接收端的详细信息,如数据库名称、主机地址、端口号、用户名和密码等。接着,它创建了一个 OracleDatabaseSync 实例,并通过一系列方法调用设置了同步任务的各种参数,包括数据库、表前缀后缀、是否忽略默认值等。最后,代码调用 create() 和 build() 方法来创建和构建同步任务,并执行这个任务。这种通过 Java 代码配置数据同步的方法比使用命令行更加灵活。它允许开发者根据需求动态调整参数,更容易集成到现有的 Java 应用中。
编写完成代码之后,可以使用 mvn clean package
命令来打包,然后在可以在 Flink Web UI 中提交任务。如下图所示:
当提交完成之后,可以在 Web UI 的 Jobs Running Jobs 看见提交的任务,当任务状态变为 RUNNING 之后,说明数据同步任务正在运行。
这个时候可以在 Doris 中查看数据同步情况:
|
|
这个时候如果插入新的数据,Doris 中也会同步更新数据,则说明数据同步成功。
Doris-Flink-Connector 同步
如果感觉使用代码同步数据比较复杂,则可以使用 Doris-Flink-Connector 同步数据,在 MySQL 数据同步已经用过了,总体来说就是添加依赖至 $FLINK_HOME/lib
下,然后启动命令即可。
依赖在上面已经添加了,这里就不再赘述了,直接启动命令如下:
|
|
这个命令使用 Flink 运行 Doris-Flink-Connector 来同步 Oracle 数据库到 Doris。解释一下主要参数:
Flink 执行参数:
- 设置检查点间隔为 10 秒
- 默认并行度为 1
主类和 JAR 文件:
- 使用
org.apache.doris.flink.tools.cdc.CdcTools
类 - 从
lib/flink-doris-connector-1.18-24.0.0.jar
运行
- 使用
同步模式:
oracle-sync-database
目标数据库:
test_db
Oracle 配置:
- 主机名、端口、用户名、密码
- 数据库名和 schema 名
- 日志挖掘策略和连续挖掘设置
- 仅捕获指定表的 DDL 历史
Doris 接收端配置:
- FE 节点地址
- 用户名和密码
- JDBC URL
- 导入标签前缀
表配置:
- 复制数量设为 1
补充:debezium 相关的三行参数的作用。
这三行参数是针对 Oracle CDC (Change Data Capture) 的特定配置,它们对于 Oracle 数据同步有重要作用:
--oracle-conf debezium.log.mining.strategy=online_catalog
- 这个参数设置了日志挖掘策略为 "online_catalog"。
- 在这种模式下,Debezium(用于 CDC 的开源框架)使用 Oracle 的在线目录视图来获取必要的元数据信息。
- 这种方法通常比其他策略更快,因为它不需要解析重做日志来获取元数据。
--oracle-conf debezium.log.mining.continuous.mine=true
- 启用连续挖掘模式。
- 当设置为 true 时,Oracle LogMiner 会持续不断地处理重做日志,而不是在每次查询时重新启动。
- 这可以显著提高性能,特别是在处理大量变更时。
--oracle-conf debezium.database.history.store.only.captured.tables.ddl=true
- 这个配置指示 Debezium 只存储被捕获表的 DDL(数据定义语言)历史。
- 当设置为 true 时,它会减少存储的 DDL 历史量,只保留与被监控表相关的 DDL 语句。
- 这可以减少存储开销,并可能提高性能,特别是在大型数据库环境中。
总的来说,这些参数旨在优化 Oracle CDC 过程,提高性能,减少资源消耗,并确保更高效的数据捕获。它们特别适用于需要实时或近实时数据同步的场景,同时也考虑了系统资源的有效利用。
可以在 Flink Web UI 中查看任务执行情况:
可以看见所有任务的 Job State 都是 RUNNING,说明数据同步任务正在运行。可以在 Doris 中查看数据同步情况:
|
|
这个时候如果在 Oracle 中插入新的数据,可以发现 Doris 中也会同步更新数据。可以发现使用 Connector 进行数据同步是高效又方便。
总结
到此为止我们完成了 Oracle 到 Doris 的数据同步,同样是提供了两种方式供读者选择,第一种选择的是代码实现,代码的总体结构比较简单,也只是设置一些参数。第二种则更方便一点,使用的是 Doris-Flink-Connector 来同步,可以直接使用 flink run
命令添加对应的参数完成数据的同步。
我们在创建测试数据的时候设置表包含主键,对应到 Doris 就是主键数据模型,读者可以使用 SHOW CREATE TABLE
语句来进行检测,那如果某张表不包含主键该怎么进行同步呢?以及某些表不需要进行同步,这些参数又该如何设置呢?可以期待下篇内容!
参考: