TransWikia.com

C++17 thread pool

Code Review Asked by osuka_ on February 6, 2021

I’ve implemented a thread pool in C++17. This is not backwards compatible with C++14, due to the usage of std::invoke_result, which is new to C++17.

The focus of this question is on best practices, with the (hopefully) obvious observation that I really want to know if any of this is Looks Funny™ (i.e., any weird moves or things that generally look like they shouldn’t be there).

You can find the current implementation of this (with post-review fixes and whatnot) here.

The implementation is divided between two files:

threadpool.h

#pragma once

#include <vector>
#include <thread>
#include <future> //packaged_task
#include <queue>
#include <functional> //bind
#include <mutex>
#include <condition_variable>
#include <type_traits> //invoke_result

class thread_pool {
public:
    thread_pool(size_t thread_count);
    ~thread_pool();
    
    //since std::thread objects are not copiable, it doesn't make sense for a thread_pool
    //  to be copiable.
    thread_pool(const thread_pool &) = delete;
    thread_pool &operator=(const thread_pool &) = delete;
    
    //F must be Callable, and invoking F with ...Args must be well-formed.
    template <typename F, typename ...Args>
    auto execute(F, Args&&...);
    
private:
    //_task_container_base and _task_container exist simply as a wrapper around a 
    //  MoveConstructible - but not CopyConstructible - Callable object. Since an
    //  std::function requires a given Callable to be CopyConstructible, we cannot
    //  construct one from a lambda function that captures a non-CopyConstructible
    //  object (such as the packaged_task declared in execute) - because a lambda
    //  capturing a non-CopyConstructible object is not CopyConstructible.
    
    //_task_container_base exists only to serve as an abstract base for _task_container.
    class _task_container_base {
    public:
        virtual ~_task_container_base() {};
        
        virtual void operator()() = 0;
    };
        
    //_task_container takes a typename F, which must be Callable and MoveConstructible.
    //  Furthermore, F must be callable with no arguments; it can, for example, be a
    //  bind object with no placeholders.
    //  F may or may not be CopyConstructible.
    template <typename F>
    class _task_container : public _task_container_base {
    public:
        //here, std::forward is needed because we need the construction of _f *not* to
        //  bind an lvalue reference - it is not a guarantee that an object of type F is
        //  CopyConstructible, only that it is MoveConstructible.
        _task_container(F &&func) : _f(std::forward<F>(func)) {}
        
        void operator()() override {
            _f();
        }

    private:
        F _f;
    };
    
    //returns a unique_ptr to a _task_container that wraps around a given function
    //  for details on _task_container_base and _task_container, see above
    //  This exists so that _Func may be inferred from f.
    template <typename _Func>
    static std::unique_ptr<_task_container_base> allocate_task_container(_Func &&f) {
        //in the construction of the _task_container, f must be std::forward'ed because
        //  it may not be CopyConstructible - the only requirement for an instantiation
        //  of a _task_container is that the parameter is of a MoveConstructible type.
        return std::unique_ptr<_task_container_base>(
            new _task_container<_Func>(std::forward<_Func>(f))
        );
    }
    
    std::vector<std::thread> _threads;
    std::queue<std::unique_ptr<_task_container_base>> _tasks;
    std::mutex _task_mutex;
    std::condition_variable _task_cv;
    bool _stop_threads = false;
};

template <typename F, typename ...Args>
auto thread_pool::execute(F function, Args &&...args) {
    std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);
    std::packaged_task<std::invoke_result_t<F, Args...>()> task_pkg(
        std::bind(function, args...)
    );
    std::future<std::invoke_result_t<F, Args...>> future = task_pkg.get_future();

    queue_lock.lock();
    //this lambda move-captures the packaged_task declared above. Since the packaged_task
    //  type is not CopyConstructible, the function is not CopyConstructible either -
    //  hence the need for a _task_container to wrap around it.
    _tasks.emplace(
        allocate_task_container([task(std::move(task_pkg))]() mutable { task(); })
    );
    queue_lock.unlock();

    _task_cv.notify_one();

    return std::move(future);
}

threadpool.cpp

#include "threadpool.h"

