kafka
one of four upstream images — apache/kafka-native (GraalVM), apache/kafka(JVM), confluentinc/cp-kafka (Confluent Platform), or
redpandadata/redpanda (Redpanda) — and a pure-Go franz-go client that
targets either the local cluster or any reachable remote cluster.
File map (all `package main`, surfaced as one Dagger module):
- security.go — *ServerSecurity / *ClientSecurity + the six
Plaintext/Tls/Mtls constructors.
- cluster_kafka.go — *Cluster + the three KAFKA_*-env-var-contract
distros (ApacheNativeCluster, ApacheCluster,
ConfluentCluster) + buildKafkaCluster.
- internal_ca.go — per-cluster internal mTLS material, caller-CA
external leaf signing, and the Kafka SSL env
var helpers that mount them onto a broker
container.
- cluster_redpanda.go — *RedpandaCluster / *RedpandaServerSecurity,
single-node-only Redpanda constructor, rpk
start args, and the redpanda.yaml renderer.
- client.go — *Client + ConsumedRecord, franz-go wiring,
PKCS#12 → *tls.Config, PropertiesFile, and the
admin / produce / consume / list-topics
method set.
- util.go — shared helpers (writeWorkdirBytes,
clusterHostSuffix, randSuffix, dagFileBytes).
Installation
dagger install github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9Entrypoint
Return Type
Kafka Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
func (m *MyModule) Example() *dagger.Kafka {
return dag.
Kafka()
}@function
def example() -> dagger.Kafka:
return (
dag.kafka()
)@func()
example(): Kafka {
return dag
.kafka()
}Types
Kafka 🔗
Kafka is the root namespace for every exported function in this module. All cluster constructors and security helpers hang off *Kafka so the generated Dagger SDK surfaces them under `dag.Kafka().<Func>(...)`.
apacheCluster() 🔗
ApacheCluster spins up a KRaft Kafka cluster of the requested size with
dedicated controller and broker containers, using the apache/kafka
JVM image.
Identical in topology, caching, and security semantics to
ApacheNativeCluster — only the image differs. The JVM image runs the
same Scala wrapper but on HotSpot, so it does not share
apache/kafka-native’s AOT-compiled getpwuid substitution
(Pwd.getpwuid from SystemPropertiesSupport.userHomeValue) that has
been observed to segfault during broker startup — see Dagger Cloud
trace 377f2e176c4f0e9844cb7f958c1e911b. Prefer this constructor
whenever startup robustness matters more than cold-start latency.
Return Type
Cluster !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| clusterId | String ! | - | No description provided |
| controllers | Integer ! | 1 | No description provided |
| brokers | Integer ! | 1 | No description provided |
| registry | String ! | "docker.io" | No description provided |
| tag | String ! | "4.2.0" | No description provided |
| clientListenerSecurity | ServerSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) *dagger.KafkaCluster {
return dag.
Kafka().
ApacheCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> dagger.KafkaCluster:
return (
dag.kafka()
.apache_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): KafkaCluster {
return dag
.kafka()
.apacheCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}apacheNativeCluster() 🔗
ApacheNativeCluster spins up a KRaft Kafka cluster of the requested
size with dedicated controller and broker containers, using the
apache/kafka-native GraalVM-compiled image.
Topology: a single controller forms a one-node KRaft quorum; one or more brokers connect to it and discover each other over the engine’s session-wide DNS — no broker-to-broker WithServiceBinding needed.
Multi-controller (controllers > 1) is rejected for now: a true HA quorum needs every controller to know every other controller at static config time, which Dagger’s WithServiceBinding model can’t express without an unresolvable cycle. TLS / mTLS and multi-controller both land in a follow-up.
Session-cached so that repeated chained method calls on the returned cluster (Client.Produce → Consume → ListTopics) all observe the SAME underlying broker services. The internal CA + per-node leaves are minted with fresh random material that we can’t make content-addressable, so a `(with a brand-new CA the previous invocation’s franz-go client doesn’t trust) every time the test calls another method on the chain.
The GraalVM-compiled image has been observed to flake during the broker
setup step under load — see Dagger Cloud trace
377f2e176c4f0e9844cb7f958c1e911b. If you need the JVM image instead,
use ApacheCluster().
Return Type
Cluster !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| clusterId | String ! | - | No description provided |
| controllers | Integer ! | 1 | No description provided |
| brokers | Integer ! | 1 | No description provided |
| registry | String ! | "docker.io" | No description provided |
| tag | String ! | "4.2.0" | No description provided |
| clientListenerSecurity | ServerSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) *dagger.KafkaCluster {
return dag.
Kafka().
ApacheNativeCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> dagger.KafkaCluster:
return (
dag.kafka()
.apache_native_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): KafkaCluster {
return dag
.kafka()
.apacheNativeCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}client() 🔗
Client constructs a franz-go-backed Kafka client that targets the given bootstrap servers. No I/O happens at construction time.
Return Type
Client !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| bootstrapServers | [String ! ] ! | - | No description provided |
| security | ClientSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(bootstrapServers []string, security *dagger.KafkaClientSecurity) *dagger.KafkaClient {
return dag.
Kafka().
Client(bootstrapServers, security)
}@function
def example(bootstrap_servers: List[str], security: dagger.KafkaClientSecurity) -> dagger.KafkaClient:
return (
dag.kafka()
.client(bootstrap_servers, security)
)@func()
example(bootstrapServers: string[], security: KafkaClientSecurity): KafkaClient {
return dag
.kafka()
.client(bootstrapServers, security)
}confluentCluster() 🔗
ConfluentCluster spins up a KRaft Kafka cluster of the requested size
using the confluentinc/cp-kafka image — the Confluent Platform
distribution. Confluent Platform 8.x bundles Apache Kafka 4.x (CP
8.2.0 ships Kafka 4.2.0), and cp-kafka speaks the same Scala-wrapper
KAFKA_* env-var contract that ApacheCluster does, so the returned
*Cluster and ServerSecurity API are identical to the Apache
constructors — callers swap distros by changing the constructor
name alone.
The constructor silently disables Confluent’s phone-home telemetry
(KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE=false) on every broker so
the cluster behaves the same way the Apache variants do at startup.
Return Type
Cluster !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| clusterId | String ! | - | No description provided |
| controllers | Integer ! | 1 | No description provided |
| brokers | Integer ! | 1 | No description provided |
| registry | String ! | "docker.io" | No description provided |
| tag | String ! | "8.2.0" | No description provided |
| clientListenerSecurity | ServerSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) *dagger.KafkaCluster {
return dag.
Kafka().
ConfluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> dagger.KafkaCluster:
return (
dag.kafka()
.confluent_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): KafkaCluster {
return dag
.kafka()
.confluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}mtlsClientSecurity() 🔗
MtlsClientSecurity returns a ClientSecurity profile that opens an mTLS connection: the broker presents its server cert (verified against trustStore) and the client presents its own leaf cert from keyStore (signed by a CA the broker trusts via its clientTrustStore).
Return Type
ClientSecurity !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| keyStore | File ! | - | No description provided |
| keyStorePassword | Secret ! | - | No description provided |
| trustStore | File ! | - | No description provided |
| trustStorePassword | Secret ! | - | No description provided |
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
mtls-client-security --key-store file:path --key-store-password env:MYSECRET --trust-store file:path --trust-store-password env:MYSECRETfunc (m *MyModule) Example(keyStore *dagger.File, keyStorePassword *dagger.Secret, trustStore *dagger.File, trustStorePassword *dagger.Secret) *dagger.KafkaClientSecurity {
return dag.
Kafka().
MtlsClientSecurity(keyStore, keyStorePassword, trustStore, trustStorePassword)
}@function
def example(key_store: dagger.File, key_store_password: dagger.Secret, trust_store: dagger.File, trust_store_password: dagger.Secret) -> dagger.KafkaClientSecurity:
return (
dag.kafka()
.mtls_client_security(key_store, key_store_password, trust_store, trust_store_password)
)@func()
example(keyStore: File, keyStorePassword: Secret, trustStore: File, trustStorePassword: Secret): KafkaClientSecurity {
return dag
.kafka()
.mtlsClientSecurity(keyStore, keyStorePassword, trustStore, trustStorePassword)
}mtlsServerSecurity() 🔗
MtlsServerSecurity returns a ServerSecurity profile that terminates mTLS on the external listener. caKeyStore signs per-broker server leaves; clientTrustStore holds the CA(s) the broker will accept incoming client certs from (this can be the same CA as caKeyStore or an independent one for asymmetric trust).
Return Type
ServerSecurity !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| caKeyStore | File ! | - | No description provided |
| caKeyStorePassword | Secret ! | - | No description provided |
| clientTrustStore | File ! | - | No description provided |
| clientTrustStorePassword | Secret ! | - | No description provided |
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
mtls-server-security --ca-key-store file:path --ca-key-store-password env:MYSECRET --client-trust-store file:path --client-trust-store-password env:MYSECRETfunc (m *MyModule) Example(caKeyStore *dagger.File, caKeyStorePassword *dagger.Secret, clientTrustStore *dagger.File, clientTrustStorePassword *dagger.Secret) *dagger.KafkaServerSecurity {
return dag.
Kafka().
MtlsServerSecurity(caKeyStore, caKeyStorePassword, clientTrustStore, clientTrustStorePassword)
}@function
def example(ca_key_store: dagger.File, ca_key_store_password: dagger.Secret, client_trust_store: dagger.File, client_trust_store_password: dagger.Secret) -> dagger.KafkaServerSecurity:
return (
dag.kafka()
.mtls_server_security(ca_key_store, ca_key_store_password, client_trust_store, client_trust_store_password)
)@func()
example(caKeyStore: File, caKeyStorePassword: Secret, clientTrustStore: File, clientTrustStorePassword: Secret): KafkaServerSecurity {
return dag
.kafka()
.mtlsServerSecurity(caKeyStore, caKeyStorePassword, clientTrustStore, clientTrustStorePassword)
}plaintextClientSecurity() 🔗
PlaintextClientSecurity returns a ClientSecurity profile configured for unencrypted, unauthenticated traffic.
Return Type
ClientSecurity ! Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
plaintext-client-securityfunc (m *MyModule) Example() *dagger.KafkaClientSecurity {
return dag.
Kafka().
PlaintextClientSecurity()
}@function
def example() -> dagger.KafkaClientSecurity:
return (
dag.kafka()
.plaintext_client_security()
)@func()
example(): KafkaClientSecurity {
return dag
.kafka()
.plaintextClientSecurity()
}plaintextServerSecurity() 🔗
PlaintextServerSecurity returns a ServerSecurity profile configured for unencrypted, unauthenticated traffic on the external listener. Internal listeners (inter-broker + controller-quorum) still use mTLS.
Return Type
ServerSecurity ! Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
plaintext-server-securityfunc (m *MyModule) Example() *dagger.KafkaServerSecurity {
return dag.
Kafka().
PlaintextServerSecurity()
}@function
def example() -> dagger.KafkaServerSecurity:
return (
dag.kafka()
.plaintext_server_security()
)@func()
example(): KafkaServerSecurity {
return dag
.kafka()
.plaintextServerSecurity()
}redpandaCluster() 🔗
RedpandaCluster spins up a single-node Redpanda cluster using the
redpandadata/redpanda image. Redpanda runs broker and Raft duties in the
same process, so there is no separate controller container.
Multi-node (controllers != 1 or brokers != 1) is rejected — multi-broker
Redpanda needs --seeds plumbing + per-node rpc_server advertising
that doesn’t fit single-story scope. The wire protocol matches Kafka,
so RedpandaCluster.Client() returns the same *Client type the Apache
constructors return.
Return Type
RedpandaCluster !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| clusterId | String ! | - | No description provided |
| controllers | Integer ! | 1 | No description provided |
| brokers | Integer ! | 1 | No description provided |
| registry | String ! | "docker.io" | No description provided |
| tag | String ! | "v26.1.7" | No description provided |
| clientListenerSecurity | RedpandaServerSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity) *dagger.KafkaRedpandaCluster {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity) -> dagger.KafkaRedpandaCluster:
return (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity): KafkaRedpandaCluster {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}redpandaPlaintextServerSecurity() 🔗
RedpandaPlaintextServerSecurity returns a RedpandaServerSecurity profile configured for unencrypted, unauthenticated traffic on the external Kafka listener.
Return Type
RedpandaServerSecurity ! Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
redpanda-plaintext-server-securityfunc (m *MyModule) Example() *dagger.KafkaRedpandaServerSecurity {
return dag.
Kafka().
RedpandaPlaintextServerSecurity()
}@function
def example() -> dagger.KafkaRedpandaServerSecurity:
return (
dag.kafka()
.redpanda_plaintext_server_security()
)@func()
example(): KafkaRedpandaServerSecurity {
return dag
.kafka()
.redpandaPlaintextServerSecurity()
}redpandaTlsServerSecurity() 🔗
RedpandaTlsServerSecurity returns a RedpandaServerSecurity profile that terminates TLS on the external Kafka listener. caKeyStore is a PKCS#12 archive of the CA cert + private key used to mint the per-cluster server leaf — same shape as Kafka.TlsServerSecurity, so callers don’t have to convert between formats even though Redpanda itself reads PEM internally. The leaf carries the broker’s stable hostname as a DNS SAN so franz-go clients dialing the bootstrap address can verify the cert against the matching truststore.
Return Type
RedpandaServerSecurity !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| caKeyStore | File ! | - | No description provided |
| caKeyStorePassword | Secret ! | - | No description provided |
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
redpanda-tls-server-security --ca-key-store file:path --ca-key-store-password env:MYSECRETfunc (m *MyModule) Example(caKeyStore *dagger.File, caKeyStorePassword *dagger.Secret) *dagger.KafkaRedpandaServerSecurity {
return dag.
Kafka().
RedpandaTlsServerSecurity(caKeyStore, caKeyStorePassword)
}@function
def example(ca_key_store: dagger.File, ca_key_store_password: dagger.Secret) -> dagger.KafkaRedpandaServerSecurity:
return (
dag.kafka()
.redpanda_tls_server_security(ca_key_store, ca_key_store_password)
)@func()
example(caKeyStore: File, caKeyStorePassword: Secret): KafkaRedpandaServerSecurity {
return dag
.kafka()
.redpandaTlsServerSecurity(caKeyStore, caKeyStorePassword)
}tlsClientSecurity() 🔗
TlsClientSecurity returns a ClientSecurity profile that opens a TLS connection to the broker. trustStore is a PKCS#12 archive of the CA(s) the client uses to verify the broker’s leaf certificate (typically the truststore that pairs with the CA passed to TlsServerSecurity on the server side).
Return Type
ClientSecurity !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| trustStore | File ! | - | No description provided |
| trustStorePassword | Secret ! | - | No description provided |
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
tls-client-security --trust-store file:path --trust-store-password env:MYSECRETfunc (m *MyModule) Example(trustStore *dagger.File, trustStorePassword *dagger.Secret) *dagger.KafkaClientSecurity {
return dag.
Kafka().
TlsClientSecurity(trustStore, trustStorePassword)
}@function
def example(trust_store: dagger.File, trust_store_password: dagger.Secret) -> dagger.KafkaClientSecurity:
return (
dag.kafka()
.tls_client_security(trust_store, trust_store_password)
)@func()
example(trustStore: File, trustStorePassword: Secret): KafkaClientSecurity {
return dag
.kafka()
.tlsClientSecurity(trustStore, trustStorePassword)
}tlsServerSecurity() 🔗
TlsServerSecurity returns a ServerSecurity profile that terminates TLS on the external listener. caKeyStore is a PKCS#12 archive containing the CA cert + private key the cluster uses to mint per-broker leaf certs; each broker leaf carries its stable hostname (e.g. “broker-100”) as a DNS SAN so franz-go clients dialing the bootstrap address can verify the broker against the same CA’s truststore.
Return Type
ServerSecurity !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| caKeyStore | File ! | - | No description provided |
| caKeyStorePassword | Secret ! | - | No description provided |
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@d38cd46573b89a2722fb888c91ced6495d6825e9 call \
tls-server-security --ca-key-store file:path --ca-key-store-password env:MYSECRETfunc (m *MyModule) Example(caKeyStore *dagger.File, caKeyStorePassword *dagger.Secret) *dagger.KafkaServerSecurity {
return dag.
Kafka().
TlsServerSecurity(caKeyStore, caKeyStorePassword)
}@function
def example(ca_key_store: dagger.File, ca_key_store_password: dagger.Secret) -> dagger.KafkaServerSecurity:
return (
dag.kafka()
.tls_server_security(ca_key_store, ca_key_store_password)
)@func()
example(caKeyStore: File, caKeyStorePassword: Secret): KafkaServerSecurity {
return dag
.kafka()
.tlsServerSecurity(caKeyStore, caKeyStorePassword)
}Cluster 🔗
Cluster represents a running KRaft Kafka cluster, holding references to every broker service so callers can bind them into their own containers or open a franz-go Client against them.
bindBrokers() 🔗
BindBrokers attaches every broker service to the given container under the same hostname BootstrapServers reports, so the container can dial brokers using the same address strings as a franz-go Client returned from Cluster.Client.
Return Type
Container !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| ctr | Container ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, ctr *dagger.Container) *dagger.Container {
return dag.
Kafka().
ConfluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
BindBrokers(ctr)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, ctr: dagger.Container) -> dagger.Container:
return (
dag.kafka()
.confluent_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.bind_brokers(ctr)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, ctr: Container): Container {
return dag
.kafka()
.confluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.bindBrokers(ctr)
}bootstrapServers() 🔗
BootstrapServers returns the host:port pairs each broker advertises on its client-facing listener.
Return Type
[String ! ] ! Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) []string {
return dag.
Kafka().
ConfluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
BootstrapServers(ctx)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> List[str]:
return await (
dag.kafka()
.confluent_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.bootstrap_servers()
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): Promise<string[]> {
return dag
.kafka()
.confluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.bootstrapServers()
}client() 🔗
Client starts every broker service in the cluster and returns a franz-go Client wired with their bootstrap addresses.
Return Type
Client !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| security | ClientSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity) *dagger.KafkaClient {
return dag.
Kafka().
ConfluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity) -> dagger.KafkaClient:
return (
dag.kafka()
.confluent_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity): KafkaClient {
return dag
.kafka()
.confluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
}stop() 🔗
Stop tears down every service container backing this cluster (the
controller plus every broker). Tests should call this in a defer so each
broker Container.asService span closes when the test work is done,
rather than running out to the parent parallel group’s lifetime.
Kill is set so Service.Stop skips graceful shutdown — Kafka’s broker
shutdown path waits on replica-drain timeouts that on a torn-down test
cluster just run out the clock (~5 min observed in Dagger trace
972bc311bf374f817b7c88481229a10c). SIGKILL returns immediately, which
is all a test needs.
Return Type
Void ! Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) {
return dag.
Kafka().
ConfluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Stop(ctx)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> None:
return await (
dag.kafka()
.confluent_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.stop()
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): Promise<void> {
return dag
.kafka()
.confluentCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.stop()
}ServerSecurity 🔗
ServerSecurity describes how a Kafka cluster's external listener authenticates and encrypts traffic from clients. Internal listeners (inter-broker + controller-quorum) are always mTLS, regardless of mode.
Client 🔗
Client is a franz-go-backed Kafka client. Each method opens a fresh connection so the function call is stateless from Dagger's perspective.
consume() 🔗
Consume reads up to maxMessages records from the topic, starting at the earliest offset, returning when either maxMessages have been gathered or the parsed timeout elapses. Each record’s key and value are encoded into the requested string forms before being returned.
When group is non-empty, the consume runs as a member of that consumer group: the broker assigns partitions and the join itself writes group metadata to __consumer_offsets (offsets are not committed — the function stays idempotent underdefault), partitions are consumed directly with no group state.
Return Type
[ConsumedRecord ! ] !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| topic | String ! | - | No description provided |
| maxMessages | Integer ! | 1 | No description provided |
| timeout | String ! | "10s" | No description provided |
| keyEncoding | String ! | "raw" | No description provided |
| valueEncoding | String ! | "raw" | No description provided |
| group | String ! | "" | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity, topic string, maxMessages int, timeout string, keyEncoding string, valueEncoding string, group string) []*dagger.KafkaConsumedRecord {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security).
Consume(topic, maxMessages, timeout, keyEncoding, valueEncoding, group)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity, topic: str, max_messages: int, timeout: str, key_encoding: str, value_encoding: str, group: str) -> List[dagger.KafkaConsumedRecord]:
return (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
.consume(topic, max_messages, timeout, key_encoding, value_encoding, group)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity, topic: string, maxMessages: number, timeout: string, keyEncoding: string, valueEncoding: string, group: string): KafkaConsumedRecord[] {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
.consume(topic, maxMessages, timeout, keyEncoding, valueEncoding, group)
}createTopic() 🔗
CreateTopic creates a new topic with the given partition count and replication factor. Errors out if the topic already exists.
Return Type
Void !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| name | String ! | - | No description provided |
| partitions | Integer ! | 1 | No description provided |
| replicationFactor | Integer ! | 1 | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity, name string, partitions int, replicationFactor int) {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security).
CreateTopic(ctx, name, partitions, replicationFactor)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity, name: str, partitions: int, replication_factor: int) -> None:
return await (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
.create_topic(name, partitions, replication_factor)
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity, name: string, partitions: number, replicationFactor: number): Promise<void> {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
.createTopic(name, partitions, replicationFactor)
}deleteTopic() 🔗
DeleteTopic deletes the named topic.
Return Type
Void !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| name | String ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity, name string) {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security).
DeleteTopic(ctx, name)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity, name: str) -> None:
return await (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
.delete_topic(name)
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity, name: string): Promise<void> {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
.deleteTopic(name)
}listTopics() 🔗
ListTopics returns the names of every topic the broker reports.
Return Type
[String ! ] ! Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity) []string {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security).
ListTopics(ctx)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity) -> List[str]:
return await (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
.list_topics()
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity): Promise<string[]> {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
.listTopics()
}produce() 🔗
Produce synchronously writes one record to the topic. Key and value are decoded from their named encodings into raw bytes before being sent.
Return Type
Void !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| topic | String ! | - | No description provided |
| key | String ! | - | No description provided |
| value | String ! | - | No description provided |
| keyEncoding | String ! | "raw" | No description provided |
| valueEncoding | String ! | "raw" | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity, topic string, key string, value string, keyEncoding string, valueEncoding string) {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security).
Produce(ctx, topic, key, value, keyEncoding, valueEncoding)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity, topic: str, key: str, value: str, key_encoding: str, value_encoding: str) -> None:
return await (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
.produce(topic, key, value, key_encoding, value_encoding)
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity, topic: string, key: string, value: string, keyEncoding: string, valueEncoding: string): Promise<void> {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
.produce(topic, key, value, keyEncoding, valueEncoding)
}propertiesFile() 🔗
PropertiesFile renders this client’s connection settings as a Java
client.properties file so callers can hand it to the Apache Kafka
command-line tools or to other JVM-based consumers.
For TLS / mTLS modes the properties reference PKCS#12 truststore (and
keystore for mTLS) by basename — the matching p12 files are written
alongside client.properties in the same directory. Callers should
export the parent directory (props.Directory()) so the relative
references resolve. Passwords appear plaintext, which is a Kafka CLI
constraint.
Return Type
File ! Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity) *dagger.File {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security).
PropertiesFile()
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity) -> dagger.File:
return (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
.properties_file()
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity): File {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
.propertiesFile()
}ClientSecurity 🔗
ClientSecurity describes how a franz-go client authenticates to a Kafka broker.
RedpandaCluster 🔗
RedpandaCluster is the Redpanda counterpart to *Cluster. Redpanda speaks the Kafka wire protocol but is a from-scratch C++ implementation with a completely different configuration layer (`rpk redpanda start`, a YAML config file, PEM cert/key files instead of PKCS#12), so it gets its own return type to make the divergence visible at the API surface. Single node only in this story (controllers=1, brokers=1).
bindBrokers() 🔗
BindBrokers binds the single Redpanda broker service into the given container so the container can reach it by hostname.
Return Type
Container !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| ctr | Container ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, ctr *dagger.Container) *dagger.Container {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
BindBrokers(ctr)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, ctr: dagger.Container) -> dagger.Container:
return (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.bind_brokers(ctr)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, ctr: Container): Container {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.bindBrokers(ctr)
}bootstrapServers() 🔗
BootstrapServers returns the bootstrap address (single broker:9092) for this Redpanda cluster.
Return Type
[String ! ] ! Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity) []string {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
BootstrapServers(ctx)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity) -> List[str]:
return await (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.bootstrap_servers()
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity): Promise<string[]> {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.bootstrapServers()
}client() 🔗
Client starts the Redpanda broker service and returns a franz-go-backed *Client targeting it. The Kafka wire protocol matches Apache Kafka, so the existing *Client + *ClientSecurity (PKCS#12) are reused unchanged.
Return Type
Client !Arguments
| Name | Type | Default Value | Description |
|---|---|---|---|
| security | ClientSecurity ! | - | No description provided |
Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity, security *dagger.KafkaClientSecurity) *dagger.KafkaClient {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Client(security)
}@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity, security: dagger.KafkaClientSecurity) -> dagger.KafkaClient:
return (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.client(security)
)@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity, security: KafkaClientSecurity): KafkaClient {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.client(security)
}stop() 🔗
Stop tears down the broker container backing this Redpanda cluster.
Tests should call this in a defer so the broker Container.asService
span closes when the test work is done. Kill is set so Service.Stop
skips graceful shutdown — see Cluster.Stop for the rationale.
Return Type
Void ! Example
echo 'Custom types are not supported in shell examples'func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaRedpandaServerSecurity) {
return dag.
Kafka().
RedpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
Stop(ctx)
}@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaRedpandaServerSecurity) -> None:
return await (
dag.kafka()
.redpanda_cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
.stop()
)@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaRedpandaServerSecurity): Promise<void> {
return dag
.kafka()
.redpandaCluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
.stop()
}RedpandaServerSecurity 🔗
RedpandaServerSecurity carries the external-listener security profile for a Redpanda cluster. Same shape as *ServerSecurity (PKCS#12 CAso callers don't have to convert; the constructor extracts PEM from the issued leaf internally for redpanda.yaml. Separate type from *ServerSecurity so a caller can't accidentally hand an Apache profile (e.g. MtlsServerSecurity, not supported here yet) to RedpandaCluster.
ConsumedRecord 🔗
ConsumedRecord is a single record returned by Client.Consume, with key and value already encoded into the requested string representation.
key() 🔗
Return Type
String ! Example
Function KafkaConsumedRecord.key is not accessible from the kafka moduleFunction KafkaConsumedRecord.key is not accessible from the kafka moduleFunction KafkaConsumedRecord.key is not accessible from the kafka moduleFunction KafkaConsumedRecord.key is not accessible from the kafka modulevalue() 🔗
Return Type
String ! Example
Function KafkaConsumedRecord.value is not accessible from the kafka moduleFunction KafkaConsumedRecord.value is not accessible from the kafka moduleFunction KafkaConsumedRecord.value is not accessible from the kafka moduleFunction KafkaConsumedRecord.value is not accessible from the kafka module