1. Introducción


Bienvenidos a un nuevo post de nuestro querido blog!. En esta ocasión veremos cómo producir y consumir mensajes en RabbitMQ a través de Spring Cloud Stream. Spring Cloud Stream es un api que provee Spring para la consumisión y producción de mensajes abstrayendo el sistema de mensajes sobre el que se va a operar. Esto nos permitirá poder cambiar el sistema de mensajería teniendo un impacto mínimo en el resto de la aplicación. Podemos decir que Spring Cloud Stream permite realizar la inyección de dependencias de los sistemas de mensajería sobre nuestro sistema.

Os podéis descargar el código de ejemplo de mi GitHub aquí.

Tecnologías empleadas:

  • Java 8
  • Gradle 3.1
  • SpringBoot 1.5.2.RELEASE
  • Spring 4.3.7.RELEASE
  • SpringCloudStream 1.1.2 RELEASE
  • RabbitMQ 3.6.9
  • Erlang 19.3

2. Arrancando RabbitMQ con Docker

Como ya hiciéramos en el anterior post sobre RabbitMQ procedemos a levantar RabbitMQ a través de Docker. Especificamos en nuestro fichero docker-compose.yml que queremos arrancar un contenedor de la imagen rabbitmq donde se configure el virtualhost / para el usuario admin con la password admin.

rabbit1:
  image: "rabbitmq"
  environment:
    RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
    RABBITMQ_DEFAULT_USER: "admin"
    RABBITMQ_DEFAULT_PASS: "admin"
    RABBITMQ_DEFAULT_VHOST: "/"
  ports:
    - "15672:15672"
    - "5672:5672"
  volumes:
    - "./enabled_plugins:/etc/rabbitmq/enabled_plugins"

Además montamos en el directorio /etc/rabbitmq/ el fichero enabled_plugins donde establecemos que queremos configurar los plugins de gestión web.

[rabbitmq_management, rabbitmq_management_visualiser].

Arrancamos nuestro contenedor

docker-compose up

Vamos a la url http://localhost:15672

3. Productor

Para empezar a utilizar Spring Cloud Stream sobre RabbitMQ debemos añadir la dependencia org.springframework.cloud:spring-cloud-starter-stream-rabbit

group 'com.jorgehernandezramirez.spring.springcloud.stream'
version '1.0-SNAPSHOT'

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'maven'
apply plugin: 'spring-boot'

dependencyManagement {
    imports {
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.SR6'
    }
}

sourceCompatibility = 1.8

springBoot {
    mainClass = "com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.Application"
}

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-web'
    compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
}

Main que arranca SpringBoot