thread_pool::thread_pool(size_t thread_count) {
    for (size_t i = 0; i < thread_count; ++i) {
        
        //start waiting threads. Workers listen for changes through
        //  the thread_pool member condition_variable
        _threads.emplace_back(
            std::thread(
                [&]() {
                std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);

                    while (true) {
                        queue_lock.lock();
                        _task_cv.wait(
                            queue_lock, 
                            [&]() -> bool { return !_tasks.empty() || _stop_threads; }
                        );

                        //used by dtor to stop all threads without having to
                        //  unceremoniously stop tasks. The tasks must all be finished,
                        //  lest we break a promise and risk a future object throwing
                        //  an exception.
                        if (_stop_threads && _tasks.empty()) return;

                        //to initialize temp_task, we must move the unique_ptr from the
                        //  queue to the local stack. Since a unique_ptr cannot be copied
                        //  (obviously), it must be explicitly moved. This transfers
                        //  ownership of the pointed-to object to *this, as specified in
                        //  20.11.1.2.1 [unique.ptr.single.ctor].
                        auto temp_task = std::move(_tasks.front());
                        
                        _tasks.pop();
                        queue_lock.unlock();

                        (*temp_task)();
                    }
                }
            )
        );
    }
}

thread_pool::~thread_pool() {
    _stop_threads = true;
    _task_cv.notify_all();

    for (std::thread &thread : _threads) {
        thread.join();
    }
}

driver.cpp (simple file to demonstrate usage. Not tested, does not need to be reviewed)

#include <iostream>
#include <vector>
#include "threadpool.h"

int multiply(int x, int y) {
    return x * y;
}

int main() {
    thread_pool pool(4);
    std::vector<std::future<int>> futures;
    
    for (const int &x : { 2, 4, 7, 13 }) {
        futures.push_back(pool.execute(multiply, x, 2));
    }
    
    for (auto &fut : futures) {
        std::cout << fut.get() << std::endl;
    }
    
    return 0;
}

2 Answers

  1. //since std::thread objects are not copiable, it doesn't make sense for a thread_pool
    //  to be copiable.
    

    True. The default copy constructor would be ill-formed, so it is not emitted, so you don't need to manually disable it. Same for the assignment operator. It's even worse for std::mutex and std::condition_variable which cannot even be moved. You can make them and implicitly thread_pool movable by using a std::unique_ptr instead, which might be a reasonable trade-off in favor of usability.

  2. I am required to specify the number of threads in the thread-pool. It would be nice if it would default to std::thread::hardware_concurrency() instead.

  3. There is a lack of forwarding. I want

    thread_pool{1}.execute([up = std::make_unique<int>(1)] (std::unique_ptr<int>) {},
        std::make_unique<int>(42));
    

    to compile, but it doesn't, because your std::bind(function, args...) makes a copy of the arguments and the callable. Simply doing

    std::bind(std::forward<Function>(function), std::forward<Args>(args)...)
    

    does not compile either and I don't like std::bind in general, so here is a lambda instead:

    [f = std::move(function), largs = std::make_tuple(std::forward<Args>(args)...)] () mutable {
        return std::apply(std::move(f), std::move(largs));
    }
    

    I heard that C++20 will support this properly and allow [largs = std::forward<Args>(args)...], but C++17 doesn't.

  4. [task(std::move(task_pkg))]() mutable { task(); } can be replaced by std::move(task_pkg).

  5. // This exists so that _Func may be inferred from f. You should not need to do that anymore with functions in C++17. That's what deduction guides are for. In theory you add

    template <typename F>
    _task_container(F) -> _task_container<std::decay_t<F>>;
    

    and can then replace allocate_task_container with _task_container. In practice ... things are broken.

Answered by nwp on February 6, 2021

Your code looks extremely nice and well-structured to me. It exhibits modern C++ coding idioms. You also include references to the standard in your code. All of these are greatly appreciated.

Here are some suggestions:

  1. I like sorting #includes according to alphabetical order. Like this:

    #include <condition_variable>
    #include <functional> //bind
    #include <future> //packaged_task
    #include <mutex>
    #include <queue>
    #include <thread>
    #include <type_traits> //invoke_result
    #include <vector>
    
  2. You do not put you class in a namespace. I would suggest doing so.

  3. The constructor of std::thread passes the Callable object by rvalue reference. Why not keep consistent with it?

  4. Instead of saying

    //F must be Callable, and invoking F with ...Args must be well-formed.
    

    in a comment, why not express your intent with code?

    template <typename F, typename... Args,
        std::enable_if_t<std::is_invocable_v<F&&, Args&&...>, int> = 0>
    auto execute(F&&, Args&&...);
    
  5. You precede all of your private types and data members with an underscore. This is probably a styling issue, but it is not really necessary since private members can't introduce name clash anyway.

  6. std::unique_ptr<_task_container_base> is repeated several types. Consider introducing a name for it. Furthermore, your allocate_task_container function repeats the return type. Instead of

    return std::unique_ptr<_task_container_base>(
        new _task_container<_Func>(std::forward<_Func>(f))
    );
    

    You can just use

    return new _task_container<_Func>(std::forward<_Func>(f));
    

Answered by L. F. on February 6, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP