मुख्य विषयवस्तु में जाएं

Streaming & CDC Ingestion with Kafka

Architecture Overview

Ilum provides a natural foundation for real-time streaming workloads. अपाचे काफ्का is already a core dependency of the ilum platform, used internally for service communication. This means every ilum deployment has a Kafka cluster available that can also serve as the streaming backbone for your data pipelines.

The streaming architecture follows a three-layer pattern:

  1. काफ्का acts as the durable, distributed message bus. Data producers (applications, CDC connectors, IoT devices) publish events to Kafka topics.
  2. Spark Structured Streamingनहीं तो Apache Flink runs as a long-lived ilum job that continuously reads from Kafka topics, transforms the data, and writes results downstream.
  3. Iceberg or Delta Lake serves as the sink table format, providing ACID transactions, schema evolution, and time-travel queries on top of object storage (S3, MinIO, HDFS).

This combination delivers low-latency, exactly-once data ingestion pipelines that are fully managed through the ilum platform.

आवश्यकताएँ

Before building a streaming pipeline, ensure the following components are in place:

  • An ilum cluster with Kafka enabled (this is the default configuration).
  • Iceberg or Delta Lake table format configured in your catalog. You can verify table availability through the SQL व्यूअर .
  • एक Kafka topic populated with data (or a producer actively writing to one).
  • Familiarity with submitting jobs through ilum. See Run a Simple Spark Job for an introduction.

Connecting Spark Jobs to Kafka

Spark Structured Streaming reads from Kafka using the काफ्का format. You must include the Kafka connector package when submitting your job.

Required Spark Package

Add the following Maven coordinate to your job's Spark Packages configuration in the ilum UI:

org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
नोक

When submitting through the ilum UI, add this under the संसाधन tab in the Spark Packages field. Ilum will resolve and distribute the dependency automatically.

Reading from Kafka

The core pattern for connecting to a Kafka topic uses readStream with the काफ्का format.

PySpark:

लोमो =  ( 
उत्तेजक गुण . readStream
. format( "kafka")
. विकल्प ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. विकल्प ( "subscribe", "events")
. विकल्प ( "startingOffsets", "earliest")
. load( )
)

# Kafka messages are binary key/value pairs. Cast to string for processing.
parsed = लोमो . selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"ऑफसेट" ,
"timestamp"
)

Scala:

valलोमो = उत्तेजक गुण . readStream
. format( "kafka")
. विकल्प ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. विकल्प ( "subscribe", "events")
. विकल्प ( "startingOffsets", "earliest")
. load( )

val parsed = लोमो . selectExpr(
"CAST(key AS STRING) AS event_key",
"CAST(value AS STRING) AS event_value",
"topic",
"partition",
"ऑफसेट" ,
"timestamp"
)
नोट

The default Kafka bootstrap server within an ilum cluster is ilum-kafka:9092. If your Kafka deployment uses a different service name or port, adjust accordingly.

Authentication and Security

For Kafka clusters configured with SASL or TLS, add the corresponding properties to your Spark configuration:

लोमो =  ( 
उत्तेजक गुण . readStream
. format( "kafka")
. विकल्प ( "kafka.bootstrap.servers", "ilum-kafka:9093")
. विकल्प ( "subscribe", "events")
. विकल्प ( "kafka.security.protocol", "SASL_SSL")
. विकल्प ( "kafka.sasl.mechanism", "SCRAM-SHA-512")
. विकल्प ( "kafka.sasl.jaas.config",
'org.apache.kafka.common.security.scram.ScramLoginModule required '
'username="user" password="password";')
. विकल्प ( "kafka.ssl.truststore.location", "/opt/spark/truststore.jks")
. विकल्प ( "kafka.ssl.truststore.password", "changeit")
. load( )
)
चेतावनी

Avoid hardcoding credentials in job code. Use Kubernetes secrets mounted as files or environment variables, and reference them in your Spark configuration.

Exactly-Once Semantics

Structured Streaming achieves exactly-once processing guarantees through checkpointingऔर idempotent writes.

Checkpointing Configuration

Checkpoints track the processing state, including Kafka offsets, so that a restarted job resumes from where it left off without data loss or duplication. Store checkpoints on durable object storage.

सवाल =  ( 
parsed. writeStream
. format( "iceberg")
. outputMode( "append")
. विकल्प ( "checkpointLocation", "s3a://my-bucket/checkpoints/events-pipeline")
. toTable( "spark_catalog.default.events")
)
चेतावनी

Each streaming query must have a unique checkpoint location. Reusing a checkpoint path across different queries will cause data corruption.

Idempotent Writes to Iceberg

