TransWikia.com

Is this Java small pubsub memory implementation correct and effective?

Code Review Asked by Bruno Thomas on January 2, 2021

We want to provide a simple Redis pub/sub alternative in memory for our software. So we implemented this :

public class MemoryDataBus implements DataBus {
    private final Map<Consumer<Message>, MessageListener> subscribers = new ConcurrentHashMap<>();

    @Override
    public void publish(final Channel channel, final Message message) {
        Message nonNullMessage = requireNonNull(message, "cannot publish a null message");
        subscribers.values().stream().filter(l -> l.hasSubscribedTo(channel)).forEach(l -> l.accept(nonNullMessage));
    }

    @Override
    public void subscribe(final Consumer<Message> subscriber, final Channel... channels) throws InterruptedException {
        MessageListener listener = new MessageListener(subscriber, channels);
        subscribers.put(subscriber, listener);
        listener.loopUntilShutdown();
    }

    @Override
    public void unsubscribe(Consumer<Message> subscriber) {
        ofNullable(subscribers.remove(subscriber)).ifPresent(l -> {
            l.accept(new ShutdownMessage());
        });
    }

    private static class MessageListener implements Consumer<Message> {
        private final Consumer<Message> subscriber;
        private final LinkedHashSet<Channel> channels;
        final AtomicReference<Message> message = new AtomicReference<>();

        public MessageListener(Consumer<Message> subscriber, Channel... channels) {
            this.subscriber = subscriber;
            this.channels = asSet(channels);
        }

        boolean hasSubscribedTo(Channel channel) {
            return channels.contains(channel);
        }

        @Override
        public void accept(Message message) {
            subscriber.accept(message);
            synchronized (this.message) {
                this.message.set(message);
                this.message.notify();
            }
        }

        boolean shutdownAsked() {
            Message message = this.message.get();
            return message != null && message.type == Message.Type.SHUTDOWN;
        }

        void loopUntilShutdown() throws InterruptedException {
            synchronized (message) {
                while (!shutdownAsked()) {
                    message.wait();
                }
            }
        }
    }
}

I’ve removed unnecessary code that brings some noise (logs, couters) the source code is here.

Our unit tests are green, and manual testing shows no regression compared to redis. But as we also know how difficult it is to make a correct thread safe implementation we want to verify with threading experts :

  • the correctness of it (for example the value returned by shutdownAsked)
  • if the contention could be improved
  • if there could be a better implementation of it

PS : must add that the question has been originally posted on SO.

One Answer

Assuming requireNonNull(...) is java.util.Objects.requireNonNull(...) which throws an exception if null, then there is no need to have a separate local variable for the return value.

Assuming ofNullable(...) is java.util.Optional.ofNullable(...), then I would recommend removing the usage here because the code can be simplified:

// ofNullable(...).ifPresent(l -> ...);
// Simplified:
MyClass result = ...
if (result != null) {
    ...
}

It looks like your locking in MessageListener will cause a deadlock:

  1. MemoryDataBus.subscribe(...) calls loopUntilShutdown which synchronizes on message
  2. MemoryDataBus.publish(...) calls accept which has to wait because lock on message is still held, preventing it from updating message

Edit: No deadlock occurs because Object.wait() is used which releases ownership of the monitor (here field message).

Answered by Marcono1234 on January 2, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP