Dagger
Search

kafka

the apache/kafka-native image and a pure-Go franz-go client that targets
either the local cluster or any reachable remote cluster.

Plaintext is the only security mechanism supported in this story; TLS /
mTLS lands in a follow-up.

Installation

dagger install github.com/z5labs/devex/daggerverse/kafka@9ebfb29b95cb654ed5d00d1e595bcf710c2a7318

Entrypoint

Return Type
Kafka
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@9ebfb29b95cb654ed5d00d1e595bcf710c2a7318 call \
func (m *MyModule) Example() *dagger.Kafka  {
	return dag.
			Kafka()
}
@function
def example() -> dagger.Kafka:
	return (
		dag.kafka()
	)
@func()
example(): Kafka {
	return dag
		.kafka()
}

Types

Kafka 🔗

client() 🔗

Client constructs a franz-go-backed Kafka client that targets the given bootstrap servers. No I/O happens at construction time.

Return Type
Client !
Arguments
NameTypeDefault ValueDescription
bootstrapServers[String ! ] !-No description provided
securityClientSecurity !-No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(bootstrapServers []string, security *dagger.KafkaClientSecurity) *dagger.KafkaClient  {
	return dag.
			Kafka().
			Client(bootstrapServers, security)
}
@function
def example(bootstrap_servers: List[str], security: dagger.KafkaClientSecurity) -> dagger.KafkaClient:
	return (
		dag.kafka()
		.client(bootstrap_servers, security)
	)
@func()
example(bootstrapServers: string[], security: KafkaClientSecurity): KafkaClient {
	return dag
		.kafka()
		.client(bootstrapServers, security)
}

cluster() 🔗

Cluster spins up a KRaft Kafka cluster of the requested size with dedicated controller and broker containers.

Topology: a single controller forms a one-node KRaft quorum; one or more brokers connect to it and discover each other over the engine’s session-wide DNS — no broker-to-broker WithServiceBinding needed.

Multi-controller (controllers > 1) is rejected for now: a true HA quorum needs every controller to know every other controller at static config time, which Dagger’s WithServiceBinding model can’t express without an unresolvable cycle. TLS / mTLS and multi-controller both land in a follow-up.

Return Type
Cluster !
Arguments
NameTypeDefault ValueDescription
clusterIdString !-No description provided
controllersInteger !1No description provided
brokersInteger !1No description provided
registryString !"docker.io"No description provided
tagString !-No description provided
clientListenerSecurityServerSecurity !-No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) *dagger.KafkaCluster  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}
@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> dagger.KafkaCluster:
	return (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
	)
@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): KafkaCluster {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
}

plaintextClientSecurity() 🔗

PlaintextClientSecurity returns a ClientSecurity profile configured for unencrypted, unauthenticated traffic.

Return Type
ClientSecurity !
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@9ebfb29b95cb654ed5d00d1e595bcf710c2a7318 call \
 plaintext-client-security
func (m *MyModule) Example() *dagger.KafkaClientSecurity  {
	return dag.
			Kafka().
			PlaintextClientSecurity()
}
@function
def example() -> dagger.KafkaClientSecurity:
	return (
		dag.kafka()
		.plaintext_client_security()
	)
@func()
example(): KafkaClientSecurity {
	return dag
		.kafka()
		.plaintextClientSecurity()
}

plaintextServerSecurity() 🔗

PlaintextServerSecurity returns a ServerSecurity profile configured for unencrypted, unauthenticated traffic.

Return Type
ServerSecurity !
Example
dagger -m github.com/z5labs/devex/daggerverse/kafka@9ebfb29b95cb654ed5d00d1e595bcf710c2a7318 call \
 plaintext-server-security
func (m *MyModule) Example() *dagger.KafkaServerSecurity  {
	return dag.
			Kafka().
			PlaintextServerSecurity()
}
@function
def example() -> dagger.KafkaServerSecurity:
	return (
		dag.kafka()
		.plaintext_server_security()
	)
@func()
example(): KafkaServerSecurity {
	return dag
		.kafka()
		.plaintextServerSecurity()
}

Client 🔗

Client is a franz-go-backed Kafka client. Each method opens a fresh connection so the function call is stateless from Dagger's perspective.

consume() 🔗

Consume reads up to maxMessages records from the topic, starting at the earliest offset, returning when either maxMessages have been gathered or the parsed timeout elapses. Each record’s key and value are encoded into the requested string forms before being returned.

Return Type
[ConsumedRecord ! ] !
Arguments
NameTypeDefault ValueDescription
topicString !-No description provided
maxMessagesInteger !1No description provided
timeoutString !"10s"No description provided
keyEncodingString !"raw"No description provided
valueEncodingString !"raw"No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity, topic string, maxMessages int, timeout string, keyEncoding string, valueEncoding string) []*dagger.KafkaConsumedRecord  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security).
			Consume(topic, maxMessages, timeout, keyEncoding, valueEncoding)
}
@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity, topic: str, max_messages: int, timeout: str, key_encoding: str, value_encoding: str) -> List[dagger.KafkaConsumedRecord]:
	return (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
		.consume(topic, max_messages, timeout, key_encoding, value_encoding)
	)
@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity, topic: string, maxMessages: number, timeout: string, keyEncoding: string, valueEncoding: string): KafkaConsumedRecord[] {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
		.consume(topic, maxMessages, timeout, keyEncoding, valueEncoding)
}

createTopic() 🔗

CreateTopic creates a new topic with the given partition count and replication factor. Errors out if the topic already exists.

Return Type
Void !
Arguments
NameTypeDefault ValueDescription
nameString !-No description provided
partitionsInteger !1No description provided
replicationFactorInteger !1No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity, name string, partitions int, replicationFactor int)   {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security).
			CreateTopic(ctx, name, partitions, replicationFactor)
}
@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity, name: str, partitions: int, replication_factor: int) -> None:
	return await (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
		.create_topic(name, partitions, replication_factor)
	)
@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity, name: string, partitions: number, replicationFactor: number): Promise<void> {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
		.createTopic(name, partitions, replicationFactor)
}

deleteTopic() 🔗

DeleteTopic deletes the named topic.

Return Type
Void !
Arguments
NameTypeDefault ValueDescription
nameString !-No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity, name string)   {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security).
			DeleteTopic(ctx, name)
}
@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity, name: str) -> None:
	return await (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
		.delete_topic(name)
	)
@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity, name: string): Promise<void> {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
		.deleteTopic(name)
}

listTopics() 🔗

ListTopics returns the names of every topic the broker reports.

Return Type
[String ! ] !
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity) []string  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security).
			ListTopics(ctx)
}
@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity) -> List[str]:
	return await (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
		.list_topics()
	)
@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity): Promise<string[]> {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
		.listTopics()
}

produce() 🔗

Produce synchronously writes one record to the topic. Key and value are decoded from their named encodings into raw bytes before being sent.

Return Type
Void !
Arguments
NameTypeDefault ValueDescription
topicString !-No description provided
keyString !-No description provided
valueString !-No description provided
keyEncodingString !"raw"No description provided
valueEncodingString !"raw"No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity, topic string, key string, value string, keyEncoding string, valueEncoding string)   {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security).
			Produce(ctx, topic, key, value, keyEncoding, valueEncoding)
}
@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity, topic: str, key: str, value: str, key_encoding: str, value_encoding: str) -> None:
	return await (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
		.produce(topic, key, value, key_encoding, value_encoding)
	)
@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity, topic: string, key: string, value: string, keyEncoding: string, valueEncoding: string): Promise<void> {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
		.produce(topic, key, value, keyEncoding, valueEncoding)
}

propertiesFile() 🔗

PropertiesFile renders this client’s connection settings as a Java client.properties file (bootstrap.servers + security.protocol) so callers can hand it to the Apache Kafka command-line tools or to other JVM-based consumers.

Return Type
File !
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity) *dagger.File  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security).
			PropertiesFile()
}
@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity) -> dagger.File:
	return (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
		.properties_file()
	)
@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity): File {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
		.propertiesFile()
}

ClientSecurity 🔗

ClientSecurity describes how a franz-go client authenticates to a Kafka broker. Only PLAINTEXT is supported in this story.

Cluster 🔗

Cluster represents a running KRaft Kafka cluster, holding references to every broker service so callers can bind them into their own containers or open a franz-go Client against them.

bindBrokers() 🔗

BindBrokers attaches every broker service to the given container under the same hostname BootstrapServers reports, so the container can dial brokers using the same address strings as a franz-go Client returned from Cluster.Client.

Return Type
Container !
Arguments
NameTypeDefault ValueDescription
ctrContainer !-No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, ctr *dagger.Container) *dagger.Container  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			BindBrokers(ctr)
}
@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, ctr: dagger.Container) -> dagger.Container:
	return (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.bind_brokers(ctr)
	)
@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, ctr: Container): Container {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.bindBrokers(ctr)
}

bootstrapServers() 🔗

BootstrapServers returns the host:port pairs each broker advertises on its client-facing listener.

Return Type
[String ! ] !
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(ctx context.Context, clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity) []string  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			BootstrapServers(ctx)
}
@function
async def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity) -> List[str]:
	return await (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.bootstrap_servers()
	)
@func()
async example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity): Promise<string[]> {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.bootstrapServers()
}

client() 🔗

Client starts every broker service in the cluster and returns a franz-go Client wired with their bootstrap addresses.

Return Type
Client !
Arguments
NameTypeDefault ValueDescription
securityClientSecurity !-No description provided
Example
echo 'Custom types are not supported in shell examples'
func (m *MyModule) Example(clusterId string, controllers int, brokers int, registry string, tag string, clientListenerSecurity *dagger.KafkaServerSecurity, security *dagger.KafkaClientSecurity) *dagger.KafkaClient  {
	return dag.
			Kafka().
			Cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity).
			Client(security)
}
@function
def example(cluster_id: str, controllers: int, brokers: int, registry: str, tag: str, client_listener_security: dagger.KafkaServerSecurity, security: dagger.KafkaClientSecurity) -> dagger.KafkaClient:
	return (
		dag.kafka()
		.cluster(cluster_id, controllers, brokers, registry, tag, client_listener_security)
		.client(security)
	)
@func()
example(clusterId: string, controllers: number, brokers: number, registry: string, tag: string, clientListenerSecurity: KafkaServerSecurity, security: KafkaClientSecurity): KafkaClient {
	return dag
		.kafka()
		.cluster(clusterId, controllers, brokers, registry, tag, clientListenerSecurity)
		.client(security)
}

ServerSecurity 🔗

ServerSecurity describes how a Kafka cluster's listener authenticates and encrypts traffic from clients. Only PLAINTEXT is supported in this story.

ConsumedRecord 🔗

ConsumedRecord is a single record returned by Client.Consume, with key and value already encoded into the requested string representation.

key() 🔗

Return Type
String !
Example
Function KafkaConsumedRecord.key is not accessible from the kafka module
Function KafkaConsumedRecord.key is not accessible from the kafka module
Function KafkaConsumedRecord.key is not accessible from the kafka module
Function KafkaConsumedRecord.key is not accessible from the kafka module

value() 🔗

Return Type
String !
Example
Function KafkaConsumedRecord.value is not accessible from the kafka module
Function KafkaConsumedRecord.value is not accessible from the kafka module
Function KafkaConsumedRecord.value is not accessible from the kafka module
Function KafkaConsumedRecord.value is not accessible from the kafka module