Spring Integration with JMS, ActiveMQ and MongoDB

Extending from some of my previous posts around the 2012 Presidential political contributions, here I will use Spring Integration, ActiveMQ, JMS and Mongodb to load the CSV data into Mongodb.

Note: In a real world scenario where you have to load a CSV/JSON file into Mongo, you might want to consider the use of the mongoimport command line tool. Often if its simple data with no transformations this will suffice. This blog takes a different path as a learning experience.

I have been dabbling with Spring Integration recently and wanted to put this quick post together on how to use it. The use case is simple. Download the political contributions CSV dataset from http://fec.gov/disclosurep/PDownload.do   . You can download the ALL.zip or any one of the state specific datasets. The ALL.zip is 155 MB and blows up to 941MB uncompressed. The goal here is to read through the CSV file and dump each record into ActiveMQ. A listener then consumes the object and writes out to MongoDB. So I recommend the reader to download one of the state specific ones.

Why would you consider a framework such as Spring Integration in the first place? A few reasons that come to my mind. When implementing your business logic (BL) you should be focusing on just that – the business logic that is critical to your business. The less time you can spend on low level plumbing connecting to email/jms/http servers or other 3rd party servers/protocols the more time you can focus on your BL. Second you are able to express your complex integration flows in an XML representation (hopefully DSL option in future). This will allow sophisticated IDE’s to draw for you a diagram representing that flow. This is a huge benefit vs having the integration flows embedded in Java code. Third, it encourages best practices and tremendous design flexibility by making you design with Enterprise Integration Patterns in mind. While Spring Integration will not do it all for you its a great platform to build your code on. Now for the code example.

Before you start you need to install Maven and also my custom CSV reader (jar included in the git repo for this blog code as fft-0.8.jar). This is a simple CSV reader I put together years back. If you need to look into that further go to https://github.com/thomasma/flatfilereader. Use mvn install to install the jar to your local repo. Command is

     mvn install:install-file -Dfile=fft-0.8.jar -DgroupId=org.aver -DartifactId=fft -Dversion=0.8 -Dpackaging=jar

First lets look at the Spring configuration.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:util="http://www.springframework.org/schema/util" xmlns:mongo="http://www.springframework.org/schema/data/mongo"
    xmlns:int-mongodb="http://www.springframework.org/schema/integration/mongodb"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd 
	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd 
    http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd
    http://www.springframework.org/schema/integration/mongodb http://www.springframework.org/schema/integration/mongodb/spring-integration-mongodb-2.2.xsd
	http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
    http://www.springframework.org/schema/data/mongo http://www.springframework.org/schema/data/mongo/spring-mongo-1.1.xsd">

    <context:annotation-config />
    <context:component-scan base-package="com.bigdata.mongodb" />
    <aop:aspectj-autoproxy />

    <!-- ======================================== -->
    <!-- MONGODB CONFIG. -->
    <!-- ======================================== -->
    <mongo:mongo id="mongo" host="localhost" port="27017" />
    <mongo:db-factory id="mongoDbFactory" mongo-ref="mongo" dbname="contributions" />
    <bean id="mongoTemplate" class="org.springframework.data.mongodb.core.MongoTemplate">
        <constructor-arg ref="mongo" />
        <constructor-arg name="databaseName" value="contributionsdb" />
    </bean>

    <!-- ======================================== -->
    <!-- JMS CONFIG. -->
    <!-- ======================================== -->
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>tcp://localhost:61616</value>
        </property>
    </bean>

    <!-- ======================================== -->
    <!-- SPRING INTEGRATION FLOW CONFIG. -->
    <!-- ======================================== -->
    <int-jms:channel id="saveToDBChannel" queue-name="saveToDBQueue"
        connection-factory="jmsConnectionFactory" auto-startup="true" concurrency="50">
        <int-jms:interceptors></int-jms:interceptors>
    </int-jms:channel>

    <int-mongodb:outbound-channel-adapter id="outBoundMongoAdapter"
        collection-name="contributions" mongo-converter="contributionsJSONConverter"
        mongodb-factory="mongoDbFactory" channel="saveToDBChannel" />

    <!-- JSON Converter for Contributions. -->
    <bean id="contributionsJSONConverter" class="com.bigdata.mongodb.service.impl.ContributionsJSONConverter">
        <constructor-arg ref="mongoDbFactory" />
        <constructor-arg>
            <bean class="org.springframework.data.mongodb.core.mapping.MongoMappingContext" />
        </constructor-arg>
    </bean>
</beans>

 

The configuration starts of with some settings for ActiveMQ and MongoDB. Please download and have both these products running with default settings. Next we configure a Spring Integration jms channel. This is used to both send objects as well as to register listeners to process the Queue data. The queue name used is saveToDBQueue. Next we configure a int-mongodb:outbound-channel-adapter which consumes messages from the same Queue and persists to Mongodb. This will require us to register a custom converter of the type org.springframework.data.mongodb.core.convert.MappingMongoConverter. This converter will be used to convert the Contribution javabean to JSON and back.

Finally take a look at the DataLoaderImpl that is used to load the file (using fft csv parser – very simple API) and send the the Queue defined above (via a MessageChannel object).

To run the class go to src/test/java and look for class LoadDataDriver. Change the file name location to a path that is valid for your environment and run this class. Now if you have downloaded the ALL.zip file this import will take a long time. You can view ActiveMQ queue statistics at http://localhost:8161/admin/queues.jsp. You should see something like this …

On Mongo console (run command mongo to get to the shell) you can type in following to check status…

 

You can download the source from my Github repo at https://github.com/thomasma/spring-integration-demo.