Skip to main content
Version: latest

Merge

Services in fluvio can be defined to have multiple sinks and sources. In this example, we will implement a service that takes in multiple sources via a merge. The example will simulate buying and selling stocks. There is a topic buy and another topic sell that aggreate to create a log of buy and sell orders. The visual shows the dataflow we will implement.

Visual of defined dataflow

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Transformation

We can add a transform operator to our source list.

source:
  - type: topic
    id: (...)
    transforms:
      - operator: map
        run: |
          (... filter function ...)
    (... more topics ...)

In our case, we have two topics buy and sell that read json objects that include a name, amount, and price. We will map the json object into a string that gets sent into the topic message which will log the orders.

sources:
  - type: topic
    id: buy
    transforms:
      - operator: map
        run: |
          fn buy_order(order: Order) -> Result<String> {
            Ok(format!("+ Buy Order for  {}x{} at {}",order.name,order.amount,order.price))
          }
  - type: topic
    id: sell 
    transforms:
      - operator: map
        run: |
          fn sell_order(order: Order) -> Result<String> {
            Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
          }

Running the Example

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
  name: merge-example
  version: 0.1.0
  namespace: examples

config:
  converter: json

types:
  order:
    type: object
    properties:
      name:
        type: string
      amount:
        type: u32
      price:
        type: f32  

topics:
  buy:
    schema:
      value:
        type: order
  sell:
    schema:
      value:
        type: order
  message:
    schema:
      value:
        type: string

services:
  mergeservice:
    sources:
      - type: topic
        id: buy
        transforms:
          - operator: map
            run: |
              fn buy_order(order: Order) -> Result<String> {
                Ok(format!("+ Buy Order for  {}x{} at {}",order.name,order.amount,order.price))
              }
      - type: topic
        id: sell 
        transforms:
          - operator: map
            run: |
              fn sell_order(order: Order) -> Result<String> {
                Ok(format!("- Sell Order for {}x{} at {}",order.name,order.amount,order.price))
              }
    sinks:
      - type: topic
        id: message

To run example:

$ sdf run

Produce json objects to each of the topics:

$ echo '{"name":"AMZN","amount":20,"price":173.33}' | fluvio produce buy
$ echo '{"name":"TSLA","amount":20,"price":219.41}' | fluvio produce sell

Consume topic message to retrieve the result:

$ fluvio consume message -Bd
"+ Buy Order for  AMZNx20 at 173.33"
"- Sell Order for TSLAx20 at 219.41"

Both the buy order and sell order has been mapped into a string to be logged.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

In this example, we covered how to use merge to allow services to consume multiple sources.