Skip to main content

Kafka Data Connector

The Kafka Data Connector enables direct acceleration of data from Apache Kafka topics using refresh_mode: append acceleration. This allows seamless integration with existing Kafka-based event streaming infrastructure for real-time data acceleration and analytics.

datasets:
- from: kafka:my_kafka_topic
name: my_dataset
params:
kafka_bootstrap_servers: broker1:9092,broker2:9092,broker3:9092 # Required. A comma separated list of Kafka broker servers.
kafka_security_protocol: SASL_SSL # Default is `SASL_SSL`. Valid values are `PLAINTEXT`, `SSL`, `SASL_PLAINTEXT`, `SASL_SSL`.
kafka_sasl_mechanism: SCRAM-SHA-512 # Default is `SCRAM-SHA-512`. Valid values are `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512`.
kafka_sasl_username: kafka # Required if `kafka_security_protocol` is `SASL_PLAINTEXT` or `SASL_SSL`.
kafka_sasl_password: ${secrets:kafka_sasl_password} # Required if `kafka_security_protocol` is `SASL_PLAINTEXT` or `SASL_SSL`.
kafka_ssl_ca_location: ./certs/kafka_ca_cert.pem # Optional. Used to verify the SSL/TLS certificate of the Kafka broker.
kafka_enable_ssl_certificate_verification: true # Default is `true`. Set to `false` to disable SSL/TLS certificate verification.
kafka_ssl_endpoint_identification_algorithm: https # Default is `https`. Valid values are `none` and `https`.

acceleration:
enabled: true # Acceleration is required for the kafka connector.
engine: duckdb # `duckdb`, `sqlite` and `postgres` are supported acceleration engines for Kafka.
refresh_mode: append # Required. Must be set to `append` for the Kafka connector.
mode: file # Persistence is recommended to not have to fully rebuild the table each time Spice starts.

Overview​

Upon startup, Spice fetches all messages for the specified topic using a uniquely generated consumer group. If a persistent acceleration engine is used (with mode: file), data is fetched starting from the last processed record, allowing Spice to resume without reprocessing all historical data.

Schema is automatically inferred from the first available topic message in JSON format. The connector creates the appropriate table schema for acceleration based on the detected data structure.

Configuration​

from​

The from field takes the form of kafka:kafka_topic where kafka_topic is the name of the Kafka topic to consume from.

datasets:
- from: kafka:user_events
name: events
...

name​

The dataset name. This will be used as the table name within Spice.

datasets:
- from: kafka:orders_events
name: orders
...
SELECT COUNT(*) FROM orders;
+----------+
| count(*) |
+----------+
| 6001215 |
+----------+

The dataset name cannot be a reserved keyword.

params​

Parameter NameDescription
kafka_bootstrap_serversRequired. A list of host/port pairs for establishing the initial Kafka cluster connection. The client will use all servers, regardless of the bootstrapping servers specified here. This list only affects the initial hosts used to discover the full server set and should be formatted as host1:port1,host2:port2,....
kafka_security_protocolSecurity protocol for Kafka connections. Default: SASL_SSL. Options:
  • PLAINTEXT
  • SSL
  • SASL_PLAINTEXT
  • SASL_SSL
kafka_sasl_mechanismSASL (Simple Authentication and Security Layer) authentication mechanism. Default: SCRAM-SHA-512. Options:
  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512
kafka_sasl_usernameSASL username. Required if kafka_security_protocol is SASL_PLAINTEXT or SASL_SSL.
kafka_sasl_passwordSASL password. Required if kafka_security_protocol is SASL_PLAINTEXT or SASL_SSL.
kafka_ssl_ca_locationPath to the SSL/TLS CA certificate file for server verification.
kafka_enable_ssl_certificate_verificationEnable SSL/TLS certificate verification. Default: true.
kafka_ssl_endpoint_identification_algorithmSSL/TLS endpoint identification algorithm. Default: https. Options:
  • none
  • https

Acceleration Settings​

warning

Using the Kafka connector requires acceleration with refresh_mode: append enabled.

The following settings are required:

Parameter NameDescription
enabledRequired. Must be set to true to enable acceleration.
engineRequired. The acceleration engine to use. Possible valid values:
  • duckdb: Use DuckDB as the acceleration engine.
  • sqlite: Use SQLite as the acceleration engine.
  • postgres: Use PostgreSQL as the acceleration engine.
refresh_modeRequired. The refresh mode to use. Must be set to append for the Kafka connector.
modeOptional. The persistence mode to use. When using the duckdb and sqlite engines, it is recommended to set this to file to persist the data across restarts. Spice persists metadata about the dataset, allowing it to resume from the last known state instead of re-processing all messages.

Data Format Support​

The Kafka connector currently supports JSON-formatted messages. Schema is automatically inferred from the first available message in the topic, and all subsequent messages are expected to follow a compatible structure.

Secrets​

Spice integrates with multiple secret stores to help manage sensitive data securely. For detailed information on supported secret stores, refer to the secret stores documentation. Additionally, learn how to use referenced secrets in component parameters by visiting the using referenced secrets guide.

Cookbook​