Skip to main content
Version: latest

Python SDK Examples

The Fluvio Python module provides an extension for working with the Fluvio streaming platform.

This module builds on top of the Fluvio Client Rust Crate and provides a pythonic access to the API.

Creating a topic with default settings is as simple as:

fluvio_admin = FluvioAdmin.connect()
fluvio_admin.create_topic("a_topic")

Or just create a topic with custom settings:

import fluvio

fluvio_admin = FluvioAdmin.connect()
topic_spec = (
TopicSpec.create()
.with_retention_time("1h")
.with_segment_size("10M")
.build()
)
fluvio_admin.create_topic("a_topic", topic_spec)

Producing data to a topic in a Fluvio cluster is as simple as:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
producer = fluvio.topic_producer(topic)

for i in range(10):
producer.send_string("Hello %s " % i)

Consuming is also simple:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

Also you can consume using offset management:

import fluvio

fluvio = Fluvio.connect()

topic = "a_topic"
builder = ConsumerConfigExtBuilder(topic)
builder.offset_start(Offset.beginning())
builder.offset_strategy(OffsetManagementStrategy.MANUAL)
builder.offset_consumer("a-consumer")
config = builder.build()
stream = fluvio.consumer_with_config(config)

num_items = 2
records = [bytearray(next(stream).value()).decode() for _ in range(num_items)]

stream.offset_commit()
stream.offset_flush()

For more examples see the integration tests in the fluvio-python repository.