Skip to main content
Version: sdf-beta2

Split Filter Map Operator

The Split Filter is an operator used to selectively filter out what entries enter what sinks. The Split Filter is useful when wanting to sort entries into multiple sinks. For this example, we will refer to the example for the filter operator. We will map strings from the topic sentences to sink topics question and not-question. The following visual shows how the service interacts with a single source to two sinks.

Visual of defined dataflow

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Transformation

The general syntax follows from the filter operator's transform but is instead embedded with the sinks.

sinks:
  - type: topic
    id: (...)
    transforms:
      - operator: filter
        run: |
          (... filter function ...)
    (... more topics ...)

In our case, the function takes a string as input, but will always return a boolean value. If the input string contains a question mark, the operator will return true. Otherwise, it will return false.

sinks:
  - type: topic
    id: question
    transforms:
      - operator: filter
        run: |
          fn filter_question(sentence: String) -> Result<bool>{
            Ok(sentence.contains("?"))
          }
  - type: topic
    id: not-question
    transforms:
      - operator: filter
        run: |
          fn filter_not_question(sentence: String) -> Result<bool>{
            Ok(!sentence.contains("?"))
          }

Running the Example

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
  name: split-filter-example
  version: 0.1.0
  namespace: examples

config:
  converter: raw

topics:
  sentences:
    schema:
      value:
        type: string
  question:
    schema:
      value:
        type: string
  not-question:
    schema:
      value:
        type: string


services:
  filter-service:
    sources:
      - type: topic
        id: sentences
    sinks:
      - type: topic
        id: question
        transforms:
          - operator: filter
            run: |
              fn filter_question(sentence: String) -> Result<bool>{
                Ok(sentence.contains("?"))
              }
      - type: topic
        id: not-question
        transforms:
          - operator: filter
            run: |
              fn filter_not_question(sentence: String) -> Result<bool>{
                Ok(!sentence.contains("?"))
              }

To run example:

$ sdf run

Produce sentences to in sentence topic:

$ echo "Hello world" | fluvio produce sentences
$ echo "Are you there?" | fluvio produce sentences

Consume topic question to retrieve the result in another terminal:

$ fluvio consume question -Bd
Are you there?

Consume the other topic not-question

$ fluvio consume not-question -Bd
Hello world

Instead of one topic with only questions, we see that both topics are sent to sinks.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

In this example, we covered how to use split traffic with the filter operator. While quite similar to the filter transform, using it as a split operator may allow for easier code.