Spring Integration – Input Channel Definition

Input Channels

In a pipes and filters architecture, pipes are connectors or channels. Although at first sight trivial, channels are semantically rich – they allow typing, synchronous and asynchronous input, direct and multicast notifications, send and wait (rendezvous) as well as queued input and wrapping by adapters.

To define or not to define?

Explicit specification or definition of the Spring Integration input-channel is not mandatory for SI constructs to operate. The framework will create input channels automatically if they are not explicitly defined in configuration, but what are the pros and cons of this aproach?

Take the following chain as an example:

Screen Shot 2013-09-16 at 13.58.08

The input-channel named “processing-channel” will get created automatically as it’s not explicitly defined here. In a trivial configuration such as this one there’s very little difference between including the channel explicitly or using the frameworks implicit creation. In larger configurations the extra lines taken by a dozen or more channel definitions may start to make the context appear a little cluttered. In this case it’s then possible to consider decomposing the configuration but sometimes you just can’t decompose those flows into distinct contexts, all of the constructs naturally belong together in a single context.

Channel Typing

One of the features of Spring Integration channels is that they can be strongly type matched with payload object types. It’s worth considering adopting a convention for specifying the payload type on the channel because not only does this provide strong payload typing but improved better confidence that whomever is reading the flow after its built can readily see which type of objects are traversing the flows.

Of course channels can’t always be strongly typed but in many cases they can. Here’s an example of one that can:

Screen Shot 2013-09-16 at 13.59.00

You can see that on the first line of this Spring Integration configuration the contract for operation specifies that a java.util.UUID type object is expected on the input channel. In this case the payload contained a claim-check identifier and is to be replaced with an arbitrary string for the example.

In the case where a channel is strongly typed and the contract broken an exception will be thrown, here’s an example of what you’ll see: Screen Shot 2013-09-16 at 13.59.44

In this example I changed the datatype to java.util.Map and ran a test with a payload that’s actually a java.util.UUID. That was the start of the stack trace that was generated when the exception was thrown.

It’s possible to provide a collection of message types for channel specification. Any messages entering the channel must conform to at least one of the specified types otherwise a MessageDeliveryException is generated. The following example shows a change that will prevent the exception above. The configuration now allows for two types of message payload, java.util.Map and java.util.UUID to enter the channel.

Screen Shot 2013-09-16 at 14.00.34

Spring Integration – Payload Storage via Header Enrichment

There’s often a need to temporarily store transient messages during design of Spring Integration flows – several different mechanisms are available in the toolkit.

It’s pretty straight forward to take a message, use an SI header enricher construct and place the message in the header using a SpEL expression – in fact one for the header key name and one for the payload extraction.

The following SI flow demonstrates an example of how to do just that :

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/integration 
       http://www.springframework.org/schema/integration/spring-integration-2.1.xsd">
  <int:gateway id="headerManagementGateway"
                 service-interface="com.l8mdv.sample.HeaderManagementGateway"/>
  <int:chain input-channel="request-message-storage-channel"
             output-channel="request-message-retrieval-channel">
    <int:header-enricher>
      <int:header name="#{T(com.l8mdv.sample.HeaderManagementGateway)
                   .REQUEST_PAYLOAD_HEADER_KEY}" expression="getPayload()"/>
    </int:header-enricher>
  </int:chain>

  <int:chain input-channel="request-message-retrieval-channel">
    <int:transformer expression="headers.get(T(com.l8mdv.sample.HeaderManagementGateway)
               .REQUEST_PAYLOAD_HEADER_KEY)"/>
  </int:chain>
</beans>

This example can be executed by implementing a gateway as follows:

package com.l8mdv.sample;

import org.springframework.integration.Message;
import org.springframework.integration.annotation.Gateway;

public interface HeaderManagementGateway {

    public static final String REQUEST_PAYLOAD_HEADER_KEY = "REQUEST_PAYLOAD_HEADER_KEY";

    @Gateway (requestChannel = "request-message-storage-channel")
    public Message<String> send(Message<String> message);
}

and then running a test such as this one:

package com.l8mdv.sample;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.Message;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import static com.l8mdv.sample.HeaderManagementGateway.REQUEST_PAYLOAD_HEADER_KEY;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/header-management.xml"})
public class HeaderManagementIntegrationTest {

    @Autowired
    HeaderManagementGateway headerManagementGateway;

    @Test
    public void locatePayloadInHeader() {
        String payload = "Sample test message.";
        Message<String> message = MessageBuilder.withPayload(payload).build();
        Message<String> response = headerManagementGateway.send(message);

        Assert.assertTrue(response.getHeaders().get(REQUEST_PAYLOAD_HEADER_KEY).equals(payload));
    }

    @Test
    public void locateTransformedPayload() {
        String payload = "Sample test message.";
        Message<String> message = MessageBuilder.withPayload(payload).build();
        Message<String> response = headerManagementGateway.send(message);

        Assert.assertTrue(response.getPayload().contains(payload));
    }
}

For full source code and configuration, see the header-management maven module under  https://github.com/mattvickery/l8mdv-si-samples