# HG changeset patch # User Sebastien Jodogne # Date 1455897710 -3600 # Node ID b97aa579e85bc57652d5ac8a2e45ce77a1e203d7 # Parent d9e33b1651122f8a70ef5e53af3c208dd7b879ae bag of tasks diff -r d9e33b165112 -r b97aa579e85b CMakeLists.txt --- a/CMakeLists.txt Fri Feb 12 15:56:37 2016 +0100 +++ b/CMakeLists.txt Fri Feb 19 17:01:50 2016 +0100 @@ -121,6 +121,7 @@ Core/RestApi/RestApiPath.cpp Core/RestApi/RestApiOutput.cpp Core/RestApi/RestApi.cpp + Core/MultiThreading/BagOfTasksProcessor.cpp Core/MultiThreading/Mutex.cpp Core/MultiThreading/ReaderWriterLock.cpp Core/MultiThreading/RunnableWorkersPool.cpp diff -r d9e33b165112 -r b97aa579e85b Core/MultiThreading/BagOfTasks.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/BagOfTasks.h Fri Feb 19 17:01:50 2016 +0100 @@ -0,0 +1,82 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "../ICommand.h" + +#include + +namespace Orthanc +{ + class BagOfTasks : public boost::noncopyable + { + private: + typedef std::list Tasks; + + Tasks tasks_; + + public: + ~BagOfTasks() + { + for (Tasks::iterator it = tasks_.begin(); it != tasks_.end(); it++) + { + delete *it; + } + } + + ICommand* Pop() + { + ICommand* task = tasks_.front(); + tasks_.pop_front(); + return task; + } + + void Push(ICommand* task) // Takes ownership + { + if (task != NULL) + { + tasks_.push_back(task); + } + } + + size_t GetSize() const + { + return tasks_.size(); + } + + bool IsEmpty() const + { + return tasks_.empty(); + } + }; +} diff -r d9e33b165112 -r b97aa579e85b Core/MultiThreading/BagOfTasksProcessor.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/BagOfTasksProcessor.cpp Fri Feb 19 17:01:50 2016 +0100 @@ -0,0 +1,254 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "../PrecompiledHeaders.h" +#include "BagOfTasksProcessor.h" + +#include "../OrthancException.h" + + +namespace Orthanc +{ + class BagOfTasksProcessor::Task : public IDynamicObject + { + private: + uint64_t bag_; + std::auto_ptr command_; + + public: + Task(uint64_t bag, + ICommand* command) : + bag_(bag), + command_(command) + { + } + + bool Execute() + { + try + { + return command_->Execute(); + } + catch (OrthancException&) + { + return false; + } + catch (std::runtime_error&) + { + return false; + } + } + + uint64_t GetBag() + { + return bag_; + } + }; + + + void BagOfTasksProcessor::Worker(BagOfTasksProcessor* that) + { + while (that->continue_) + { + std::auto_ptr obj(that->queue_.Dequeue(100)); + if (obj.get() != NULL) + { + Task& task = *dynamic_cast(obj.get()); + + { + boost::mutex::scoped_lock lock(that->mutex_); + + Bags::iterator bag = that->bags_.find(task.GetBag()); + assert(bag != that->bags_.end()); + assert(bag->second.done_ < bag->second.size_); + + if (bag->second.status_ != BagStatus_Running) + { + // This bag of task has failed or is tagged as canceled, do nothing + bag->second.done_ += 1; + continue; + } + } + + bool success = task.Execute(); + + { + boost::mutex::scoped_lock lock(that->mutex_); + + Bags::iterator bag = that->bags_.find(task.GetBag()); + assert(bag != that->bags_.end()); + + if (!success) + { + bag->second.status_ = BagStatus_Failed; + } + + assert(bag->second.done_ < bag->second.size_); + bag->second.done_ += 1; + + if (bag->second.done_ == bag->second.size_) + { + that->exitStatus_[task.GetBag()] = (bag->second.status_ == BagStatus_Running); + that->bagFinished_.notify_all(); + } + } + } + } + } + + + void BagOfTasksProcessor::Cancel(int64_t bag) + { + boost::mutex::scoped_lock lock(mutex_); + + Bags::iterator it = bags_.find(bag); + if (it != bags_.end()) + { + it->second.status_ = BagStatus_Canceled; + } + } + + + bool BagOfTasksProcessor::Join(int64_t bag) + { + boost::mutex::scoped_lock lock(mutex_); + + while (continue_) + { + ExitStatus::iterator it = exitStatus_.find(bag); + if (it == exitStatus_.end()) // The bag is still running + { + bagFinished_.wait(lock); + } + else + { + bool status = it->second; + exitStatus_.erase(it); + return status; + } + } + + return false; // The processor is stopping + } + + + float BagOfTasksProcessor::GetProgress(int64_t bag) + { + boost::mutex::scoped_lock lock(mutex_); + + Bags::const_iterator it = bags_.find(bag); + if (it == bags_.end()) + { + // The bag of tasks has finished + return 1.0f; + } + else + { + return (static_cast(it->second.done_) / + static_cast(it->second.size_)); + } + } + + + bool BagOfTasksProcessor::Handle::Join() + { + if (hasJoined_) + { + return status_; + } + else + { + status_ = that_.Join(bag_); + hasJoined_ = true; + return status_; + } + } + + + BagOfTasksProcessor::BagOfTasksProcessor(size_t countThreads) : + countBags_(0), + continue_(true) + { + if (countThreads <= 0) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + + threads_.resize(countThreads); + + for (size_t i = 0; i < threads_.size(); i++) + { + threads_[i] = new boost::thread(Worker, this); + } + } + + + BagOfTasksProcessor::~BagOfTasksProcessor() + { + continue_ = false; + + bagFinished_.notify_all(); // Wakes up all the pending "Join()" + + for (size_t i = 0; i < threads_.size(); i++) + { + if (threads_[i]) + { + if (threads_[i]->joinable()) + { + threads_[i]->join(); + } + + delete threads_[i]; + threads_[i] = NULL; + } + } + } + + + BagOfTasksProcessor::Handle* BagOfTasksProcessor::Submit(BagOfTasks& tasks) + { + boost::mutex::scoped_lock lock(mutex_); + + uint64_t id = countBags_; + countBags_ += 1; + + Bag bag(tasks.GetSize()); + bags_[id] = bag; + + while (!tasks.IsEmpty()) + { + queue_.Enqueue(new Task(id, tasks.Pop())); + } + + return new Handle(*this, id); + } +} diff -r d9e33b165112 -r b97aa579e85b Core/MultiThreading/BagOfTasksProcessor.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/MultiThreading/BagOfTasksProcessor.h Fri Feb 19 17:01:50 2016 +0100 @@ -0,0 +1,145 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "BagOfTasks.h" +#include "SharedMessageQueue.h" + +#include +#include + +namespace Orthanc +{ + class BagOfTasksProcessor : public boost::noncopyable + { + private: + enum BagStatus + { + BagStatus_Running, + BagStatus_Canceled, + BagStatus_Failed + }; + + + struct Bag + { + size_t size_; + size_t done_; + BagStatus status_; + + Bag() : + size_(0), + done_(0), + status_(BagStatus_Failed) + { + } + + Bag(size_t size) : + size_(size), + done_(0), + status_(BagStatus_Running) + { + } + }; + + class Task; + + + typedef std::map Bags; + typedef std::map ExitStatus; + + SharedMessageQueue queue_; + + boost::mutex mutex_; + uint64_t countBags_; + Bags bags_; + std::vector threads_; + ExitStatus exitStatus_; + bool continue_; + + boost::condition_variable bagFinished_; + + static void Worker(BagOfTasksProcessor* that); + + void Cancel(int64_t bag); + + bool Join(int64_t bag); + + float GetProgress(int64_t bag); + + public: + class Handle : public boost::noncopyable + { + friend class BagOfTasksProcessor; + + private: + BagOfTasksProcessor& that_; + uint64_t bag_; + bool hasJoined_; + bool status_; + + Handle(BagOfTasksProcessor& that, + uint64_t bag) : + that_(that), + bag_(bag), + hasJoined_(false) + { + } + + public: + ~Handle() + { + Join(); + } + + void Cancel() + { + that_.Cancel(bag_); + } + + bool Join(); + + float GetProgress() + { + return that_.GetProgress(bag_); + } + }; + + + BagOfTasksProcessor(size_t countThreads); + + ~BagOfTasksProcessor(); + + Handle* Submit(BagOfTasks& tasks); + }; +}