AnswerBun.com

Is this Java small pubsub memory implementation correct and effective?

We want to provide a simple Redis pub/sub alternative in memory for our software. So we implemented this :

public class MemoryDataBus implements DataBus {
    private final Map<Consumer<Message>, MessageListener> subscribers = new ConcurrentHashMap<>();

    @Override
    public void publish(final Channel channel, final Message message) {
        Message nonNullMessage = requireNonNull(message, "cannot publish a null message");
        subscribers.values().stream().filter(l -> l.hasSubscribedTo(channel)).forEach(l -> l.accept(nonNullMessage));
    }

    @Override
    public void subscribe(final Consumer<Message> subscriber, final Channel... channels) throws InterruptedException {
        MessageListener listener = new MessageListener(subscriber, channels);
        subscribers.put(subscriber, listener);
        listener.loopUntilShutdown();
    }

    @Override
    public void unsubscribe(Consumer<Message> subscriber) {
        ofNullable(subscribers.remove(subscriber)).ifPresent(l -> {
            l.accept(new ShutdownMessage());
        });
    }

    private static class MessageListener implements Consumer<Message> {
        private final Consumer<Message> subscriber;
        private final LinkedHashSet<Channel> channels;
        final AtomicReference<Message> message = new AtomicReference<>();

        public MessageListener(Consumer<Message> subscriber, Channel... channels) {
            this.subscriber = subscriber;
            this.channels = asSet(channels);
        }

        boolean hasSubscribedTo(Channel channel) {
            return channels.contains(channel);
        }

        @Override
        public void accept(Message message) {
            subscriber.accept(message);
            synchronized (this.message) {
                this.message.set(message);
                this.message.notify();
            }
        }

        boolean shutdownAsked() {
            Message message = this.message.get();
            return message != null && message.type == Message.Type.SHUTDOWN;
        }

        void loopUntilShutdown() throws InterruptedException {
            synchronized (message) {
                while (!shutdownAsked()) {
                    message.wait();
                }
            }
        }
    }
}

I’ve removed unnecessary code that brings some noise (logs, couters) the source code is here.

Our unit tests are green, and manual testing shows no regression compared to redis. But as we also know how difficult it is to make a correct thread safe implementation we want to verify with threading experts :

  • the correctness of it (for example the value returned by shutdownAsked)
  • if the contention could be improved
  • if there could be a better implementation of it

PS : must add that the question has been originally posted on SO.

Code Review Asked by Bruno Thomas on January 2, 2021

1 Answers

One Answer

Assuming requireNonNull(...) is java.util.Objects.requireNonNull(...) which throws an exception if null, then there is no need to have a separate local variable for the return value.

Assuming ofNullable(...) is java.util.Optional.ofNullable(...), then I would recommend removing the usage here because the code can be simplified:

// ofNullable(...).ifPresent(l -> ...);
// Simplified:
MyClass result = ...
if (result != null) {
    ...
}

It looks like your locking in MessageListener will cause a deadlock:

  1. MemoryDataBus.subscribe(...) calls loopUntilShutdown which synchronizes on message
  2. MemoryDataBus.publish(...) calls accept which has to wait because lock on message is still held, preventing it from updating message

Edit: No deadlock occurs because Object.wait() is used which releases ownership of the monitor (here field message).

Answered by Marcono1234 on January 2, 2021

Add your own answers!

Related Questions

Finding duplicates in multiple lists for configuration validation

2  Asked on February 1, 2021 by arkady-levin

     

Microservice in Springboot

1  Asked on February 1, 2021 by forhadmethun

         

10 Kinds of People

1  Asked on January 29, 2021 by martin-york

   

Generating product variants in Ruby

1  Asked on January 29, 2021 by dcangulo

 

Handling Circular References Without Recursion

2  Asked on January 27, 2021 by kittoes0124

   

vector<tuple > to map

1  Asked on January 25, 2021 by valerij

     

naming convention for atomic design

0  Asked on January 23, 2021 by irohitbhatia

   

Cave explorer: using stack

0  Asked on January 21, 2021 by sherloock

   

Largest odd number

5  Asked on January 17, 2021 by neisy-sofa-vadori

   

Calculator Program improvements

2  Asked on January 16, 2021 by the

     

Is this good c++ code for a pin/socket for a node editor?

1  Asked on January 12, 2021 by apoqlite

   

Arduino-based darkroom timer

0  Asked on January 12, 2021 by marcellothearcane

 

Finding the possible number of equal pairs in an array

1  Asked on January 11, 2021 by alaa-jabre

   

Ask a Question

Get help from others!

© 2022 AnswerBun.com. All rights reserved.