Skip to main content

Introduction

In Vatis Streams engine, the first-class entity is a stream. A stream is a of processors. This DAG of processors alongside with the configuration of each processor is stored in a stream configuration template. Each stream created using the Vatis Streams API is based on a stream configuration template, that can be patched to modify processors configuration.

Stream configuration template

A stream configuration template can be (optionally) patched and retrieved using the Patch by id endpoint
An example of a stream configuration template is shown below:
{
  "id": "65f46430c6d8985974ff9c9d",
  "name": "Ping",
  "description": "Simple pipeline for testing stream latency",
  "dataSourceSchema": "bytes",
  "processors": [
    {
      "id": "echo",
      "name": "echo",
      "description": "Echo operation",
      "processorDeclaration": {
        "id": "echo",
        "displayName": "Echo",
        "description": "Echo processor. Returns the same message it receives.",
        "clusterDomainNameTemplate": "echo-${REPLICA}.echo-headless.stream-processors.svc.cluster.local",
        "podsDiscoveryClusterDomainName": "echo-headless.stream-processors.svc.cluster.local",
        "revision": "2024-02-06",
        "propertiesSchema": "tech.vatis.schema.stream.processor.properties.NoPropertiesDto",
        "inputSchemas": [
          "bytes"
        ],
        "outputSchema": "bytes",
        "features": {
          "support": [],
          "require": [],
          "implement": [],
          "requireSupport": []
        },
        "operationType": "NO_OPERATION"
      },
      "previousProcessorId": null,
      "propertiesSchema": "tech.vatis.schema.stream.processor.properties.NoPropertiesDto",
      "properties": {},
      "destinationTopicMessageHeaders": {},
      "destinationTopicHeaders": {
        "X-Vat-Topic-Accept-Frame-Type": "final"
      },
      "sourceSubscriptionHeaders": {
        "X-Vat-Sub-Accept-Frame-Type": "final"
      },
      "patchAliases": {},
      "egress": {
        "persist": false,
        "sink": false,
        "tags": []
      }
    }
  ],
  "patchAliases": {}
}
This stream configuration template can produce a stream similar to the one shown below:
{
  "streamId": "20d36912-f746-4f45-8e68-d537fc1ff57f",
  "name": "Stream jokidx",
  "groupId": "20d36912-f746-4f45-8e68-d537fc1ff57f",
  "configuration": {
    "streamConfigurationTemplateId": "65f46430c6d8985974ff9c9d",
    "dataSource": {
      "topic": "persistent://vatis-streams/data-sources/4c43a454-32b4-4ef4-baa9-3442f5fecc90",
      "messageSchema": "bytes",
      "connectionOptions": {
        "X-Vat-Topic-Conn-Close-On-Disconnect": "true"
      },
      "headers": {
        "X-Vat-Topic-Accept-Frame-Type": "final"
      },
      "messageHeaders": {
        "X-Vat-Stream-Frame-Type": "final",
        "X-Vat-Stream-Processor-Id": "gateway"
      },
      "subscriptions": {
        "echo": {
          "subscription": "echo-ZlejK",
          "topic": "persistent://vatis-streams/data-sources/4c43a454-32b4-4ef4-baa9-3442f5fecc90",
          "messageSchema": "bytes",
          "connectionOptions": {},
          "headers": {
            "X-Vat-Sub-Accept-Frame-Type": "final"
          }
        }
      }
    },
    "dataSinks": {
      "echo": {
        "topic": "persistent://vatis-streams/persistent-data-sinks/f8eeb155-d496-4746-b949-94ae05426544",
        "messageSchema": "bytes",
        "connectionOptions": {},
        "headers": {
          "X-Vat-Topic-Accept-Frame-Type": "final"
        },
        "messageHeaders": {
          "X-Vat-Stream-Egress-Tags": "echo,main",
          "X-Vat-Stream-Id": "20d36912-f746-4f45-8e68-d537fc1ff57f",
          "X-Vat-Stream-Group-Id": "20d36912-f746-4f45-8e68-d537fc1ff57f",
          "X-Vat-Stream-Schema": "bytes",
          "X-Vat-Stream-Processor-Id": "echo"
        },
        "subscriptions": {}
      }
    },
    "processors": [
      {
        "id": "echo",
        "source": {
          "subscription": "echo-ZlejK",
          "topic": "persistent://vatis-streams/data-sources/4c43a454-32b4-4ef4-baa9-3442f5fecc90",
          "messageSchema": "bytes",
          "connectionOptions": {},
          "headers": {
            "X-Vat-Sub-Accept-Frame-Type": "final"
          }
        },
        "destination": {
          "topic": "persistent://vatis-streams/persistent-data-sinks/f8eeb155-d496-4746-b949-94ae05426544",
          "messageSchema": "bytes",
          "connectionOptions": {},
          "headers": {
            "X-Vat-Topic-Accept-Frame-Type": "final"
          },
          "messageHeaders": {
            "X-Vat-Stream-Egress-Tags": "echo,main",
            "X-Vat-Stream-Id": "20d36912-f746-4f45-8e68-d537fc1ff57f",
            "X-Vat-Stream-Group-Id": "20d36912-f746-4f45-8e68-d537fc1ff57f",
            "X-Vat-Stream-Schema": "bytes",
            "X-Vat-Stream-Processor-Id": "echo"
          },
          "subscriptions": {}
        },
        "clusterDomainName": "echo-0.echo-headless.stream-processors.svc.cluster.local",
        "podsDiscoveryClusterDomainName": "echo-headless.stream-processors.svc.cluster.local",
        "propertiesSchema": "tech.vatis.schema.stream.processor.properties.NoPropertiesDto",
        "properties": {},
        "egress": {
          "persist": true,
          "sink": true,
          "tags": [
            "main",
            "echo"
          ]
        }
      }
    ]
  },
  "createdTimestamp": "2024-12-12T18:47:06.865Z",
  "creatorId": "d710a067-daf3-460d-9e60-01d0bb8d10c4",
  "groupCreatorId": "d710a067-daf3-460d-9e60-01d0bb8d10c4",
  "state": "READY",
  "stateDescription": null,
  "finalizedTimestamp": null
}
The Vatis Streams API provides a set of pre-defined stream configuration templates that can be used out-of-the-box for use-cases like Speech-to-Text, Audio Intelligence, Real-time transcoding, etc.

Streams

When a stream is created, it requires a stream configuration template id. This can be specified using the streamConfigurationTemplateId query parameter set in the connection URL.