RabbitMQ Sending Scheduled/delayed messages Using Spring Boot

Perquisites

Start Rabbit With delay message exchange plugin enabled

Create Project

Create Boot project with following dependencies

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
		    <groupId>com.fasterxml.jackson.datatype</groupId>
		    <artifactId>jackson-datatype-jsr310</artifactId>
		</dependency>

application.properties

# Rabbit
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

rabbitmq.flight.dg.exchange=otel_flight_direct
rabbitmq.flight.received.routingkey=flight.event.received
rabbitmq.flight.received.queue=flight.received.queue

RabbitMQConfig.java

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

	private static final String EXCHANGE_TYPE_DELAY = "x-delayed-message";
	private static final String EXCHANGE_TYPE_DELAY_ARGUMENT = "x-delayed-type";
	private static final String EXCHANGE_TYPE_DELAY_ARGUMENT_DIRECT = "direct";

	@Value("${rabbitmq.flight.dg.exchange}")
	private String exchange;

	@Value("${rabbitmq.flight.received.queue}")
	private String flightQueue;
	@Value("${rabbitmq.flight.received.routingkey}")
	private String flightRoutingKey;

	@Bean
	CustomExchange directExchange() {
		Map<String, Object> args = new HashMap<>();
		args.put(EXCHANGE_TYPE_DELAY_ARGUMENT, EXCHANGE_TYPE_DELAY_ARGUMENT_DIRECT);
		return new CustomExchange(exchange, EXCHANGE_TYPE_DELAY, true, false, args);
	}

	@Bean(name = "flightReceived")
	Queue flightReceivedQueue() {
		return new Queue(flightQueue, true);
	}

	@Bean
	Binding flightReceivedBinding(@Qualifier("flightReceived") Queue queue, CustomExchange exchange) {
		return BindingBuilder
				.bind(queue)
				.to(exchange)
				.with(flightRoutingKey)
				.noargs();
	}

	@Bean
	RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Jackson2JsonMessageConverter converter) {
	    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
	    rabbitTemplate.setMessageConverter(converter);
	    return rabbitTemplate;
	}

	@Bean
	Jackson2JsonMessageConverter producerJackson2MessageConverter() {
	    return new Jackson2JsonMessageConverter();
	}
}

Flight.java

import java.time.LocalDateTime;

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;

public class Flight {

	private String origin;
	private String destination;
	private String airline;
	
	@JsonSerialize(using = LocalDateTimeSerializer.class)
	@JsonDeserialize(using = LocalDateTimeDeserializer.class)
	private LocalDateTime departureTime;

	public String getOrigin() {
		return origin;
	}

	public void setOrigin(String origin) {
		this.origin = origin;
	}

	public String getDestination() {
		return destination;
	}

	public void setDestination(String destination) {
		this.destination = destination;
	}

	public String getAirline() {
		return airline;
	}

	public void setAirline(String airline) {
		this.airline = airline;
	}

	public LocalDateTime getDepartureTime() {
		return departureTime;
	}

	public void setDepartureTime(LocalDateTime departureTime) {
		this.departureTime = departureTime;
	}

	@Override
	public String toString() {
		return "Flight [origin=" + origin + ", destination=" + destination + ", airline=" + airline + ", departureTime="
				+ departureTime + "]";
	}	
}

FlightReceivedProducer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class FlightReceivedProducer {

	private static final Logger LOGGER = LoggerFactory.getLogger(FlightReceivedProducer.class);

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Value("${rabbitmq.flight.dg.exchange}")
	private String exchangeName;

	@Value("${rabbitmq.flight.received.routingkey}")
	private String routingKey;

	public void sendMessage(Flight flight) {
		try {
			rabbitTemplate.convertAndSend(exchangeName, routingKey, flight, new MessagePostProcessor() {
				
				@Override
				public Message postProcessMessage(Message message) throws AmqpException {
					message.getMessageProperties().setDelay(1000);
					return message;
				}
			});
			LOGGER.debug("Sent message for flight");
		} catch (Exception e) {
			LOGGER.error("Unable to send Message ", e);
		}
	}
}

FlightMQConsumer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FlightMQConsumer {

	private static final Logger LOGGER = LoggerFactory.getLogger(FlightMQConsumer.class);

	@RabbitListener(queues = "#{'${rabbitmq.flight.received.queue}'}")
	//public void consumeMessage(Flight flight, @Header(name = "receivedDelay", required = false) Integer delay) {
	public void consumeMessage(Flight flight, Message message) {
		try {
			LOGGER.trace("Message received, after delay of {} : {}", message.getMessageProperties().getReceivedDelay(), flight);

		} catch (Exception e) {
			LOGGER.error("Unnable to process the Message", e);
		}
	}

}

RabbitDelayMessageApplication.java

