# HG changeset patch # User Sebastien Jodogne # Date 1403703361 -7200 # Node ID a91e7b4080d114bc24b38d26cc2f0b257260bf09 # Parent 394a19d44f9d0825ee27e6d9814ca45f7a1822c2# Parent 8811abd6aec9134c18df710a3344cb6bb778e2be merge mainline -> lua-scripting diff -r 8811abd6aec9 -r a91e7b4080d1 CMakeLists.txt --- a/CMakeLists.txt Wed Jun 25 15:32:02 2014 +0200 +++ b/CMakeLists.txt Wed Jun 25 15:36:01 2014 +0200 @@ -148,6 +148,11 @@ OrthancServer/ServerToolbox.cpp OrthancServer/OrthancFindRequestHandler.cpp OrthancServer/OrthancMoveRequestHandler.cpp + + # From "lua-scripting" branch + OrthancServer/Scheduler/ServerFilterInstance.cpp + OrthancServer/Scheduler/ServerJob.cpp + OrthancServer/Scheduler/ServerScheduler.cpp ) diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/IServerFilter.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/IServerFilter.h Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,54 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include +#include + +namespace Orthanc +{ + class IServerFilter + { + public: + typedef std::list ListOfStrings; + + virtual ~IServerFilter() + { + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) = 0; + + virtual bool SendOutputsToSink() const = 0; + }; +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/ServerFilterInstance.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerFilterInstance.cpp Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,82 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "ServerFilterInstance.h" + +#include "../../Core/OrthancException.h" + +namespace Orthanc +{ + bool ServerFilterInstance::Execute(IListener& listener) + { + ListOfStrings outputs; + if (!filter_->Apply(outputs, inputs_)) + { + listener.SignalFailure(jobId_); + return true; + } + + for (std::list::iterator + it = next_.begin(); it != next_.end(); it++) + { + for (ListOfStrings::const_iterator + output = outputs.begin(); output != outputs.end(); output++) + { + (*it)->AddInput(*output); + } + } + + listener.SignalSuccess(jobId_); + return true; + } + + + ServerFilterInstance::ServerFilterInstance(IServerFilter *filter, + const std::string& jobId) : + filter_(filter), + jobId_(jobId) + { + if (filter_ == NULL) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + } + + + ServerFilterInstance::~ServerFilterInstance() + { + if (filter_ != NULL) + { + delete filter_; + } + } +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/ServerFilterInstance.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerFilterInstance.h Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,98 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "../../Core/IDynamicObject.h" +#include "IServerFilter.h" + +namespace Orthanc +{ + class ServerFilterInstance : public IDynamicObject + { + friend class ServerScheduler; + + public: + class IListener + { + public: + virtual ~IListener() + { + } + + virtual void SignalSuccess(const std::string& jobId) = 0; + + virtual void SignalFailure(const std::string& jobId) = 0; + }; + + private: + typedef IServerFilter::ListOfStrings ListOfStrings; + + IServerFilter *filter_; + std::string jobId_; + ListOfStrings inputs_; + std::list next_; + + bool Execute(IListener& listener); + + public: + ServerFilterInstance(IServerFilter *filter, + const std::string& jobId); + + virtual ~ServerFilterInstance(); + + const std::string& GetJobId() const + { + return jobId_; + } + + void AddInput(const std::string& input) + { + inputs_.push_back(input); + } + + void ConnectNext(ServerFilterInstance& filter) + { + next_.push_back(&filter); + } + + const std::list& GetNextFilters() const + { + return next_; + } + + IServerFilter& GetFilter() const + { + return *filter_; + } + }; +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/ServerJob.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerJob.cpp Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,126 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "ServerJob.h" + +#include "../../Core/OrthancException.h" +#include "../../Core/Toolbox.h" +#include "../../Core/Uuid.h" + +namespace Orthanc +{ + void ServerJob::CheckOrdering() + { + std::map index; + + unsigned int count = 0; + for (std::list::const_iterator + it = filters_.begin(); it != filters_.end(); it++) + { + index[*it] = count++; + } + + for (std::list::const_iterator + it = filters_.begin(); it != filters_.end(); it++) + { + const std::list& nextFilters = (*it)->GetNextFilters(); + + for (std::list::const_iterator + next = nextFilters.begin(); next != nextFilters.end(); next++) + { + if (index.find(*next) == index.end() || + index[*next] <= index[*it]) + { + // You must reorder your calls to "ServerJob::AddFilter" + throw OrthancException("Bad ordering of filters in a job"); + } + } + } + } + + + size_t ServerJob::Submit(SharedMessageQueue& target, + ServerFilterInstance::IListener& listener) + { + if (submitted_) + { + // This job has already been submitted + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + CheckOrdering(); + + size_t size = filters_.size(); + + for (std::list::iterator + it = filters_.begin(); it != filters_.end(); it++) + { + target.Enqueue(*it); + } + + filters_.clear(); + submitted_ = true; + + return size; + } + + + ServerJob::ServerJob() + { + jobId_ = Toolbox::GenerateUuid(); + submitted_ = false; + description_ = "no description"; + } + + + ServerJob::~ServerJob() + { + for (std::list::iterator + it = filters_.begin(); it != filters_.end(); it++) + { + delete *it; + } + } + + + ServerFilterInstance& ServerJob::AddFilter(IServerFilter* filter) + { + if (submitted_) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + filters_.push_back(new ServerFilterInstance(filter, jobId_)); + + return *filters_.back(); + } +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/ServerJob.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerJob.h Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,77 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "ServerFilterInstance.h" +#include "../../Core/MultiThreading/SharedMessageQueue.h" + +namespace Orthanc +{ + class ServerJob + { + friend class ServerScheduler; + + private: + std::list filters_; + std::string jobId_; + bool submitted_; + std::string description_; + + void CheckOrdering(); + + size_t Submit(SharedMessageQueue& target, + ServerFilterInstance::IListener& listener); + + public: + ServerJob(); + + ~ServerJob(); + + const std::string& GetId() const + { + return jobId_; + } + + void SetDescription(const char* description) + { + description_ = description; + } + + const std::string& GetDescription() const + { + return description_; + } + + ServerFilterInstance& AddFilter(IServerFilter* filter); + }; +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/ServerScheduler.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerScheduler.cpp Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,329 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#include "ServerScheduler.h" + +#include "../../Core/OrthancException.h" + +#include + +namespace Orthanc +{ + namespace + { + // Anonymous namespace to avoid clashes between compilation modules + class Sink : public IServerFilter + { + private: + ListOfStrings& target_; + + public: + Sink(ListOfStrings& target) : target_(target) + { + } + + virtual bool SendOutputsToSink() const + { + return false; + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) + { + for (ListOfStrings::const_iterator + it = inputs.begin(); it != inputs.end(); it++) + { + target_.push_back(*it); + } + + return true; + } + }; + } + + + ServerScheduler::JobInfo& ServerScheduler::GetJobInfo(const std::string& jobId) + { + Jobs::iterator info = jobs_.find(jobId); + + if (info == jobs_.end()) + { + throw OrthancException(ErrorCode_InternalError); + } + + return info->second; + } + + + void ServerScheduler::SignalSuccess(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo& info = GetJobInfo(jobId); + info.success_++; + + assert(info.failures_ == 0); + + if (info.success_ >= info.size_) + { + if (info.watched_) + { + watchedJobStatus_[jobId] = JobStatus_Success; + jobFinished_.notify_all(); + } + + LOG(INFO) << "Job successfully finished (" << info.description_ << ")"; + jobs_.erase(jobId); + } + } + + + void ServerScheduler::SignalFailure(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo& info = GetJobInfo(jobId); + info.failures_++; + + if (info.success_ + info.failures_ >= info.size_) + { + if (info.watched_) + { + watchedJobStatus_[jobId] = JobStatus_Failure; + jobFinished_.notify_all(); + } + + LOG(ERROR) << "Job has failed (" << info.description_ << ")"; + jobs_.erase(jobId); + } + } + + + void ServerScheduler::Worker(ServerScheduler* that) + { + static const int32_t TIMEOUT = 100; + + LOG(WARNING) << "The server scheduler has started"; + + while (!that->finish_) + { + std::auto_ptr object(that->queue_.Dequeue(TIMEOUT)); + if (object.get() != NULL) + { + ServerFilterInstance& filter = dynamic_cast(*object); + + // Skip the execution of this filter if its parent job has + // previously failed. + bool jobHasFailed; + { + boost::mutex::scoped_lock lock(that->mutex_); + JobInfo& info = that->GetJobInfo(filter.GetJobId()); + jobHasFailed = (info.failures_ > 0 || info.cancel_); + } + + if (jobHasFailed) + { + that->SignalFailure(filter.GetJobId()); + } + else + { + filter.Execute(*that); + } + } + } + } + + + void ServerScheduler::SubmitInternal(ServerJob& job, + bool watched) + { + boost::mutex::scoped_lock lock(mutex_); + + JobInfo info; + info.size_ = job.Submit(queue_, *this); + info.cancel_ = false; + info.success_ = 0; + info.failures_ = 0; + info.description_ = job.GetDescription(); + info.watched_ = watched; + + assert(info.size_ > 0); + + if (watched) + { + watchedJobStatus_[job.GetId()] = JobStatus_Running; + } + + jobs_[job.GetId()] = info; + + LOG(INFO) << "New job submitted (" << job.description_ << ")"; + } + + + ServerScheduler::ServerScheduler() + { + finish_ = false; + worker_ = boost::thread(Worker, this); + } + + + ServerScheduler::~ServerScheduler() + { + finish_ = true; + worker_.join(); + } + + + void ServerScheduler::Submit(ServerJob& job) + { + if (job.filters_.empty()) + { + return; + } + + SubmitInternal(job, false); + } + + + bool ServerScheduler::SubmitAndWait(ListOfStrings& outputs, + ServerJob& job) + { + std::string jobId = job.GetId(); + + outputs.clear(); + + if (job.filters_.empty()) + { + return true; + } + + // Add a sink filter to collect all the results of the filters + // that have no next filter. + ServerFilterInstance& sink = job.AddFilter(new Sink(outputs)); + + for (std::list::iterator + it = job.filters_.begin(); it != job.filters_.end(); it++) + { + if ((*it) != &sink && + (*it)->GetNextFilters().size() == 0 && + (*it)->GetFilter().SendOutputsToSink()) + { + (*it)->ConnectNext(sink); + } + } + + // Submit the job + SubmitInternal(job, true); + + // Wait for the job to complete (either success or failure) + JobStatus status; + + { + boost::mutex::scoped_lock lock(mutex_); + + assert(watchedJobStatus_.find(jobId) != watchedJobStatus_.end()); + + while (watchedJobStatus_[jobId] == JobStatus_Running) + { + jobFinished_.wait(lock); + } + + status = watchedJobStatus_[jobId]; + watchedJobStatus_.erase(jobId); + } + + return (status == JobStatus_Success); + } + + + bool ServerScheduler::IsRunning(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + return jobs_.find(jobId) != jobs_.end(); + } + + + void ServerScheduler::Cancel(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + Jobs::iterator job = jobs_.find(jobId); + + if (job != jobs_.end()) + { + job->second.cancel_ = true; + LOG(WARNING) << "Canceling a job (" << job->second.description_ << ")"; + } + } + + + float ServerScheduler::GetProgress(const std::string& jobId) + { + boost::mutex::scoped_lock lock(mutex_); + + Jobs::iterator job = jobs_.find(jobId); + + if (job == jobs_.end() || + job->second.size_ == 0 /* should never happen */) + { + // This job is not running + return 1; + } + + if (job->second.failures_ != 0) + { + return 1; + } + + if (job->second.size_ == 1) + { + return job->second.success_; + } + + return (static_cast(job->second.success_) / + static_cast(job->second.size_ - 1)); + } + + + void ServerScheduler::GetListOfJobs(ListOfStrings& jobs) + { + boost::mutex::scoped_lock lock(mutex_); + + jobs.clear(); + + for (Jobs::const_iterator + it = jobs_.begin(); it != jobs_.end(); it++) + { + jobs.push_back(it->first); + } + } +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/Scheduler/ServerScheduler.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/Scheduler/ServerScheduler.h Wed Jun 25 15:36:01 2014 +0200 @@ -0,0 +1,115 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2014 Medical Physics Department, CHU of Liege, + * Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + **/ + + +#pragma once + +#include "ServerJob.h" + +namespace Orthanc +{ + class ServerScheduler : public ServerFilterInstance::IListener + { + private: + struct JobInfo + { + bool watched_; + bool cancel_; + size_t size_; + size_t success_; + size_t failures_; + std::string description_; + }; + + enum JobStatus + { + JobStatus_Running = 1, + JobStatus_Success = 2, + JobStatus_Failure = 3 + }; + + typedef IServerFilter::ListOfStrings ListOfStrings; + typedef std::map Jobs; + + boost::mutex mutex_; + boost::condition_variable jobFinished_; + Jobs jobs_; + SharedMessageQueue queue_; + bool finish_; + boost::thread worker_; + std::map watchedJobStatus_; + + JobInfo& GetJobInfo(const std::string& jobId); + + virtual void SignalSuccess(const std::string& jobId); + + virtual void SignalFailure(const std::string& jobId); + + static void Worker(ServerScheduler* that); + + void SubmitInternal(ServerJob& job, + bool watched); + + public: + ServerScheduler(); + + ~ServerScheduler(); + + void Submit(ServerJob& job); + + bool SubmitAndWait(ListOfStrings& outputs, + ServerJob& job); + + bool IsRunning(const std::string& jobId); + + void Cancel(const std::string& jobId); + + // Returns a number between 0 and 1 + float GetProgress(const std::string& jobId); + + bool IsRunning(const ServerJob& job) + { + return IsRunning(job.GetId()); + } + + void Cancel(const ServerJob& job) + { + Cancel(job.GetId()); + } + + float GetProgress(const ServerJob& job) + { + return GetProgress(job.GetId()); + } + + void GetListOfJobs(ListOfStrings& jobs); + }; +} diff -r 8811abd6aec9 -r a91e7b4080d1 OrthancServer/ServerContext.h --- a/OrthancServer/ServerContext.h Wed Jun 25 15:32:02 2014 +0200 +++ b/OrthancServer/ServerContext.h Wed Jun 25 15:36:01 2014 +0200 @@ -40,6 +40,7 @@ #include "ServerIndex.h" #include "ParsedDicomFile.h" #include "DicomProtocol/ReusableDicomUserConnection.h" +#include "Scheduler/ServerScheduler.h" namespace Orthanc { @@ -73,6 +74,7 @@ boost::mutex dicomCacheMutex_; MemoryCache dicomCache_; ReusableDicomUserConnection scu_; + ServerScheduler scheduler_; LuaContext lua_; @@ -167,5 +169,10 @@ { return scu_; } + + ServerScheduler& GetScheduler() + { + return scheduler_; + } }; } diff -r 8811abd6aec9 -r a91e7b4080d1 UnitTestsSources/MultiThreading.cpp --- a/UnitTestsSources/MultiThreading.cpp Wed Jun 25 15:32:02 2014 +0200 +++ b/UnitTestsSources/MultiThreading.cpp Wed Jun 25 15:36:01 2014 +0200 @@ -32,8 +32,9 @@ #include "PrecompiledHeadersUnitTests.h" #include "gtest/gtest.h" +#include -#include +#include "../OrthancServer/Scheduler/ServerScheduler.h" #include "../Core/OrthancException.h" #include "../Core/Toolbox.h" @@ -248,8 +249,6 @@ - - #include "../OrthancServer/DicomProtocol/ReusableDicomUserConnection.h" TEST(ReusableDicomUserConnection, DISABLED_Basic) @@ -257,6 +256,7 @@ ReusableDicomUserConnection c; c.SetMillisecondsBeforeClose(200); printf("START\n"); fflush(stdout); + { ReusableDicomUserConnection::Locker lock(c, "STORESCP", "localhost", 2000, ModalityManufacturer_Generic); lock.GetConnection().StoreFile("/home/jodogne/DICOM/Cardiac/MR.X.1.2.276.0.7230010.3.1.4.2831157719.2256.1336386844.676281"); @@ -274,3 +274,110 @@ Toolbox::ServerBarrier(); printf("DONE\n"); fflush(stdout); } + + + +class Tutu : public IServerFilter +{ +private: + int factor_; + +public: + Tutu(int f) : factor_(f) + { + } + + virtual bool Apply(ListOfStrings& outputs, + const ListOfStrings& inputs) + { + for (ListOfStrings::const_iterator + it = inputs.begin(); it != inputs.end(); it++) + { + int a = boost::lexical_cast(*it); + int b = factor_ * a; + + printf("%d * %d = %d\n", a, factor_, b); + + //if (a == 84) { printf("BREAK\n"); return false; } + + outputs.push_back(boost::lexical_cast(b)); + } + + Toolbox::USleep(1000000); + + return true; + } + + virtual bool SendOutputsToSink() const + { + return true; + } +}; + + +static void Tata(ServerScheduler* s, ServerJob* j, bool* done) +{ + typedef IServerFilter::ListOfStrings ListOfStrings; + +#if 1 + while (!(*done)) + { + ListOfStrings l; + s->GetListOfJobs(l); + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + printf(">> %s: %0.1f\n", i->c_str(), 100.0f * s->GetProgress(*i)); + Toolbox::USleep(100000); + } +#else + ListOfStrings l; + s->GetListOfJobs(l); + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + printf(">> %s\n", i->c_str()); + Toolbox::USleep(1500000); + s->Cancel(*j); + Toolbox::USleep(1000000); + s->GetListOfJobs(l); + for (ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + printf(">> %s\n", i->c_str()); +#endif +} + + +TEST(Toto, Toto) +{ + ServerScheduler scheduler; + + ServerJob job; + ServerFilterInstance& f2 = job.AddFilter(new Tutu(2)); + ServerFilterInstance& f3 = job.AddFilter(new Tutu(3)); + ServerFilterInstance& f4 = job.AddFilter(new Tutu(4)); + ServerFilterInstance& f5 = job.AddFilter(new Tutu(5)); + f2.AddInput(boost::lexical_cast(42)); + //f3.AddInput(boost::lexical_cast(42)); + //f4.AddInput(boost::lexical_cast(42)); + f2.ConnectNext(f3); + f3.ConnectNext(f4); + f4.ConnectNext(f5); + + job.SetDescription("tutu"); + + bool done = false; + boost::thread t(Tata, &scheduler, &job, &done); + + + //scheduler.Submit(job); + + IServerFilter::ListOfStrings l; + scheduler.SubmitAndWait(l, job); + + for (IServerFilter::ListOfStrings::iterator i = l.begin(); i != l.end(); i++) + { + printf("** %s\n", i->c_str()); + } + + //Toolbox::ServerBarrier(); + //Toolbox::USleep(3000000); + + done = true; + t.join(); +}