Elasticsearch Pipelines and Ingest Nodes
An ingest pipeline transforms documents before they are stored. Instead of cleaning data in your application code, you define transformation rules once in Elasticsearch and apply them automatically to every incoming document.
The Assembly Line Analogy
Raw car parts arrive → Assembly Line → Finished car stored in warehouse Raw document arrives → Ingest Pipeline → Transformed document stored in index Pipeline steps (called processors) run in order. Each processor makes one change. Chain them for complex transforms.
How Ingest Works
Application sends document:
{ "message": " Hello World ", "ip": "203.0.113.45", "raw_date": "15/06/2024" }
|
v
Ingest Node receives document
|
v
Pipeline runs processors in sequence:
1. Trim whitespace from "message" → "Hello World"
2. Resolve IP to location → { city: "Mumbai", country: "India" }
3. Parse date to standard format → "2024-06-15T00:00:00Z"
|
v
Transformed document stored in index:
{
"message": "Hello World",
"ip": "203.0.113.45",
"geo": { "city": "Mumbai", "country": "India" },
"date": "2024-06-15T00:00:00Z"
}
Creating a Pipeline
PUT /_ingest/pipeline/clean_log_pipeline
{
"description": "Cleans and enriches log documents",
"processors": [
{
"trim": {
"field": "message"
}
},
{
"lowercase": {
"field": "status"
}
},
{
"set": {
"field": "environment",
"value": "production"
}
},
{
"remove": {
"field": "raw_internal_id"
}
}
]
}
Common Processors
| Processor | What It Does |
|---|---|
| set | Adds or overwrites a field value |
| remove | Deletes a field from the document |
| rename | Renames a field |
| trim | Strips leading and trailing whitespace |
| lowercase / uppercase | Changes text case |
| convert | Changes a field's data type (string → integer) |
| date | Parses date strings into Elasticsearch date format |
| grok | Extracts fields from unstructured text using patterns |
| geoip | Looks up geographic data from an IP address |
| split | Splits a string into an array by a delimiter |
| join | Joins an array into a string |
| script | Runs a custom Painless script for complex logic |
Grok — Parsing Unstructured Logs
Most server logs are plain text strings. Grok uses named patterns to extract structured fields from them:
Raw log line:
"203.0.113.45 - GET /home 200 1024"
Grok pattern:
"%{IP:client_ip} - %{WORD:method} %{URIPATHPARAM:path} %{NUMBER:status} %{NUMBER:bytes}"
Result:
{
"client_ip": "203.0.113.45",
"method": "GET",
"path": "/home",
"status": "200",
"bytes": "1024"
}
Grok ships with patterns for Apache logs, Nginx logs, syslog, and dozens of other common formats.
Using a Pipeline When Indexing
POST /logs/_doc?pipeline=clean_log_pipeline
{
"message": " Server started ",
"status": "OK",
"raw_internal_id": "tmp-abc123"
}
Set a default pipeline on an index so every document automatically uses it — no ?pipeline parameter needed:
PUT /logs/_settings
{
"default_pipeline": "clean_log_pipeline"
}
Testing a Pipeline Before Using It
Simulate a pipeline against a sample document without storing anything:
POST /_ingest/pipeline/clean_log_pipeline/_simulate
{
"docs": [
{
"_source": {
"message": " Test message ",
"status": "WARNING",
"raw_internal_id": "tmp-xyz"
}
}
]
}
The response shows what the document looks like after every processor runs — a safe way to verify your pipeline logic before putting it in production.
Error Handling in Pipelines
Use on_failure to handle errors without failing the entire document. For example, if the geoip lookup fails, set a default location:
{
"geoip": {
"field": "ip",
"on_failure": [
{
"set": {
"field": "geo.country",
"value": "Unknown"
}
}
]
}
}
