seatunnel-一种场景mysqlcdc同步进入clickhouse基于2.3.11版本
MySQL CDC 到 ClickHouse 完整数据流转分析
概述
本文档详细分析在startup_mode="initial"模式下,数据从 MySQL CDC Source 读取到 ClickHouse Sink 的完整流转过程。我们将深入探讨每个环节的具体函数调用链、数据转换过程以及关键实现细节。
整体流程概览
sql
体验AI代码助手
代码解读
复制代码
MySQL Binlog/Snapshot → CDC Source Reader → SeaTunnel Row → ClickHouse Sink Writer → ClickHouse
详细函数调用链分析
第一阶段:任务启动与初始化
1.1 SeaTunnel 任务启动
scss
体验AI代码助手
代码解读
复制代码
SeaTunnel.run(command) → command.execute() → TaskExecution.execute()
入口函数:org.apache.seatunnel.core.starter.SeaTunnel.run()
- • 接收命令行参数,启动 SeaTunnel 任务
- • 调用具体的执行命令
1.2 Source 初始化
scss
体验AI代码助手
代码解读
复制代码
MySqlIncrementalSource.createReader() → IncrementalSourceReader() → MySqlDialect.createFetchTaskContext()
关键函数:MySqlIncrementalSource.createReader()
java
体验AI代码助手
代码解读
复制代码
@Override public SourceReader createReader(SourceReader.Context context) throws Exception { // 创建增量源读取器 return new IncrementalSourceReader<>( dataSourceDialect, elementsQueue, splitReaderSupplier, recordEmitter, options, context, configFactory.create(0), deserializationSchema ); }
1.3 Sink 初始化
scss
体验AI代码助手
代码解读
复制代码
ClickhouseSink.createWriter() → ClickhouseSinkWriter() → initStatementMap()
关键函数:ClickhouseSink.createWriter()
java
体验AI代码助手
代码解读
复制代码
@Override public ClickhouseSinkWriter createWriter(SinkWriter.Context context) throws IOException { // 1. 创建 ClickHouse 节点连接 List nodes = ClickhouseUtil.createNodes(readonlyConfig); // 2. 创建代理连接 ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0)); // 3. 获取表结构信息 Map tableSchema = proxy.getClickhouseTableSchema(readonlyConfig.get(TABLE)); // 4. 创建分片路由器 ShardRouter shardRouter = new ShardRouter(proxy, option.getShardMetadata()); // 5. 创建批次执行器映射 Map statementMap = initStatementMap(); return new ClickhouseSinkWriter(option, context); }
第二阶段:数据分片与分配(Source 端)
2.1 分片分配器初始化
体验AI代码助手
代码解读
复制代码
IncrementalSourceEnumerator → HybridSplitAssigner → MySqlChunkSplitter
关键函数:HybridSplitAssigner.getNext()
kotlin
体验AI代码助手
代码解读
复制代码
@Override public Optional getNext() { // 1. 首先分配快照分片 if (!snapshotAssigner.isFinished()) { return snapshotAssigner.getNext(); } // 2. 快照完成后分配增量分片 if (incrementalSplitAssigner != null) { return incrementalSplitAssigner.getNext(); } return Optional.empty(); }
2.2 分片算法实现
scss
体验AI代码助手
代码解读
复制代码
MySqlChunkSplitter.queryMinMax() → MySqlUtils.queryMinMax() → buildSplitScanQuery()
关键函数:MySqlUtils.queryMinMax()
scss
体验AI代码助手
代码解读
复制代码
public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException { final String minMaxQuery = String.format( "SELECT MIN(%s), MAX(%s) FROM %s", quote(columnName), quote(columnName), quote(tableId) ); return jdbc.queryAndMap(minMaxQuery, rs -> { if (!rs.next()) { throw new SQLException(String.format("No result returned after running query [%s]", minMaxQuery)); } return rowToArray(rs, 2); }); }
第三阶段:全量数据读取(Source 端)
3.1 快照任务执行
scss
体验AI代码助手
代码解读
复制代码
MySqlSnapshotFetchTask.execute() → MySqlSnapshotSplitReadTask.execute() → doExecute()
关键函数:MySqlSnapshotSplitReadTask.doExecute()
ini
体验AI代码助手
代码解读
复制代码
@Override protected SnapshotResult doExecute( ChangeEventSourceContext context, MySqlOffsetContext previousOffset, SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { final MySqlSnapshotContext ctx = (MySqlSnapshotContext) snapshotContext; ctx.offset = offsetContext; // 步骤1:确定低水位标记(开始时的 binlog 位置) final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", lowWatermark, snapshotSplit); ((SnapshotSplitChangeEventSourceContext) context).setLowWatermark(lowWatermark); dispatcher.dispatchWatermarkEvent( ctx.partition.getSourcePartition(), snapshotSplit, lowWatermark, WatermarkKind.LOW); // 步骤2:快照数据 LOG.info("Snapshot step 2 - Snapshotting data"); createDataEvents(ctx, snapshotSplit.getTableId()); // 步骤3:确定高水位标记(结束时的 binlog 位置) final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); ((SnapshotSplitChangeEventSourceContext) context).setHighWatermark(highWatermark); dispatcher.dispatchWatermarkEvent( ctx.partition.getSourcePartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); return SnapshotResult.completed(ctx.offset); }
3.2 数据事件创建
scss
体验AI代码助手
代码解读
复制代码
createDataEvents() → createDataEventsForTable() → createDataEventsForTable()
关键函数:MySqlSnapshotSplitReadTask.createDataEventsForTable()
ini
体验AI代码助手
代码解读
复制代码
private void createDataEventsForTable( MySqlSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, Table table) throws InterruptedException { long exportStart = clock.currentTimeInMillis(); LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); // 构建分片查询 SQL final String selectSql = buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null ); // 执行查询并创建数据事件 try (PreparedStatement statement = readTableSplitDataStatement(selectSql, snapshotSplit); ResultSet resultSet = statement.executeQuery()) { ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(table, resultSet.getMetaData()); long rows = 0; Timer logTimer = getTableScanLogTimer(); while (resultSet.next()) { rows++; final Object[] row = new Object[columnArray.getGreatestColumnPosition()]; // 从 ResultSet 中提取数据 for (int i = 0; i < columnArray.getColumns().length; i++) { Column column = columnArray.getColumns()[i]; int position = columnArray.getColumnPositions()[i]; row[position - 1] = readColumnValue(resultSet, i + 1, column); } // 创建快照变更记录 SnapshotChangeRecordEmitter recordEmitter = new SnapshotChangeRecordEmitter( partition, offsetContext, table.id(), row, clock.currentTimeAsInstant() ); // 分发数据变更事件 dispatcher.dispatchSnapshotEvent(snapshotContext.partition, table.id(), recordEmitter); // 定期记录进度 if (logTimer.expired()) { LOG.info("Exported {} records for table '{}' from split '{}'", rows, table.id(), snapshotSplit.splitId()); logTimer = getTableScanLogTimer(); } } LOG.info("Export {} records for table '{}' from split '{}'", rows, table.id(), snapshotSplit.splitId()); } }
第四阶段:数据反序列化转换
4.1 Debezium 事件转换
scss
体验AI代码助手
代码解读
复制代码
SeaTunnelRowDebeziumDeserializeSchema.deserialize() → extractAfterRow()/extractBeforeRow()
关键函数:SeaTunnelRowDebeziumDeserializeSchema.deserializeDataChangeRecord()
ini
体验AI代码助手
代码解读
复制代码
private void deserializeDataChangeRecord(SourceRecord record, Collector collector) throws Exception { Envelope.Operation operation = Envelope.operationFor(record); Struct messageStruct = (Struct) record.value(); Schema valueSchema = record.valueSchema(); TablePath tablePath = SourceRecordUtils.getTablePath(record); String tableId = tablePath.toString(); // 获取对应表的转换器 SeaTunnelRowDebeziumDeserializationConverters converters = tableRowConverters.get(tableId); Long fetchTimestamp = SourceRecordUtils.getFetchTimestamp(record); Long messageTimestamp = SourceRecordUtils.getMessageTimestamp(record); long delay = -1L; if (fetchTimestamp != null && messageTimestamp != null) { delay = fetchTimestamp - messageTimestamp; } if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) { // INSERT 操作(全量快照中的数据也标记为 INSERT) SeaTunnelRow insert = extractAfterRow(converters, record, messageStruct, valueSchema); insert.setRowKind(RowKind.INSERT); insert.setTableId(tableId); MetadataUtil.setDelay(insert, delay); MetadataUtil.setEventTime(insert, fetchTimestamp); collector.collect(insert); } else if (operation == Envelope.Operation.DELETE) { // DELETE 操作 SeaTunnelRow delete = extractBeforeRow(converters, record, messageStruct, valueSchema); delete.setRowKind(RowKind.DELETE); delete.setTableId(tableId); MetadataUtil.setDelay(delete, delay); MetadataUtil.setEventTime(delete, fetchTimestamp); collector.collect(delete); } else if (operation == Envelope.Operation.UPDATE) { // UPDATE 操作(生成两行:UPDATE_BEFORE 和 UPDATE_AFTER) SeaTunnelRow before = extractBeforeRow(converters, record, messageStruct, valueSchema); before.setRowKind(RowKind.UPDATE_BEFORE); before.setTableId(tableId); MetadataUtil.setDelay(before, delay); MetadataUtil.setEventTime(before, fetchTimestamp); collector.collect(before); SeaTunnelRow after = extractAfterRow(converters, record, messageStruct, valueSchema); after.setRowKind(RowKind.UPDATE_AFTER); after.setTableId(tableId); MetadataUtil.setDelay(after, delay); MetadataUtil.setEventTime(after, fetchTimestamp); collector.collect(after); } }
4.2 数据类型转换
scss
体验AI代码助手
代码解读
复制代码
extractAfterRow() → converters.getAfterRowConverter().convert() → RowConverter
关键函数:SeaTunnelRowDebeziumDeserializationConverters.extractAfterRow()
ini
体验AI代码助手
代码解读
复制代码
public SeaTunnelRow extractAfterRow(SourceRecord record, Struct value, Schema valueSchema) throws Exception { Struct after = value.getStruct(Envelope.FieldName.AFTER); Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); SeaTunnelRow row = new SeaTunnelRow(fieldConverters.length); for (int i = 0; i < fieldConverters.length; i++) { String fieldName = fieldNames[i]; Object fieldValue = after.get(fieldName); Object convertedValue = fieldConverters[i].convert(fieldValue); row.setField(i, convertedValue); } // 设置元数据字段 for (int i = 0; i < metadataConverters.length; i++) { row.setField(fieldConverters.length + i, metadataConverters[i].convert(record)); } return row; }
第五阶段:数据写入 ClickHouse
5.1 Sink Writer 接收数据
scss
体验AI代码助手
代码解读
复制代码
ClickhouseSinkWriter.write() → shardRouter.getShard() → addIntoBatch()
关键函数:ClickhouseSinkWriter.write()
scss
体验AI代码助手
代码解读
复制代码
@Override public void write(SeaTunnelRow element) throws IOException { // 1. 确定分片键值 Object shardKey = null; if (StringUtils.isNotEmpty(this.option.getShardMetadata().getShardKey())) { int i = this.option.getSeaTunnelRowType().indexOf(this.option.getShardMetadata().getShardKey()); shardKey = element.getField(i); } // 2. 路由到目标分片 ClickhouseBatchStatement statement = statementMap.get(shardRouter.getShard(shardKey)); JdbcBatchStatementExecutor clickHouseStatement = statement.getJdbcBatchStatementExecutor(); IntHolder sizeHolder = statement.getIntHolder(); // 3. 添加到批次 addIntoBatch(element, clickHouseStatement); sizeHolder.setValue(sizeHolder.getValue() + 1); // 4. 批量刷新 if (sizeHolder.getValue() >= option.getBulkSize()) { flush(clickHouseStatement); sizeHolder.setValue(0); } }
5.2 分片路由算法
scss
体验AI代码助手
代码解读
复制代码
ShardRouter.getShard() → HASH_INSTANCE.hash() → TreeMap.lowerEntry()
关键函数:ShardRouter.getShard()
scss
体验AI代码助手
代码解读
复制代码
public Shard getShard(Object shardValue) { if (!splitMode) { return shards.firstEntry().getValue(); } if (StringUtils.isEmpty(shardKey) || shardValue == null) { // 没有分片键时随机选择 return shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount) + 1).getValue(); } // 使用 XXHash64 进行一致性哈希 int offset = (int) ((HASH_INSTANCE.hash( ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)), 0) & Long.MAX_VALUE) % shardWeightCount); return shards.lowerEntry(offset + 1).getValue(); }
5.3 批量执行器处理
scss
体验AI代码助手
代码解读
复制代码
JdbcBatchStatementExecutorBuilder.build() → SimpleBatchStatementExecutor → executeBatch()
关键函数:SimpleBatchStatementExecutor.executeBatch()
csharp
体验AI代码助手
代码解读
复制代码
@Override public void executeBatch() throws SQLException { if (batchCount > 0) { try { // 执行批量插入 statement.executeBatch(); connection.commit(); batchCount = 0; } catch (SQLException e) { connection.rollback(); throw new ClickhouseConnectorException( CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Clickhouse execute batch statement error", e ); } } }
5.4 数据类型注入
scss
体验AI代码助手
代码解读
复制代码
JdbcRowConverter.toExternal() → fieldInjectFunctionMap.get().injectFields()
关键函数:JdbcRowConverter.toExternal()
ini
体验AI代码助手
代码解读
复制代码
public PreparedStatement toExternal(SeaTunnelRow row, PreparedStatement statement) throws SQLException { for (int i = 0; i < projectionFields.length; i++) { String fieldName = projectionFields[i]; Object fieldValue = fieldGetterMap.get(fieldName).apply(row); if (fieldValue == null) { statement.setObject(i + 1, null); continue; } // 使用对应的注入函数处理字段值 fieldInjectFunctionMap .getOrDefault(fieldName, DEFAULT_INJECT_FUNCTION) .injectFields(statement, i + 1, fieldValue); } return statement; }
第六阶段:增量阶段切换
6.1 快照完成检测
scss
体验AI代码助手
代码解读
复制代码
IncrementalSourceReader.onSplitFinished() → reportFinishedSnapshotSplitsIfNeed()
关键函数:IncrementalSourceReader.onSplitFinished()
scss
体验AI代码助手
代码解读
复制代码
@Override protected void onSplitFinished(Map finishedSplitIds) { for (SourceSplitStateBase splitState : finishedSplitIds.values()) { SourceSplitBase sourceSplit = splitState.toSourceSplit(); checkState( sourceSplit.isSnapshotSplit() && sourceSplit.asSnapshotSplit().isSnapshotReadFinished(), String.format("Only snapshot split could finish, but the actual split is incremental split %s", sourceSplit) ); finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit()); } reportFinishedSnapshotSplitsIfNeed(); context.sendSplitRequest(); // 请求下一个分片 }
6.2 增量分片分配
scss
体验AI代码助手
代码解读
复制代码
HybridSplitAssigner.getNext() → incrementalSplitAssigner.getNext()
关键函数:IncrementalSplitAssigner.getNext()
scss
体验AI代码助手
代码解读
复制代码
@Override public Optional getNext() { if (remainingTables.isEmpty()) { return Optional.empty(); } // 为每个表创建一个增量分片 TableId tableId = remainingTables.iterator().next(); remainingTables.remove(tableId); // 创建增量分片,从快照的高水位标记开始 IncrementalSplit split = new IncrementalSplit( tableId.toString(), Collections.singletonList(tableId), completedSnapshotSplitWatermarks.get(tableId).getHighWatermark(), stopOffset, Collections.emptyList() ); return Optional.of(split); }
6.3 Binlog 事件处理
scss
体验AI代码助手
代码解读
复制代码
MySqlBinlogFetchTask.execute() → MySqlStreamingChangeEventSource.execute() → handleEvent()
关键函数:MySqlBinlogFetchTask.MySqlBinlogSplitReadTask.handleEvent()
scss
体验AI代码助手
代码解读
复制代码
@Override protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) { super.handleEvent(partition, offsetContext, event); // 检查是否需要停止(有界读取) if (isBoundedRead()) { final BinlogOffset currentBinlogOffset = getBinlogPosition(offsetContext.getOffset()); // 达到高水位标记,结束 binlog 读取 if (currentBinlogOffset.isAtOrAfter(binlogSplit.getStopOffset())) { // 发送 binlog 结束事件 dispatcher.dispatchWatermarkEvent( partition.getSourcePartition(), binlogSplit, currentBinlogOffset, WatermarkKind.END ); // 通知读取器任务完成 ((MySqlSnapshotFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context).finished(); } } }
关键数据转换过程
1. MySQL 数据类型到 SeaTunnel 类型的转换
转换器:MySqlTypeUtils.convertFromColumn()
csharp
体验AI代码助手
代码解读
复制代码
public static SeaTunnelDataType convertFromColumn(Column column, RelationalDatabaseConnectorConfig config) { String mysqlType = column.typeName().toUpperCase(); switch (mysqlType) { case "INT": case "INTEGER": return BasicType.INT_TYPE; case "BIGINT": return BasicType.LONG_TYPE; case "VARCHAR": case "TEXT": return BasicType.STRING_TYPE; case "DATETIME": return LocalTimeType.LOCAL_DATE_TIME_TYPE; case "TIMESTAMP": return LocalTimeType.LOCAL_DATE_TIME_TYPE; case "DECIMAL": case "NUMERIC": return new DecimalType(column.length(), column.scale()); default: return BasicType.STRING_TYPE; } }
2. SeaTunnel 类型到 ClickHouse 类型的转换
转换器:JdbcRowConverter.createFieldInjectFunctionMap()
scss
体验AI代码助手
代码解读
复制代码
private Map createFieldInjectFunctionMap( String[] fields, Map clickhouseTableSchema) { Map fieldInjectFunctionMap = new HashMap<>(); for (String field : fields) { String fieldType = clickhouseTableSchema.get(field); ClickhouseFieldInjectFunction injectFunction = Arrays.asList( new ArrayInjectFunction(), new MapInjectFunction(), new BigDecimalInjectFunction(), new DateTimeInjectFunction(), new LongInjectFunction(), new DoubleInjectFunction(), new IntInjectFunction(), new StringInjectFunction() ).stream() .filter(f -> f.isCurrentFieldType(unwrapCommonPrefix(fieldType))) .findFirst() .orElse(new StringInjectFunction()); fieldInjectFunctionMap.put(field, injectFunction); } return fieldInjectFunctionMap; }
3. 数据行转换流程
体验AI代码助手
代码解读
复制代码
MySQL ResultSet → Debezium SourceRecord → SeaTunnelRow → ClickHouse PreparedStatement
完整转换链:
- 1.MySQL ResultSet: 从数据库查询结果
- 2.Debezium SourceRecord: Debezium 格式的事件记录
- 3.SeaTunnelRow: SeaTunnel 统一的数据格式
- 4.ClickHouse PreparedStatement: 最终写入 ClickHouse 的语句
精确一次处理实现
1. 水位标记机制
低水位标记: 快照开始时的 binlog 位置
高水位标记: 快照结束时的 binlog 位置
ini
体验AI代码助手
代码解读
复制代码
// 低水位标记 final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection); dispatcher.dispatchWatermarkEvent(partition, snapshotSplit, lowWatermark, WatermarkKind.LOW); // 数据快照 createDataEvents(ctx, snapshotSplit.getTableId()); // 高水位标记 final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection); dispatcher.dispatchWatermarkEvent(partition, snapshotSplit, highWatermark, WatermarkKind.HIGH);
2. 增量补偿机制
当lowWatermark != highWatermark时,需要进行增量补偿:
scss
体验AI代码助手
代码解读
复制代码
// 创建补偿增量分片 IncrementalSplit backfillSplit = new IncrementalSplit( splitId, Collections.singletonList(tableId), lowWatermark, // 从低水位开始 highWatermark, // 到高水位结束 new ArrayList<>() ); // 执行补偿读取 MySqlBinlogSplitReadTask backfillTask = new MySqlBinlogSplitReadTask(...); backfillTask.execute(context, partition, offsetContext);
3. 偏移量原子性
java
体验AI代码助手
代码解读
复制代码
@Override public void notifyCheckpointComplete(long checkpointId) throws Exception { // 在检查点完成时提交变更日志偏移量 dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset); }
性能优化点
1. 并行度控制
- •快照阶段: 多分片并行读取,提高全量数据读取效率
- •增量阶段: 单线程顺序处理,保证事件顺序性
2. 批量处理
- •批次大小: 可配置的
bulk_size参数(默认 20000) - •批量提交: 减少网络往返和事务开销
3. 内存管理
- •流式处理: 避免大量数据驻留内存
- •连接池: 复用数据库连接,减少连接创建开销
错误处理与容错
1. 连接异常处理
csharp
体验AI代码助手
代码解读
复制代码
try { statement.executeBatch(); connection.commit(); } catch (SQLException e) { connection.rollback(); throw new ClickhouseConnectorException( CommonErrorCodeDeprecated.FLUSH_DATA_FAILED, "Clickhouse execute batch statement error", e ); }
2. 数据类型不匹配处理
scss
体验AI代码助手
代码解读
复制代码
// 使用默认转换器处理未知类型 fieldInjectFunctionMap .getOrDefault(fieldName, DEFAULT_INJECT_FUNCTION) .injectFields(statement, i + 1, fieldValue);
3. 重试机制
- •连接重试: 支持配置
connect.max-retries参数 - •自动重连: 数据库连接断开后自动重连
监控与指标
1. CDC 指标
less
体验AI代码助手
代码解读
复制代码
// 记录获取延迟 recordFetchDelay.set(fetchDelay > 0 ? fetchDelay : 0); // 记录发送延迟 recordEmitDelay.set(emitDelay > 0 ? emitDelay : 0); // 延迟事件监控 if (delayedEventLimiter.acquire(messageTimestamp)) { eventListener.onEvent(new MessageDelayedEvent(emitDelay, element.toString())); }
2. ClickHouse 指标
- •批次大小: 当前批次中的记录数
- •写入延迟: 数据写入 ClickHouse 的耗时
- •错误率: 写入失败的比例
总结
整个 MySQL CDC 到 ClickHouse 的数据流转过程是一个精心设计、高度优化的流水线,主要特点包括:
1.全链路数据一致性
- • 通过水位标记和补偿机制确保数据不丢失
- • 支持精确一次处理语义
- • 自动处理快照期间的变更数据
2.高性能并行处理
- • 快照阶段支持多分片并行读取
- • 批量写入 ClickHouse 减少网络开销
- • 智能分片路由实现负载均衡
3.灵活的数据转换
- • 支持复杂的数据类型映射
- • 自动处理 DDL 结构变更
- • 提供丰富的元数据信息
4.企业级可靠性
- • 完善的错误处理和重试机制
- • 支持故障恢复和断点续传
- • 提供详细的监控指标
5.优化策略
- • 流式处理避免内存溢出
- • 连接池复用减少资源消耗
- • 智能批次管理平衡性能和延迟
这个架构设计使得 SeaTunnel 能够高效、可靠地将 MySQL 的变更数据实时同步到 ClickHouse,为实时分析提供强有力的数据支撑。
.preview-wrapper pre::before { position: absolute; top: 0; right: 0; color: #ccc; text-align: center; font-size: 0.8em; padding: 5px 10px 0; line-height: 15px; height: 15px; font-weight: 600; } .hljs.code__pre > .mac-sign { display: flex; } .code__pre { padding: 0 !important; } .hljs.code__pre code { display: -webkit-box; padding: 0.5em 1em 1em; overflow-x: auto; text-indent: 0; } h2 strong { color: inherit !important; }