package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public Application(){
        //For Spring
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Lo más importante que deberemos considerar es que la aplicación no va a comunicarse directamente con el sistema de mensajería. La hará a través de los canales de SpringCloudStream. En dichos canales deberemos configurar

  • Binder: Indicamos sobre que sistema de mensajería queremos operar
  • Destination: En el caso de RabbitMQ nos referimos al exchange sobre el que vamos a trabajar
  • Group: En el caso de RabbitMQ nos referimos a la cola sobre la que vamos a trabajar
  • ContextType: Tipo de contenido que se va a escribir y leer de las colas

Por tanto en nuestro fichero application.yml debemos configurar:

  • La conexión a RabbitMQ
  • El canal de SpringCloudStream

La clave para con

spring:
  cloud:
    stream:
      bindings:
        itemsChannel:
          binder: rabbit
          destination: item.exchange
          group: myqueue
          contentType: application/json
  rabbitmq:
    host: localhost
    username: admin
    password: admin
    virtual-host: /
    port: 5672

Para configurar el canal itemsChannel en nuestro contexto de Spring deberemos establecer las siguientes clases:

  • ItemSource: Clase que permite operar sobre el canal itemsChannel. Utilizamos la anotación @Output
    para indicar que vamos a escribir sobre el canal
  • IntegrationConfiguration: Permite habilitar la clase ItemSource en el contexto de Spring
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;

@Configuration
@EnableBinding(ItemSource.class)
@IntegrationComponentScan
public class IntegrationConfiguration {
}
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface ItemSource {

    String CHANNEL_NAME = "itemsChannel";

    @Output
    MessageChannel itemsChannel();
}

Finalmente la clase que permitirá escribir en nuestro canal

package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration;

import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.dto.ItemDto;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface ItemGateway {

    @Gateway(requestChannel = ItemSource.CHANNEL_NAME)
    void generate(ItemDto itemDto);
}
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.dto;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class ItemDto {

    private String id;

    private String name;

    public ItemDto(){
        //Para Jackson
    }

    @JsonCreator
    public ItemDto(@JsonProperty("id") String id, @JsonProperty("name") String name) {
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "ItemDto{" +
                "id='" + id + '\'' +
                ", name='" + name + '\'' +
                '}';
    }
}

Creamos un controlador que hará uso de la clase ItemGateway para escribir mensajes en nuestro canal.

package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.controller;

import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.configuration.ItemGateway;
import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.producer.dto.ItemDto;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
public class ItemController {

    @Autowired
    private ItemGateway itemGateway;

    @RequestMapping("/generateItem")
    @ResponseBody
    public ItemDto generateWork(@RequestParam("name") String name) {
        final ItemDto itemDto = new ItemDto(UUID.randomUUID().toString(), name);
        itemGateway.generate(itemDto);
        return itemDto;
    }
}

4. Consumidor

Añadimos añadir la dependencia org.springframework.cloud:spring-cloud-starter-stream-rabbit

group 'com.jorgehernandezramirez.spring.springcloud.stream'
version '1.0-SNAPSHOT'

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.2.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'maven'
apply plugin: 'spring-boot'

dependencyManagement {
    imports {
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.SR6'
    }
}

sourceCompatibility = 1.8

springBoot {
    mainClass = "com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.Application"
}

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.springframework.boot:spring-boot-starter-web'
    compile 'org.springframework.cloud:spring-cloud-starter-stream-rabbit'
}

Main que arranca SpringBoot

package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public Application(){
        //For Spring
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

Configuramos la conexión a RabbitMQ y el canal itemsChannel de SpringCloudStream

server:
   port: 8082

spring:
  cloud:
    stream:
      bindings:
        itemsChannel:
          binder: rabbit
          destination: item.exchange
          group: myqueue
          contentType: application/json
  rabbitmq:
    host: localhost
    username: admin
    password: admin
    virtual-host: /
    port: 5672

Como en el caso del productor deberemos configurar las siguientes clases en el contexto de Spring para poder leer del canal itemsChannel. En este caso utilizamos la anotación @Input para indicar que vamos a leer del canal.

package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.configuration;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface ItemSink {

    String CHANNEL_NAME = "itemsChannel";

    @Input
    SubscribableChannel itemsChannel();
}
package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.configuration;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBinding(ItemSink.class)
public class IntegrationConfiguration {
}

Por último utilizamos la anotación @StreamListener para leer mensajes de nuestro canal.

package com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.configuration;

import com.jorgehernandezramirez.spring.springcloud.stream.rabbitmq.consumer.dto.ItemDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Service;

@Service
public class ItemHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(ItemHandler.class);

    @StreamListener(ItemSink.CHANNEL_NAME)
    public void process(ItemDto itemDto) {
        LOGGER.info("{}, {}", itemDto.getId(), itemDto.getName());
    }
}

5. Probando la aplicación

A continuación compilamos nuestros fuentes y arrancamos los microservicios productor y consumidor

Compilamos cada uno de los microservicios

gradle clean build

Arrancamos cada uno de los microservicios

java -jar [JAR_EN_BUILD_LIBS]

Atacamos al controlador que inserta un mensaje en la cola

curl http://localhost:8080/generateItem?name=Jorge

Se imprime por la consola del consumidor.

173a8c4f-ac86-429a-93d9-869bbd82d325, Jorge