import java.time.LocalDateTime;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.mnadeem.app.flight.messaging.Flight;
import com.mnadeem.app.flight.messaging.FlightReceivedProducer;

@SpringBootApplication
public class RabbitDelayMessageApplication implements CommandLineRunner {
	
	@Autowired
	private FlightReceivedProducer producer;

	public static void main(String[] args) {
		SpringApplication.run(RabbitDelayMessageApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		producer.sendMessage(flight());		
	}

	private Flight flight() {
		Flight flight = new Flight();
		flight.setAirline("1");
		flight.setDepartureTime(LocalDateTime.now());
		flight.setDestination("d");
		return flight;
	}
}

Run the Application

Message delivered to exchange immediately, however delivered to queue after the delay.

Exchange Definition

{
            "name": "otel_flight_direct",
            "vhost": "/",
            "type": "x-delayed-message",
            "durable": true,
            "auto_delete": false,
            "internal": false,
            "arguments": {
                "x-delayed-type": "direct"
            }
        }

References

Adding Another Plugin To Rabbit Management Docker Image

You are using RabbitMQ management plugin ( rabbitmq:3.8.27-management) and you are getting the following error enabling rabbitmq_delayed_message_exchange

{:plugins_not_found, [:rabbitmq_delayed_message_exchange]}

Command to enable rabbitmq_delayed_message_exchange

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Perquisites

Create folder structure

Dockerfile

FROM rabbitmq:3.8.27-management

RUN apt-get -o Acquire::Check-Date=false update && apt-get install -y curl

RUN curl -L https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez > $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-3.8.17.ez

RUN chown rabbitmq:rabbitmq $RABBITMQ_HOME/plugins/rabbitmq_delayed_message_exchange-3.8.17.ez

#RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange

enabled_plugins

[rabbitmq_delayed_message_exchange,rabbitmq_management,rabbitmq_prometheus].

definitions.json

{
   "users": [
    {
      "name": "admin",
      "password": "admin",
      "tags": "administrator"
    }
  ],
  "vhosts": [
    {
      "name": "/"
    }
  ],
  "policies": [
    {
      "vhost": "/",
      "name": "ha",
      "pattern": "",
      "apply-to": "all",
      "definition": {
        "ha-mode": "all",
        "ha-sync-batch-size": 256,
        "ha-sync-mode": "automatic"
      },
      "priority": 0
    }
  ],
  "permissions": [
    {
      "user": "admin",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "queues": [
    {
      "name": "job-import.triggered.queue",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "arguments": {}
    }
  ],
  "exchanges": [
    {
      "name": "lob-proj-dx",
      "vhost": "/",
      "type": "direct",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "lob-proj-dx",
      "vhost": "/",
      "destination": "job-import.triggered.queue",
      "destination_type": "queue",
      "routing_key": "job-import.event.triggered",
      "arguments": {}
    }
  ]
}

rabbitmq.conf

auth_mechanisms.1 = PLAIN
auth_mechanisms.2 = AMQPLAIN
loopback_users.guest = false
listeners.tcp.default = 5672
#default_pass = admin
#default_user = admin
hipe_compile = false
#management.listener.port = 15672
#management.listener.ssl = false
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json
#default_pass = admin
#default_user = admin

Build Custom Docker image

docker build -t rabbitmq:3.8.27-md .
docker image ls

Run the custom Docker Image

docker run --restart=always -d -p 5672:5672 -p 15672:15672 --mount type=bind,source=E:\docker\rabbit\data,target=/var/lib/rabbitmq/ --mount type=bind,source=E:\docker\rabbit\etc,target=/etc/rabbitmq/ --name rabbitmq --hostname my-rabbit rabbitmq:3.8.27-md

Validate the plugin enabled

 docker exec -it 9b0bffed911e bash

Start Options

Directly Running

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8.27-management

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8.27-md

 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Docker Compose

version: "3.8"

services:
  rabbitmq:
    build: .
    #image: rabbitmq:3.7.28-management
    #container_name: rabbitmq
    volumes:
        - ./etc/:/etc/rabbitmq/
        - ./data/:/var/lib/rabbitmq/
        - ./logs/:/var/log/rabbitmq/
    environment:
        RABBITMQ_ERLANG_COOKIE: ${RABBITMQ_ERLANG_COOKIE:-secret_cookie}
        RABBITMQ_DEFAULT_USER: ${RABBITMQ_DEFAULT_USER:-admin}
        RABBITMQ_DEFAULT_PASS: ${RABBITMQ_DEFAULT_PASS:-admin}
    ports:
        - 5672:5672    #amqp
        - 15672:15672  #http
        - 15692:15692  #prometheus

Either

docker-compose up

Or

docker swarm init
docker stack deploy --compose-file docker-compose.yaml rabbit-delay
docker stack services rabbit-delay
docker stack rm rabbit-delay

References