Spring Integration – Message Gateway Adapters

Introduction

This article is going to present a technique for handling response types from message sub-systems in a service-based Message Driven Architecture (MDA). It will provide some theory, experience of message sub-system development and some robust working code that’s running in several financial  institution production deployments.

As Spring Integration is underpinned by a well established set of Spring APIs familiar to most developers, I have chosen this as a basis for this article.

If developers only ever had to build integration flows to handle positive business responses, this article would not be necessary. However, real world development, particularly within service integration environments, results in developers having to deal not just with positive business responses but business exceptions, technical exceptions and services that become unresponsive.

Spring Integration Gateways

Spring Integration (SI) Gateways are entry points for message sub-systems. Specify a gateway using a Java interface and a dynamic proxy implementation is automatically generated by the SI framework at runtime such that it can be used by any beans in which it’s injected. Incidentally, SI gateways also offer a number of other benefits such as request & reply timeout behaviour.

Typical Message Driven Architecture (MDA) Applications

As with Java design patterns for application development, similar patterns exist for integration applications. One such common integration pattern is service-chaining – a number of services are connected in series or parallel that together perform a business function. If you’ve built and deployed services using MuleSoft’s Mule, SpringSource’s Spring Integration, FuseSource’s ServiceMix (Camel), Oracle’s Service Bus or IBMs WebSphere ESB the chances are that you’ve already built an application using chained-services. This pattern will become ever more widespread as the software industry moves away from client-server topologies towards service based architectures.

A recent engagement in which I provided architectural consultancy will be used as an example  implementation of the service-chain application pattern. The engagement required the team to build a solution that would transition financial messages (SWIFT FIN) from a raw (ISO15022) state through binding, validation and transformation (into XML) and ultimately added to a data store for further processing or business exception management.

Using EIP graph notation, the first phase service composition could be represented as follows. A document message arrives into the application domain, it’s parsed and bound to a Java object, semantically validated as a SWIFT message, transformed to XML and then stored in a datastore. As a side note, the binding, semantic validation and transformation are all performed by C24s iO product. Furthermore, the full solution sample that can be found in GitHub for this project contains dispatcher configurations in order that thread pools can be used for each message sub-system, for the sake of brevity and clarity, such details will be omitted from this article.

Using EIP graph notation, the first phase service composition could be represented as follows. A document message arrives into the application domain, it’s parsed and bound to a Java object, semantically validated as a SWIFT message, transformed to XML and then stored in a datastore. As a side note, the binding, semantic validation and transformation are all performed by C24s iO product. Furthermore, the full solution sample that can be found in GitHub for this project contains dispatcher configurations in order that thread pools can be used for each message sub-system, for the sake of brevity and clarity, such details will be omitted from this article.

Although this configuration specifies a chain of services it’s not adequate to form the bases of a robust production deployment. An exception thrown by any of the services would be thrown straight back to the entry gateway thus loosing context, i.e. which service threw the exception, any non-response code invoked by the service may result in its Java thread getting parked and null values returned by a service may cause unexpected problems or even for the entry gateway to hang; if an entry gateway is used unlike in this diagram.

Unresponsive Service Invocation

The Spring Integration specific construct for accessing a message sub-system is the gateway. Although apparently simple, the SI gateway is a powerful feature that results in generation of a dynamic proxy generated by Spring’s GatewayProxyFactoryBean. This bean can be injected into new or existing code or services as an implementation of the interface. The SI gateway also provides facilities to deal with timeouts and provides some error handling facility. A typical Spring Integration namespace XML configuration is as follows:

<int:channel id="parsing-gw-request-channel" datatype="java.lang.String">
  <int:queue capacity="${gateway.parse.queue.capacity}"/>
</int:channel>

<int:gateway id="parseGateway"
             service-interface=
                 "com.c24.solution.swift.flatten.gateway.ParseGateway"
             default-request-channel="parsing-gw-request-channel"
             default-reply-channel="parsing-gw-reply-channel"
             default-reply-timeout="${gateway.parse.timeout}"/>
<int:channel id="parsing-gw-reply-channel" 
             datatype="biz.c24.io.api.data.ComplexDataObject"/>

Evolving the design, the next phase application architecture design needs to take advantage of these gateways, the Spring Integration context can be extended quite simply by creating a Java interface for each service. Building message sub-system gateways leads us towards a design model like the following:

A small amount of additional configuration and some very simple Java interfaces means that developers can now configure request/reply timeouts on the gateway and avoid any unresponsive code – assuming (as we always should) that code written locally or supplied by 3rd parties is capable of misbehaving.

Additionally, the SI gateway allows specification of an error handling channel so that you have the opportunity to handle errors. This design I am presenting here is not going to use error channels but handle them in a different way – hopefully that will become more obvious as the design evolves.

Specific semantics and configuration examples for gateways and gateway timeouts can be found in reference material provided by SpringSource and other blogs.

A significant benefit to the integration solution has been added with the use of Spring Integration gateways. However, further improvements need to be made for the following reasons:

  1. A gateway timeout will result in a null value being returned to the calling, or outer, flow. This is a business exception condition that means that the payload that was undergoing processing needs to be pushed into a business exception management process in order that it can be progressed through to conclusion – normally dictated by the business. If a transient technical issue caused this problem, resubmission may solve the exception, otherwise further investigation will be required. Whatever the eventual outcome, context about the failure and it’s location need to be recorded and made available to the business exception management operative. The context must contain information about the message sub-system that failed to process the message and the message itself.
  2. Exceptions generated by message sub-systems can be handled in a few different ways, through the error channel inside the gateway undergoing message processing failure or by the calling flow. Again, context needs to be recorded. In this case, the technical exception needs to be added to the context, along with the gateway processing the message undergoing failure and any additional information that may be used within the business exception management process for resolution.

If transactions were involved in this flow, for example a transactional JMS message consumption flow trigger, it would be possible to rollback the transaction in order that it can be re-queued to a DLQ. However, the design in this software compensates for failures by pushing failing messages to a destination configured by the developer directly; and this may of course be a JMS queue if that’s required. This avoids transaction rollback and adds the benefit of exception context directly rather than operations staff and developers having to scour logs to locate exception details.

The Gateway Adapter

In order to all handle exceptions taking place within a message sub-system and also handle null values a Gateway Adapter class can be used. The reason that the SI gateway error channel is not used for this purpose is that it would be complex to define and would have to be done in several places. The gateway adapter allows all business exception conditions to be handled in one place and treat them in the same way. The Gateway Adapter is a custom written Spring bean that invokes the injected gateway directly and manages null and exception responses before allowing the invocation request to return to the caller (or outer flow).

The architectural design diagram evolved from the previous design phase includes a service activator backed by a Gateway Adapter bean, this calls the injected gateway (dynamic proxy) which calls the business service.

Nuts and Bolts

The design diagrams are a useful guide for making the point but maybe the more interesting part is the configuration and code itself. As with the entire solution, the outer or calling flow can be seen in the project located in GitHub, however a useful snippet to view is one of the gateway adapter namespace configurations, the gateway, the gateway adapter and the gateway service configuration. As a number of gateways exist in this application, and they all follow the same pattern, the following configuration and code will merely demonstrate one of them.

Gateway Adapter Namespace Configuration

 
<int:channel id="message-parse-channel" datatype="java.lang.String"/>
<int:chain input-channel="message-parse-channel" 
           output-channel="message-validate-channel">
    <int:service-activator ref="parseGatewayService" method="service"/>
</int:chain>

Gateway

package com.c24.solution.swift.flatten.gateway;

import org.springframework.integration.Message;

/**
 * @author Matt Vickery - matt.vickery@incept5.com
 * @since 17/05/2012
 */
public interface ParseGateway {
    public Message<?> send(Message<String> message);
}

GatewayAdapter

package com.c24.solution.swift.flatten.gateway.adapter;

import biz.c24.io.api.data.ComplexDataObject;
import com.c24.solution.swift.flatten.exception.ExceptionContext;
import com.c24.solution.swift.flatten.exception.ExceptionSubContext;
import com.c24.solution.swift.flatten.gateway.ExceptionGatewayService;
import com.c24.solution.swift.flatten.gateway.ParseGateway;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.Message;
import org.springframework.util.Assert;

/**
 * @author Matt Vickery - matt.vickery@incept5.com
 * @since 17/05/2012
 */
public class ParseGatewayService extends AbstractGatewayService {

