Elasticsearch Composite Aggregation using Java

V Ramya
4 min readDec 28, 2020

--

While trying to lookout for an implementation related to Composite Aggregation of Elasticsearch using Java Rest High Level Client on the internet I couldn’t find anything which was easy to understand and implement. Hence, I decided to share my understanding of this topic with the community.

In my current organisation, we extensively use Elasticsearch to store and process complex user related documents to leverage the power of indexing and search capabilities of Elasticsearch for serving our API requests in an optimal time.

Eat. Sleep.Code.Repeat
Photo by Tracy Adams on Unsplash

Let’s first understand the problem we encountered and then our approach to solve it.

Problem-

We have been using search query to find the distinct values of a property ( say id) with the size limit of 10000.Elasticsearch scans all the documents which satisfy the given criteria and then aggregates them into the buckets. As the number of documents started increasing in the index we were facing too_many_buckets_exception in search response. This led us to rethink about the loopholes in our current implementation as it wasn’t really scalable.

Solution-

Whenever we have to query large datasets the best way is to use pagination. We definitely wanted to implement pagination but the challenge was to keep our current functionality intact. We found our ray of hope in Composite Aggregation which supports Pagination + Aggregation allowing you to quickly paginate over large aggregation result sets sequentially without compromising response time.

Implementation-

Let’s quickly check the implementation using Java Rest High Level Client.

Steps-

  1. Search Request- Using this class we create a search request and specify the index to be queried.
SearchRequest searchRequest = new SearchRequest(indexName);

2. CompositeValuesSourceBuilder- We need to have a composite values source builder to add terms on which we are performing aggregation.

List<CompositeValuesSourceBuilder<?>> sourceBuilderList=new ArrayList<>();
sourceBuilderList.add(new TermsValuesSourceBuilder("distinctId").field("id"));

3. CompositeAggregationBuilder- This class will help to build a composite aggregation where we need to define the source fields to be used while building composite buckets. Source fields we created in the step 2 pass them as an argument along with search size.Composite builder has a property “aggregateAfter” where we need to pass last result from our previous query in this property to be able to paginate further. For first and last set of results it would be null.

CompositeAggregationBuilder compositeAggregationBuilder=new CompositeAggregationBuilder("composite_id",sourceBuilderList)
.size(10000);
compositeAggregationBuilder.aggregateAfter(afterKey);

4. SearchSourceBuilder- This is the standard class used to build the search request and we will now be combining everything created for the search into this. Aggregation Type would be the composite aggregation builder since we are looking to make a composite aggregation based search. Now, specify the fields to be included in the search response along with size and hit search.

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.aggregation(compositeAggregationBuilder);
String[] includeFields = {"id"};
searchSourceBuilder.fetchSource(includeFields, null);
searchSourceBuilder.size(10000);
searchRequest.source(searchSourceBuilder);
searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

At the end, the search query would look like this :

{
"size":10000,
"_source":{
"includes":[
"id"
],
"excludes":[

]
},
"aggregations":{
"id_composite":{
"composite":{
"size":10000,
"sources":[
{
"distinctId":{
"terms":{
"field":"id",
"missing_bucket":false,
"order":"asc"
}
}
}
]
}
}
}
}

5. Iteration —We would be iterating in while loop until the aggregations over the data set isn’t over in the search response.

Search Response would contain aggregation results as mentioned below as an example-

{ ...some other values   "aggregations":{
"composite#id_composite":{
"after_key":{
"distinctId":500
},
"buckets":[
{
"key":{
"distinctId":100
},
"doc_count":1
},
{
"key":{
"distinctId":101
},
"doc_count":1
},
{
"key":{
"distinctId":102
},
"doc_count":1
}
.
.
.
more buckets
]
}
}
}

Aggregations are easy to iterate in the form of a list hence we can use for each loop and type cast each aggregation item to ParsedComposite object which keeps track of the afterKey for next round of search query and the data per page in form of buckets.

Each bucket in turn is an array list where each list item consist of a property key which is a LinkedHashMap having key as the name specified for each distinct value( here- “distinctId”) and value would be the id.

SearchResponse searchResponse = findDistinctIds(null); //null after key for first round
while (searchResponse != null && searchResponse.getAggregations() != null) {
List<Aggregation> aggregationList = searchResponse.getAggregations().asList();
if (CollectionUtils.isNotEmpty(aggregationList)) {
for (Aggregation aggregation : aggregationList) {
ParsedComposite parsedComposite = (ParsedComposite) aggregation;
afterKey = parsedComposite.afterKey();
parsedComposite.getBuckets().forEach(parsedBucket -> parsedBucket.getKey().forEach((k, v) -> idList.add(String.valueOf(v))));
}
}
if (afterKey != null) {
searchResponse = findDistinctIds(afterKey);
} else {
searchResponse = null;
}
}

That’s it ! We have successfully completed the composite aggregation implementation using Elasticsearch Java API and I really hope this article was helpful for you. Happy Coding :)

--

--

V Ramya

Backend Developer at Paytm Money. Love cooking food , travelling and reading stories. Music Lover and a Happy Human Being !