Skip to main content
Version: 0.13.0 (stable)

NodeJS SDK Examples

  • This client uses node-bindgen to wrap the Rust client.
  • It supports most administrator features.
  • The blocking calls to Fluvio return promises allowing for async on blocking Fluvio calls.
  • The PartitionConsumer.createStream call returns an asyncIterator to allow iterating over the stream in a for-loop.

To see the full docs, visit our typedoc page.

Example Workflow

Follow the installation instructions to run this example.

/**
* This is an example of a basic Fluvio workflow in Typescript
*
* 1. Establish a connection to the Fluvio cluster
* 2. Create a topic to store data in
* 3. Create a producer and send some bytes
* 4. Create a consumer, and stream the data back
*/
import Fluvio, { Offset, Record } from "@fluvio/client";

const TOPIC_NAME = "hello-node";
const PARTITION = 0;

async function createTopic() {
 try {
   // Connect to the Fluvio cluster
   console.log("Connecting client to fluvio");
   await fluvio.connect();

   // Create admin client;
   const admin = await fluvio.admin();

   // Create topic
   console.log("Creating topic");
   await admin.createTopic(TOPIC_NAME);
 } catch (ex) {
   console.log("Topic already exists", ex);
 }
}

const produce = async () => {
 // Connect to the Fluvio cluster
 console.log("Connecting client to fluvio");
 await fluvio.connect();

 // Create a topic producer;
 const producer = await fluvio.topicProducer(TOPIC_NAME);
 await producer.send("example-key", "Hello World!  - Time is " + Date());
};

const consume = async () => {
 try {
   // Connect to the fluvio cluster referenced in the cli profile.
   await fluvio.connect();

   // Create partition consumer
   const consumer = await fluvio.partitionConsumer(TOPIC_NAME, PARTITION);

   console.log("read from the end");
   await consumer.stream(Offset.FromEnd(), async (record: Record) => {
     // handle record;
     console.log(`Key=${record.keyString()}, Value=${record.valueString()}`);
     process.exit(0);
   });
 } catch (ex) {
   console.log("error", ex);
 }
};

// Create Fluvio Client Instance
const fluvio = new Fluvio();
createTopic();
produce();
consume();

Run

$ npx ts-node example.ts

Expected Output

Connecting client to fluvio
Connecting client to fluvio
Creating topic
read from the end
Key=example-key, Value=Hello World!  - Time is (...)

The above code tries to create a topic, produces an entry for the topic, and consumes the said entry. The process.exit(0) consumes only one record before ending.