Here is a slightly modified Architecture on my previous post on Getting Started With ElasticSearch. If you find yourself indexing content constantly (like 100s or even 1000s per minute) you might want to consider an asynchronous architecture towards indexing.
Consider the following high level architecture. The layers are self-explanatory. Your users access the system via the web servers which then “talk” with the Application servers which then interact with the data tier. Pretty much business as normal. Now throw into this a fact that you have limited hardware/VM resources and limited money to buy any more. Which means you need to design more appropriately for higher volumes and ensure user response times are not impacted. The key here is to go – asynchronous.
For the sake of the discussion the various components in your Architecture can be either cloud based or in your own physical data centers. The Architecture concern here is that, we have a large amount of incoming data that has to be added into the search engine from both your background processes as well as from user actions on your application. Even though you might have a vast cloud based infrastructure at your command (lucky you), that is no reason to design applications poorly. Design right, then scale out.
In our case, constantly making HTTP calls to ElasticSearch to index documents one by one makes no sense. Yes surely you can thrown in more hardware and grow a high performance ElasticSearch Cluster. But there will be processes on the web server and application server that will constantly block for the indexing call to complete. And most often these indexing calls can be of the “eventually consistent” type. Meaning, it is ok for the indexing to happen in a delayed manner vs real time. Of course you cannot discount the network traffic (and resources) that will be required to implement such a chatty service interaction.
The architecture described here is in context to indexing into ElasticSearch, but it can be used for any other type of processing where eventual consistency can be tolerated. In a previous blog post I had gone through a design (and code) which used the ElasticSearch Java API to index campaign contribution donor information into the search engine. In that example, each of the thousands of documents was being indexed one by one. A bit slow , but worked since all components were on my machine. Put that into a production environment and soon you will hear your pager going ON at 3am.
I will focus on how to create a framework that can be used by either the background processes or user actions on web, to both use the same architecture. The solution is two-fold. First collect or aggregate a set of documents and then index them in bulk using ElasticSearch Bulk Java API. For the sake of this post I will use a bulk size of 100. You can modify the code with a different number if you please. Some of my requirements for this are:
- Producer continues to submit one record at a time for indexing . It has no need to know about bulking of the documents for indexing. Keep it simple principle.
- Producer submits the record to a JMS Queue (fire and forget).
- The consumer side must simply get a bulk set of records (100 at a time) for indexing.
- Any unprocessed documents that are less then 100 in size, but have been collected for a bit longer must get indexed without further delay.
- Finally the bulk data must be persistent and survive server crashes. Basically I need guaranteed delivery and indexing. For the sake of this example I go a bit light on this requirement. I will assume a persistent queue will suffice.
- Finally the integration flow and related plumbing should be kept as simple and far away from the actual code as possible.
These requirements are typically met by using an ESB, but in my case Apache Camel and Spring Integration will suffice. I will focus on using Spring Integration in this blog.
The architecture will be – the data loader reading the campaign contributions file, will write one record at a time to a MessageChannel which is backed by a JMS Queue. Taking advantage of the Aggregator pattern (see Gregor Hohpe and Bobby Woolf on Enterprise Integration Aggregator Patterns) supported in Spring Integration we will aggregate a set of records, batch them into size of 100 each and submit that into ElasticSearch. Lets review the Spring Integration XML configuration.
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 <context:annotation-config /><context:component-scan base-package="com.bigdata.elasticsearch" /><!-- ======================================== --><!-- ActiveMQ CONFIG. --><!-- ======================================== --><bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean><!-- ======================================== --><!-- Spring Integration. --><!-- ======================================== --><!-- Simple channel to which each individual indexing request is writtenout to. --><int-jms:channel id="indexerChannel" queue-name="indexChannel"connection-factory="jmsConnectionFactory" auto-startup="true"concurrency="10"></int-jms:channel><!-- Results of the aggregation of a set of messages is written to thischannel. --><int-jms:channel id="aggregatorOutputChannel"queue-name="aggregatorOutChannel" connection-factory="jmsConnectionFactory"auto-startup="true" concurrency="10"></int-jms:channel><!-- Aggregates individual messages based in the input channel and the releasestrategy. Writes out the aggregated messages to an output channel. --><int:aggregator id="messageAggregator" ref="messageAggregatorBean"method="aggregate" input-channel="indexChannel" output-channel="aggregatorOutputChannel"release-strategy="messageReleaseStrategyBean" release-strategy-method="canRelease"send-partial-result-on-expiry="true" message-store="resultMessageStore"></int:aggregator><!-- Receives an aggregated set of messages to bulk index into ElasticSearch. --><int:service-activator id="contributionIndexerService"ref="contributionIndexerBean" method="bulkIndex" input-channel="aggregatorOutChannel" /><!-- In-memory message store. Can instead be written out to database viajdbc or to mongodb --><bean id="resultMessageStore"class="org.springframework.integration.store.SimpleMessageStore" /><!-- This configuration periodically --><bean id="resultMessageStoreReaper"class="org.springframework.integration.store.MessageGroupStoreReaper"><property name="messageGroupStore" ref="resultMessageStore" /><property name="timeout" value="60000" /></bean><task:scheduled-tasks><task:scheduled ref="resultMessageStoreReaper" method="run"fixed-rate="10000" /></task:scheduled-tasks>
- First we set up the ActiveMQ connection. Remember to install ActiveMQ locally if you want to run this code. I am using version 5.7.0 at this time.
- The LoadDataDriver.java invokes the data load into ElasticSearch and invokes DataLoaderImpl.java which then uses my CSV parser to read in the contents of the data file. Each converted record (instance of class Contribution) is submitted into a JMS Queue (named indexerChannel) via a Spring Integration MessageChannel abstraction.
- Define the aggregatorOutputChannel (another JMS Queue) to which aggregated set of messages (set of 100) are written out to.
- Define the aggregator itself which listens for individual messages from indexChannel, aggregates them based on a release strategy and the default correlation strategy. The release strategy is defined inside bean MessageReleaseStrategyBean. The default correlation strategy uses the JMS CORRELATION_ID header attribute to help it group messages together. I am setting a unique correlation strategy for every 100 messages.
- Define a service activator (ContributionIndexerBean) which will receive bulk messages from the aggregatorOutputChannel (which is the output from the aggregator).
- Finally what about messages that are delayed and being held by the aggregator for some reason OR if there is only say 10 messages in the queue after the file has been read completely. To ensure that these documents get indexed we define a MessageGroupStoreReaper which runs every 10 seconds and looks for messages that have been around for more than a minute and not delivered for indexing. It then sends all those to the service activator. This is a very critical piece of the design.
- The MessageStore used here is the SimpleMessageStore which is all in-memory. There is a possibility that if the server crashes while the aggregator is still collecting messages we can loose them. To resolve this we can persist the messages into a JDBC or Mongodb store. Store implementations are provided.
To run this…
- Download the source (clone) from my git repo https://github.com/thomasma/elasticsearch-async-cmpgn-contribs.
- Install Maven, ActiveMQ and ElasticSearch 0.90.0. See previous blog post on installing ElasticSearch.
- I use the STS IDE (SpringSource version of Eclipse)
- Download the campaign contribution files and modify LoadDataDriver.java to point to the CSV file name.
- Assuming ActiveMQ and ElasticSearch are up, execute the LoadDataDriver class. You should see messages that indicate that the service activator received 100 messages on each invocation to index.