  private static final Logger LOG =
      LoggerFactory.getLogger(ParseGatewayService.class);
  private final ParseGateway parseGateway;
  private final ExceptionGatewayService exceptionGatewayService;

  public ParseGatewayService(
         final ParseGateway parseGateway,
         final ExceptionGatewayService exceptionGatewayService) {
      this.parseGateway = parseGateway;
      this.exceptionGatewayService = exceptionGatewayService;
  }

  public Message<ComplexDataObject> service(Message<String> message) {

      Message<?> response;
      try {
          LOG.debug("Entering parse gateway.");
          response = parseGateway.send(message);
      } catch (RuntimeException e) {
          LOG.error("Exception response .. {}, exception: {}", getClass(), e);
          LOG.error("Invoking ... process because: {}.", e.getCause());
          buildExceptionContextAndDispatch(
            message,
            ExceptionContext.PARSE_FAILURE,
            ExceptionSubContext.EXCEPTION_GATEWAY_RESPONSE,
            exceptionGatewayService);
          throw e;
      }

      if (response != null) {
        if (!(response.getPayload() instanceof ComplexDataObject))
            throw new IllegalStateException(INTERRUPTING_..._EXCEPTION);
      } else {
        LOG.info("Null response received ....", getClass());
        buildExceptionContextAndDispatch(
          message,
          ExceptionContext.PARSE_FAILURE,
          ExceptionSubContext.NULL_GATEWAY_RESPONSE,
          exceptionGatewayService);
        throw new GatewayAdapterException(NULL_GATEWAY_RESPONSE_CAUGHT);
      }

      Assert.state(response.getPayload() instanceof ComplexDataObject);
      return (Message<ComplexDataObject>) response;
  }
}

GatewayService

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:c24="http://schema.c24.biz/spring-integration"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/integration
 http://www..org/schema/integration/spring-integration.xsd  http://schema.c24.biz/spring-integration
 http://schema.c24.biz/spring-integration.xsd
 http://www.springframework.org/schema/beans
 http://www.springframework.org/schema/beans/spring-beans.xsd">

    <int:channel id="parsing-gw-request-channel" 
                 datatype="java.lang.String">
        <int:queue capacity="${gateway.parse.queue.capacity}"/>
    </int:channel>
    <int:gateway
      id="parseGateway"
      service-interface="com.c24.solution.swift.flatten.gateway.ParseGateway"
      default-request-channel="parsing-gw-request-channel"
      default-reply-channel="parsing-gw-reply-channel"
      default-reply-timeout="${gateway.parse.timeout}"/>
    <int:channel id="parsing-gw-reply-channel" 
                 datatype="biz.c24.io.api.data.ComplexDataObject"/>

    <int:chain input-channel="parsing-gw-request-channel" 
               output-channel="parsing-gw-reply-channel">
        <int:poller fixed-delay="50" 
                    task-executor="binding-thread"/>
        <c24:unmarshalling-transformer
          id="c24Mt541UnmarshallingTransformer"
          source-factory-ref="textualSourceFactory"
          model-ref="mt541Model"/>
    </int:chain>

</beans>

Summary

Whatever type of message based integration system you are designing, once you get beyond the use of sample code for prototyping a technology, and let’s face it, there are millions of lines of code out there that are copied from trivial examples, you need to consider invocation in the face of technical exceptions, business exceptions & non-responsive services. Using Spring Integration Gateways to represent entry to message sub-systems means that we must be able to cope with all of these types of behaviour. The Gateway Adapter pattern allows a single, central processing location for such conditions.

The intent for the Gateway Adapter should be clear from the example configuration and code provided but can also be run and tested using the full source located on GitHub. Please leave any feedback or comments as you see fit.

Spring Integration – Input Channel Definition

Input Channels

In a pipes and filters architecture, pipes are connectors or channels. Although at first sight trivial, channels are semantically rich – they allow typing, synchronous and asynchronous input, direct and multicast notifications, send and wait (rendezvous) as well as queued input and wrapping by adapters.

To define or not to define?

Explicit specification or definition of the Spring Integration input-channel is not mandatory for SI constructs to operate. The framework will create input channels automatically if they are not explicitly defined in configuration, but what are the pros and cons of this aproach?

Take the following chain as an example:

Screen Shot 2013-09-16 at 13.58.08

