Using Apache Camel and ActiveMQ to Implement Synchronous Request/Response.

Implementing a synchronous request/response pattern with Apache Camel and ActiveMQ is quite a bit easier than you may expected, and has allowed us to leverage our current messaging infrastructure to facilitate synchronous exchanges between applications where we otherwise may have needed to create a new web service.

Below is an example of setting up two Camel endpoints which will demonstrate the request/response pattern.

First, configure the connection to the JMS message broker.  In this case, an ActiveMQ broker is created in-process.

 

Next, set up the producer route.  First a processor is created, which will print the body of the message.  The route will be executed when a file is dropped into the /Users/RobTerpilowski/tmp/in directory, and routed to the robt.test.queue destination.  Once the route has completed, the processor will be executed.  What we are hoping to see is that the message has been modified by the consuming endpoint when this route has completed.  The important piece to note here is the url:
jms-broker:queue:robt.test.queue?exchangePattern=InOut
exchangePattern=InOut tells Camel that the route is a syncrhonous request/response

 

Next, set up the consumer endpoint.  Again, a processor is created which will be run when the route has completed.  This processor will first print the message that the producer sent.  It will then replace the message with a new message saying that the original message was seen.  This endpoint will listen on the robt.test.queue and route the result to the directory /Users/RobTerpilowski/tmp/out.  When the route has completed, the processor will update the message.  If everything works correctly, the producer endpoint should be able to see the modified message.

So now that the routes are set up, it’s time to send a message.  Saving a file in the specified input directory will kick things off.  The file will contain the text “HelloCamel”.

 

$ echo "HelloCamel" > /Users/RobTerpilowski/tmp/in/message.txt

The consumer listening on the robt.test.queue immediately sees the message arrives, and prints the message body.

CONSUMER received message: HelloCamel

 

The producer endpoint then receives the modified message back, with confirmation that the consumer endpoint did indeed see the message.

PRODUCER Received response: I Saw it!!! It contained: HelloCamel

 

Make Sure Your Server Clocks are in Sync!

As I finished my first test services that would utilize the Request/Response pattern I created an integration test where they connected to messaging broker that was running in-process.  Things looked great, and the services were communicating without any issues.  I deployed the services to a Wildfly instance running locally, which were pointing at a messaging broker on our staging server.  However, this time when I started my test, the requests were consistently timing out, and never making it back from the second service.  I literally spent the entire day deconstructing each service piece by piece to see what was going on.  I then remembered something about clock synchronization in the Camel JMS documentation.  I checked both the clock on the VM and the clock on the staging server and proceeded to do a face palm when I saw there was a four hour difference, lesson learned!

 

twitter: @RobTerpilowski

Advertisements

JPA java.lang.IllegalArgumentException: No query defined for that name (Solved)

