Skip to main content
Version: latest

Split Filter Operator

Like the Split Filter operator, the Split Filter-Map operator extends the functionality of a traffic splitter with the filter-map transformation. The operator canselectively filter out what entries enter what sinks while also applying mapping functionality. The following example is an extension of the filter map example. We will implement a dataflow that can detect the whether or not an input is a valid addition or substraction statement, compute the equation, and send it to the right sink.

Visual of defined dataflow

Prerequisites

This guide uses local Fluvio cluster. If you need to install it, please follow the instructions at here.

Transformation

The filter-map transformation is placed in the sink section.

sinks:
  - type: topic
    id: (...)
    transforms:
      - operator: filter-map
        run: |
          (... filter map function ...)
    (... more topics ...)

The function defined should take in the input and return a Result<Option<...>>. The Result<Option<...>> is either a Some(...) if the input is not filtered or a None if the input should be filtered. The following are our transformations. The code is shortened for brevity but the full example is below. The regex and calculation is just replaced for the dosubtraction topic compared to the filter map example.

sinks:
  - type: topic
    id: doaddition
    transforms: 
      - operator: filter-map 
        dependencies:
          - name: regex
            version: "1"
        run: |
          fn do_addition(input: String) -> Result<Option<String> > {
            (...)
          }
  - type: topic
    id: dosubtraction
    transforms: 
      - operator: filter-map 
        dependencies:
          - name: regex
            version: "1"
        run: |
          fn do_substraction(input: String) -> Result<Option<String> > {
            let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
            if let Some(num) = re.captures(&input) {
                (...)
                return Ok(Some(format!("{}{}",input,(a-b))));
            } else{
                return Ok(None);
            }
          }

Running the Example

Copy and paste following config and save it as dataflow.yaml.

# dataflow.yaml
apiVersion: 0.5.0
meta:
  name: split-filter-map-example
  version: 0.1.0
  namespace: examples

config:
  converter: raw

topics:
  sentences:
    schema:
      value:
        type: string
  doaddition:
    schema:
      value:
        type: string
  dosubtraction:
    schema:
      value:
        type: string

services:
  filter-map-service:
    sources:
      - type: topic
        id: sentences

    sinks:
      - type: topic
        id: doaddition
        transforms: 
          - operator: filter-map 
            dependencies:
              - name: regex
                version: "1"
            run: |
              fn do_addition(input: String) -> Result<Option<String> > {
                let re = regex::Regex::new(r"^(\d+)\+(\d+)=$").unwrap();
                if let Some(num) = re.captures(&input) {
                    let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
                    let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
                    return Ok(Some(format!("{}{}",input,(a+b))));
                } else{
                    return Ok(None);
                }
              }
      - type: topic
        id: dosubtraction
        transforms: 
          - operator: filter-map 
            dependencies:
              - name: regex
                version: "1"
            run: |
              fn do_substraction(input: String) -> Result<Option<String> > {
                let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
                if let Some(num) = re.captures(&input) {
                    let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
                    let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
                    return Ok(Some(format!("{}{}",input,(a-b))));
                } else{
                    return Ok(None);
                }
              }

To run example:

$ sdf run

Produce sentences to in sentence topic:

$ echo "Hello world" | fluvio produce sentences
$ echo "9999+1=" | fluvio produce sentences
$ echo "9999-1=" | fluvio produce sentences

Consume topic doaddition to retrieve the result in another terminal:

$ fluvio consume doaddition -Bd
9999+1=10000

Consume the other topic dosubtraction

$ fluvio consume dosubtraction -Bd
9999-1=9998

We can see the first entry Hello World is discarded, but the other two are sent to the right topic with the respective mapping calculation done.

Cleanup

Exit sdf terminal and clean-up. The --force flag removes the topics:

$ sdf clean --force

Conclusion

In this example, we covered how to use split traffic with the filter map operator.