Iceberg tables support atomic commits, which means each micro-batch is written as a single transaction. If a batch fails partway through, the partial write is rolled back. Combined with checkpointing, this provides end-to-end exactly-once delivery.

Failure Recovery

When a streaming job fails and is restarted (manually or via ilum's Max Retries setting):

  1. Spark reads the latest checkpoint to determine the last successfully committed Kafka offset.
  2. Processing resumes from the next offset, skipping already-committed data.
  3. No manual intervention or offset management is required.
नोक

Set Max Retries on your ilum job to a reasonable value (e.g., 3-5) so that transient failures are automatically recovered without operator intervention.

CDC Patterns

Change Data Capture (CDC) pipelines replicate changes from operational databases (PostgreSQL, MySQL, MongoDB) into your lakehouse. The typical architecture is:

Source DB --> Debezium --> Kafka --> Structured Streaming --> Iceberg/Delta

Debezium CDC Source

Debezium is an open-source CDC platform that captures row-level changes from databases and publishes them to Kafka topics. A typical Debezium deployment runs as a Kafka Connect connector alongside your ilum Kafka cluster.

Key Debezium configuration properties:

connector.class= io.debezium.connector.postgresql.PostgresConnector
database.hostname= postgres-host
database.port= 5432
database.user= debezium
database.password= रहस्य
database.dbname= मायडीबी
database.server.name= मायडीबी
table.include.list= public.orders,public.customers
topic.prefix= cdc

This produces Kafka topics like cdc.public.ordersऔर cdc.public.customers, each containing insert, update, and delete events.

Upsert with MERGE INTO

CDC events include inserts, updates, and deletes. To apply these changes to an Iceberg or Delta table, use the foreachBatch sink with a MERGE INTO statement.

PySpark example:

डीईएफ़  upsert_to_iceberg( batch_df,  batch_id) : 
# Register the micro-batch as a temporary view
batch_df. createOrReplaceTempView ( "cdc_batch")

batch_df. sparkSession. एसक्यूएल ( """
MERGE INTO spark_catalog.default.orders AS target
USING cdc_batch AS source
ON target.id = source.id
WHEN MATCHED AND source.op = 'd' THEN DELETE
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED AND source.op != 'd' THEN INSERT *
""")

सवाल = (
cdc_stream. writeStream
. foreachBatch( upsert_to_iceberg)
. विकल्प ( "checkpointLocation", "s3a://my-bucket/checkpoints/orders-cdc")
. start( )
)
नोट

वही op field in Debezium events indicates the operation type: के आसपास (create), u (update), d (delete), and r (read/snapshot). Adjust the MERGE logic based on your Debezium envelope format.

Example: End-to-End Streaming Pipeline

The following complete example reads JSON events from a Kafka topic, parses them, and writes them to an Iceberg table.

Complete PySpark Code

से पिस्पार्क . एसक्यूएल आयात स्पार्कसेशन 
से पिस्पार्क . एसक्यूएल . functions आयात from_json, col
से पिस्पार्क . एसक्यूएल . types आयात StructType, StructField, StringType, DoubleType, TimestampType

उत्तेजक गुण = स्पार्कसेशन . भवन-निर्माता\
. ऐप का नाम ( "streaming-ingestion-pipeline") \
. getOrCreate ( )

# Define the expected JSON schema
रूपरेखा = StructType( [
StructField( "event_id", StringType( ) , सच्चा ) ,
StructField( "user_id", StringType( ) , सच्चा ) ,
StructField( "action", StringType( ) , सच्चा ) ,
StructField( "amount", DoubleType( ) , सच्चा ) ,
StructField( "event_time", TimestampType( ) , सच्चा ) ,
] )

# Read from Kafka
raw_stream = (
उत्तेजक गुण . readStream
. format( "kafka")
. विकल्प ( "kafka.bootstrap.servers", "ilum-kafka:9092")
. विकल्प ( "subscribe", "user-events")
. विकल्प ( "startingOffsets", "earliest")
. load( )
)

# Parse JSON values
parsed_stream = (
raw_stream
. selectExpr( "CAST(value AS STRING) AS json_str")
. select( from_json( col( "json_str") , रूपरेखा ) . alias( "data") )
. select( "data.*")
. छानना ( col( "event_id") . isNotNull( ) )
)

# Write to Iceberg table
सवाल = (
parsed_stream. writeStream
. format( "iceberg")
. outputMode( "append")
. विकल्प ( "checkpointLocation", "s3a://my-bucket/checkpoints/user-events")
. toTable( "spark_catalog.default.user_events")
)

सवाल . awaitTermination( )

Submitting as an Ilum Job

  1. Save the script above as streaming_pipeline.py.
  2. In the ilum UI, create a new job with Job Typeकरने के लिए सेट करें Spark Jobऔर भाषा करने के लिए सेट करें अजगर .
  3. Upload streaming_pipeline.py under the संसाधन बिल।
  4. Add the Kafka connector package under Spark Packages:
    org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0
  5. Under the संरूपण tab, add the following Spark properties for Iceberg support:
    spark.sql.catalog.spark_catalog = org.apache.iceberg.spark.SparkCatalog
    spark.sql.catalog.spark_catalog.type = hive
  6. Submit the job. For detailed submission steps, see Run a Simple Spark Job.

Monitoring Streaming Jobs

Streaming jobs run continuously, so monitoring is essential:

  • इलम यूआई : The job will appear with a RUNNING status. Use the लॉग tab to monitor micro-batch progress and throughput.
  • स्पार्क यूआई : Access the Spark UI through ilum to view the Structured Streaming tab, which shows input rate, processing rate, and batch duration metrics.
  • Max Retries: Configure automatic restart on failure to maintain pipeline uptime.
नोक

For long-running streaming jobs, allocate sufficient driver and executor memory to handle state accumulation. Monitor memory usage through the ilum UI and adjust resources as needed.

Apache Flink is supported as a streaming engine in ilum, complementing Spark Structured Streaming. Flink provides true event-at-a-time processing with lower latency and is well suited for complex event processing (CEP), session windowing, and use cases where sub-second response times are critical.

Ilum deploys Flink jobs on Kubernetes using the Flink Kubernetes Operator. Flink applications are submitted through the ilum UI or REST API just like Spark jobs. Ilum handles pod lifecycle management, checkpoint storage configuration, and integration with the shared Kafka cluster and catalog layer.

Key architectural points:

  • Job Managerऔर Task Manager pods are orchestrated by Kubernetes, with resource allocation managed through ilum's cluster configuration.
  • Flink jobs connect to the same काफ्का cluster used by ilum internally (ilum-kafka:9092 by default).
  • Flink integrates with the हाइव मेटास्टोर और हिमशैल catalog, allowing Flink and Spark to read and write the same tables.
  • Checkpoints and savepoints are stored on the ilum default storage (S3/MinIO/HDFS), enabling job recovery and upgrades without data loss.

Flink SQL provides a declarative way to build streaming pipelines. The following example reads JSON events from a Kafka topic and writes them to an Iceberg table.

-- Register the Kafka source table
बनाना सारणी kafka_events (
event_id STRING,
user_id STRING,
actionतार ,
कुल धनराशि DOUBLE,
event_time टाइमस्टैम्प ( 3 ) ,
WATERMARK FOR event_time जैसा event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'ilum-kafka:9092',
'properties.group.id' = 'flink-ingestion',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false'
) ;

-- Register the Iceberg sink table
बनाना सारणी iceberg_events (
event_id STRING,
user_id STRING,
actionतार ,
कुल धनराशि DOUBLE,
event_time टाइमस्टैम्प ( 3 )
) WITH (
'connector' = 'iceberg',
'catalog-name' = 'hive_catalog',
'catalog-type' = 'hive',
'uri' = 'thrift://ilum-hive-metastore:9083',
'warehouse' = 's3a://ilum-data/warehouse'
) ;

-- Continuous streaming insert
अंतःस्‍थापित करना में iceberg_events
चुनना event_id, user_id, action, कुल धनराशि , event_time
से kafka_events
कहां event_id है नहीं शून्य ;
नोक

Flink SQL jobs can be submitted as SQL scripts through the ilum UI. Set the Job Typeतक बहादुर and provide the SQL statements in the code editor.

For more complex processing logic, use the Flink DataStream API. The following Java example reads from Kafka, applies a tumbling window aggregation, and writes the results to an Iceberg table.

आयात  org. apache. flink. एपीआई . common. eventtime. WatermarkStrategy; 
आयात org. apache. flink. connector. काफ्का . source. KafkaSource;
आयात org. apache. flink. connector. काफ्का . source. enumerator. initializer. OffsetsInitializer;
आयात org. apache. flink. streaming. एपीआई . environment. StreamExecutionEnvironment;
आयात org. apache. flink. streaming. एपीआई . windowing. assigners. TumblingEventTimeWindows;
आयात org. apache. flink. streaming. एपीआई . windowing. time. Time;

StreamExecutionEnvironmentईएनवी = StreamExecutionEnvironment. getExecutionEnvironment( ) ;
ईएनवी . enableCheckpointing( 60000 ) ; // checkpoint every 60 seconds

KafkaSource< String> source = KafkaSource. < String> builder( )
. setBootstrapServers( "ilum-kafka:9092")
. setTopics( "user-events")
. setGroupId( "flink-datastream")
. setStartingOffsets( OffsetsInitializer. earliest( ) )
. setValueOnlyDeserializer( new SimpleStringSchema( ) )
. build( ) ;

ईएनवी . fromSource( source, WatermarkStrategy. forBoundedOutOfOrderness( Duration. ofSeconds( 5 ) ) , "Kafka Source")
. मानचित्र ( json -> parseEvent( json) ) // custom parsing logic
. keyBy( event -> event. getUserId( ) )
. window( TumblingEventTimeWindows. का ( Time. minutes( 5 ) ) )
. sum( "amount")
. sinkTo( icebergSink) ; // Iceberg sink connector

ईएनवी . अमल ( "flink-aggregation-pipeline") ;

Flink provides a native CDC connector that captures database changes directly without requiring a separate Debezium deployment. This simplifies the CDC pipeline from a three-component architecture to two.

Source DB --> Flink CDC Connector --> Iceberg/Delta

Flink SQL example with PostgreSQL CDC:

-- Register a CDC source directly from PostgreSQL
बनाना सारणी orders_cdc (
परिचय आईएनटी ,
customer_id आईएनटी ,
product STRING,
कुल धनराशि DOUBLE,
updated_at टाइमस्टैम्प ( 3 ) ,
PRIMARY KEY ( परिचय ) नहीं ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgres-host',
'port' = '5432',
'username' = 'cdc_user',
'password' = 'secret',
'database-name' = 'mydb',
'schema-name' = 'public',
'table-name' = 'orders',
'slot.name' = 'flink_orders_slot'
) ;

-- Write changes directly to Iceberg with upsert semantics
अंतःस्‍थापित करना में iceberg_orders
चुनना परिचय , customer_id, product, कुल धनराशि , updated_at
से orders_cdc;
नोट

Flink CDC connectors support PostgreSQL, MySQL, MongoDB, Oracle, and SQL Server. Each connector requires the corresponding Flink CDC library to be included in the job dependencies.

Flink achieves exactly-once semantics through its distributed snapshot (Chandy-Lamport) checkpointing algorithm:

  1. Checkpoint barriers flow through the data stream, ensuring consistent state snapshots across all operators.
  2. State backends (RocksDB or heap-based) persist operator state to durable storage (S3/MinIO) on each checkpoint.
  3. Two-phase commit sinks (Kafka, Iceberg) ensure that output records are committed atomically with the checkpoint.

Configure checkpointing in your Flink job:

ईएनवी . enableCheckpointing( 60000 ) ;                           // checkpoint interval (ms)
ईएनवी . getCheckpointConfig( ) . setCheckpointingMode( CheckpointingMode. EXACTLY_ONCE) ;
ईएनवी . getCheckpointConfig( ) . setMinPauseBetweenCheckpoints( 30000) ;
ईएनवी . getCheckpointConfig( ) . setCheckpointStorage( "s3://ilum-data/flink-checkpoints") ;
नोक

For Flink SQL jobs, set checkpointing via Flink configuration properties:

execution.checkpointing.interval= 60s
execution.checkpointing.mode= EXACTLY_ONCE
state.checkpoints.dir= s3://ilum-data/flink-checkpoints

Both engines are production-ready for streaming workloads on ilum. Choose based on your requirements:

दृष्टिकोण Spark Structured StreamingApache Flink
Processing ModelMicro-batch (default) or continuousTrue event-at-a-time
LatencySeconds (micro-batch)Milliseconds
State ManagementLimited by micro-batch boundariesFine-grained, key-partitioned state
WindowingTumbling, sliding, sessionTumbling, sliding, session, custom
Complex Event Processingमूलवर्ती Advanced (CEP library)
CDCVia Debezium + KafkaNative CDC connectors
Batch + Stream UnificationStrong (same DataFrame API)Strong (same DataStream/Table API)
Catalog IntegrationHive, Nessie, Unity, Iceberg, Delta, HudiHive, Iceberg
के लिए सबसे अच्छा ETL pipelines, unified batch/stream, existing Spark codebasesLow-latency event processing, CEP, native CDC

General guidance:

  • प्रयोग Spark Structured Streaming when you have existing Spark batch jobs and want a unified batch/streaming codebase, or when micro-batch latency (seconds) is acceptable.
  • प्रयोग बहादुर when you need sub-second latency, complex event processing patterns, native CDC without Debezium, or fine-grained state management with event-time processing.