Logstash - Enrich documents using different index data
You likely needed at some point to enrich a document by retrieving data from another index, Elasticsearch has the Enrich API to do this job, however, it is limited to one matching field on the source index. In order to use a complex query with more matching fields and filters you will have to resort to Logstash.
What you will learn
- Update logstash conf file to enrich the document
- Create JSON file with a query that accepts more than one field and filters
What will be done
Add a reference data source index (products) which will be queried to enrich documents being loaded by Logstash (transactions.csv, see Source Code section), in this example, customers are purchasing books and depending on the category, product and purchase date the price information is added.
Products - Reference Data
| CATEGORY | PRODUCT | PRICE | START_DATE | END_DATE |
|---|---|---|---|---|
| Book | Refactoring - Martin Fowler | 40 | 2021-01-01 | 2021-01-31 |
| Book | Refactoring - Martin Fowler | 51 | 2021-02-01 | 2999-12-31 |
Transactions
| CUSTOMER_NAME | CATEGORY | PRODUCT | PURCHASE_DATE | PRICE |
|---|---|---|---|---|
| John | Book | Refactoring - Martin Fowler | 2021-01-15 | (value will be $40 after enrichment) |
| Hezekiah | Book | Refactoring - Martin Fowler | 2021-02-01 | (value will be $51 after enrichment) |
| James | Book | Refactoring - Martin Fowler | 2021-02-15 | (value will be $51 after enrichment) |
Update logstash conf file to enrich the document
In the filter section of your conf file (transactions.conf), add the elasticsearch plugin by specifying the host, query template (product_price_query.json) and the index (products) which will be queried to retrieve data to enrich the destination field. Lastly, the "fields" property should indicate the field to be enriched.
input {
file {
path => "C:/Carlos/Elastic/samples/transactions*.csv"
start_position => "beginning"
sincedb_path => "NUL"
}
}
filter {
csv {
separator => ","
columns => ["CUSTOMER_NAME","CATEGORY","PRODUCT","PURCHASE_DATE"]
remove_field => [ "path","message", "@version","host","timestamp" ]
}
#Adding the PRICE field which does not exist in the csv file being loaded
mutate {
add_field => { "PRICE" => "" }
}
elasticsearch {
hosts => "http://localhost:9200/"
#logstash will use the result of this query to enrich PRICE field
query_template => "C:/Carlos/Elastic/samples/product_price_query.json"
index => "products"
fields => {
"PRICE" => "PRICE"
}
}
}
output {
elasticsearch {
hosts => "http://localhost:9200/"
index => "transactions"
}
stdout { codec => rubydebug }
}
JSON file with dynamic value query
Logstash will dynamically replace the field values according to each line being read for insertion in the destination index, in order to accomplish this task add "%{field}", once the value is found it is returned to the elasticsearch plugin and placed in the PRICE field.
{
"size": 1,
"_source": "PRICE",
"query": {
"bool": {
"must": [
{
"match": {
"CATEGORY": "%{CATEGORY}"
}
},
{
"match": {
"PRODUCT": "%{PRODUCT}"
}
}
],
"filter": [
{
"range": {
"START_DATE": {
"lte": "%{PURCHASE_DATE}"
}
}
},
{
"range": {
"END_DATE": {
"gte": "%{PURCHASE_DATE}"
}
}
}
]
}
}
}
Results
#GET /transactions/_search
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "transactions",
"_type" : "_doc",
"_id" : "f8QOwnsBZX_9Y03gVraX",
"_score" : 1.0,
"_source" : {
"PRICE" : 40,
"CUSTOMER_NAME" : "John",
"CATEGORY" : "Book",
"PRODUCT" : "Refactoring - Martin Fowler",
"PURCHASE_DATE" : "20210115"
}
},
{
"_index" : "transactions",
"_type" : "_doc",
"_id" : "fsQOwnsBZX_9Y03gVraX",
"_score" : 1.0,
"_source" : {
"PRICE" : 51,
"CUSTOMER_NAME" : "Hezekiah",
"CATEGORY" : "Book",
"PRODUCT" : "Refactoring - Martin Fowler",
"PURCHASE_DATE" : "20210201"
}
},
{
"_index" : "transactions",
"_type" : "_doc",
"_id" : "fcQOwnsBZX_9Y03gVraX",
"_score" : 1.0,
"_source" : {
"PRICE" : 51,
"CUSTOMER_NAME" : "James",
"CATEGORY" : "Book",
"PRODUCT" : "Refactoring - Martin Fowler",
"PURCHASE_DATE" : "20210215"
}
}
]
}
}
Source Code
The data to be loaded as well as source files are found in this repository.