Logstash - Enrich documents using different index data

You likely needed at some point to enrich a document by retrieving data from another index, Elasticsearch has the Enrich API to do this job, however, it is limited to one matching field on the source index. In order to use a complex query with more matching fields and filters you will have to resort to Logstash.

What you will learn

  1. Update logstash conf file to enrich the document
  2. Create JSON file with a query that accepts more than one field and filters

What will be done

Add a reference data source index (products) which will be queried to enrich documents being loaded by Logstash (transactions.csv, see Source Code section), in this example, customers are purchasing books and depending on the category, product and purchase date the price information is added.

Products - Reference Data

CATEGORY PRODUCT PRICE START_DATE END_DATE
Book Refactoring - Martin Fowler 40 2021-01-01 2021-01-31
Book Refactoring - Martin Fowler 51 2021-02-01 2999-12-31

Transactions

CUSTOMER_NAME CATEGORY PRODUCT PURCHASE_DATE PRICE
John Book Refactoring - Martin Fowler 2021-01-15 (value will be $40 after enrichment)
Hezekiah Book Refactoring - Martin Fowler 2021-02-01 (value will be $51 after enrichment)
James Book Refactoring - Martin Fowler 2021-02-15 (value will be $51 after enrichment)

Update logstash conf file to enrich the document

In the filter section of your conf file (transactions.conf), add the elasticsearch plugin by specifying the host, query template (product_price_query.json) and the index (products) which will be queried to retrieve data to enrich the destination field. Lastly, the "fields" property should indicate the field to be enriched.


input {
	file {
		path => "C:/Carlos/Elastic/samples/transactions*.csv"
		start_position => "beginning"
		sincedb_path => "NUL"	
	}
}
filter {
	csv {
		separator => ","
		columns => ["CUSTOMER_NAME","CATEGORY","PRODUCT","PURCHASE_DATE"]
		remove_field => [ "path","message", "@version","host","timestamp" ]
	} 	
	#Adding the PRICE field which does not exist in the csv file being loaded
	mutate { 
		add_field => { "PRICE" => "" } 
	}

    elasticsearch {
	  hosts => "http://localhost:9200/"
	  #logstash will use the result of this query to enrich PRICE field
      query_template => "C:/Carlos/Elastic/samples/product_price_query.json"
      index => "products"
      fields => {
        "PRICE" => "PRICE"
      }
    }
}
output {
	elasticsearch {
		hosts => "http://localhost:9200/"
		index => "transactions"
  }
stdout { codec => rubydebug }
}									


JSON file with dynamic value query

Logstash will dynamically replace the field values according to each line being read for insertion in the destination index, in order to accomplish this task add "%{field}", once the value is found it is returned to the elasticsearch plugin and placed in the PRICE field.


{
  "size": 1,
  "_source": "PRICE", 
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "CATEGORY": "%{CATEGORY}"
          }
        },
        {
          "match": {
            "PRODUCT": "%{PRODUCT}"
          }
        }
      ],
      "filter": [
        {
          "range": {
            "START_DATE": {
              "lte": "%{PURCHASE_DATE}"
            }
          }
        },
        {
          "range": {
            "END_DATE": {
              "gte": "%{PURCHASE_DATE}"
            }
          }
        }
      ]
    }
  }
}

	

Results


#GET /transactions/_search

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "transactions",
        "_type" : "_doc",
        "_id" : "f8QOwnsBZX_9Y03gVraX",
        "_score" : 1.0,
        "_source" : {
          "PRICE" : 40,
          "CUSTOMER_NAME" : "John",
          "CATEGORY" : "Book",
          "PRODUCT" : "Refactoring - Martin Fowler",
          "PURCHASE_DATE" : "20210115"
        }
      },
      {
        "_index" : "transactions",
        "_type" : "_doc",
        "_id" : "fsQOwnsBZX_9Y03gVraX",
        "_score" : 1.0,
        "_source" : {
          "PRICE" : 51,
          "CUSTOMER_NAME" : "Hezekiah",
          "CATEGORY" : "Book",
          "PRODUCT" : "Refactoring - Martin Fowler",
          "PURCHASE_DATE" : "20210201"
        }
      },
      {
        "_index" : "transactions",
        "_type" : "_doc",
        "_id" : "fcQOwnsBZX_9Y03gVraX",
        "_score" : 1.0,
        "_source" : {
          "PRICE" : 51,
          "CUSTOMER_NAME" : "James",
          "CATEGORY" : "Book",
          "PRODUCT" : "Refactoring - Martin Fowler",
          "PURCHASE_DATE" : "20210215"
        }
      }
    ]
  }
}
	

Source Code

The data to be loaded as well as source files are found in this repository.