I’ve recently been working with the [Camel JPA] (http://camel.apache.org/jpa.html) component for moving data from one of our SQL servers to our messaging system.

We have a number of Entity POJOs defined that contain a named query which the JPA component uses in order to query the database to select the appropriate records that need to be processed. Everything was working great and I decided to move these beans to a separate library that could be shared with other applications. However, once I did this the original application started encountering the following error.

java.lang.IllegalArgumentException: No query defined for that name [AllinboundMessagesSqlBean.findByProcessed]

I checked the classpath and the beans were in fact being found, but the named queries on the beans were not being found. It took some research, but the solution to the problem ended up proving to be very simple.

The change was to explicitly add the entity classes to the application’s persistance.xml

com.lynden.json.beans.AllinboundLoopBean
com.lynden.json.beans.AllinboundMessagesSqlBean

Once the classes were defined in the file, the app was then able to find these entity beans that were in a separate .jar. Hopefully this will help others out there who may have run into a similar issue.

twitter: @RobTerpilowski

Monitoring Real-Time Commodity Prices using JavaFX, NetBeans RCP, and Camel

Zoi Capital is a commodity investment firm which trades in the commodity futures markets on behalf of clients, with offices in New York and Seattle. We needed an application which could display the commodities we were currently holding as well as show any open profit/loss of our trades in real-time. In addition, we wanted to display the current performance of our trading strategy (named Telio) along with a comparison of the current performance of the S&P 500 stock index as well as the Dow Jones UBS commodity index.

Trades are placed from the Seattle office, but are monitored throughout the day from the New York office, so the application (which would be running in New York) needed a way to stay up to date with the current trades. The application also needed to be aesthetically pleasing as it was planned to put it on a large 50 inch LCD in the reception area of our New York office, where both staff and visitors could view the current trades and statistics in real time.

I had previously written an automated trading application on the NetBeans Rich Client Platform (RCP), where I had created a number of plug-ins, including a plug-in to connect to Interactive Brokers to retrieve real-time market data. Since I already had the plug-ins available in order to connect to a real-time data feed, it seemed a natural choice to also build the Quote Monitor application on the NetBeans RCP as well. Instead of using the existing Swing components however, I opted for JavaFX in order to give the application a polished look.

In order to get the trade information from the Seattle office to the Commodity Monitor application in the New York office, we made use of Camel to facilitate the communication between the 2 offices. The great thing about Camel is that it provides an abstraction layer for the actual communication mechanism between applications. Since the offices are not networked together we made use of the Camel email component in order to transfer the data from the Seattle office to the Commodity Monitor application. In the future we could move the communication mechanism to web services or JMS simply by changing a property file, with no code changes required as camel takes care of everything else under the hood.


System Architecture

enter image description here

Trades are placed in the Seattle office, and then emailed to a designated email box which the Commodity Monitor watches (via Camel). Trade information is then imported into the application, at which point it requests real-time quote information of the commodities from Interactive Brokers via their Java API. At this point the application can then update the profit/loss statistics in real-time.


Application Screen Shot

enter image description here

The grid in the top left portion of the screen displays the performance for our Telio trading strategy for today, for the month of August, and then the year-to-date return of the strategy. The table also shows the same statistics for the S&P 500 stock index and Dow Jones/UBS commodity index for comparison purposes.

Below the table is a candlestick chart displaying the performance of the S&P 500 Index futures for the current day. The chart made use of the charting API in JavaFX as well as CSS. The chart is updated in real-time throughout the day.

Finally, on the right half of the screen is a panel which displays the commodities that we are currently holding with current profit/loss on the trade. For example, we have a current profit of +0.18% since we bought natural gas.

To add additional eye candy to the application, I created a scrolling background with a slightly blurred Zoi Capital logo. The animation was extremely easy to set up in JavaFX, and I’ll post a short how-to blog on animations in the not-too-distant future.


Demo Video

Below is a 3 minute video demo showing the Commodity Monitor application with the animated scrolling background. About 40 seconds into the video an email is sent to the Camel email box, at which point the Commodity Monitor picks up the email and displays the commodities that were sent, and their corresponding profit/loss in real time. Another email is sent at the 2:10 mark that clears most of the commodities from the application.

 

twitter: @RobTerpilowski

Writing to a NoSQL DB using Camel

We use a somewhat out of the ordinary NoSQL database called “Universe“, produced by a company called Rocket as our primary data store. We have written our own ORM framework to write data to the DB from Java beans that we have dubbed “siesta” as it is a lightweight hibernate-like framework.

Camel is a great framework for implementing Enterprise Integration Patterns (EIP), and we have started making heavy use of the various Camel components in order to pass data in varying formats between internal and 3rd party systems.
While there are large number of components available out of the box available here, there are no components available for writing data to UniVerse

Fortunately it is extremely easy to implement custom Camel components, and we were able to create a component to write to UniVerse with a few classes and one configuration file.

For the Camel endpoint URI, we would like to use the following format:

siesta://com.lynden.siesta.component.FreightBean?uvSessionName=XDOCK_SHARED

where:

siesta:// denotes the component scheme,

com.lynden.siesta.component.FreightBean denotes the annotated POJO that the Siesta framework will use to persist the data to UniVerse.

uvSessionName=XDOCK_SHARED tells the component which database session pool to use when connecting to the DB.


The Endpoint Class

package com.lynden.siesta.component;

import com.lynden.siesta.BaseBean;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;

/**
 * Represents a Siesta endpoint.
 */
@UriEndpoint(scheme = "siesta" )
public class SiestaEndpoint extends DefaultEndpoint {

    @UriParam
    protected String uvSessionName = "";

    Class<? extends BaseBean> siestaBean;

    public SiestaEndpoint() {
    }

    public SiestaEndpoint(String uri, SiestaComponent component) {
        super(uri, component);
    }

    public SiestaEndpoint(String endpointUri) {
        super(endpointUri);
    }

    @Override
    public Producer createProducer() throws Exception {
        return new SiestaProducer(this);
    }

    @Override
    public Consumer createConsumer(Processor processor) throws Exception {
        return null;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    public void setSiestaBeanClass( Class<? extends BaseBean> siestaBean) {
        this.siestaBean = siestaBean;
    }

    public Class<? extends BaseBean> getSiestaBeanClass() {
        return siestaBean;
    }

    public String getUvSessionName() {
        return uvSessionName;
    }

    public void setUvSessionName(String uvSessionName) {
        this.uvSessionName = uvSessionName;
    }
}

The Component Class
The next step is to create a class to represent the component itself. The easiest way to do this is to extend the org.apache.camel.impl.DefaultComponent class and override the createEndpoint() method.

import com.lynden.siesta.BaseBean;
import java.util.Map;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;

public class SiestaComponent extends DefaultComponent {

    @Override
    protected Endpoint createEndpoint(String uri, String path,    Map<String, Object> options) throws Exception {

    SiestaEndpoint endpoint = new SiestaEndpoint(uri, this);
    setProperties(endpoint, options);

    Class<? extends BaseBean> type = getCamelContext().getClassResolver().resolveClass(path, BaseBean.class, SiestaComponent.class.getClassLoader());

   if (type != null) {
       endpoint.setSiestaBeanClass(type);
    }
    return endpoint;
    }
}

The createEndpoint method takes as arguments, the uri of the component, the path, which includes the “com.lynden.siesta.component.FreightBean” portion of the URI, and finally the options, which include everything after the “?” portion of the URI.

From this method we use reflection to load the BaseBean class specified in the URI, and pass it into the SiestaEndpoint class that was created in the previous step.


The Producer Class

import com.lynden.siesta.BaseBean;
import com.lynden.siesta.EntityManager;
import com.lynden.siesta.IEntityManager;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The Siesta producer.
 */
public class SiestaProducer extends DefaultProducer {
    private static final Logger LOG = LoggerFactory.getLogger(SiestaProducer.class);
    private SiestaEndpoint endpoint;
    private IEntityManager entityManager;
    private String uvSessionName;

    public SiestaProducer(SiestaEndpoint endpoint) {
        super(endpoint);
        this.endpoint = endpoint;
        uvSessionName = endpoint.getUvSessionName();
        entityManager =  EntityManager.getInstance(uvSessionName);

    }

    @Override
    public void process(Exchange exchange) throws Exception {
        BaseBean siestaBean = exchange.getIn().getBody( BaseBean.class );
        entityManager.createOrUpdate(siestaBean);
        LOG.debug( "Saving bean " + siestaBean.getClass() + " with ID: "+ siestaBean.getId() );
    }

}

The Config File

The final step is to create a configuration file in the .jar’s META-INF directory which will allow the Camel Context to find and load the custom component. The convention is to put a file named component-name (“siesta” in our case) in the META-INF/services/org/apache/camel/component/
directory of the component’s .jar file

The META-INF/services/org/apache/camel/component/siesta file contains 1 line to tell the Camel Context which class to load:

class=com.lynden.siesta.component.SiestaComponent

That’s it, With 3 relatively simple classes and a small config file we were able to easily implement our own Camel producer using our NoSQL database as an endpoint.

twitter: @RobTerp

Written with StackEdit.

CamelOne 2012 Summary

camelone2012

I’ve just returned from the CamelOne Open Source Integration Conference in Boston where I was invited as a speaker to present how Lynden is using ActiveMQ as part of its freight tracking infrastructure.  As expected, there were a number of big announcements and very interesting projects that speakers from all over the world showcased during the 2 day conference.  Here are few new projects/applications that I found may be useful here at Lynden in particular.


I thought this was by far the most interesting project announced during the conference.  Fuse Fabric is a distributed configuration, management and provisioning system for Fuse ESB, Fuse Message Broker and the Apache open source integration solutions (ActiveMQ, Camel, CXF, Karaf & ServiceMix).  Basically providing a single point where configuration information for brokers (and other applications) can go to retrieve their config info.   Under the covers it is making use of Apache ZooKeeper to store the cluster configuration and node registration.

We are currently only running 1 production message broker, but we have 8 message brokers in our test environment.  Fuse Fabric could greatly reduce the overhead of managing the configuration of those brokers.

    Apache ZooKeeper
As it seems at these conferences, there are always ideas flying around among the attendees and I always seem to obtain just as many new ideas attending sessions as I do talking to other architects and developers outside the sessions.  I was explaining to someone the challenge we have with managing the configurations of our 70+ web applications and services in our production, staging and test environments.  His immediate response was to take a look at Apache ZooKeeper.  They had been using it the past few months for managing configuration of multiple environments with good success.  As mentioned above, Fuse Fabric makes use of Apache ZooKeeper, and I’m curious to learn if Fabric can be used beyond the integration apps it mentions (ESB, MQ, etc), and also used for things such as our web applications and services.

Fuse Management Console
Provides a toolf for managing large-scale deployments of Fuse ESB Enterprise and Fuse MQ Enterprise.  A FuseSource subscription is required to install the console, which provides the following benefits:

  • Simplified management– key status metrics and commands are accessible from a unified, centralized console
  • Automatic provisioning– dependencies between components and include files are tracked and managed
  • Profile management– simplify the creation and management of customized brokers by defining configuration profiles
  • Automated updates– updates to a single component are automatically deployed to the appropriate brokers based on the profile
  • Local or Cloud deployment – your integration infrastructure can be deployed and managed locally or in the Cloud

MQTT protocol support in ActiveMQ 5.6
MQTT protocol has come a little late for Lynden since we already have a solution for connecting Windows CE devices to ActiveMQ.  MQ Telemetry Transport (MQTT) is a machine to machine protocol design as an extremely lightweight publish/subscribe messaging transport.  ActiveMQ 5.6 and Apollo 1.0 will both support MQTT.

Fuse HQ
Fuse HQ is a SOA management and monitoring system and is available with a FuseSource support subscription.  Fuse HQ takes advantage of the JMX-based reporting capabilities of the Fuse products.  Some of the features Fuse HQ provides are:

Monitor systems– comprehensive cross-stack dashboard make it easy to check the health of systems and services

  • Role-based access controls for managing user visibility
  • Visually review relationships between FuseSource servers and other managed resources
  • Explore and generate reports of real-time and historic logs, configurations, and events
  • Impact analysis and change control based on configuration tracking
  • Collect, chart and view real-time and historic metrics from hardware, network and applications without invasive instrumentation
  • Auto-discover hardware and software attributes including configuration, version numbers, memory, CPU, disk and network devices
  • Distributed monitoring allows for linear scalability of Fuse HQ server

Manage systems– execute control operations to manage the FuseSource environment

  • Role-based access controls for managing user permissions
  • External authentication leverages existing LDAP or Kerberos directories
  • Automate maintenance activities with scheduled controls and conditional responses to alerts
  • High availability allows multiple Fuse HQ servers to assume workloads in the event of a server failure

Intelligent Alerts– definable alerts proactively identify problems before they occur

  • Follow-the-sun assignments routs alerts to the appropriate person based on the time of day and users’ scheduled availability
  • Integrate traps with existing IT operations suite
  • Define alerts once and apply to large, global resource groups
  • Alerts can be multiconditional, role-based and group based, and trigger recovery processes
  • Generate alerts on changes in configuration or key attributes of any managed resource

twitter: @RobTerp