Rust SDK Examples
- The Rust client is the core client for all language clients.
- New features arrive in the Rust client before any of the other clients
- Full support for the Admin API to manage cluster components.
- This client uses async Rust for communication with the Fluvio cluster.
Refer to the fluvio API docs.rs page for full detail.
Prerequisites
Follow the installation instructions to run this example.
Let's get started.
Create a Rust Project
Create a new Rust project, and import the dependencies we'll use in the examples.
$ mkdir fluvio-rust-example && cd fluvio-rust-example && cargo init
Add the following dependencies to your Cargo.toml
file:
[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"
[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
flate2 = "1.0.35"
fluvio = "0.24"
Let's build the project to ensure dependencies are installed correctly:
$ cargo build
We are ready to add some code.
Manage Topics
Fluvio has an admin interface that manages fluvio resources such as topics.
Create and List Topics
In this examples we'll create a topic called hello-rust
and list all topics.
In your main.rs
file, add the following code:
use fluvio::metadata::topic::TopicSpec;
use fluvio::Fluvio;
const TOPIC_NAME: &str = "hello-rust";
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;
// List topics
let topics = admin.all::<TopicSpec>().await.expect("Failed to list topics");
let topic_names = topics.iter().map(|topic| topic.name.clone()).collect::<Vec<String>>();
println!("Topics:\n - {}", topic_names.join("\n - "));
}
Run the example
$ cargo run
You can also use fluvio cli to double check:
$ fluvio topic list
Checkout the documentation for examples on how to delete a topic.
Produce
Fluvio uses a producer API to send records to a topic.
Produce Records
The producer API has many configuration parameters. Let's start with a simple example, where we produce one record with a timestamp.
Replace the content in the main.rs
file with the following code:
use chrono::Local;
use fluvio::RecordKey;
const TOPIC_NAME: &str = "hello-rust";
#[async_std::main]
async fn main() {
// Create a record
let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());
// Produce to a topic
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(RecordKey::NULL, record.clone()).await.expect("Failed to send record");
// Fluvio batches outgoing records by default,
// call flush to ensure the record is sent
producer.flush().await.expect("Failed to flush");
println!("Sent record: {}", record);
}
Run the example
$ cargo run
You can also use fluvio cli to double check:
$ fluvio consume hello-rust -T=1 -d
The flags are -T
to specify the number of messages to produce and -d
to to disable continuous consumption.
Produce Key/Value Records
Producer API may send records with a key and value.
Replace the content in the main.rs
file with the following code:
const TOPIC_NAME: &str = "hello-rust";
#[async_std::main]
async fn main() {
// Create key and value
let key = "Hello";
let value = "Fluvio";
// create producer & send key/value
let producer = fluvio::producer(TOPIC_NAME).await.expect("Failed to create producer");
producer.send(key, value).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");
println!("Sent [{}] {}", key, value);
}
Run the example
$ cargo run
Let's use fluvio cli to check the result:
$ fluvio consume hello-rust -k -d -T=1
The -k
or --key-value
parameter ask the CLI to display the key and value of the record.
Tune Producer Performance
Producer performance is affected by your network bandwidth and the average packets size. You can tune the performane by chaning the following configuration options:
batch_size
- default: 16Mblinger
- default: 0 millisecondscompression
- default: none
In in the following example, we set the linger
to 500 milliseconds and the batch_size
to 500 bytes, and compression to Gzip
. Let's create a TopicProducerConfigBuilder
object and set the configuration options.
Replace the content in the main.rs
file with the following code:
use std::time::Duration;
use fluvio::{Fluvio, TopicProducerConfigBuilder, Compression, RecordKey};
const TOPIC_NAME: &str = "hello-rust";
#[async_std::main]
async fn main() {
// Use config builder to create a topic producer config
let producer_config = TopicProducerConfigBuilder::default()
.batch_size(500)
.linger(Duration::from_millis(500))
.compression(Compression::Gzip)
.build()
.expect("Failed to create topic producer config");
// Connet to fluvio cluster & create a producer
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
let producer = fluvio.topic_producer_with_config(TOPIC_NAME, producer_config)
.await.expect("Failed to create a producer");
// Send 10 records
for i in 1..=10 {
let record = format!("Record-{}", i);
producer.send(RecordKey::NULL, record.as_str()).await.expect("Failed to send record");
producer.flush().await.expect("Failed to flush");
}
println!("Sent 10 records successfully.");
}
Run the example
$ cargo run
While it's difficult to see the outcomme for applying these parameters without a performance test and encoding/decoding is performed before an after writing to disk, we can see that the records have been sent to the topic:
$ fluvio consume hello-rust -d -T=10
Consume
Fluvio uses a consumer API to retrieve records from a topic.
Consume Records
Let's start with a simple example, where we consume the last record from a topic.
Update the main.rs
file with the following code:
use async_std::stream::StreamExt;
use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, Offset};
const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::from_end(1))
.build()
.expect("Failed to build consumer config");
// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
Run the example
$ cargo run
For example fluvio::Offset is set to read 1 record from th end. However, the offset allows you to read records from the beginining, from the end, or from a specific offset value.
Consume with SmartModules
Fluvio can apply custom SmartModules to transform the data as it is consumed. The SmartModule tranformations are applied before the record is sent to the consumer.
The consumer API allows you to load a local WASM file or reference a SmartModule that has been uploaded to the cluster.
Loal SmartModule WASM File
In general, you would use Smartmodule Development Kit (smdk) to develop a smartmodule, compile it to WebAssembly and load it into the consumer code.
In this example, we will use a pre-compiled SmartModule that simply prints the record key and value to the console.
git clone https://github.com/fluvio-community/smartmodules
cd smartmodules/regex-text
smdk build
cd ../..
cp smartmodules/regex-text/target/wasm32-wasip1/release-lto/regex_text.wasm .
Update the main.rs
file with the following code:
use std::io::Read;
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use flate2::{bufread::GzEncoder, Compression};
use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};
const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_file(
SmartModuleKind::Map,
"regex_text.wasm",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);
// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");
// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
// Create a smartmodule invocation from a wasm file
fn build_smartmodule_from_file(
kind: SmartModuleKind,
file_path: &str,
spec: &str
) -> SmartModuleInvocation {
// Read smartmodule wasm file
let raw_buffer = std::fs::read(file_path).expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer).expect("failed to read encoded wasm file");
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);
// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: kind,
params: params,
}
}
This smartmodule, uses regex, to replace secret
with ****
.
Run the example
$ cargo run
Produce a record:
$ echo "this is a secret value" | fluvio produce hello-rust
As you can see, the "secret" has been replaced.
Use SmartModules from Fluvio Cluster
In this example, we'll use a SmartModule developed by someone else and available for download to the cluster from the InfinyOn Hub.
Update the main.rs
file with the following code:
use std::collections::BTreeMap;
use async_std::stream::StreamExt;
use fluvio::{Fluvio, Offset, SmartModuleExtraParams};
use fluvio::consumer::{
ConsumerConfigExtBuilder,
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
};
const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
// Build smartmodule invocation from wasm file
let sm_invocation = build_smartmodule_from_name(
SmartModuleKind::Map,
"fluvio/regex-text@0.1.0",
r#"[{"replace": {"regex": "secret", "with": "****"}}]"#,
);
// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.smartmodule(vec![sm_invocation])
.build()
.expect("Failed to build consumer config");
// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
// Create a smartmodule invocation using smartmodule name
fn build_smartmodule_from_name(
kind: SmartModuleKind,
smartmodule_name: &str,
spec: &str
) -> SmartModuleInvocation {
// Create smartmodule invocation with params
let mut param_tree = BTreeMap::<String,String>::new();
param_tree.insert("spec".to_owned(), spec.to_owned());
let params = SmartModuleExtraParams::new(param_tree, None);
// Return smartmodule invocation
SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::Predefined(smartmodule_name.to_string()),
kind: kind,
params: params,
}
}
The code performs a similar regex replace as the previous example, but uses a SmartModule that has been uploaded to the cluster.
Run the example
Download the Smartmodule from the InfinyOn Hub:
$ fluvio hub smartmodule download fluvio/regex-text@0.1.0
Compile and run:
$ cargo run
Produce a record:
$ echo "this is a secret value" | fluvio produce hello-rust
As you can see, the "secret" has been replaced.
Consume with Managed Offsets
Fluvio automatically manages consumer offsets for you, eliminating the need to track the last record you've consumed. This ensures that when your client resumes, it seamlessly continues consuming from where it left off.
Fluvio provides two offset management strategies: automatic commits, where offsets are managed seamlessly by the system, and manual commits, giving you full control over when offsets are updated.
Offset Management with Automatic Commits
In this example, we'll create a consumer that commits offsets automatically. Checkout the consumer_with_config for more details.
Update the main.rs
file with the following code:
use async_std::stream::StreamExt;
use fluvio::{
consumer::{ConsumerConfigExtBuilder, OffsetManagementStrategy},
Fluvio, Offset,
};
const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-auto";
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Auto)
.build()
.expect("Failed to build consumer config");
// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await
.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
The code creates a consumer that automatically commits offsets.
Run the example
$ cargo run
Produce the following records:
$ fluvio produce hello-rust <<EOF
Message 1
Message 2
Message 3
EOF
List the consumers:
$ fluvio consumer list
You'll notice the system created a consumer-auto
consumer to capture the last offset.
Use <Ctrl-C>
to stop the consumer, and produce a few more records:
$ fluvio produce hello-rust <<EOF
Message 4
Message 5
Message 6
EOF
Resume the script and notice that the consumer resumes from where it left off.
Offset Management with Manual Commits
In this example, we'll create a consumer that manually commits offsets.
Update the main.rs
file with the following code:
use async_std::stream::StreamExt;
use fluvio::{
consumer::{ConsumerConfigExtBuilder, ConsumerStream, OffsetManagementStrategy},
Fluvio, Offset,
};
const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const CONSUMER_OFFSET: &str = "consumer-manual";
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.expect("Failed to connect to Fluvio");
// Consume last record from topic
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(Offset::end())
.offset_consumer(CONSUMER_OFFSET.to_string())
.offset_strategy(OffsetManagementStrategy::Manual)
.build()
.expect("Failed to build consumer config");
// Create consumer & stream one record
let mut stream = fluvio.consumer_with_config(config).await.expect("Failed to create consumer");
while let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
stream.offset_commit().expect("offset commit failed");
stream.offset_flush().await.expect("offset flush failed");
}
}
The code calls commit
and flush
during every iteration of the loop. You may change this interval based on your business logic.
Run the example
$ cargo run
Produce the following records:
$ fluvio produce hello-rust <<EOF
Message 1
Message 2
Message 3
EOF
List the consumers:
$ fluvio consumer list
You'll notice the system created a consumer-auto
consumer to capture the last offset.
Use <Ctrl-C>
to stop the consumer, and produce a few more records:
$ fluvio produce hello-rust <<EOF
Message 4
Message 5
Message 6
EOF
Resume the script and notice that the consumer resumes from where it left off.
Links to Docs
Refer to the [fluvio docs.rs page] for full detail. The following are some of the most important links: