Skip to main content
Version: sdf-beta4

SDF provides a schema validation feature to ensure that the data flowing through the dataflow is in the correct format.

Schema

First step is to define data schema. Schema is defined in the types section of the dataflow. The types can be define as inline or in the package which can be shared across multiple dataflows.

For example, the following is a simple object type representing a person.

types:
  person:
    type: object
    properties:
      name:
        type: string
      weight:
        type: u8

To enforce schema, all you have to is to specify the schema in the topic section. For example, the following is a topic definition with schema:

topics:
  persons:
    schema:
      value:
        type: person

Schema can be enforced for both key and value part of the record.

Once defined, it can used to enforce the schema on the data from the source. The enforcement is specific to serialization format. Currently, SDF supports JSON serialization format but it can be extended to other formats in the future.

The serialization format is defined int the configuration section:

config:
  converter: json

This will use json for all topics. But you can override per topic.

Given the schema above, the following JSON object will pass the schema validation:

{
    "name": "joe",
    "age": 30
}

However, the following JSON object will fail the schema validation:

{
    "name": "joe",
    "age": "30"
}

The schema validation error will be reported in the operator log. The error message will indicate the field that failed the validation. The failed record will be skipped and the dataflow will continue to process the next record.

For example, with bad user data above, the error message will be:

$ sdf log -f
Error deserializing input value ExpectedUnsigned at character 0

Number of failed records will be also reflected in the internal metrics. The metrics can be accessed via the sdf show state <operator>/metrics command.

>> show state
 Namespace                                Keys  Type   
 check-adult/user-topic/topic.offset      1     offset 
 check-adult/age-check/metrics            1     table  
 request-processing/request/metrics       1     table  
 request-processing/request/topic.offset  1     offset 
 check-adult/user-topic/metrics           1     table  
>> show state check-adult/user-topic/metrics
 Key    Window  succeeded  failed  last_error_offset 
 stats  *       4          2       5                 
>> 

The SDF type supports following concepts in the schema:

  • primitive types such as string, integer, float, boolean.
  • enum types
  • composite objects with nested properties
  • array or list of objects

Versioning

Inline schema's version is inherited from dataflow version. If you want to version the schema, you can define the schema in the package and then version the package. The versioned package can be then used in the dataflow.

The schema package then can be published to Hub and imported into the dataflow.

Version follows semver syntax. For example, the following is a versioned schema package:

apiVersion: 0.5.0
meta:
  name: person-age-validation
  version: 0.1.0
  namespace: examples

The apiVersion is the pkg syntax version and version in the meta section is the schema version.

Dataflow

Full dataflow is defined as follows:

apiVersion: 0.5.0
meta:
  name: person-age-validation
  version: 0.1.0
  namespace: examples
config:
  converter: json
  consumer:
    default_starting_offset:
      value: 0
      position: End

types:
  user:
    type: object
    properties:
      name:
        type: string
      age:
        type: u8

topics:
  user-topic:
    name: user
    schema:
      value:
        type: user

  message-topic:
    name: message
    schema:
      value:
        type: string


services:
  check-adult:
    sources:
      - type: topic
        id: user-topic
    transforms:
      - operator: map
        run: |
          fn age_check(user: User) -> Result<String> {
            if user.age < 18 {
              Ok("minor".to_string())
            } else {
              Ok("adult".to_string())
            }
           }
    sinks:
      - type: topic
        id: message-topic