1. Introducción
Bienvenidos a un nuevo post de nuestro querido blog!. En esta ocasión veremos cómo producir y consumir mensajes en RabbitMQ a través de Spring Cloud Stream. Spring Cloud Stream es un api que provee Spring para la consumisión y producción de mensajes abstrayendo el sistema de mensajes sobre el que se va a operar. Esto nos permitirá poder cambiar el sistema de mensajería teniendo un impacto mínimo en el resto de la aplicación. Podemos decir que Spring Cloud Stream permite realizar la inyección de dependencias de los sistemas de mensajería sobre nuestro sistema.
Os podéis descargar el código de ejemplo de mi GitHub aquí.
Tecnologías empleadas:
- Java 8
- Gradle 3.1
- SpringBoot 1.5.2.RELEASE
- Spring 4.3.7.RELEASE
- SpringCloudStream 1.1.2 RELEASE
- RabbitMQ 3.6.9
- Erlang 19.3
2. Arrancando RabbitMQ con Docker
Como ya hiciéramos en el anterior post sobre RabbitMQ procedemos a levantar RabbitMQ a través de Docker. Especificamos en nuestro fichero docker-compose.yml que queremos arrancar un contenedor de la imagen rabbitmq donde se configure el virtualhost / para el usuario admin con la password admin.
rabbit1: image: "rabbitmq" environment: RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG" RABBITMQ_DEFAULT_USER: "admin" RABBITMQ_DEFAULT_PASS: "admin" RABBITMQ_DEFAULT_VHOST: "/" ports: - "15672:15672" - "5672:5672" volumes: - "./enabled_plugins:/etc/rabbitmq/enabled_plugins"
Además montamos en el directorio /etc/rabbitmq/
el fichero enabled_plugins donde establecemos que queremos configurar los plugins de gestión web.
Arrancamos nuestro contenedor
Vamos a la url http://localhost:15672
3. Productor
Para empezar a utilizar Spring Cloud Stream sobre RabbitMQ debemos añadir la dependencia org.springframework.cloud:spring-cloud-starter-stream-rabbit
group 'com.jorgehernandezramirez.spring.springcloud.stream' version '1.0-SNAPSHOT' buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE") } } apply plugin: 'java' apply plugin: 'idea' apply plugin: 'maven' apply plugin: 'spring-boot' dependencyManagement { imports { mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.SR6' } } sourceCompatibility = 1.8 springBoot { mainClass = "com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.Application" } repositories { mavenCentral() } dependencies { compile 'org.springframework.boot:spring-boot-starter-web' compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit' }
Main que arranca SpringBoot
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public Application(){ //For Spring } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Lo más importante que deberemos considerar es que la aplicación no va a comunicarse directamente con el sistema de mensajería. La hará a través de los canales de SpringCloudStream. En dichos canales deberemos configurar
- Binder: Indicamos sobre que sistema de mensajería queremos operar
- Destination: En el caso de RabbitMQ nos referimos al exchange sobre el que vamos a trabajar
- Group: En el caso de RabbitMQ nos referimos a la cola sobre la que vamos a trabajar
- ContextType: Tipo de contenido que se va a escribir y leer de las colas
Por tanto en nuestro fichero application.yml debemos configurar:
- La conexión a RabbitMQ
- El canal de SpringCloudStream
La clave para con
spring: cloud: stream: bindings: itemsChannel: binder: rabbit destination: item.exchange group: myqueue contentType: application/json rabbitmq: host: localhost username: admin password: admin virtual-host: / port: 5672
Para configurar el canal itemsChannel en nuestro contexto de Spring deberemos establecer las siguientes clases:
- ItemSource: Clase que permite operar sobre el canal itemsChannel. Utilizamos la anotación
@Output
para indicar que vamos a escribir sobre el canal - IntegrationConfiguration: Permite habilitar la clase ItemSource en el contexto de Spring
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; @Configuration @EnableBinding(ItemSource.class) @IntegrationComponentScan public class IntegrationConfiguration { }
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface ItemSource { String CHANNEL_NAME = "itemsChannel"; @Output MessageChannel itemsChannel(); }
Finalmente la clase que permitirá escribir en nuestro canal
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration; import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.dto.ItemDto; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.MessagingGateway; @MessagingGateway public interface ItemGateway { @Gateway(requestChannel = ItemSource.CHANNEL_NAME) void generate(ItemDto itemDto); }
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.dto; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; public class ItemDto { private String id; private String name; public ItemDto(){ //Para Jackson } @JsonCreator public ItemDto(@JsonProperty("id") String id, @JsonProperty("name") String name) { this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "ItemDto{" + "id='" + id + '\'' + ", name='" + name + '\'' + '}'; } }
Creamos un controlador que hará uso de la clase ItemGateway para escribir mensajes en nuestro canal.
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.controller; import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration.ItemGateway; import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.dto.ItemDto; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RestController public class ItemController { @Autowired private ItemGateway itemGateway; @RequestMapping("/generateItem") @ResponseBody public ItemDto generateWork(@RequestParam("name") String name) { final ItemDto itemDto = new ItemDto(UUID.randomUUID().toString(), name); itemGateway.generate(itemDto); return itemDto; } }
4. Consumidor
Añadimos añadir la dependencia org.springframework.cloud:spring-cloud-starter-stream-rabbit
group 'com.jorgehernandezramirez.spring.springcloud.stream' version '1.0-SNAPSHOT' buildscript { repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE") } } apply plugin: 'java' apply plugin: 'idea' apply plugin: 'maven' apply plugin: 'spring-boot' dependencyManagement { imports { mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.SR6' } } sourceCompatibility = 1.8 springBoot { mainClass = "com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.Application" } repositories { mavenCentral() } dependencies { compile 'org.springframework.boot:spring-boot-starter-web' compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit' }
Main que arranca SpringBoot
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public Application(){ //For Spring } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Configuramos la conexión a RabbitMQ y el canal itemsChannel de SpringCloudStream
server: port: 8082 spring: cloud: stream: bindings: itemsChannel: binder: rabbit destination: item.exchange group: myqueue contentType: application/json rabbitmq: host: localhost username: admin password: admin virtual-host: / port: 5672
Como en el caso del productor deberemos configurar las siguientes clases en el contexto de Spring para poder leer del canal itemsChannel. En este caso utilizamos la anotación @Input
para indicar que vamos a leer del canal.
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.configuration; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface ItemSink { String CHANNEL_NAME = "itemsChannel"; @Input SubscribableChannel itemsChannel(); }
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.configuration; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.context.annotation.Configuration; @Configuration @EnableBinding(ItemSink.class) public class IntegrationConfiguration { }
Por último utilizamos la anotación @StreamListener
para leer mensajes de nuestro canal.
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.configuration; import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.dto.ItemDto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Service; @Service public class ItemHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ItemHandler.class); @StreamListener(ItemSink.CHANNEL_NAME) public void process(ItemDto itemDto) { LOGGER.info("{}, {}", itemDto.getId(), itemDto.getName()); } }
5. Probando la aplicación
A continuación compilamos nuestros fuentes y arrancamos los microservicios productor y consumidor
Compilamos cada uno de los microservicios
Arrancamos cada uno de los microservicios
Atacamos al controlador que inserta un mensaje en la cola
Se imprime por la consola del consumidor.
173a8c4f-ac86-429a-93d9-869bbd82d325, Jorge