TransWikia.com

How to consume dynamically created rabbit queues?

Stack Overflow Asked by laaf on November 7, 2021

I have an application that creates queues (if it doesn’t exist) with a naming convention. Example:
"test. {Something} .demandas"

Since {something} is passed at the time of its creation, and then there are several queues with different {something}.

Now I need to read these queues on the consumer, that is, get all the queues created by the producer. I saw some examples using the RabbitListenerEndpointRegistry, or even getting the names of the queues by jenkins (using variables from the vm).
But would you have any alternative?
This is the rabbit configuration class:

@Configuration
@EnableRabbit
public class RabbitConfig {

    public static final String S_S = "%s.%s";
    public static final String PREFIX = "test.laa.aaa";
    public static final String QUEUE_NAME = "demandas";
    public static final String APPLICATION_NAME = "name:test.laa.aaa";

    private final String exchange;
    private final String routingKey;
    private final Integer maxConsumers;

    public RabbitConfig(

            @Value("${crawler.exchange.name:demandas}")
                    String exchange,
            @Value("${crawler.exchange.routing-key:default}")
                    String routingKey,
            @Value("${crawler.max-consumers:1}")
                    Integer maxConsumers) {

        this.routingKey = routingKey;
        this.maxConsumers = maxConsumers;
        this.exchange = exchange;
    }

    @Bean
    @Primary
    public String routingKey() {
        return routingKey;
    }

    @Bean
    @Primary
    public String prefixName() {
        return PREFIX;
    }

    @Bean
    @Primary
    public String queueName() {
        return QUEUE_NAME;
    }

    private String exchangeName() {
        return String.format(S_S, PREFIX, exchange);
    }

    @Bean
    @Primary
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        AbstractConnectionFactory abstractConnectionFactory = (AbstractConnectionFactory) connectionFactory;
        abstractConnectionFactory.setConnectionNameStrategy(con -> String.format("%s", APPLICATION_NAME));
        final RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.afterPropertiesSet();
        return rabbitAdmin;
    }

    @Bean(name = "queueConsumer")
    @Primary
    public Queue queue() {
        Map<String, Object> map = new HashMap<>();
        map.put("x-max-priority", 10);

        return new Queue(String.format(S_S, PREFIX, QUEUE_NAME), true, false, false, map);
    }

    @Bean
    @Primary
    public DirectExchange exchange() {

        return new DirectExchange(exchangeName());
    }

    @Bean
    @Primary
    public Binding binding(Queue queue, DirectExchange exchange) {
        if (Objects.nonNull(routingKey)) {
            return BindingBuilder.bind(queue).to(exchange).with(routingKey);
        }
        return BindingBuilder.bind(queue).to(exchange).with("*");
    }

    @Bean
    @Primary
    public MessageConverter messageConverter(ObjectMapper objectMapper) {

        return new Jackson2JsonMessageConverter(objectMapper);
    }


    @Bean
    @Primary
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) {

        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(maxConsumers);
        factory.setMaxConcurrentConsumers(maxConsumers);
        factory.setPrefetchCount(1); //Default
        factory.setMessageConverter(messageConverter);

        factory.setAfterReceivePostProcessors(message -> {
            message.getMessageProperties().setContentType("application/json");
            message.getMessageProperties().setContentEncoding("UTF-8");
            return message;
        });

        return factory;
    }

    public String getPrefix() {
        return PREFIX;
    }

    public String getQueueName() {
        return QUEUE_NAME;
    }

One Answer

Since queues are created by your producer, I assume that you are publishing messages directly into queues (as describe in the Rabbitmq documentation). If you want to keep this approach, then you have no other choice than finding a way to communicate queue names to consumers.

However, I recommend you to take a look at a different approach based on publish/subscribe pattern (you can find it in the official documentation too). Producers will then push messages into an exchange, with a specific routing key (for example: test.{Something}.demandas).

Then consumers will be in charge on creating they own queue and bind it (for example: receive messages from test.*.demandas, making the value of {Something} irrelevant to route your message).

This way, you don't have to share queue names (though you have to share the exchange name). It also helps to reduce the coupling between producer and consumer.

Answered by DavidL on November 7, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP