Welcome to This Week in Fluvio, our weekly newsletter
for development updates to Fluvio open source. Fluvio is a distributed,
programmable streaming platform written in Rust.
New Release - Fluvio v0.9.19
Connector versions
Now when you run fluvio connector list
, the version of the connector is returned
Example connector config:
# test-connector-config.yaml
version: 0.1.1
name: my-test-connector
type: test-connector
topic: my-test-connector-topic
direction: source
%copy first-line%
$ fluvio connector create --config test-connector-config.yaml
%copy first-line%
$ fluvio connector list
NAME VERSION STATUS
my-test-connector 0.1.1 Running
SmartModule debugging support using WASI
This is for advanced users who are willing to compile Fluvio locally. Please follow the Fluvio Developer guide to get set up for local development.
SmartModule devs can now compile Fluvio with WASI support. This provides SmartModules access to stdout
and stderr
for debugging purposes.
$ git clone https://github.com/infinyon/fluvio.git
Build the development Fluvio cluster image with WASI support enabled
$ DEBUG_SMARTMODULE=true make build_k8_image
Build the development Fluvio CLI.
$ make build-cli
Start our development Fluvio cluster with WASI support
$ ./target/debug/fluvio cluster start --develop
Here's our example SmartModule. It is a slight modification of our filter example. For debugging purposes, we print the record to stdout before checking the contents of the record and applying filtering.
use fluvio_smartmodule::{smartmodule, Record, Result};
#[smartmodule(filter)]
pub fn filter(record: &Record) -> Result<bool> {
// Print every record to SPU logs
println!("DEBUG: {record:#?}");
let string = std::str::from_utf8(record.value.as_ref())?;
Ok(string.contains('a'))
}
Before you build the SmartModule, you need to add the wasm32-wasi
target with rustup
.
$ rustup target add wasm32-wasi
Build SmartModule using the wasm32-wasi
target to use it against our WASI-enabled cluster.
$ cargo build --release --target wasm32-wasi
Load the WASI SmartModule into the cluster as wasi-sm
$ ./target/debug/fluvio smart-module create wasi-sm --wasm-file ./target/wasm32-wasi/release/fluvio_wasm_filter.wasm
Create the testing topic twif-22
$ ./target/debug/fluvio topic create twif-22
topic "twif-22" created
For our example producer input, we'll send 2 records to demonstrate the SmartModule output.
$ ./target/debug/fluvio produce twif-22
> a
Ok!
> b
Ok!
In the consumer output using our WASI SmartModule, only the first record prints, which is the correct behavior.
$ ./target/debug/fluvio consume twif-22 --filter wasi-sm
Consuming records from the end of topic 'twif-22'. This will wait for new records
a
⠴
To view our SmartModule debug output, we look at the SPU pod logs in Kubernetes. At the bottom of the log we can verify that the contents of each record was printed.
$ kubectl logs -f fluvio-spg-main-0
[...]
2022-02-15T00:45:25.502747Z INFO accept_incoming{self=FluvioApiServer("0.0.0.0:9005")}: fluvio_service::server: Received connection, spawning request handler
DEBUG: Record {
preamble: RecordHeader {
attributes: 0,
timestamp_delta: 0,
offset_delta: 0,
},
key: None,
value: a,
headers: 0,
}
DEBUG: Record {
preamble: RecordHeader {
attributes: 0,
timestamp_delta: 0,
offset_delta: 0,
},
key: None,
value: b,
headers: 0,
}
Connectors
~> Support for inbound and outbound Postgres connectors discontinued since Fluvio release v0.10.0.
This section is for historical purposes only
Postgres
We will provide a more hands-on blog post in the future, but for now we'll summarize the release.
Postgres Source connector
The Fluvio source connector allows you to connect to an external Postgres database and implement Change Data Capture (CDC) patterns by recording all database updates into a Fluvio topic.
There is a little bit of required configuration on the Postgres database side, but the Postgres source connector config looks like this:
# example-pg-source-connect.yml
version: 0.1.0
name: my-postgres-source
type: postgres-source
topic: postgres-topic
parameters:
publication: fluvio
slot: fluvio
secrets:
FLUVIO_PG_DATABASE_URL: postgres://postgres:mysecretpassword@localhost:5432/postgres
Postgres Sink connector
The Postgres sink connector consumes the CDC event data from the Postgres source connector and runs the corresponding SQL against the sink connector's Postgres database.
The Postgres sink connector looks like this:
# connect.yml
version: 0.1.0
name: my-postgres-sink
type: postgres-sink
topic: postgres-topic
parameters:
url: postgres://postgres:mysecretpassword@localhost:5432/postgres
secrets:
FLUVIO_PG_DATABASE_URL: postgres://postgres:mysecretpassword@localhost:5432/postgres
Postgres Connector Docs available now
A lot of work went into the release of our new Postgres connectors that we couldn't cover in depth here.
We encourage you to visit the docs, and expect a walkthrough using the Source and Sink connectors together in the future.
Docs for Postgres Source connector
Docs for Postgres Sink connector
HTTP
Our HTTP source connector has new options available output_type
and output_parts
to format its output.
Example HTTP connector config
%copy%
# connect.yml
version: 0.2.0
name: cat-facts
type: http
topic: cat-facts
direction: source
parameters:
endpoint: https://catfact.ninja/fact
interval: 10
output_parts: body # default
output_type: text # default
For example, our endpoint returns a JSON object in the body of the HTTP response.
%copy first-line%
$ fluvio consume cat-facts
Consuming records from the end of topic 'cat-facts'. This will wait for new records
{"fact":"In 1987 cats overtook dogs as the number one pet in America.","length":60}
If you want the full HTTP response, you can use output_parts: full
# connect.yml
[...]
- output_parts: body # default
+ output_parts: full
output_type: text # default
%copy first-line%
$ fluvio consume cat-facts
Consuming records from the end of topic 'cat-facts'. This will wait for new records
HTTP/1.1 200 OK
server: nginx
date: Wed, 16 Feb 2022 00:53:04 GMT
content-type: application/json
transfer-encoding: chunked
connection: keep-alive
vary: Accept-Encoding
cache-control: no-cache, private
x-ratelimit-limit: 100
x-ratelimit-remaining: 96
access-control-allow-origin: *
set-cookie: XSRF-TOKEN=REDACTED expires=Wed, 16-Feb-2022 02:53:04 GMT; path=/; samesite=lax
set-cookie: cat_facts_session=REDACTED expires=Wed, 16-Feb-2022 02:53:04 GMT; path=/; httponly; samesite=lax
x-frame-options: SAMEORIGIN
x-xss-protection: 1; mode=block
x-content-type-options: nosniff
{"fact":"Cats only use their meows to talk to humans, not each other. The only time they meow to communicate with other felines is when they are kittens to signal to their mother.","length":170}
If you plan to process the HTTP response details, it might be more useful to use output_type: json
.
# connect.yml
[...]
output_parts: full
- output_type: text # default
+ output_type: json
$ fluvio consume cat-facts
Consuming records from the end of topic 'cat-facts'. This will wait for new records
{"status":{"version":"HTTP/1.1","code":200,"string":"OK"},"header":{"set-cookie":["XSRF-TOKEN=REDACTED expires=Wed, 16-Feb-2022 02:56:22 GMT; path=/; samesite=lax","cat_facts_session=REDACTED expires=Wed, 16-Feb-2022 02:56:22 GMT; path=/; httponly; samesite=lax"],"content-type":"application/json","x-frame-options":"SAMEORIGIN","x-content-type-options":"nosniff","x-xss-protection":"1; mode=block","vary":"Accept-Encoding","server":"nginx","x-ratelimit-remaining":"94","date":"Wed, 16 Feb 2022 00:56:22 GMT","transfer-encoding":"chunked","cache-control":"no-cache, private","x-ratelimit-limit":"100","access-control-allow-origin":"*","connection":"keep-alive"},"body":"{\"fact\":\"There are more than 500 million domestic cats in the world, with approximately 40 recognized breeds.\",\"length\":100}"}
Get in touch with us on Github Discussions or join our Discord channel and come say hello! Watch videos on our InfinyOn Youtube Channel
For the full list of changes this week, be sure to check out our CHANGELOG.