Skip to main content
Version: 0.14.1 (stable)

Python SDK Examples

The Fluvio Python module provides an extension for working with the Fluvio streaming platform.

This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.

Creating a topic with default settings is as simple as:

fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")

Or just create a topic with custom settings:

import fluvio

fluvio_admin = FluvioAdmin.connect()
topic_spec = (
TopicSpec.create()
.with_retention_time("1h")
.with_segment_size("10M")
.build()
)
fluvio_admin.create_topic("a_topic", topic_spec)

Producing data to a topic in a Fluvio cluster is as simple as:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
producer = fluvio.topic_producer(topic)

for i in range(10):
producer.send_string("Hello %s " % i)

Consuming is also simple:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

Also you can consume using offset management:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

stream.offset_commit()
stream.offset_flush()

For more examples see the integration tests in the fluvio-python repository.