Skip to main content
Version: latest

Rust SDK Examples

  • The Rust client is the core client for all language clients.
    • New features arrive in the Rust client before any of the other clients
  • Full support for the Admin API to manage cluster components.
  • This client uses async Rust for communication with the Fluvio cluster.

Refer to the fluvio API docs.rs page for full detail.

Prerequisites

Follow the installation instructions to run this example.

Let's get started.

Create a Rust Project

Create a new Rust project, and import the dependencies we'll use in the examples.

$ mkdir fluvio-rust-example && cd fluvio-rust-example && cargo init

Add the following dependencies to your Cargo.toml file:

[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"

[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
flate2 = "1.0.35"
fluvio = "0.24"

Let's build the project to ensure dependencies are installed correctly:

$ cargo build

We are ready to add some code.

Manage Topics

Fluvio has an admin interface that manages fluvio resources such as topics.

Create and List Topics

In this examples we'll create a topic called hello-rust and list all topics.

In your main.rs file, add the following code:

use fluvio::metadata::topic::TopicSpec;
use fluvio::Fluvio;

const TOPIC_NAME: &str = "hello-rust";
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;

// List topics
let topics = admin.all::<TopicSpec>().await.expect("Failed to list topics");
let topic_names = topics.iter().map(|topic| topic.name.clone()).collect::<Vec<String>>();

println!("Topics:\n - {}", topic_names.join("\n - "));
}

Run the example

$ cargo run

You can also use fluvio cli to double check:

$ fluvio topic list

Checkout the documentation for examples on how to delete a topic.

Produce

Fluvio uses a producer API to send records to a topic.

Produce Records

The producer API has many configuration parameters. Let's start with a simple example, where we produce one record with a timestamp.

Replace the content in the main.rs file with the following code:

use chrono::Local;

use fluvio::RecordKey;

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create a record
let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());

// Produce to a topic
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(RecordKey::NULL, record.clone()).await.expect("Failed to send record");

// Fluvio batches outgoing records by default,
// call flush to ensure the record is sent
producer.flush().await.expect("Failed to flush");

println!("Sent record: {}", record);
}

Run the example

$ cargo run

You can also use fluvio cli to double check:

$ fluvio consume hello-rust -T=1 -d

The flags are -T to specify the number of messages to produce and -d to to disable continuous consumption.

Produce Key/Value Records

Producer API may send records with a key and value.

Replace the content in the main.rs file with the following code:

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Create key and value
let key = "Hello";
let value = "Fluvio";

// create producer & send key/value
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(key, value).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");

println!("Sent [{}] {}", key, value);
}

Run the example

$ cargo run

Let's use fluvio cli to check the result:

$ fluvio consume hello-rust -k -d -T=1

The -k or --key-value parameter ask the CLI to display the key and value of the record.

Tune Producer Performance

Producer performance is affected by your network bandwidth and the average packets size. You can tune the performane by chaning the following configuration options:

  • batch_size - default: 16Mb
  • linger - default: 0 milliseconds
  • compression - default: none

In in the following example, we set the linger to 500 milliseconds and the batch_size to 500 bytes, and compression to Gzip. Let's create a TopicProducerConfigBuilder object and set the configuration options.

Replace the content in the main.rs file with the following code:

use std::time::Duration;

use fluvio::{Fluvio, TopicProducerConfigBuilder, Compression, RecordKey};

const TOPIC_NAME: &str = "hello-rust";

#[async_std::main]
async fn main() {
// Use config builder to create a topic producer config
let producer_config = TopicProducerConfigBuilder::default()
.batch_size(500)
.linger(Duration::from_millis(500))
.compression(Compression::Gzip)
.build()
.expect("Failed to create topic producer config");

// Connet to fluvio cluster & create a producer
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
let producer = fluvio.topic_producer_with_config(TOPIC_NAME, producer_config)
.await.expect("Failed to create a producer");

// Send 10 records
for i in 1..=10 {
let record = format!("Record-{}", i);
producer.send(RecordKey::NULL, record.as_str()).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");
}

println!("Sent 10 records successfully.");
}

Run the example

$ cargo run

While it's difficult to see the outcomme for applying these parameters without a performance test and encoding/decoding is performed before an after writing to disk, we can see that the records have been sent to the topic:

$ fluvio consume hello-rust -d -T=10

Consume

Fluvio uses a consumer API to retrieve records from a topic.

Consume Records

Let's start with a simple example, where we consume the last record from a topic.

Update the main.rs file with the following code:

use async_std::stream::StreamExt;

use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::from_end(1))
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

Run the example

$ cargo run

For example fluvio::Offset is set to read 1 record from th end. However, the offset allows you to read records from the beginining, from the end, or from a specific offset value.

Consume with SmartModules

Fluvio can apply custom SmartModules to transform the data as it is consumed. The SmartModule tranformations are applied before the record is sent to the consumer.

The consumer API allows you to load a local WASM file or reference a SmartModule that has been uploaded to the cluster.

Loal SmartModule WASM File

In general, you would use Smartmodule Development Kit (smdk) to develop a smartmodule, compile it to WebAssembly and load it into the consumer code.

In this example, we will use a pre-compiled SmartModule that simply prints the record key and value to the console.

git clone https://github.com/fluvio-community/smartmodules
cd smartmodules/regex-text
smdk build
cd ../..
cp smartmodules/regex-text/target/wasm32-wasip1/release-lto/regex_text.wasm .

Update the main.rs file with the following code:

use std::io::Read;
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use flate2::{bufread::GzEncoder, Compression};

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_file(
SmartModuleKind::Map,
"regex_text.wasm",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation from a wasm file
fn build_smartmodule_from_file(
kind: SmartModuleKind,
file_path: &str,
spec: &str
) -> SmartModuleInvocation {
// Read smartmodule wasm file
let raw_buffer = std::fs::read(file_path).expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer).expect("failed to read encoded wasm file");

// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: kind,
params: params,
}
}

This smartmodule, uses regex, to replace secret with ****.

Run the example

$ cargo run

Produce a record:

$ echo "this is a secret value" | fluvio produce hello-rust

As you can see, the "secret" has been replaced.

Use SmartModules from Fluvio Cluster

In this example, we'll use a SmartModule developed by someone else and available for download to the cluster from the InfinyOn Hub.

Update the main.rs file with the following code:

use std::collections::BTreeMap;
use async_std::stream::StreamExt;

use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_name(
SmartModuleKind::Map,
"fluvio/regex-text@0.1.0",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");

// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

// Create a smartmodule invocation using smartmodule name
fn build_smartmodule_from_name(
kind: SmartModuleKind,
smartmodule_name: &str,
spec: &str
) -> SmartModuleInvocation {
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);

// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(smartmodule_name.to_string()),
kind: kind,
params: params,
}
}

The code performs a similar regex replace as the previous example, but uses a SmartModule that has been uploaded to the cluster.

Run the example

Download the Smartmodule from the InfinyOn Hub:

$ fluvio hub smartmodule download fluvio/regex-text@0.1.0

Compile and run:

$ cargo run

Produce a record:

$ echo "this is a secret value" | fluvio produce hello-rust

As you can see, the "secret" has been replaced.

Consume with Managed Offsets

Fluvio automatically manages consumer offsets for you, eliminating the need to track the last record you've consumed. This ensures that when your client resumes, it seamlessly continues consuming from where it left off.

Fluvio provides two offset management strategies: automatic commits, where offsets are managed seamlessly by the system, and manual commits, giving you full control over when offsets are updated.

Offset Management with Automatic Commits

In this example, we'll create a consumer that commits offsets automatically. Checkout the consumer_with_config for more details.

Update the main.rs file with the following code:

use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-auto";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}

The code creates a consumer that automatically commits offsets.

Run the example

$ cargo run

Produce the following records:

$ fluvio produce hello-rust <<EOF       
Message 1
Message 2
Message 3
EOF

List the consumers:

$ fluvio consumer list

You'll notice the system created a consumer-auto consumer to capture the last offset.

Use <Ctrl-C> to stop the consumer, and produce a few more records:

$ fluvio produce hello-rust <<EOF       
Message 4
Message 5
Message 6
EOF

Resume the script and notice that the consumer resumes from where it left off.

Offset Management with Manual Commits

In this example, we'll create a consumer that manually commits offsets.

Update the main.rs file with the following code:

use async_std::stream::StreamExt;

use fluvio::{
consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy},
Fluvio, Offset,
};

const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-manual";

#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");

// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()
.expect("Failed to build consumer config");


// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
stream.offset_commit().expect("offset commit failed");
stream.offset_flush().await.expect("offset flush failed");
}
}

The code calls commit and flush during every iteration of the loop. You may change this interval based on your business logic.

Run the example

$ cargo run

Produce the following records:

$ fluvio produce hello-rust <<EOF       
Message 1
Message 2
Message 3
EOF

List the consumers:

$ fluvio consumer list

You'll notice the system created a consumer-auto consumer to capture the last offset.

Use <Ctrl-C> to stop the consumer, and produce a few more records:

$ fluvio produce hello-rust <<EOF       
Message 4
Message 5
Message 6
EOF

Resume the script and notice that the consumer resumes from where it left off.

Refer to the [fluvio docs.rs page] for full detail. The following are some of the most important links: