Elasticsearch Pipelines and Ingest Nodes

An ingest pipeline transforms documents before they are stored. Instead of cleaning data in your application code, you define transformation rules once in Elasticsearch and apply them automatically to every incoming document.

The Assembly Line Analogy

Raw car parts arrive → Assembly Line → Finished car stored in warehouse

Raw document arrives → Ingest Pipeline → Transformed document stored in index

Pipeline steps (called processors) run in order.
Each processor makes one change. Chain them for complex transforms.

How Ingest Works

Application sends document:
  { "message": "  Hello World  ", "ip": "203.0.113.45", "raw_date": "15/06/2024" }
        |
        v
  Ingest Node receives document
        |
        v
  Pipeline runs processors in sequence:
    1. Trim whitespace from "message"       → "Hello World"
    2. Resolve IP to location               → { city: "Mumbai", country: "India" }
    3. Parse date to standard format        → "2024-06-15T00:00:00Z"
        |
        v
  Transformed document stored in index:
  {
    "message": "Hello World",
    "ip": "203.0.113.45",
    "geo": { "city": "Mumbai", "country": "India" },
    "date": "2024-06-15T00:00:00Z"
  }

Creating a Pipeline

PUT /_ingest/pipeline/clean_log_pipeline
{
  "description": "Cleans and enriches log documents",
  "processors": [
    {
      "trim": {
        "field": "message"
      }
    },
    {
      "lowercase": {
        "field": "status"
      }
    },
    {
      "set": {
        "field": "environment",
        "value": "production"
      }
    },
    {
      "remove": {
        "field": "raw_internal_id"
      }
    }
  ]
}

Common Processors

ProcessorWhat It Does
setAdds or overwrites a field value
removeDeletes a field from the document
renameRenames a field
trimStrips leading and trailing whitespace
lowercase / uppercaseChanges text case
convertChanges a field's data type (string → integer)
dateParses date strings into Elasticsearch date format
grokExtracts fields from unstructured text using patterns
geoipLooks up geographic data from an IP address
splitSplits a string into an array by a delimiter
joinJoins an array into a string
scriptRuns a custom Painless script for complex logic

Grok — Parsing Unstructured Logs

Most server logs are plain text strings. Grok uses named patterns to extract structured fields from them:

Raw log line:
  "203.0.113.45 - GET /home 200 1024"

Grok pattern:
  "%{IP:client_ip} - %{WORD:method} %{URIPATHPARAM:path} %{NUMBER:status} %{NUMBER:bytes}"

Result:
  {
    "client_ip": "203.0.113.45",
    "method":    "GET",
    "path":      "/home",
    "status":    "200",
    "bytes":     "1024"
  }

Grok ships with patterns for Apache logs, Nginx logs, syslog, and dozens of other common formats.

Using a Pipeline When Indexing

POST /logs/_doc?pipeline=clean_log_pipeline
{
  "message": "  Server started  ",
  "status": "OK",
  "raw_internal_id": "tmp-abc123"
}

Set a default pipeline on an index so every document automatically uses it — no ?pipeline parameter needed:

PUT /logs/_settings
{
  "default_pipeline": "clean_log_pipeline"
}

Testing a Pipeline Before Using It

Simulate a pipeline against a sample document without storing anything:

POST /_ingest/pipeline/clean_log_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "  Test message  ",
        "status": "WARNING",
        "raw_internal_id": "tmp-xyz"
      }
    }
  ]
}

The response shows what the document looks like after every processor runs — a safe way to verify your pipeline logic before putting it in production.

Error Handling in Pipelines

Use on_failure to handle errors without failing the entire document. For example, if the geoip lookup fails, set a default location:

{
  "geoip": {
    "field": "ip",
    "on_failure": [
      {
        "set": {
          "field": "geo.country",
          "value": "Unknown"
        }
      }
    ]
  }
}

Leave a Comment