Is this Java small pubsub memory implementation correct and effective?

We want to provide a simple Redis pub/sub alternative in memory for our software. So we implemented this :

public class MemoryDataBus implements DataBus {
    private final Map<Consumer<Message>, MessageListener> subscribers = new ConcurrentHashMap<>();

    public void publish(final Channel channel, final Message message) {
        Message nonNullMessage = requireNonNull(message, "cannot publish a null message");
        subscribers.values().stream().filter(l -> l.hasSubscribedTo(channel)).forEach(l -> l.accept(nonNullMessage));

    public void subscribe(final Consumer<Message> subscriber, final Channel... channels) throws InterruptedException {
        MessageListener listener = new MessageListener(subscriber, channels);
        subscribers.put(subscriber, listener);

    public void unsubscribe(Consumer<Message> subscriber) {
        ofNullable(subscribers.remove(subscriber)).ifPresent(l -> {
            l.accept(new ShutdownMessage());

    private static class MessageListener implements Consumer<Message> {
        private final Consumer<Message> subscriber;
        private final LinkedHashSet<Channel> channels;
        final AtomicReference<Message> message = new AtomicReference<>();

        public MessageListener(Consumer<Message> subscriber, Channel... channels) {
            this.subscriber = subscriber;
            this.channels = asSet(channels);

        boolean hasSubscribedTo(Channel channel) {
            return channels.contains(channel);

        public void accept(Message message) {
            synchronized (this.message) {

        boolean shutdownAsked() {
            Message message = this.message.get();
            return message != null && message.type == Message.Type.SHUTDOWN;

        void loopUntilShutdown() throws InterruptedException {
            synchronized (message) {
                while (!shutdownAsked()) {

I’ve removed unnecessary code that brings some noise (logs, couters) the source code is here.

Our unit tests are green, and manual testing shows no regression compared to redis. But as we also know how difficult it is to make a correct thread safe implementation we want to verify with threading experts :

  • the correctness of it (for example the value returned by shutdownAsked)
  • if the contention could be improved
  • if there could be a better implementation of it

PS : must add that the question has been originally posted on SO.

Code Review Asked by Bruno Thomas on January 2, 2021

1 Answers

One Answer

Assuming requireNonNull(...) is java.util.Objects.requireNonNull(...) which throws an exception if null, then there is no need to have a separate local variable for the return value.

Assuming ofNullable(...) is java.util.Optional.ofNullable(...), then I would recommend removing the usage here because the code can be simplified:

// ofNullable(...).ifPresent(l -> ...);
// Simplified:
MyClass result = ...
if (result != null) {

It looks like your locking in MessageListener will cause a deadlock:

  1. MemoryDataBus.subscribe(...) calls loopUntilShutdown which synchronizes on message
  2. MemoryDataBus.publish(...) calls accept which has to wait because lock on message is still held, preventing it from updating message

Edit: No deadlock occurs because Object.wait() is used which releases ownership of the monitor (here field message).

Answered by Marcono1234 on January 2, 2021

Add your own answers!

Related Questions

Finding duplicates in multiple lists for configuration validation

2  Asked on February 1, 2021 by arkady-levin


Microservice in Springboot

1  Asked on February 1, 2021 by forhadmethun


10 Kinds of People

1  Asked on January 29, 2021 by martin-york


Generating product variants in Ruby

1  Asked on January 29, 2021 by dcangulo


Handling Circular References Without Recursion

2  Asked on January 27, 2021 by kittoes0124


vector<tuple > to map

1  Asked on January 25, 2021 by valerij


naming convention for atomic design

0  Asked on January 23, 2021 by irohitbhatia


Cave explorer: using stack

0  Asked on January 21, 2021 by sherloock


Largest odd number

5  Asked on January 17, 2021 by neisy-sofa-vadori


Calculator Program improvements

2  Asked on January 16, 2021 by the


Is this good c++ code for a pin/socket for a node editor?

1  Asked on January 12, 2021 by apoqlite


Arduino-based darkroom timer

0  Asked on January 12, 2021 by marcellothearcane


Finding the possible number of equal pairs in an array

1  Asked on January 11, 2021 by alaa-jabre


Ask a Question

Get help from others!

© 2022 All rights reserved.