Runtime Capabilities
The Tremor runtime is the part of the system that takes your Troy configuration, brings it to live and has your CPUs pump events through your pipelines as fast as we could make it do. We tried to follow a few principles we took from years long experience in writing event processing engines, distributes systems and high performance code:
Keep the hot path as free from allocations as possible
The hot path is the way your events take from the input connector through all pipelines towards the output connector. While we need to allocate some memory for the events themselves and for mutations on those events throughout Pipeline and Script processing, we try to move events from end to end without additional allocations.
E.g. for turning JSON encoded data into events, we keep the raw event bytes around and reference that data directly for our JSON strings (with a few exceptions). This avoids one copy of the incoming data.
We preallocate all the datastructures we need for bookkeeping and shuffling events around and try to keep the actual event flow mechanics as simple as possible. At best we only move some pointers.
Execute each entity of the runtime on their own task
As our runtime is built upon async Rust, our unit of concurrency is a Task, which can be scheduled on an executor thread. When and where this task is executed is decided by the underlying executor. We run every entity, that is Pipelines and Connectors, on their own task. All entities communicate via channels (Multi-producer-multi-consumer queues) and this is also the way events take. This decouples event receiving from event handling and sending and allows for smoother spreading of load to multiple CPUs in parallel, especially in the case of multiple Flows being deployed to a Tremor instance in parallel.
This has the nice effect, that you can vertically scale high event volumes by pumping them through multiple parallel flows:
define flow kafka_consumer
flow
# import connector and pipeline definitions
use my_connectors;
use my_pipelines;
create connector kafka_in from my_connectors::kafka_consumer
with
consumer_group = "snot",
topic = "my_topic",
brokers = ["localhost:9092", "example.org:9092"]
end;
create connector elastic_out from my_connectors::elastic;
create pipeline consume from my_pipelines::consume;
connect /connector/kafka_in to /pipeline/consume;
connect /pipeline/consume to /connector/elastic_out;
end;
# deploy three instances of the above flow to leverage available CPU and memory resources
# we will have three kafka consumers in one consumer group, consuming in parallel
# 3 pipelines processing in parallel
# and 3 elastic connectors pushing events to elasticsearch in parallel (with configurable concurrency)
deploy flow kafka_consumer_01 from kafka_consumer;
deploy flow kafka_consumer_02 from kafka_consumer;
deploy flow kafka_consumer_03 from kafka_consumer;
Leverage your CPUs capabilities
Our CPUs have crazy instruction sets (SSE, AVX2, Neon, ...) for data parallel programming that we mostly only use unintended by some optimizing compiler rewriting our crufty loops. Think of them as turning your CPU into a poor mans GPU. Very intelligent people came up with incredibly clever algorithms for leveraging those instruction sets for e.g. parsing JSON, UTF-8 validation or finding bytes in strings. We try to use those whenever possible. Most of the time, we are standing on the shoulders of giants providing all the goodness for us in existing crates we pull in as dependencies, sometimes we roll our own, as we did with a Rust port of simdjson: https://crates.io/crates/simd-json
Contraflow
Tremors runtime was built to enable traffic shaping and improving quality of service for high volumetric data streams. It was built with very real possibility of failure in mind. In a system of many connected computers, either any computer or the network could fail in arbitrarily scary and harmful ways. Tremor, as the safeguard of downstream systems, the harbinger of quality of service, needs to be a good citizen and stop pounding downstream systems that are already known to be out of service.
To that end, the contraflow mechanism was built to allow signals to be propagated back along the Pipeline graph to all upstream Connectors.
All the pipeline graphs you can build form a directed acyclic graph (or DAG). The reason for this is that we can take this DAG and turn it around and still have a DAG. This reversed pipeline graph can be used to propagate special events back the pipeline to all reachable inputs.
Event flow:
Contraflow:
Contraflow is used to transport signals for successful or failed event delivery, for transmitting topological knowledge between connected connectors (for knowing what other connectors are reachable via the connected pipelines) and for managing the circuit breaker mechanism.
The Circuit Breaker Mechanism
Circuit breakers are used to avoid pounding downstream systems with requests (or events in our case) if we know those are doomed anyways. Then the circuit breaker opens and it fails those events early or buffers them (e.g. when using kafka as a buffer) until the downstream system is healthy again. This mechanism is also builtin to our runtime.
It is possible for any operator node in a pipeline and for all connectors to open and close the runtimes builtin circuit breaker by sending a special contraflow event to all reachable upstream pipeline operators and connectors. Each upstream connector receiving such a message will stop pulling data from the external system it connects to, thus stopping to send events downstream. The issuer of the circuit breaker message will check if the downstream system is healthy again and if so, it will close the circuit breaker again in order to start receiving events again.
The runtime even makes use of this mechanism for a controlled start of all elements of a [Flow]. Every connector, when connected as an event-source (on the left hand side of a connect statement), starts with an opened circuit breaker. This ensures that it doesn't start sending events before downstream connectors are prepared to receive them. And every event-sink connector (on the right-hand side of a connect statement) will close the circuit breaker upon startup, so that only when all upstream connectors signal their readiness for events, events actually start to flow from event-source to event-sink.
The qos::backpressure
operator can be used with method = pause
to make use of the circuit breaker mechanism to apply backpressure towards upstream system, otherwise it discards messages by sending them to the overflow
port, where they are usually discarded. The pause
method is great when backpressure should preserve all events.
The qos::roundrobin
operator will distribute events to the given outputs in a roundrobin fashion and take outputs out of rotation if they error repeatedly or their circuit breaker is triggered. It will only forward circuit breaker contraflow events if all outputs are unavailable.
Both the elastic
connector and the http_client
connector use the circuit breaker mechanism to limit concurrent in-flight outgoing requests and stop upstream connectors from sending further events if the supported concurrency is exceeded.
Garuanteed Delivery
Tremor uses the contraflow mechanism to implement Event delivery acknowledgements. In a nutshell, it works like this:
- When a connector receives an event and handles it successfully without error (e.g., sends it off via TCP) and event delivery acknowledgement is sent backward via contraflow, if required, which is flagged in each event, usually by the originating connector.
- When a connector fails to handle an event, a delivery failure message is sent backward via contraflow, if required.
- Pipeline operators and connectors emitting events can handle those messages to implement guaranteed delivery.
An event acknowledgment contraflow message is only handled by the connector (or operator) that did send the event. E.g. a kafka_consumer
connector will commit the offset of the event upon receiving an event acknowledgment.
This means that we have one contraflow event flowing backward for each event reaching its destination. The forward traffic volume will be mirrored by the contraflow volume, just that the contraflow itself is not leaving the system but is handled internally by Operators and Connectors.
Not all connectors support event delivery guarantees, as their nature doesn't support the notion of marking parts of a data stream as successfully handled. Those connectors will acknowledge every event that reaches them. For example the udp_client
connector falls into this category. Other connectors only acknowledge successful transmission, such as the tcp_client
connector but not successful reception. Yet others only acknowledge successful reception but not necessarily successful processing of the downstream system, such as the ws_client
connector.
Connectors like kafka_consumer
wal
support event delivery and processing acknowledgements and are well suited for implementing workloads that can guarantee at-least-once delivery. This guarantee can, however, only hold if all participants honor the same guarantee.
For the circuit breaker mechanism, a successful event acknowledgment is used as a signal for the circuit breaker to close as the downstream system seems to be back to normal.
Please note that contraflow events, be it circuit breaker or GD events, do not traverse over connectors. In other words, two pipelines connected by a connector will not see each other's contraflow events.
Example 1: from kafka_consumer to udp_client
Source: The kafka_consumer
connector, when used with transactional
mode supports guaranteed delivery. A failure on a message after the latest acknowledged offset will revert to that message and replay all data since then.
Sink: The udp_client
connector offers no delivery guarantee. Any message reaching it will trigger an acknowledgment automatically.
Result: All messages read from Kafa will be automatically acknowledged.
Example 2: from udp_server to kafka_producer
Source: UDP as transport protocol offers no guarantees or order, the upstream system sending the messages does not know if it arrived or is handled. Thus the udp_server
simply cannot make any guarantees, or mark any packets as delivered or not.
Sink: Depending on its setting, the kafka_producer
connector acknowledges successful production of the event and will fail an event if it can't produce it to the configured topic.
Result: The upstream system has no guarantees if its messages arrived anywhere. If a message is delivered to tremor it will make a best effort attempt to deliver it, but if Kafka fails to persist the event, a replay can't happen due to UDP's limitations.
Example 3: from http_server to kafka_producer
Source: The http_server
connector will acknowledge successful delivery of the message to its upstream system but does not support replays. Events that did fail inside tremor trigger a 500 Internal Server Error
HTTP response.
Sink: Depending on its setting, kafka_producer
acknowledges successful storage of the message and will fail an event if it can't store it.
Result: Events that are delivered to tremor will be delivered to Kafka. If the kafka_producer
fails to persist the message, an error is sent back to the upstream system as a 500 response to the HTTP request. How the upstream system handles this failure is up to the service.
Example 4: from http_server via wal connector to kafka_producer
Source: The http_server
connector will acknowledge successful delivery of the message to its upstream system but does not support replays. Events that did fail inside tremor trigger a 500 Internal Server Error
HTTP response.
Intermidiary: This setup uses a wal
connector as an intermediary, effectively splitting the system into two pipelines. One pipeline being http_server
to wal
the other being wal
to kafka_producer
.
Sink: Depending on its setting, kafka_producer
acknowledges successful storage of the message and will fail an event if it can't store it.
Result 1 (http -> wal): The wal connector, as long as it doesn't run full, will always acknowledge messages it stores, so all data that is successfully sent to the HTTP endpoint, no matter the state of the downstream system, will be acknowledged. This frees the HTTP API from having to consider retries.
Result 2 (wal -> kafka): The wal
connector acts pretty similar to the kafka_consumer
. Whenever an event is marked as failed, it will replay all events since that one. So failure in the kafka_producer
producing events will lead to a re-transmission from the wal
- this happens in a transparent matter from the point of view of the http_server
connector.
Pause / Resume
With the Contraflow mechanism and the Circuit Breaking we have all the bits and pieces in place to automatically stop upstreams from producing and events when they are not going to be successful. Turns out we can also use this to manually stop events from flowing.
We do expose API endpoints to change the current status of Flows: Patch flow status and single Connectors inside Flows: Patch flow connector status.
Here we see that the current status of the flow main
is running
. It is accepting events from and emitting events into the runtime from stdin.
$ curl -s localhost:9898/v1/flows/main | jq .
{
"alias": "main",
"status": "running",
"connectors": [
"console"
]
}
We can patch the flow status to pause all connectors inside that flow. It will return the new status of the flow in the response:
curl -s -XPATCH -H'application/json' localhost:9898/v1/flows/main -d'{"status": "paused"}' | jq .
{
"alias": "main",
"status": "paused",
"connectors": [
"console"
]
}
The connector console
is receiving string events for each line sent to it via stdin. The whole flow is set up to echo events back to stdout. If we try typing something into the processes terminal, we won't see anythign echoed to stdout, because the connector is paused. It will not pull new data from stdin
until it is resumed again.
We can resume the flow by patching its status back to running. It will return the new status of the flow in the response:
curl -s -XPATCH -H'application/json' localhost:9898/v1/flows/main -d'{"status": "running"}' | jq .
{
"alias": "main",
"status": "running",
"connectors": [
"console"
]
}
Quiescence
TBD