// // main.cpp // thread_pool v0.2a // // Created by Kristof Toth on 26/06/15. // // Copyright © 2015 Kristof Toth // This work is free. You can redistribute it and/or modify it under the // terms of the Do What The Fuck You Want To Public License, Version 2, // as published by Sam Hocevar. See http://www.wtfpl.net/ for more details. // // Generalized thread pool class. // ( READ BEFORE USING IN PRODUCTION CODE ) // User's notes: // - the recommended way to add a task to the queue is to use the // add_task(F&& f, Args&&... args) format, where f is any kind of // functor and (args...) are the parameters to the functor (any number/kind). // - copying & moving not allowed (not sure if they'd make sense) // - if a task is added to the pool via add_task(std::packaged_task& ), // the client must grant, that the packaged_task object is NOT // destructed before the task is finished. (otherwise it's undefined behaviour) // this way of adding tasks is only recommended for micro-operations, // where the overhead of add_task(F&& f, Args&&... args)'s use of operator new // is too much (this is rarely the case). // /* necessary includes */ #include #include #include #include #include #include #include #include #include #include /* for the tests */ #include #include //#include //#include //#include class thread_pool { /* under the hood */ std::mutex mu; std::condition_variable cond; std::vector workers; std::deque > queue; std::atomic fin; /* the main loop of the threads */ void loop(); public: /* construction & destruction */ explicit thread_pool(unsigned int); ~thread_pool(); /* disallowed operations */ thread_pool(const thread_pool&) = delete; thread_pool& operator=(const thread_pool&) = delete; thread_pool(thread_pool&&) = delete; thread_pool& operator=(thread_pool&&) = delete; template void add_task(std::packaged_task&& ) = delete; template void priority_task(std::packaged_task&& ) = delete; /* adding tasks to the queue */ template auto add_task(F&& f, Args&&... args) -> std::future; template auto priority_task(F&& f, Args&&... args) -> std::future; template void add_task(std::packaged_task& ); template void priority_task(std::packaged_task& ); // template void add_task(std::function ); << think about this void add_task(std::function ); /* other operations */ void add_thread(size_t); /* means of getting information */ inline size_t get_thread_num() const { return workers.size(); } inline size_t get_queue_size() const { return queue.size(); } }; thread_pool::thread_pool(unsigned int thcount): fin(false) { for (int i = 0; i < thcount ; ++i) workers.emplace_back(&thread_pool::loop, this); } thread_pool::~thread_pool() { fin = true; cond.notify_all(); for (auto& i : workers) i.join(); } template auto thread_pool::add_task(F&& f, Args&&... args) -> std::future { auto pckgd_tsk = std::make_shared > (std::bind(std::forward(f), std::forward(args)...)); { std::lock_guard lock(mu); queue.emplace_back([pckgd_tsk](){ (*pckgd_tsk)(); }); } cond.notify_one(); return pckgd_tsk->get_future(); } template auto thread_pool::priority_task(F&& f, Args&&... args) -> std::future { auto pckgd_tsk = std::make_shared > (std::bind(std::forward(f), std::forward(args)...)); auto ret_val= pckgd_tsk->get_future(); { std::lock_guard lock(mu); queue.emplace_front([pckgd_tsk](){ (*pckgd_tsk)(); }); } cond.notify_one(); return ret_val; } template void thread_pool::add_task(std::packaged_task& arg) { { std::lock_guard lock(mu); queue.emplace_back([&arg](){ arg(); }); } cond.notify_one(); } template void thread_pool::priority_task(std::packaged_task& arg) { { std::lock_guard lock(mu); queue.emplace_front([&arg](){ arg(); }); } cond.notify_one(); } void thread_pool::add_task(std::function func) { { std::lock_guard lock(mu); queue.push_back(std::move(func)); } cond.notify_one(); } void thread_pool::add_thread(size_t num = 1) { for (int i = 0; i < num; ++i) { workers.emplace_back(&thread_pool::loop, this); } } void thread_pool::loop() { std::function fun; while (true) { { std::unique_lock lock(mu); while (!fin && queue.empty()) cond.wait(lock); if (fin) return; fun = std::move(queue.front()); queue.pop_front(); } fun(); } } /////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////// functions for testing & test zone //////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////////// boost::multiprecision::cpp_int mp_factorial(unsigned long long int number) { boost::multiprecision::cpp_int num = number; for (unsigned long long int i = number; i > 1; --i) { num *=(--number); } return num; } void simpleTask() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } void function_vs_packaged_task_test(thread_pool &tp, const unsigned short TIMES = 10000) { namespace ch = std::chrono; namespace mp = boost::multiprecision; auto fstart = ch::high_resolution_clock::now(); for (int j = 0; j < TIMES; ++j) { tp.add_task(simpleTask); } auto fdur = ch::high_resolution_clock::now() - fstart; auto pt = std::packaged_task(simpleTask); auto pstart = ch::high_resolution_clock::now(); for (int j = 0; j < TIMES; ++j) { tp.add_task(pt); } auto pdur = ch::high_resolution_clock::now() - pstart; std::cout << "Function: " << ch::duration_cast(fdur).count() << std::endl << "Packaged task: " << ch::duration_cast(pdur).count() << std::endl; } void factorial_test(thread_pool& tp, unsigned long long number) { std::packaged_task pt(std::bind(mp_factorial, number-1)); tp.add_task(pt); auto fu1 = tp.add_task(mp_factorial, number); auto fu2 = pt.get_future(); std::cout << "Results for n & n-1: " << fu1.get() << " " << fu2.get() << std::endl; std::cout << "With " << tp.get_thread_num() << " threads" << std::endl; std::cout << "Type of future: " << boost::typeindex::type_id_with_cvr().pretty_name() << std::endl; } void boost_vs_me_test(thread_pool& tp) { /* TODO */ } int main(void) { thread_pool tp(4); factorial_test(tp, 6); function_vs_packaged_task_test(tp, 10000); return 0; }