thread-pool/main.cpp

247 lines
7.0 KiB
C++
Raw Normal View History

2016-05-02 14:49:52 +00:00
//
// main.cpp
// thread_pool v0.2a
//
// Created by Kristof Toth on 26/06/15.
// Copyright (c) 2015 Kristof Toth. All rights reserved.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see http://www.gnu.org/licenses.
//
// Generalized thread pool class.
// ( READ BEFORE USING IN PRODUCTION CODE )
// User's notes:
// - the recommended way to add a task to the queue is to use the
// add_task(F&& f, Args&&... args) format, where f is any kind of
// functor and (args...) are the parameters to the functor (any number/kind).
// - copying & moving not allowed (not sure if they'd make sense)
// - if a task is added to the pool via add_task(std::packaged_task<R()>& ),
// the client must grant, that the packaged_task object is NOT
// destructed before the task is finished. (otherwise it's undefined behaviour)
// this way of adding tasks is only recommended for micro-operations,
// where the overhead of add_task(F&& f, Args&&... args)'s use of operator new
// is too much (this is rarely the case).
//
/* necessary includes */
#include <iostream>
#include <thread>
#include <future>
#include <mutex>
#include <atomic>
#include <vector>
#include <deque>
#include <condition_variable>
#include <functional>
#include <memory>
/* for the tests */
#include <boost/multiprecision/cpp_int.hpp>
#include <boost/type_index.hpp>
class thread_pool
{
/* under the hood */
std::mutex mu;
std::condition_variable cond;
std::vector<std::thread> workers;
std::deque<std::function<void()> > queue;
std::atomic<bool> fin;
/* the main loop of the threads */
void loop();
public:
/* construction & destruction */
explicit thread_pool(unsigned int);
~thread_pool();
/* disallowed operations */
thread_pool(const thread_pool&) = delete;
thread_pool& operator=(const thread_pool&) = delete;
thread_pool(thread_pool&&) = delete;
thread_pool& operator=(thread_pool&&) = delete;
template <typename R> void add_task(std::packaged_task<R()>&& ) = delete;
template <typename R> void priority_task(std::packaged_task<R()>&& ) = delete;
/* adding tasks to the queue */
template <typename F, typename... Args>
auto add_task(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
template <typename F, typename... Args>
auto priority_task(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
template <typename R> void add_task(std::packaged_task<R()>& );
template <typename R> void priority_task(std::packaged_task<R()>& );
// template <typename R> void add_task(std::function<R()> ); << think about this
void add_task(std::function<void()> );
/* other operations */
void add_thread(size_t);
/* means of getting information */
inline size_t get_thread_num() const { return workers.size(); }
inline size_t get_queue_size() const { return queue.size(); }
};
thread_pool::thread_pool(unsigned int thcount):
fin(false)
{
for (int i = 0; i < thcount ; ++i)
workers.emplace_back(&thread_pool::loop, this);
}
thread_pool::~thread_pool()
{
fin = true;
cond.notify_all();
for (auto& i : workers)
i.join();
}
template <typename F, typename... Args>
auto thread_pool::add_task(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
auto pckgd_tsk = std::make_shared<std::packaged_task<decltype(f(args...))()> >
(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
{
std::lock_guard<std::mutex> lock(mu);
queue.emplace_back([pckgd_tsk](){ (*pckgd_tsk)(); });
}
cond.notify_one();
return pckgd_tsk->get_future();
}
template <typename F, typename... Args>
auto thread_pool::priority_task(F&& f, Args&&... args)
-> std::future<decltype(f(args...))>
{
auto pckgd_tsk = std::make_shared<std::packaged_task<decltype(f(args...))()> >
(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto ret_val= pckgd_tsk->get_future();
{
std::lock_guard<std::mutex> lock(mu);
queue.emplace_front([pckgd_tsk](){ (*pckgd_tsk)(); });
}
cond.notify_one();
return ret_val;
}
template <typename R>
void thread_pool::add_task(std::packaged_task<R()>& arg)
{
{
std::lock_guard<std::mutex> lock(mu);
queue.emplace_back([&arg](){ arg(); });
}
cond.notify_one();
}
template <typename R>
void thread_pool::priority_task(std::packaged_task<R()>& arg)
{
{
std::lock_guard<std::mutex> lock(mu);
queue.emplace_front([&arg](){ arg(); });
}
cond.notify_one();
}
void thread_pool::add_task(std::function<void()> func)
{
{
std::lock_guard<std::mutex> lock(mu);
queue.push_back(std::move(func));
}
cond.notify_one();
}
void thread_pool::add_thread(size_t num = 1)
{
for (int i = 0; i < num; ++i)
{
workers.emplace_back(&thread_pool::loop, this);
}
}
void thread_pool::loop()
{
std::function<void()> fun;
while (true)
{
{
std::unique_lock<std::mutex> lock(mu);
while (!fin && queue.empty())
cond.wait(lock);
if (fin)
return;
fun = std::move(queue.front());
queue.pop_front();
}
fun();
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////// functions for testing & test zone ////////////////////
///////////////////////////////////////////////////////////////////////////////////////////////////////////
boost::multiprecision::cpp_int mp_factorial(unsigned long long int number)
{
boost::multiprecision::cpp_int num = number;
for (unsigned long long int i = number; i > 1; --i)
{
num *=(--number);
}
return num;
}
int main(void)
{
using std::cout;
using std::cin;
using std::endl;
namespace ch = std::chrono;
namespace mp = boost::multiprecision;
thread_pool tp(4);
auto fu1 = tp.add_task(mp_factorial, 6);
std::packaged_task<mp::cpp_int()> pt(std::bind(mp_factorial, 5));
tp.add_task(pt);
auto fu2 = pt.get_future();
cout << fu1.get() << " " << fu2.get() << endl;
cout << tp.get_thread_num() << endl;
cout << "Type of fu1: " << boost::typeindex::type_id_with_cvr<decltype(fu1)>().pretty_name() << endl;
return 0;
}