Starting from this blog post I’d like to extend that solution to handle a slightly different use case.
Since adding top-level fields is not really my thing without a good reason (but it works as an example) I first checked if there were well-established fields in ECS (mind the version).
The events
fields are not completely fulfilling my needs and the discussion is still ongoing so I took another approach. I am anyway dealing with a custom index data with no ECS and various datetime fields to consider.
While I can map some of the ECS field (“semantically”) to some existing fields (for example @timestamp
is the equivalent of event.created
) I need to calculate potentially many lags compared to the ingestion time so either I “pollute” the schema with top-level fields or I try to group this kind of data in some way. And yes, I am aware that this can quickly became overkill and when the goal is to “trace” a pipeline this is not really a method I would suggest.
But back to the solution with a bit more context: recently I extended the OpenTelemetry resource attributes by using a mycompany
top-level (root) key to store some service-specific values so I thought about re-using that concept (I may write a quick post for that as well if I ever find the time).
A root field called mycompany
is currently structured like this:
{
"mycompany":
"<service_name:":
field1: [...]
"<other service-scoped fields": [...]
"<other sub-mycompany fields not scoped by service": [...]
}
So, without touching the service-scoped sub-tree I started defining these fields:
mycompany.ingestion.timestamp
: the time when the doc gets processed by an ingest pipelinemycompany.ingestion.lag_from_source
: the difference from the ingestion timestamp and the@timestamp
valuemycompany.ingestion.lag_from_storageTime
: another difference from another timestamp (in this case calledstorageTime
), for example a previous step in an upstream pipeline (logstash or other processing happened before sending the doc to the Elasticsearch API)
The Painless syntax to access and store data in a dictionary is not trivial and after some trial and error I was able to make it work, here is the complete file for reference, complete with some error-handling examples:
{
"description": "Add ingestion timestamp and lag from different phases of their logstash pipelines",
"processors": [
{
"set": {
"field": "_source.mycompany.ingestion.timestamp",
"on_failure": [
{
"append": {
"description": "Add an error tag",
"field": "tags",
"value": [
"failed_to_add_ingestion_timestamp"
]
}
}
],
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"description": "Add a lag field with the different between ingest time and source (@timestamp)",
"if": "ctx.containsKey('@timestamp') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
"lang": "painless",
"on_failure": [
{
"append": {
"description": "Add an error tag",
"field": "tags",
"value": [
"failed_to_set_lag_for_source"
]
}
}
],
"source": "ctx.mycompany.ingestion.lag_from_source_in_seconds = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx.mycompany.ingestion.timestamp))/1000"
}
},
{
"script": {
"description": "Add a lag field with the different between ingest time and the storageTime timestamp present in the source",
"if": "ctx.containsKey('storageTime') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
"lang": "painless",
"on_failure": [
{
"append": {
"description": "Add an error tag",
"field": "tags",
"value": [
"failed_to_set_lag_for_storageTime"
]
}
}
],
"source": "ctx.mycompany.ingestion.lag_from_storageTime_in_seconds = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['storageTime']), ZonedDateTime.parse(ctx.mycompany.ingestion.timestamp))/1000"
}
}
]
}
You can PUT
that JSON to define a pipeline like this:
PUT {{api_endpoint}}/_ingest/pipeline/add-ingestion-timestamp-and-lags
<JSON body of the pipeline>
And test it with the simulate endpoint by sending a bunch of sample docs, in this case just one:
POST {{api_endpoint}}/_ingest/pipeline/add-ingestion-timestamp-and-lags/_simulate?verbose
{
"docs": [
{
"_source": {
"@timestamp": "2024-03-06T13:33:02.407Z",
"storageTime": "2024-03-06T13:36:09.549119Z"
[...other fields...]
}
}
]
}
The result will be something like this:
{
"docs": [
{
"processor_results": [
{
"processor_type": "set",
"status": "success",
"doc": {
"_index": "_index",
"_version": "-3",
"_id": "_id",
"_source": {
"mycompany": {
"ingestion": {
"timestamp": "2024-03-06T15:03:41.726652718Z"
}
},
"@timestamp": "2024-03-06T13:33:02.407Z",
"storageTime": "2024-03-06T13:36:09.549119Z"
},
"_ingest": {
"pipeline": "add-ingestion-timestamp-and-lags",
"timestamp": "2024-03-06T15:03:41.726652718Z"
}
}
},
{
"processor_type": "script",
"status": "success",
"description": "Add a lag field with the different between ingest time and source (@timestamp)",
"if": {
"condition": "ctx.containsKey('@timestamp') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
"result": true
},
"doc": {
"_index": "_index",
"_version": "-3",
"_id": "_id",
"_source": {
"mycompany": {
"ingestion": {
"lag_from_source_in_seconds": 5439,
"timestamp": "2024-03-06T15:03:41.726652718Z"
}
},
"@timestamp": "2024-03-06T13:33:02.407Z",
"storageTime": "2024-03-06T13:36:09.549119Z"
},
"_ingest": {
"pipeline": "add-ingestion-timestamp-and-lags",
"timestamp": "2024-03-06T15:03:41.726652718Z"
}
}
},
{
"processor_type": "script",
"status": "success",
"description": "Add a lag field with the different between ingest time and the storageTime timestamp present in the source",
"if": {
"condition": "ctx.containsKey('storageTime') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
"result": true
},
"doc": {
"_index": "_index",
"_version": "-3",
"_id": "_id",
"_source": {
"mycompany": {
"ingestion": {
"lag_from_source_in_seconds": 5439,
"lag_from_storageTime_in_seconds": 5252,
"timestamp": "2024-03-06T15:03:41.726652718Z"
}
},
"@timestamp": "2024-03-06T13:33:02.407Z",
"storageTime": "2024-03-06T13:36:09.549119Z"
},
"_ingest": {
"pipeline": "add-ingestion-timestamp-and-lags",
"timestamp": "2024-03-06T15:03:41.726652718Z"
}
}
}
]
}
]
}
Happy ingesting!