Starting from this blog post I’d like to extend that solution to handle a slightly different use case.

Since adding top-level fields is not really my thing without a good reason (but it works as an example) I first checked if there were well-established fields in ECS (mind the version).

The events fields are not completely fulfilling my needs and the discussion is still ongoing so I took another approach. I am anyway dealing with a custom index data with no ECS and various datetime fields to consider.

While I can map some of the ECS field (“semantically”) to some existing fields (for example @timestamp is the equivalent of event.created) I need to calculate potentially many lags compared to the ingestion time so either I “pollute” the schema with top-level fields or I try to group this kind of data in some way. And yes, I am aware that this can quickly became overkill and when the goal is to “trace” a pipeline this is not really a method I would suggest.

But back to the solution with a bit more context: recently I extended the OpenTelemetry resource attributes by using a mycompany top-level (root) key to store some service-specific values so I thought about re-using that concept (I may write a quick post for that as well if I ever find the time).

A root field called mycompany is currently structured like this:

{
  "mycompany":
    "<service_name:":
      field1: [...]
      "<other service-scoped fields": [...]
    "<other sub-mycompany fields not scoped by service": [...]
}

So, without touching the service-scoped sub-tree I started defining these fields:

  • mycompany.ingestion.timestamp: the time when the doc gets processed by an ingest pipeline
  • mycompany.ingestion.lag_from_source: the difference from the ingestion timestamp and the @timestamp value
  • mycompany.ingestion.lag_from_storageTime: another difference from another timestamp (in this case called storageTime), for example a previous step in an upstream pipeline (logstash or other processing happened before sending the doc to the Elasticsearch API)

The Painless syntax to access and store data in a dictionary is not trivial and after some trial and error I was able to make it work, here is the complete file for reference, complete with some error-handling examples:

{
  "description": "Add ingestion timestamp and lag from different phases of their logstash pipelines",
  "processors": [
    {
      "set": {
        "field": "_source.mycompany.ingestion.timestamp",
        "on_failure": [
          {
            "append": {
              "description": "Add an error tag",
              "field": "tags",
              "value": [
                "failed_to_add_ingestion_timestamp"
              ]
            }
          }
        ],
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "description": "Add a lag field with the different between ingest time and source (@timestamp)",
        "if": "ctx.containsKey('@timestamp') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
        "lang": "painless",
        "on_failure": [
          {
            "append": {
              "description": "Add an error tag",
              "field": "tags",
              "value": [
                "failed_to_set_lag_for_source"
              ]
            }
          }
        ],
        "source": "ctx.mycompany.ingestion.lag_from_source_in_seconds = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx.mycompany.ingestion.timestamp))/1000"
      }
    },
    {
      "script": {
        "description": "Add a lag field with the different between ingest time and the storageTime timestamp present in the source",
        "if": "ctx.containsKey('storageTime') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
        "lang": "painless",
        "on_failure": [
          {
            "append": {
              "description": "Add an error tag",
              "field": "tags",
              "value": [
                "failed_to_set_lag_for_storageTime"
              ]
            }
          }
        ],
        "source": "ctx.mycompany.ingestion.lag_from_storageTime_in_seconds = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['storageTime']), ZonedDateTime.parse(ctx.mycompany.ingestion.timestamp))/1000"
      }
    }
  ]
}

You can PUT that JSON to define a pipeline like this:

PUT {{api_endpoint}}/_ingest/pipeline/add-ingestion-timestamp-and-lags

<JSON body of the pipeline>

And test it with the simulate endpoint by sending a bunch of sample docs, in this case just one:

POST {{api_endpoint}}/_ingest/pipeline/add-ingestion-timestamp-and-lags/_simulate?verbose

{
  "docs": [
    {
      "_source": {
        "@timestamp": "2024-03-06T13:33:02.407Z",
        "storageTime": "2024-03-06T13:36:09.549119Z"
        [...other fields...]
      }
    }
  ]
}

The result will be something like this:

{
  "docs": [
    {
      "processor_results": [
        {
          "processor_type": "set",
          "status": "success",
          "doc": {
            "_index": "_index",
            "_version": "-3",
            "_id": "_id",
            "_source": {
              "mycompany": {
                "ingestion": {
                  "timestamp": "2024-03-06T15:03:41.726652718Z"
                }
              },
              "@timestamp": "2024-03-06T13:33:02.407Z",
              "storageTime": "2024-03-06T13:36:09.549119Z"
            },
            "_ingest": {
              "pipeline": "add-ingestion-timestamp-and-lags",
              "timestamp": "2024-03-06T15:03:41.726652718Z"
            }
          }
        },
        {
          "processor_type": "script",
          "status": "success",
          "description": "Add a lag field with the different between ingest time and source (@timestamp)",
          "if": {
            "condition": "ctx.containsKey('@timestamp') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
            "result": true
          },
          "doc": {
            "_index": "_index",
            "_version": "-3",
            "_id": "_id",
            "_source": {
              "mycompany": {
                "ingestion": {
                  "lag_from_source_in_seconds": 5439,
                  "timestamp": "2024-03-06T15:03:41.726652718Z"
                }
              },
              "@timestamp": "2024-03-06T13:33:02.407Z",
              "storageTime": "2024-03-06T13:36:09.549119Z"
            },
            "_ingest": {
              "pipeline": "add-ingestion-timestamp-and-lags",
              "timestamp": "2024-03-06T15:03:41.726652718Z"
            }
          }
        },
        {
          "processor_type": "script",
          "status": "success",
          "description": "Add a lag field with the different between ingest time and the storageTime timestamp present in the source",
          "if": {
            "condition": "ctx.containsKey('storageTime') && ctx.containsKey('mycompany') && ctx.mycompany.containsKey('ingestion') && ctx.mycompany.ingestion.containsKey('timestamp')",
            "result": true
          },
          "doc": {
            "_index": "_index",
            "_version": "-3",
            "_id": "_id",
            "_source": {
              "mycompany": {
                "ingestion": {
                  "lag_from_source_in_seconds": 5439,
                  "lag_from_storageTime_in_seconds": 5252,
                  "timestamp": "2024-03-06T15:03:41.726652718Z"
                }
              },
              "@timestamp": "2024-03-06T13:33:02.407Z",
              "storageTime": "2024-03-06T13:36:09.549119Z"
            },
            "_ingest": {
              "pipeline": "add-ingestion-timestamp-and-lags",
              "timestamp": "2024-03-06T15:03:41.726652718Z"
            }
          }
        }
      ]
    }
  ]
}

Happy ingesting!