Skip to main content
Version: edge

The elastic Connector

The elastic connector integrates ElasticSearch and compatible systems with tremor.

Tested with ElasticSearch v6 and v7 and OpenSearch v1.3.1

Events will be sent to the connected ElasticSearch compatible cluster via the ES Bulk API using the index action by default. It is recommended to batch events sent to this sink using the generic::batch operator to reduce the overhead introduced by the ES Bulk API.

note

The configuration options codec and postprocessors are not used, as elastic will always serialize event payloads as JSON.

If the number of parallel requests surpass concurrency, the connector will trigger the circuit breaker in order to stop events from flowing in. It will restore it again when it regains capacity.

The following metadata variables can be specified on a per event basis as fields under the elastic namespace:

VariableDescriptionTypeRequiredDefault value
actionThe bulk action to perform, one of delete, create, update or index. delete and update require $elastic._id to be set or elastic search will have error.Stringnoindex
_indexThe index to write to.Stringnot if specified in connector configurationindex from connector configuration
_typeThe document type for elastic, deprecated in ES 7.Stringno_doc
_idThe document id for elastic. If not provided ES generates an id.Stringno
pipelineThe elasticsearch pipeline to use.Stringno
raw_payloadBy default, if the update action is used, the event payload is considered as the partial document for update, by wrapping it inside the doc field. Setting this field to true will take the event payload as is. This allows to specify script, doc_as_upsert and more fields.boolnofalse
routingRouting information for elasticsearch. See their docs on bulk routingStringno
timeoutTimeout to wait for operations within elasticsearch (index creation, mapping updates, waiting for active shards)elasticsearch time unitno1m (one minute)
refreshRefresh the affected shards and make this operation visible to searchtrue, false, "wait_for"nofalse
versionSet a custom version manually. See bulk versioning.unsigned integerno
version_typeSee bulk versioning.See elasticsearch version types.no
retry_on_conflictThe number of retries to execute in case of a version conflict.unsigned integerno0
if_primary_termSee optimistic concurrency controlunsigned integerno
if_seq_noSee optimistic concurrency controlunsigned integerno

The following example shows a way how to specify the necessary metadata for elasticsearch:

define script prepare_for_elastic
args
index
script
let $elastic = {
"action": "update",
"_id": event["_id"],
"_index": args.index,
"retry_on_conflict": 3,
"refresh": true,
"timeout": "30s"
};
emit event["payload"];
end;

Configuration

OptionDescriptionTypeRequiredDefault value
urlsList of URLs to elasticsearch cluster nodesarray of stringsyes
indexElasticsearch index to operate on. Can be overwritten by event metadata $elastic["_index"]stringno
concurrencyThe maximum number of in-flight requestsunsigned integerno4
include_payload_in_responseWhether or not to include the whole event payload in ES success and error responses emitted via out and err portsbooleannofalse
headersHTTP headers to add to each request to elasticsearchrecord with string valuesno
authAuthorization method to use for each HTTP request. See auth config.See auth config.no
tlsEnable TLS encrypted traffic to elasticsearch. Specify a custom CA certificate chain or make use of client-side certificates.Seetls client configno
timeoutHTTP request timeout in nanosecondsunsigned integernoBy default no timeout is specified.

Example

config.troy
  define connector elastic from elastic
with
config = {
"nodes": ["http://127.0.0.1:9200/"],
"concurrency": 10,
# When true, attaches request payload to response event
"include_payload_in_response": true
index = "my_index",
# authenticate using an elasticsearch api key
auth = {
"elastic_api_key": {
"id": "ZSHpKIEBc6SDIeISiRsT",
"api_key": "1lqrzNhRSUWmzuQqy333yw"
}
}
}
end;

A batch upload service to elasticsearch

define flow main
flow
use std::time::nanos;
use integration;
use tremor::pipelines;
use tremor::connectors;

define pipeline main
pipeline
define
script process_batch_item
script
# setting required metadata for elastic
let $elastic = {
"_index": "my_little_index",
"action": event.action,
};
let $correlation = event.snot;
match event of
case %{present doc_id} => let $elastic["_id"] = event.doc_id
default => null
end;
event
end;
create script process_batch_item;

define operator batch from generic::batch with
count = 6
end;
create operator batch;

define script process_whole_batch
script
let $elastic = {
"_type": "my_little_doc"
};
event
end;
create script process_whole_batch;

select event from in into process_batch_item;
select event from process_batch_item into batch;
select event from batch into process_whole_batch;
select event from process_whole_batch into out;
select event from process_batch_item/err into err;
select event from process_whole_batch/err into err;
end;

define pipeline response_handling
into out, exit, err
pipeline
select {
"action": $elastic.action,
"success": $elastic.success,
"payload": event.payload,
"index": $elastic["_index"],
"doc": $elastic["_type"],
"correlation": $correlation
}
from in where $elastic.success == true into out;

select {
"action": $elastic.action,
"payload": event.payload,
"success": $elastic.success,
"index": $elastic["_index"],
"doc": $elastic["_type"],
"correlation": $correlation
}
from in where $elastic.success == false into err;

select "exit" from in where match event.payload of case %{ present exit } => true default => false end into exit;
end;

define connector elastic from elastic
with
config = {
"nodes": ["http://127.0.0.1:9200/"],
"concurrency": 10,
"include_payload_in_response": true
}
end;

create connector input from integration::read_file;
create connector errfile from integration::write_file
with
file = "err.log"
end;
create connector okfile from integration::write_file
with
file = "ok.log"
end;
create connector exit from integration::exit;
create connector stdio from connectors::console;
create connector elastic;

create pipeline main;
create pipeline response_handling;


connect /connector/input/out to /pipeline/main/in;
connect /pipeline/main/out to /connector/elastic/in;
connect /connector/elastic/out to /pipeline/response_handling/in;
connect /pipeline/response_handling/out to /connector/okfile/in;
connect /pipeline/response_handling/out to /connector/stdio/in;
connect /pipeline/response_handling/exit to /connector/exit/in;
connect /connector/elastic/err to /pipeline/response_handling/in;
connect /pipeline/response_handling/err to /connector/errfile/in;
connect /pipeline/response_handling/err to /connector/stdio/in;
end;

deploy flow main;