Spring Integration is a programming model to support the well-known enterprise integration patterns.
Features

Enterprise Integration
Spring Integration provides a messaging paradigm to separate an application into components that communicate without knowing about each other. In addition to wiring together fine-grained components, Spring Integration provides many channel adapters and gateways to communicate with external systems. Channel adapters are used for one-way integration (send or receive), and gateways are used for request/reply scenarios (inbound or outbound).
REST/HTTP
FTP/SFTP
Twitter
Web Services (SOAP)
TCP/UDP
JMS
RabbitMQ
Email
Spring Cloud Integration
The Spring Cloud Stream project builds on Spring Integration, where Spring Integration is used as an engine for message-driven microservices. This is covered in Chapter 18.
Getting Started
This brings in the Spring Boot starter for AMQP and spring-rabbit-test for testing integration with RabbitMQ.
Registers some built-in beans, such as errorChannel and its LoggingHandler, taskScheduler for pollers, jsonPath SpEL-function, and others.
Adds several BeanFactoryPostProcessor instances.
Adds several BeanPostProcessor instances to enhance or convert and wrap particular beans for integration purposes.
Adds annotation processors to parse messaging annotations and registers components for them with the application context.
You can also use @IntegrationComponentScan to scan the classpath for Spring Integration–specific annotations such as the @MessagingGateway annotation.
Adding Additional Support
JPA | HTTP | JDBC | JMS |
MongoDB | Kafka | Redis | |
Resource | RMI | RSocket | SFTP |
STOMP | Stream | Syslog | TCP and UDP (ip) |
WebFlux | Web Services | XML | XMPP |
Message Gateway
A Message Gateway is the abstraction Spring Integration used over an existing messaging technology which allows your code to interact with an interface without needing to know about the underlying channel. When you annotate an interface with @MessagingGateway and annotate one or more methods with @Gateway annotations, Spring implements the interface with a proxy at runtime using the underlying technology from the support artifact you included.
- 1.
Define the requestChannel to send data, in this case, a String payload.
- 2.
Using @Header defines a header, in this case, the Kafka topic the message payload will be sent to.
- 3.
Define the replyChannel, which can be used to get messages from Kafka. Note the return type is Spring’s Message<T> interface, an abstraction which can be used for any messaging system. The replyTimeout is in milliseconds, so here it is ten seconds.
IntegrationFlow
There are two main ways to create flows (implementing the IntegrationFlow interface), either using a lambda expression or using the fluent builder DSL starting with the IntegrationFlows class .
aggregate – Aggregator-specific implementation of AbstractCorrelatingMessageHandler, a message handler that holds a buffer of correlated messages in a MessageStore. It takes care of correlated groups of messages that can be completed in batches.
barrier – A message handler that suspends the thread until a message with corresponding correlation is passed into the trigger method or the timeout occurs.
bridge – A simple MessageHandler implementation that passes the request Message directly to the output channel without modifying it. The main purpose of this handler is to bridge a PollableChannel to a SubscribableChannel or vice versa.
channel – Defines methods for sending messages.
claimCheckIn – Populates the MessageTransformingHandler for the ClaimCheckInTransformer with the provided MessageStore.
claimCheckOut – Populates the MessageTransformingHandler for the ClaimCheckOutTransformer with the provided MessageStore.
controlBus – Populates the Control Bus EI Pattern–specific MessageHandler implementation at the current IntegrationFlow chain position.
convert – Populates the MessageTransformingHandler instance for the provided payloadType to convert at runtime.
delay – Populates a DelayHandler to the current integration flow position.
enrich – Populates a ContentEnricher to the current integration flow position with provided options. ContentEnricher is a Message Transformer that can augment a message's payload with either dynamic or static values.
enrichHeaders – Populates a MessageTransformingHandler that adds statically configured header values to a Message.
filter – A MessageFilter only passes to filter’s output channel if the message passes a given MessageSelector.
fixedSubscriberChannel – Populates an instance of FixedSubscriberChannel (a specialized SubscribableChannel for a single final subscriber set up during bean instantiation) at the current IntegrationFlow chain position.
fluxTransform – Populates a FluxMessageChannel to start a reactive processing for upstream data, wrap it to a Flux, apply the provided Function via Flux.transform(Function), and emit the result to one more FluxMessageChannel, subscribed in the downstream flow.
gateway – Populates the "artificial" GatewayMessageHandler for the provided subflow or channel.
handle – Populates a ServiceActivatingHandler for the provided MessageHandler or MessageProcessorSpec bean and method-name.
headerFilter – Provides the HeaderFilter to the current StandardIntegrationFlow.
log – Populates a WireTap for the current message channel and uses a LoggingHandler, a MessageHandler implementation that simply logs the Message or its payload.
logAndReply – This operator can be used only in the end of the flow. Does the same as “log” method. Returns IntegrationFlow.
nullChannel – Adds a bean into this flow definition as a terminal operator. Returns IntegrationFlow.
publishSubscribeChannel – The PublishSubscribeChannel (a channel that sends Messages to each of its subscribers) BaseIntegrationFlowDefinition.channel(java.lang.String) method–specific implementation to allow the use of the “subflow” subscriber capability.
resequence – Populates a ResequencingMessageHandler, which resequences messages, using a buffer of correlated messages in a MessageStore.
route – Many different variations of this method exist. They populate the MethodInvokingRouter, or an ExpressionEvaluatingRouter if provided a SpEL expression, which then determines a MessageChannel or channel name to use.
routeByException – Can route messages by Exception type.
routeToRecipients – Populates the RecipientListRouter with options from the RecipientListRouterSpec, which sends messages on multiple channels.
scatterGather – Populates a ScatterGatherHandler to the current integration flow position based on the provided MessageChannel for scattering function and AggregatorSpec for gathering function.
split – Populates either the MethodInvokingSplitter to evaluate the provided method of the service at runtime or the ExpressionEvaluatingSplitter with the provided SpEL expression. A splitter splits message into multiple Messages.
transform – Populates the MessageTransformingHandler instance for the provided GenericTransformer.
trigger – Populates a ServiceActivatingHandler instance to perform MessageTriggerAction.
wireTap – Populates the Wire Tap EI Pattern–specific ChannelInterceptor implementation to the currentMessageChannel.
This is by no means exhaustive. Most of these methods have several overloaded variations.
Kafka Config
application.yml
Installing Kafka is left as an exercise for the reader. Go to https://kafka.apache.org/quickstart and follow the instructions. Then follow the content of this chapter to set up Spring Integration.
Topics
Since we are using Kafka, we also need to create the topics initially.
This would create two topics named “topic” and “topic2” with replication of 1 (meaning only one copy is stored) and 5 partitions, meaning the data will be split into five partitions.
Monitoring
By default, if a Micrometer meterRegistry bean is present, which will be the case in a Spring Boot project with Spring Actuator included, Spring Integration metrics will be managed by Micrometer. If you wish to use legacy Spring Integration metrics, add a DefaultMetricsFactory (from Spring Integration) bean to the application context.