The input-channel named “processing-channel” will get created automatically as it’s not explicitly defined here. In a trivial configuration such as this one there’s very little difference between including the channel explicitly or using the frameworks implicit creation. In larger configurations the extra lines taken by a dozen or more channel definitions may start to make the context appear a little cluttered. In this case it’s then possible to consider decomposing the configuration but sometimes you just can’t decompose those flows into distinct contexts, all of the constructs naturally belong together in a single context.

Channel Typing

One of the features of Spring Integration channels is that they can be strongly type matched with payload object types. It’s worth considering adopting a convention for specifying the payload type on the channel because not only does this provide strong payload typing but improved better confidence that whomever is reading the flow after its built can readily see which type of objects are traversing the flows.

Of course channels can’t always be strongly typed but in many cases they can. Here’s an example of one that can:

Screen Shot 2013-09-16 at 13.59.00

You can see that on the first line of this Spring Integration configuration the contract for operation specifies that a java.util.UUID type object is expected on the input channel. In this case the payload contained a claim-check identifier and is to be replaced with an arbitrary string for the example.

In the case where a channel is strongly typed and the contract broken an exception will be thrown, here’s an example of what you’ll see: Screen Shot 2013-09-16 at 13.59.44

In this example I changed the datatype to java.util.Map and ran a test with a payload that’s actually a java.util.UUID. That was the start of the stack trace that was generated when the exception was thrown.

It’s possible to provide a collection of message types for channel specification. Any messages entering the channel must conform to at least one of the specified types otherwise a MessageDeliveryException is generated. The following example shows a change that will prevent the exception above. The configuration now allows for two types of message payload, java.util.Map and java.util.UUID to enter the channel.

Screen Shot 2013-09-16 at 14.00.34

Spring Integration – Payload Storage via Header Enrichment

There’s often a need to temporarily store transient messages during design of Spring Integration flows – several different mechanisms are available in the toolkit.

It’s pretty straight forward to take a message, use an SI header enricher construct and place the message in the header using a SpEL expression – in fact one for the header key name and one for the payload extraction.

The following SI flow demonstrates an example of how to do just that :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration 
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">
  <int:gateway id="headerManagementGateway"
                 service-interface="com.l8mdv.sample.HeaderManagementGateway"/>
  <int:chain input-channel="request-message-storage-channel"
             output-channel="request-message-retrieval-channel">
    <int:header-enricher>
      <int:header name="#{T(com.l8mdv.sample.HeaderManagementGateway)
                   .REQUEST_PAYLOAD_HEADER_KEY}" expression="getPayload()"/>
    </int:header-enricher>
  </int:chain>

  <int:chain input-channel="request-message-retrieval-channel">
    <int:transformer expression="headers.get(T(com.l8mdv.sample.HeaderManagementGateway)
               .REQUEST_PAYLOAD_HEADER_KEY)"/>
  </int:chain>
</beans>

This example can be executed by implementing a gateway as follows:

package com.l8mdv.sample;

import org.springframework.integration.Message;
import org.springframework.integration.annotation.Gateway;

public interface HeaderManagementGateway {

    public static final String REQUEST_PAYLOAD_HEADER_KEY = "REQUEST_PAYLOAD_HEADER_KEY";

    @Gateway (requestChannel = "request-message-storage-channel")
    public Message<String> send(Message<String> message);
}

and then running a test such as this one:

package com.l8mdv.sample;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static com.l8mdv.sample.HeaderManagementGateway.REQUEST_PAYLOAD_HEADER_KEY;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/header-management.xml"})
public class HeaderManagementIntegrationTest {

    @Autowired
    HeaderManagementGateway headerManagementGateway;

    @Test
    public void locatePayloadInHeader() {
        String payload = "Sample test message.";
        Message<String> message = MessageBuilder.withPayload(payload).build();
        Message<String> response = headerManagementGateway.send(message);

        Assert.assertTrue(response.getHeaders().get(REQUEST_PAYLOAD_HEADER_KEY).equals(payload));
    }

    @Test
    public void locateTransformedPayload() {
        String payload = "Sample test message.";
        Message<String> message = MessageBuilder.withPayload(payload).build();
        Message<String> response = headerManagementGateway.send(message);

        Assert.assertTrue(response.getPayload().contains(payload));
    }
}

For full source code and configuration, see the header-management maven module under  https://github.com/mattvickery/l8mdv-si-samples