© Adam L. Davis 2020
A. L. DavisSpring Quick Reference Guidehttps://doi.org/10.1007/978-1-4842-6144-6_13

13. Spring Integration

Adam L. Davis1 
(1)
Oviedo, FL, USA
 

Spring Integration is a programming model to support the well-known enterprise integration patterns.

Features

Spring Integration implements many common enterprise integration patterns,1 such as Channel, Aggregator, Filter, and Transformer, and provides an abstraction over many different messaging implementations.
../images/498572_1_En_13_Chapter/498572_1_En_13_Fig1_HTML.jpg
Figure 13-1

Enterprise Integration

Spring Integration provides a messaging paradigm to separate an application into components that communicate without knowing about each other. In addition to wiring together fine-grained components, Spring Integration provides many channel adapters and gateways to communicate with external systems. Channel adapters are used for one-way integration (send or receive), and gateways are used for request/reply scenarios (inbound or outbound).

Messaging supported includes but is not limited to
  • REST/HTTP

  • FTP/SFTP

  • Twitter

  • Web Services (SOAP)

  • TCP/UDP

  • JMS

  • RabbitMQ

  • Email

../images/498572_1_En_13_Chapter/498572_1_En_13_Figa_HTML.jpg Spring Cloud Integration

The Spring Cloud Stream project builds on Spring Integration, where Spring Integration is used as an engine for message-driven microservices. This is covered in Chapter 18.

Getting Started

The easiest way to get started is using the Spring Initializr or Spring Boot CLI to create a new project (they are covered in Chapter 15). In an existing project, add the following dependencies:
implementation 'org.springframework.boot:spring-boot-starter-integration'
testImplementation 'org.springframework.integration:spring-integration-test'
Then also include any other Spring Boot starters or other libraries you will need for your project, for example:
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'

This brings in the Spring Boot starter for AMQP and spring-rabbit-test for testing integration with RabbitMQ.

Then, in a Spring Boot application, add the @EnableIntegration annotation to one of your configuration classes which does the following:
  • Registers some built-in beans, such as errorChannel and its LoggingHandler, taskScheduler for pollers, jsonPath SpEL-function, and others.

  • Adds several BeanFactoryPostProcessor instances.

  • Adds several BeanPostProcessor instances to enhance or convert and wrap particular beans for integration purposes.

  • Adds annotation processors to parse messaging annotations and registers components for them with the application context.

You can also use @IntegrationComponentScan to scan the classpath for Spring Integration–specific annotations such as the @MessagingGateway annotation.

Putting it all together, your main application class might look like the following:
@EnableIntegration
@IntegrationComponentScan
@SpringBootApplication
public class SpringIntApplication {
        public static void main(String[] args) {
                SpringApplication.run(SpringIntApplication.class, args);
        }
}

Adding Additional Support

In general, when you want to use Spring Integration with a particular technology, such as JPA, you include the additional artifact named spring-integration-X under the org.springframework.integration groupId, for example, for Kafka:2
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>3.3.0.RELEASE</version>
</dependency>
Some of the available supports:

JPA

HTTP

JDBC

JMS

Mail

MongoDB

Kafka

Redis

Resource

RMI

RSocket

SFTP

STOMP

Stream

Syslog

TCP and UDP (ip)

WebFlux

Web Services

XML

XMPP

Message Gateway

A Message Gateway is the abstraction Spring Integration used over an existing messaging technology which allows your code to interact with an interface without needing to know about the underlying channel. When you annotate an interface with @MessagingGateway and annotate one or more methods with @Gateway annotations, Spring implements the interface with a proxy at runtime using the underlying technology from the support artifact you included.

For example, for a Kafka Message Gateway:
//Use the following imports:
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.Gateway;
import org.springframework.kafka.support.KafkaHeaders;
@MessagingGateway
public interface KafkaGateway {
    @Gateway(requestChannel = "toKafka.input")              \\1
    void sendToKafka(String payload,
                     @Header(KafkaHeaders.TOPIC) String topic);                                \\2
    @Gateway(replyChannel = "fromKafka", replyTimeout = 10000)  \\3
    Message<?> receiveFromKafka();
}
  1. 1.

    Define the requestChannel to send data, in this case, a String payload.

     
  2. 2.

    Using @Header defines a header, in this case, the Kafka topic the message payload will be sent to.

     
  3. 3.

    Define the replyChannel, which can be used to get messages from Kafka. Note the return type is Spring’s Message<T> interface, an abstraction which can be used for any messaging system. The replyTimeout is in milliseconds, so here it is ten seconds.

     
Assuming everything is set up correctly, Spring Integration implements the KafkaGateway interface as a Spring Bean at runtime, so it can be invoked in the following way:
KafkaGateway kafkaGateway = context.getBean(KafkaGateway.class);
String message = "any message";
String topic = "topic";
kafkaGateway.sendToKafka(message, topic);

IntegrationFlow

There are two main ways to create flows (implementing the IntegrationFlow interface), either using a lambda expression or using the fluent builder DSL starting with the IntegrationFlows class .

In the first instance, we are taking advantage of the fact that IntegrationFlow is a SAM (Single Abstract Method) interface, so a lambda expression with one parameter can be supplied, and Java will know it implements the interface from the return type, for example:
    @Bean
    public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
        return flowDefinition -> flowDefinition
                .handle(Kafka.outboundChannelAdapter(kafkaTemplate)
                        .messageKey("si.key"));
    }
The IntegrationFlows class can be used to create an IntegrationFlow, for example:
    @Bean
    public IntegrationFlow fromKafkaFlow(
                   ConsumerFactory<?, ?> consumerFactory) {
        return IntegrationFlows
          .from(Kafka.messageDrivenChannelAdapter(consumerFactory, topic))
          .channel((Channels c) -> c.queue("fromKafka"))
          .get();
    }
The IntegrationFlows.from static method returns an IntegrationFlowBuilder which extends IntegrationFlowDefinition and has the “get()” method which returns a new instance of StandardIntegrationFlow. The methods on IntegrationFlowDefinition allow you to fluently build an IntegrationFlow and include the following:
  • aggregate – Aggregator-specific implementation of AbstractCorrelatingMessageHandler, a message handler that holds a buffer of correlated messages in a MessageStore. It takes care of correlated groups of messages that can be completed in batches.

  • barrier  – A message handler that suspends the thread until a message with corresponding correlation is passed into the trigger method or the timeout occurs.

  • bridge – A simple MessageHandler implementation that passes the request Message directly to the output channel without modifying it. The main purpose of this handler is to bridge a PollableChannel to a SubscribableChannel or vice versa.

  • channel – Defines methods for sending messages.

  • claimCheckIn – Populates the MessageTransformingHandler for the ClaimCheckInTransformer with the provided MessageStore.

  • claimCheckOut – Populates the MessageTransformingHandler for the ClaimCheckOutTransformer with the provided MessageStore.

  • controlBus – Populates the Control Bus EI Pattern–specific MessageHandler implementation at the current IntegrationFlow chain position.

  • convert – Populates the MessageTransformingHandler instance for the provided payloadType to convert at runtime.

  • delay  – Populates a DelayHandler to the current integration flow position.

  • enrich – Populates a ContentEnricher to the current integration flow position with provided options. ContentEnricher is a Message Transformer that can augment a message's payload with either dynamic or static values.

  • enrichHeaders – Populates a MessageTransformingHandler that adds statically configured header values to a Message.

  • filter – A MessageFilter only passes to filter’s output channel if the message passes a given MessageSelector.

  • fixedSubscriberChannel – Populates an instance of FixedSubscriberChannel (a specialized SubscribableChannel for a single final subscriber set up during bean instantiation) at the current IntegrationFlow chain position.

  • fluxTransform – Populates a FluxMessageChannel to start a reactive processing for upstream data, wrap it to a Flux, apply the provided Function via Flux.transform(Function), and emit the result to one more FluxMessageChannel, subscribed in the downstream flow.

  • gateway – Populates the "artificial" GatewayMessageHandler for the provided subflow or channel.

  • handle  – Populates a ServiceActivatingHandler for the provided MessageHandler or MessageProcessorSpec bean and method-name.

  • headerFilter – Provides the HeaderFilter to the current StandardIntegrationFlow.

  • log – Populates a WireTap for the current message channel and uses a LoggingHandler, a MessageHandler implementation that simply logs the Message or its payload.

  • logAndReply – This operator can be used only in the end of the flow. Does the same as “log” method. Returns IntegrationFlow.

  • nullChannel – Adds a bean into this flow definition as a terminal operator. Returns IntegrationFlow.

  • publishSubscribeChannel – The PublishSubscribeChannel (a channel that sends Messages to each of its subscribers) BaseIntegrationFlowDefinition.channel(java.lang.String) method–specific implementation to allow the use of the “subflow” subscriber capability.

  • resequence  – Populates a ResequencingMessageHandler, which resequences messages, using a buffer of correlated messages in a MessageStore.

  • route – Many different variations of this method exist. They populate the MethodInvokingRouter, or an ExpressionEvaluatingRouter if provided a SpEL expression, which then determines a MessageChannel or channel name to use.

  • routeByException – Can route messages by Exception type.

  • routeToRecipients – Populates the RecipientListRouter with options from the RecipientListRouterSpec, which sends messages on multiple channels.

  • scatterGather – Populates a ScatterGatherHandler to the current integration flow position based on the provided MessageChannel for scattering function and AggregatorSpec for gathering function.

  • split – Populates either the MethodInvokingSplitter to evaluate the provided method of the service at runtime or the ExpressionEvaluatingSplitter with the provided SpEL expression. A splitter splits message into multiple Messages.

  • transform  – Populates the MessageTransformingHandler instance for the provided GenericTransformer.

  • trigger – Populates a ServiceActivatingHandler instance to perform MessageTriggerAction.

  • wireTap – Populates the Wire Tap EI Pattern–specific ChannelInterceptor implementation to the currentMessageChannel.

This is by no means exhaustive. Most of these methods have several overloaded variations.

Kafka Config

You can then configure the Spring Integration Kafka–specific settings in application.yml under spring.kafka.consumer and spring.kafka.producer, for example:
spring:
  kafka:
    consumer:
      group-id: siTestGroup
      auto-offset-reset: earliest
      enable-auto-commit: false
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      batch-size: 16384
      buffer-memory: 33554432
      retries: 0
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Listing 13-1

application.yml

../images/498572_1_En_13_Chapter/498572_1_En_13_Figb_HTML.jpg Installing Kafka is left as an exercise for the reader. Go to https://kafka.apache.org/quickstart and follow the instructions. Then follow the content of this chapter to set up Spring Integration.

Topics

Since we are using Kafka, we also need to create the topics initially.

Since we’re using Spring Boot with auto-configuration, Spring Boot's auto-configured KafkaAdmin (from spring-integration-kafka) will provision the topics for us if we provide NewTopic Spring Beans, for example:
    @Bean
    public NewTopic topic() {
        return new NewTopic("topic", 5, (short) 1);
    }
    @Bean
    public NewTopic newTopic() {
        return new NewTopic("topic2", 5, (short) 1);
    }

This would create two topics named “topic” and “topic2” with replication of 1 (meaning only one copy is stored) and 5 partitions, meaning the data will be split into five partitions.

Monitoring

By default, if a Micrometer meterRegistry bean is present, which will be the case in a Spring Boot project with Spring Actuator included, Spring Integration metrics will be managed by Micrometer. If you wish to use legacy Spring Integration metrics, add a DefaultMetricsFactory (from Spring Integration) bean to the application context.