Mercurial > hg > orthanc
changeset 2603:988936118354 jobs
reorganization
line wrap: on
line diff
--- a/CMakeLists.txt Fri May 18 15:34:11 2018 +0200 +++ b/CMakeLists.txt Fri May 18 17:02:25 2018 +0200 @@ -90,6 +90,11 @@ OrthancServer/ServerContext.cpp OrthancServer/ServerEnumerations.cpp OrthancServer/ServerIndex.cpp + OrthancServer/ServerJobs/DeleteResourceOperation.cpp + OrthancServer/ServerJobs/DicomModalityStoreJob.cpp + OrthancServer/ServerJobs/LuaJobManager.cpp + OrthancServer/ServerJobs/OrthancPeerStoreJob.cpp + OrthancServer/ServerJobs/StoreScuOperation.cpp OrthancServer/ServerToolbox.cpp OrthancServer/SliceOrdering.cpp )
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/DicomNetworking/TimeoutDicomConnectionManager.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,136 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeaders.h" +#include "TimeoutDicomConnectionManager.h" + +#include "../OrthancException.h" + +namespace Orthanc +{ + static boost::posix_time::ptime GetNow() + { + return boost::posix_time::microsec_clock::universal_time(); + } + + class TimeoutDicomConnectionManager::Resource : public IDicomConnectionManager::IResource + { + private: + TimeoutDicomConnectionManager& that_; + boost::mutex::scoped_lock lock_; + + public: + Resource(TimeoutDicomConnectionManager& that) : + that_(that), + lock_(that.mutex_) + { + } + + ~Resource() + { + that_.Touch(); + } + + DicomUserConnection& GetConnection() + { + if (that_.connection_.get() == NULL) + { + throw OrthancException(ErrorCode_InternalError); + } + + return *that_.connection_; + } + }; + + + void TimeoutDicomConnectionManager::Touch() + { + lastUse_ = GetNow(); + } + + + void TimeoutDicomConnectionManager::CheckTimeoutInternal() + { + if (connection_.get() != NULL && + (GetNow() - lastUse_) >= timeout_) + { + connection_.reset(NULL); + } + } + + + void TimeoutDicomConnectionManager::SetTimeout(unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); + + timeout_ = boost::posix_time::milliseconds(timeout); + CheckTimeoutInternal(); + } + + + unsigned int TimeoutDicomConnectionManager::GetTimeout() + { + boost::mutex::scoped_lock lock(mutex_); + return timeout_.total_milliseconds(); + } + + + void TimeoutDicomConnectionManager::Close() + { + boost::mutex::scoped_lock lock(mutex_); + connection_.reset(NULL); + } + + + void TimeoutDicomConnectionManager::CheckTimeout() + { + boost::mutex::scoped_lock lock(mutex_); + CheckTimeoutInternal(); + } + + + IDicomConnectionManager::IResource* + TimeoutDicomConnectionManager::AcquireConnection(const std::string& localAet, + const RemoteModalityParameters& remote) + { + boost::mutex::scoped_lock lock(mutex_); + + if (connection_.get() == NULL || + !connection_->IsSameAssociation(localAet, remote)) + { + connection_.reset(new DicomUserConnection(localAet, remote)); + } + + return new Resource(*this); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/DicomNetworking/TimeoutDicomConnectionManager.h Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,73 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "IDicomConnectionManager.h" + +#include <boost/thread/mutex.hpp> + +namespace Orthanc +{ + class TimeoutDicomConnectionManager : public IDicomConnectionManager + { + private: + class Resource; + + boost::mutex mutex_; + std::auto_ptr<DicomUserConnection> connection_; + boost::posix_time::ptime lastUse_; + boost::posix_time::time_duration timeout_; + + void Touch(); + + void CheckTimeoutInternal(); + + public: + TimeoutDicomConnectionManager() : + timeout_(boost::posix_time::milliseconds(1000)) + { + } + + void SetTimeout(unsigned int timeout); + + unsigned int GetTimeout(); + + void Close(); + + void CheckTimeout(); + + virtual IResource* AcquireConnection(const std::string& localAet, + const RemoteModalityParameters& remote); + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,312 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../../PrecompiledHeaders.h" +#include "SequenceOfOperationsJob.h" + +#include "../../Logging.h" +#include "../../OrthancException.h" + +namespace Orthanc +{ + class SequenceOfOperationsJob::Operation : public boost::noncopyable + { + private: + JobOperationValues originalInputs_; + JobOperationValues workInputs_; + std::auto_ptr<IJobOperation> operation_; + std::list<Operation*> nextOperations_; + size_t currentInput_; + + public: + Operation(IJobOperation* operation) : + operation_(operation), + currentInput_(0) + { + if (operation == NULL) + { + throw OrthancException(ErrorCode_NullPointer); + } + } + + void AddOriginalInput(const JobOperationValue& value) + { + if (currentInput_ != 0) + { + // Cannot add input after processing has started + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + originalInputs_.Append(value.Clone()); + } + } + + const JobOperationValues& GetOriginalInputs() const + { + return originalInputs_; + } + + void Reset() + { + workInputs_.Clear(); + currentInput_ = 0; + } + + void AddNextOperation(Operation& other) + { + if (currentInput_ != 0) + { + // Cannot add input after processing has started + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + nextOperations_.push_back(&other); + } + } + + bool IsDone() const + { + return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); + } + + void Step() + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + const JobOperationValue* input; + + if (currentInput_ < originalInputs_.GetSize()) + { + input = &originalInputs_.GetValue(currentInput_); + } + else + { + input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); + } + + JobOperationValues outputs; + operation_->Apply(outputs, *input); + + if (!nextOperations_.empty()) + { + std::list<Operation*>::iterator first = nextOperations_.begin(); + outputs.Move((*first)->workInputs_); + + std::list<Operation*>::iterator current = first; + ++current; + + while (current != nextOperations_.end()) + { + (*first)->workInputs_.Copy((*current)->workInputs_); + ++current; + } + } + + currentInput_ += 1; + } + }; + + + // Invoked from constructors + void SequenceOfOperationsJob::Setup() + { + done_ = false; + current_ = 0; + trailingTimeout_ = boost::posix_time::milliseconds(1000); + } + + + SequenceOfOperationsJob::~SequenceOfOperationsJob() + { + for (size_t i = 0; i < operations_.size(); i++) + { + if (operations_[i] != NULL) + { + delete operations_[i]; + } + } + } + + + void SequenceOfOperationsJob::Register(IObserver& observer) + { + boost::mutex::scoped_lock lock(mutex_); + observers_.push_back(&observer); + } + + + void SequenceOfOperationsJob::Lock::SetTrailingOperationTimeout(unsigned int timeout) + { + that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); + } + + + size_t SequenceOfOperationsJob::Lock::AddOperation(IJobOperation* operation) + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + + that_.operations_.push_back(new Operation(operation)); + that_.operationAdded_.notify_one(); + + return that_.operations_.size() - 1; + } + + + void SequenceOfOperationsJob::Lock::AddInput(size_t index, + const JobOperationValue& value) + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (index >= that_.operations_.size() || + index < that_.current_) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + that_.operations_[index]->AddOriginalInput(value); + } + } + + + void SequenceOfOperationsJob::Lock::Connect(size_t input, + size_t output) + { + if (IsDone()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (input >= output || + input >= that_.operations_.size() || + output >= that_.operations_.size() || + input < that_.current_ || + output < that_.current_) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + Operation& a = *that_.operations_[input]; + Operation& b = *that_.operations_[output]; + a.AddNextOperation(b); + } + } + + + JobStepResult SequenceOfOperationsJob::ExecuteStep() + { + boost::mutex::scoped_lock lock(mutex_); + + if (current_ == operations_.size()) + { + LOG(INFO) << "Executing the trailing timeout in the sequence of operations"; + operationAdded_.timed_wait(lock, trailingTimeout_); + + if (current_ == operations_.size()) + { + // No operation was added during the trailing timeout: The + // job is over + LOG(INFO) << "The sequence of operations is over"; + done_ = true; + + for (std::list<IObserver*>::iterator it = observers_.begin(); + it != observers_.end(); ++it) + { + (*it)->SignalDone(*this); + } + + return JobStepResult::Success(); + } + else + { + LOG(INFO) << "New operation added to the sequence of operations"; + } + } + + assert(current_ < operations_.size()); + + while (current_ < operations_.size() && + operations_[current_]->IsDone()) + { + current_++; + } + + if (current_ < operations_.size()) + { + operations_[current_]->Step(); + } + + return JobStepResult::Continue(); + } + + + void SequenceOfOperationsJob::SignalResubmit() + { + boost::mutex::scoped_lock lock(mutex_); + + current_ = 0; + done_ = false; + + for (size_t i = 0; i < operations_.size(); i++) + { + operations_[i]->Reset(); + } + } + + + float SequenceOfOperationsJob::GetProgress() + { + boost::mutex::scoped_lock lock(mutex_); + + return (static_cast<float>(current_) / + static_cast<float>(operations_.size() + 1)); + } + + + void SequenceOfOperationsJob::GetPublicContent(Json::Value& value) + { + boost::mutex::scoped_lock lock(mutex_); + + value["CountOperations"] = static_cast<unsigned int>(operations_.size()); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.h Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,153 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../IJob.h" +#include "IJobOperation.h" + +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp> + +#include <list> + +namespace Orthanc +{ + class SequenceOfOperationsJob : public IJob + { + public: + class IObserver : public boost::noncopyable + { + public: + virtual ~IObserver() + { + } + + virtual void SignalDone(const SequenceOfOperationsJob& job) = 0; + }; + + private: + class Operation; + + std::string jobType_; + bool done_; + boost::mutex mutex_; + std::vector<Operation*> operations_; + size_t current_; + boost::condition_variable operationAdded_; + boost::posix_time::time_duration trailingTimeout_; + std::list<IObserver*> observers_; + + void Setup(); + + public: + SequenceOfOperationsJob() : + jobType_("SequenceOfOperations") + { + Setup(); + } + + SequenceOfOperationsJob(const std::string& jobType) : + jobType_(jobType) + { + Setup(); + } + + virtual ~SequenceOfOperationsJob(); + + void Register(IObserver& observer); + + // This lock allows adding new operations to the end of the job, + // from another thread than the worker thread, after the job has + // been submitted for processing + class Lock : public boost::noncopyable + { + private: + SequenceOfOperationsJob& that_; + boost::mutex::scoped_lock lock_; + + public: + Lock(SequenceOfOperationsJob& that) : + that_(that), + lock_(that.mutex_) + { + } + + bool IsDone() const + { + return that_.done_; + } + + void SetTrailingOperationTimeout(unsigned int timeout); + + size_t AddOperation(IJobOperation* operation); + + size_t GetOperationsCount() const + { + return that_.operations_.size(); + } + + void AddInput(size_t index, + const JobOperationValue& value); + + void Connect(size_t input, + size_t output); + }; + + virtual void Start() + { + } + + virtual JobStepResult ExecuteStep(); + + virtual void SignalResubmit(); + + virtual void ReleaseResources() + { + } + + virtual float GetProgress(); + + virtual void GetJobType(std::string& target) + { + target = jobType_; + } + + virtual void GetPublicContent(Json::Value& value); + + virtual void GetInternalContent(Json::Value& value) + { + // TODO + } + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/DeleteResourceOperation.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,71 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeadersServer.h" +#include "DeleteResourceOperation.h" + +#include "DicomInstanceOperationValue.h" + +#include "../../Core/Logging.h" +#include "../../Core/OrthancException.h" + +namespace Orthanc +{ + void DeleteResourceOperation::Apply(JobOperationValues& outputs, + const JobOperationValue& input) + { + switch (input.GetType()) + { + case JobOperationValue::Type_DicomInstance: + { + const DicomInstanceOperationValue& instance = dynamic_cast<const DicomInstanceOperationValue&>(input); + LOG(INFO) << "Deleting instance: " << instance.GetId(); + + try + { + Json::Value tmp; + context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What(); + } + + break; + } + + default: + break; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/DeleteResourceOperation.h Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,57 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../../Core/JobsEngine/Operations/IJobOperation.h" + +#include "../ServerContext.h" + +namespace Orthanc +{ + class DeleteResourceOperation : public IJobOperation + { + private: + ServerContext& context_; + + public: + DeleteResourceOperation(ServerContext& context) : + context_(context) + { + } + + virtual void Apply(JobOperationValues& outputs, + const JobOperationValue& input); + }; +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/DicomInstanceOperationValue.h Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,72 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../../Core/JobsEngine/Operations/JobOperationValue.h" + +#include "../ServerContext.h" + +namespace Orthanc +{ + class DicomInstanceOperationValue : public JobOperationValue + { + private: + ServerContext& context_; + std::string id_; + + public: + DicomInstanceOperationValue(ServerContext& context, + const std::string& id) : + JobOperationValue(Type_DicomInstance), + context_(context), + id_(id) + { + } + + const std::string& GetId() const + { + return id_; + } + + void ReadContent(std::string& dicom) const + { + context_.ReadDicom(dicom, id_); + } + + virtual JobOperationValue* Clone() const + { + return new DicomInstanceOperationValue(context_, id_); + } + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/DicomModalityStoreJob.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,176 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeadersServer.h" +#include "DicomModalityStoreJob.h" + +#include "../../Core/Logging.h" + +namespace Orthanc +{ + void DicomModalityStoreJob::OpenConnection() + { + if (connection_.get() == NULL) + { + connection_.reset(new DicomUserConnection); + connection_->SetLocalApplicationEntityTitle(localAet_); + connection_->SetRemoteModality(remote_); + } + } + + + bool DicomModalityStoreJob::HandleInstance(const std::string& instance) + { + OpenConnection(); + + LOG(INFO) << "Sending instance " << instance << " to modality \"" + << remote_.GetApplicationEntityTitle() << "\""; + + std::string dicom; + context_.ReadDicom(dicom, instance); + + if (HasMoveOriginator()) + { + connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_); + } + else + { + connection_->Store(dicom); + } + + //boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + return true; + } + + + DicomModalityStoreJob::DicomModalityStoreJob(ServerContext& context) : + context_(context), + localAet_("ORTHANC"), + moveOriginatorId_(0) // By default, not a C-MOVE + { + } + + + void DicomModalityStoreJob::SetLocalAet(const std::string& aet) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + localAet_ = aet; + } + } + + + void DicomModalityStoreJob::SetRemoteModality(const RemoteModalityParameters& remote) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + remote_ = remote; + } + } + + + const std::string& DicomModalityStoreJob::GetMoveOriginatorAet() const + { + if (HasMoveOriginator()) + { + return moveOriginatorAet_; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + + uint16_t DicomModalityStoreJob::GetMoveOriginatorId() const + { + if (HasMoveOriginator()) + { + return moveOriginatorId_; + } + else + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + } + + + void DicomModalityStoreJob::SetMoveOriginator(const std::string& aet, + int id) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else if (id < 0 || + id >= 65536) + { + throw OrthancException(ErrorCode_ParameterOutOfRange); + } + else + { + moveOriginatorId_ = static_cast<uint16_t>(id); + moveOriginatorAet_ = aet; + } + } + + void DicomModalityStoreJob::ReleaseResources() // For pausing jobs + { + connection_.reset(NULL); + } + + + void DicomModalityStoreJob::GetPublicContent(Json::Value& value) + { + value["LocalAet"] = localAet_; + value["RemoteAet"] = remote_.GetApplicationEntityTitle(); + + if (HasMoveOriginator()) + { + value["MoveOriginatorAET"] = GetMoveOriginatorAet(); + value["MoveOriginatorID"] = GetMoveOriginatorId(); + } + + value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size()); + value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size()); + } +}
--- a/OrthancServer/ServerJobs/DicomModalityStoreJob.h Fri May 18 15:34:11 2018 +0200 +++ b/OrthancServer/ServerJobs/DicomModalityStoreJob.h Fri May 18 17:02:25 2018 +0200 @@ -36,6 +36,7 @@ #include "../../Core/JobsEngine/SetOfInstancesJob.h" #include "../../Core/DicomNetworking/DicomUserConnection.h" +#include "../ServerContext.h" namespace Orthanc { @@ -49,154 +50,47 @@ uint16_t moveOriginatorId_; std::auto_ptr<DicomUserConnection> connection_; - void OpenConnection() - { - if (connection_.get() == NULL) - { - connection_.reset(new DicomUserConnection); - connection_->SetLocalApplicationEntityTitle(localAet_); - connection_->SetRemoteModality(remote_); - } - } + void OpenConnection(); protected: - virtual bool HandleInstance(const std::string& instance) - { - OpenConnection(); - - LOG(INFO) << "Sending instance " << instance << " to modality \"" - << remote_.GetApplicationEntityTitle() << "\""; - - std::string dicom; - context_.ReadDicom(dicom, instance); - - if (HasMoveOriginator()) - { - connection_->Store(dicom, moveOriginatorAet_, moveOriginatorId_); - } - else - { - connection_->Store(dicom); - } - - //boost::this_thread::sleep(boost::posix_time::milliseconds(500)); - - return true; - } + virtual bool HandleInstance(const std::string& instance); public: - DicomModalityStoreJob(ServerContext& context) : - context_(context), - localAet_("ORTHANC"), - moveOriginatorId_(0) // By default, not a C-MOVE - { - } + DicomModalityStoreJob(ServerContext& context); const std::string& GetLocalAet() const { return localAet_; } - void SetLocalAet(const std::string& aet) - { - if (IsStarted()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - localAet_ = aet; - } - } + void SetLocalAet(const std::string& aet); const RemoteModalityParameters& GetRemoteModality() const { return remote_; } - void SetRemoteModality(const RemoteModalityParameters& remote) - { - if (IsStarted()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - remote_ = remote; - } - } + void SetRemoteModality(const RemoteModalityParameters& remote); bool HasMoveOriginator() const { return moveOriginatorId_ != 0; } - const std::string& GetMoveOriginatorAet() const - { - if (HasMoveOriginator()) - { - return moveOriginatorAet_; - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } + const std::string& GetMoveOriginatorAet() const; - uint16_t GetMoveOriginatorId() const - { - if (HasMoveOriginator()) - { - return moveOriginatorId_; - } - else - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - } + uint16_t GetMoveOriginatorId() const; void SetMoveOriginator(const std::string& aet, - int id) - { - if (IsStarted()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else if (id < 0 || - id >= 65536) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - else - { - moveOriginatorId_ = static_cast<uint16_t>(id); - moveOriginatorAet_ = aet; - } - } + int id); - virtual void ReleaseResources() // For pausing jobs - { - connection_.reset(NULL); - } + virtual void ReleaseResources(); virtual void GetJobType(std::string& target) { target = "DicomModalityStore"; } - virtual void GetPublicContent(Json::Value& value) - { - value["LocalAet"] = localAet_; - value["RemoteAet"] = remote_.GetApplicationEntityTitle(); - - if (HasMoveOriginator()) - { - value["MoveOriginatorAET"] = GetMoveOriginatorAet(); - value["MoveOriginatorID"] = GetMoveOriginatorId(); - } - - value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size()); - value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size()); - } + virtual void GetPublicContent(Json::Value& value); }; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/LuaJobManager.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,131 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeadersServer.h" +#include "LuaJobManager.h" + + +namespace Orthanc +{ + void LuaJobManager::ConnectionTimeoutThread(LuaJobManager* manager) + { + while (manager->continue_) + { + manager->connectionManager_.CheckTimeout(); + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + } + + + void LuaJobManager::SignalDone(const SequenceOfOperationsJob& job) + { + boost::mutex::scoped_lock lock(mutex_); + + if (&job == currentJob_) + { + currentId_.clear(); + currentJob_ = NULL; + } + } + + + LuaJobManager::LuaJobManager(JobsEngine& engine) : + engine_(engine), + currentJob_(NULL), + maxOperations_(1000), + priority_(0), + continue_(true) + { + connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this); + } + + + LuaJobManager::~LuaJobManager() + { + continue_ = false; + + if (connectionTimeoutThread_.joinable()) + { + connectionTimeoutThread_.join(); + } + } + + + void LuaJobManager::SetMaxOperationsPerJob(size_t count) + { + boost::mutex::scoped_lock lock(mutex_); + maxOperations_ = count; + } + + + void LuaJobManager::SetPriority(int priority) + { + boost::mutex::scoped_lock lock(mutex_); + priority_ = priority; + } + + + void LuaJobManager::SetTrailingOperationTimeout(unsigned int timeout) + { + boost::mutex::scoped_lock lock(mutex_); + trailingTimeout_ = timeout; + } + + + LuaJobManager::Lock* LuaJobManager::Modify() + { + boost::mutex::scoped_lock lock(mutex_); + + if (currentJob_ != NULL) + { + std::auto_ptr<Lock> result(new Lock(*currentJob_)); + + if (!result->IsDone() && + result->GetOperationsCount() < maxOperations_) + { + return result.release(); + } + } + + // Need to create a new job, as the previous one is either + // finished, or is getting too long + currentJob_ = new SequenceOfOperationsJob; + + engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); + + std::auto_ptr<Lock> result(new Lock(*currentJob_)); + result->SetTrailingOperationTimeout(trailingTimeout_); + + return result.release(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/LuaJobManager.h Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,76 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../../Core/DicomNetworking/TimeoutDicomConnectionManager.h" +#include "../../Core/JobsEngine/JobsEngine.h" +#include "../../Core/JobsEngine/Operations/SequenceOfOperationsJob.h" + +namespace Orthanc +{ + class LuaJobManager : private SequenceOfOperationsJob::IObserver + { + public: + typedef SequenceOfOperationsJob::Lock Lock; + + private: + boost::mutex mutex_; + JobsEngine& engine_; + TimeoutDicomConnectionManager connectionManager_; + std::string currentId_; + SequenceOfOperationsJob* currentJob_; + size_t maxOperations_; + int priority_; + unsigned int trailingTimeout_; + bool continue_; + boost::thread connectionTimeoutThread_; + + static void ConnectionTimeoutThread(LuaJobManager* manager); + + virtual void SignalDone(const SequenceOfOperationsJob& job); + + public: + LuaJobManager(JobsEngine& engine); + + ~LuaJobManager(); + + void SetMaxOperationsPerJob(size_t count); + + void SetPriority(int priority); + + void SetTrailingOperationTimeout(unsigned int timeout); + + Lock* Modify(); + }; +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/OrthancPeerStoreJob.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,97 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeadersServer.h" +#include "OrthancPeerStoreJob.h" + +#include "../../Core/Logging.h" + + +namespace Orthanc +{ + bool OrthancPeerStoreJob::HandleInstance(const std::string& instance) + { + //boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + + if (client_.get() == NULL) + { + client_.reset(new HttpClient(peer_, "instances")); + client_->SetMethod(HttpMethod_Post); + } + + LOG(INFO) << "Sending instance " << instance << " to peer \"" + << peer_.GetUrl() << "\""; + + context_.ReadDicom(client_->GetBody(), instance); + + std::string answer; + if (client_->Apply(answer)) + { + return true; + } + else + { + throw OrthancException(ErrorCode_NetworkProtocol); + } + } + + + void OrthancPeerStoreJob::SetPeer(const WebServiceParameters& peer) + { + if (IsStarted()) + { + throw OrthancException(ErrorCode_BadSequenceOfCalls); + } + else + { + peer_ = peer; + } + } + + + void OrthancPeerStoreJob::ReleaseResources() // For pausing jobs + { + client_.reset(NULL); + } + + + void OrthancPeerStoreJob::GetPublicContent(Json::Value& value) + { + Json::Value v; + peer_.ToJson(v); + value["Peer"] = v; + + value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size()); + value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size()); + } +}
--- a/OrthancServer/ServerJobs/OrthancPeerStoreJob.h Fri May 18 15:34:11 2018 +0200 +++ b/OrthancServer/ServerJobs/OrthancPeerStoreJob.h Fri May 18 17:02:25 2018 +0200 @@ -36,6 +36,8 @@ #include "../../Core/JobsEngine/SetOfInstancesJob.h" #include "../../Core/HttpClient.h" +#include "../ServerContext.h" + namespace Orthanc { @@ -47,31 +49,7 @@ std::auto_ptr<HttpClient> client_; protected: - virtual bool HandleInstance(const std::string& instance) - { - //boost::this_thread::sleep(boost::posix_time::milliseconds(500)); - - if (client_.get() == NULL) - { - client_.reset(new HttpClient(peer_, "instances")); - client_->SetMethod(HttpMethod_Post); - } - - LOG(INFO) << "Sending instance " << instance << " to peer \"" - << peer_.GetUrl() << "\""; - - context_.ReadDicom(client_->GetBody(), instance); - - std::string answer; - if (client_->Apply(answer)) - { - return true; - } - else - { - throw OrthancException(ErrorCode_NetworkProtocol); - } - } + virtual bool HandleInstance(const std::string& instance); public: OrthancPeerStoreJob(ServerContext& context) : @@ -79,41 +57,20 @@ { } - void SetPeer(const WebServiceParameters& peer) - { - if (IsStarted()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - peer_ = peer; - } - } + void SetPeer(const WebServiceParameters& peer); const WebServiceParameters& GetPeer() const { return peer_; } - virtual void ReleaseResources() // For pausing jobs - { - client_.reset(NULL); - } + virtual void ReleaseResources(); // For pausing jobs virtual void GetJobType(std::string& target) { target = "OrthancPeerStore"; } - virtual void GetPublicContent(Json::Value& value) - { - Json::Value v; - peer_.ToJson(v); - value["Peer"] = v; - - value["InstancesCount"] = static_cast<uint32_t>(GetInstances().size()); - value["FailedInstancesCount"] = static_cast<uint32_t>(GetFailedInstances().size()); - } + virtual void GetPublicContent(Json::Value& value); }; }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/StoreScuOperation.cpp Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,79 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#include "../PrecompiledHeadersServer.h" +#include "StoreScuOperation.h" + +#include "DicomInstanceOperationValue.h" + +#include "../../Core/Logging.h" +#include "../../Core/OrthancException.h" + +namespace Orthanc +{ + void StoreScuOperation::Apply(JobOperationValues& outputs, + const JobOperationValue& input) + { + std::auto_ptr<IDicomConnectionManager::IResource> resource + (manager_.AcquireConnection(localAet_, modality_)); + + if (resource.get() == NULL) + { + LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); + return; + } + + if (input.GetType() != JobOperationValue::Type_DicomInstance) + { + throw OrthancException(ErrorCode_BadParameterType); + } + + const DicomInstanceOperationValue& instance = dynamic_cast<const DicomInstanceOperationValue&>(input); + + LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" + << modality_.GetApplicationEntityTitle() << "\""; + + try + { + std::string dicom; + instance.ReadContent(dicom); + resource->GetConnection().Store(dicom); + outputs.Append(instance.Clone()); + } + catch (OrthancException& e) + { + LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" + << modality_.GetApplicationEntityTitle() << "\": " << e.What(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/OrthancServer/ServerJobs/StoreScuOperation.h Fri May 18 17:02:25 2018 +0200 @@ -0,0 +1,63 @@ +/** + * Orthanc - A Lightweight, RESTful DICOM Store + * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics + * Department, University Hospital of Liege, Belgium + * Copyright (C) 2017-2018 Osimis S.A., Belgium + * + * This program is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * In addition, as a special exception, the copyright holders of this + * program give permission to link the code of its release with the + * OpenSSL project's "OpenSSL" library (or with modified versions of it + * that use the same license as the "OpenSSL" library), and distribute + * the linked executables. You must obey the GNU General Public License + * in all respects for all of the code used other than "OpenSSL". If you + * modify file(s) with this exception, you may extend this exception to + * your version of the file(s), but you are not obligated to do so. If + * you do not wish to do so, delete this exception statement from your + * version. If you delete this exception statement from all source files + * in the program, then also delete it here. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + **/ + + +#pragma once + +#include "../../Core/JobsEngine/Operations/IJobOperation.h" + +#include "../../Core/DicomNetworking/RemoteModalityParameters.h" +#include "../../Core/DicomNetworking/IDicomConnectionManager.h" + +namespace Orthanc +{ + class StoreScuOperation : public IJobOperation + { + private: + std::string localAet_; + RemoteModalityParameters modality_; + IDicomConnectionManager& manager_; + public: + StoreScuOperation(const std::string& localAet, + const RemoteModalityParameters& modality, + IDicomConnectionManager& manager) : + localAet_(localAet), + modality_(modality), + manager_(manager) + { + } + + virtual void Apply(JobOperationValues& outputs, + const JobOperationValue& input); + }; +} +
--- a/Resources/CMake/OrthancFrameworkConfiguration.cmake Fri May 18 15:34:11 2018 +0200 +++ b/Resources/CMake/OrthancFrameworkConfiguration.cmake Fri May 18 17:02:25 2018 +0200 @@ -497,6 +497,7 @@ list(APPEND ORTHANC_CORE_SOURCES_INTERNAL ${ORTHANC_ROOT}/Core/Cache/SharedArchive.cpp + ${ORTHANC_ROOT}/Core/DicomNetworking/TimeoutDicomConnectionManager.cpp ${ORTHANC_ROOT}/Core/FileStorage/FilesystemStorage.cpp ${ORTHANC_ROOT}/Core/JobsEngine/JobInfo.cpp ${ORTHANC_ROOT}/Core/JobsEngine/JobStatus.cpp @@ -505,6 +506,7 @@ ${ORTHANC_ROOT}/Core/JobsEngine/JobsRegistry.cpp ${ORTHANC_ROOT}/Core/JobsEngine/Operations/JobOperationValues.cpp ${ORTHANC_ROOT}/Core/JobsEngine/Operations/LogJobOperation.cpp + ${ORTHANC_ROOT}/Core/JobsEngine/Operations/SequenceOfOperationsJob.cpp ${ORTHANC_ROOT}/Core/JobsEngine/SetOfInstancesJob.cpp ${ORTHANC_ROOT}/Core/MultiThreading/RunnableWorkersPool.cpp ${ORTHANC_ROOT}/Core/MultiThreading/Semaphore.cpp
--- a/UnitTestsSources/MultiThreadingTests.cpp Fri May 18 15:34:11 2018 +0200 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri May 18 17:02:25 2018 +0200 @@ -742,697 +742,10 @@ -#include "../OrthancServer/ServerContext.h" -#include "../Core/Logging.h" - -#include "../Core/DicomNetworking/IDicomConnectionManager.h" -#include "../Core/JobsEngine/Operations/JobOperationValues.h" +#include "../OrthancServer/ServerJobs/LuaJobManager.h" #include "../Core/JobsEngine/Operations/StringOperationValue.h" #include "../Core/JobsEngine/Operations/LogJobOperation.h" -namespace Orthanc -{ - class DicomInstanceValue : public JobOperationValue - { - private: - ServerContext& context_; - std::string id_; - - public: - DicomInstanceValue(ServerContext& context, - const std::string& id) : - JobOperationValue(Type_DicomInstance), - context_(context), - id_(id) - { - } - - const std::string& GetId() const - { - return id_; - } - - void ReadContent(std::string& dicom) const - { - context_.ReadDicom(dicom, id_); - } - - virtual JobOperationValue* Clone() const - { - return new DicomInstanceValue(context_, id_); - } - }; - - - class StoreScuOperation : public IJobOperation - { - private: - std::string localAet_; - RemoteModalityParameters modality_; - - public: - StoreScuOperation(const std::string& localAet, - const RemoteModalityParameters& modality) : - localAet_(localAet), - modality_(modality) - { - } - - virtual void Apply(JobOperationValues& outputs, - const JobOperationValue& input, - IDicomConnectionManager& manager) - { - std::auto_ptr<IDicomConnectionManager::IResource> resource - (manager.AcquireConnection(localAet_, modality_)); - - if (resource.get() == NULL) - { - LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); - return; - } - - if (input.GetType() != JobOperationValue::Type_DicomInstance) - { - throw OrthancException(ErrorCode_BadParameterType); - } - - const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); - - LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" - << modality_.GetApplicationEntityTitle() << "\""; - - try - { - std::string dicom; - instance.ReadContent(dicom); - resource->GetConnection().Store(dicom); - outputs.Append(instance.Clone()); - } - catch (OrthancException& e) - { - LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" - << modality_.GetApplicationEntityTitle() << "\": " << e.What(); - } - } - }; - - - class DeleteResourceOperation : public IJobOperation - { - private: - ServerContext& context_; - - public: - DeleteResourceOperation(ServerContext& context) : - context_(context) - { - } - - virtual void Apply(JobOperationValues& outputs, - const JobOperationValue& input, - IDicomConnectionManager& manager) - { - switch (input.GetType()) - { - case JobOperationValue::Type_DicomInstance: - { - const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); - LOG(INFO) << "Deleting instance: " << instance.GetId(); - - try - { - Json::Value tmp; - context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance); - } - catch (OrthancException& e) - { - LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What(); - } - - break; - } - - default: - break; - } - } - }; - - - - class TimeoutDicomConnectionManager : public IDicomConnectionManager - { - private: - class Resource : public IDicomConnectionManager::IResource - { - private: - TimeoutDicomConnectionManager& that_; - boost::mutex::scoped_lock lock_; - - public: - Resource(TimeoutDicomConnectionManager& that) : - that_(that), - lock_(that.mutex_) - { - } - - virtual ~Resource() - { - that_.Touch(); - } - - virtual DicomUserConnection& GetConnection() - { - if (that_.connection_.get() == NULL) - { - throw OrthancException(ErrorCode_InternalError); - } - - return *that_.connection_; - } - }; - - boost::mutex mutex_; - std::auto_ptr<DicomUserConnection> connection_; - boost::posix_time::ptime lastUse_; - boost::posix_time::time_duration timeout_; - - static boost::posix_time::ptime GetNow() - { - return boost::posix_time::microsec_clock::universal_time(); - } - - void Touch() - { - lastUse_ = GetNow(); - } - - void CheckTimeoutInternal() - { - if (connection_.get() != NULL && - (GetNow() - lastUse_) >= timeout_) - { - connection_.reset(NULL); - } - } - - public: - TimeoutDicomConnectionManager() : - timeout_(boost::posix_time::milliseconds(1000)) - { - } - - void SetTimeout(unsigned int timeout) - { - boost::mutex::scoped_lock lock(mutex_); - - timeout_ = boost::posix_time::milliseconds(timeout); - CheckTimeoutInternal(); - } - - unsigned int GetTimeout() - { - boost::mutex::scoped_lock lock(mutex_); - return timeout_.total_milliseconds(); - } - - void Close() - { - boost::mutex::scoped_lock lock(mutex_); - connection_.reset(NULL); - } - - void CheckTimeout() - { - boost::mutex::scoped_lock lock(mutex_); - CheckTimeoutInternal(); - } - - virtual IResource* AcquireConnection(const std::string& localAet, - const RemoteModalityParameters& remote) - { - boost::mutex::scoped_lock lock(mutex_); - - if (connection_.get() == NULL || - !connection_->IsSameAssociation(localAet, remote)) - { - connection_.reset(new DicomUserConnection(localAet, remote)); - } - - return new Resource(*this); - } - }; - - - - class SequenceOfOperationsJob : public IJob - { - public: - class IObserver : public boost::noncopyable - { - public: - virtual ~IObserver() - { - } - - virtual void SignalDone(const SequenceOfOperationsJob& job) = 0; - }; - - private: - class Operation : public boost::noncopyable - { - private: - JobOperationValues originalInputs_; - JobOperationValues workInputs_; - std::auto_ptr<IJobOperation> operation_; - std::list<Operation*> nextOperations_; - size_t currentInput_; - - public: - Operation(IJobOperation* operation) : - operation_(operation), - currentInput_(0) - { - if (operation == NULL) - { - throw OrthancException(ErrorCode_NullPointer); - } - } - - void AddOriginalInput(const JobOperationValue& value) - { - if (currentInput_ != 0) - { - // Cannot add input after processing has started - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - originalInputs_.Append(value.Clone()); - } - } - - const JobOperationValues& GetOriginalInputs() const - { - return originalInputs_; - } - - void Reset() - { - workInputs_.Clear(); - currentInput_ = 0; - } - - void AddNextOperation(Operation& other) - { - if (currentInput_ != 0) - { - // Cannot add input after processing has started - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else - { - nextOperations_.push_back(&other); - } - } - - bool IsDone() const - { - return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); - } - - void Step() - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - const JobOperationValue* input; - - if (currentInput_ < originalInputs_.GetSize()) - { - input = &originalInputs_.GetValue(currentInput_); - } - else - { - input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); - } - - JobOperationValues outputs; - operation_->Apply(outputs, *input); - - if (!nextOperations_.empty()) - { - std::list<Operation*>::iterator first = nextOperations_.begin(); - outputs.Move((*first)->workInputs_); - - std::list<Operation*>::iterator current = first; - ++current; - - while (current != nextOperations_.end()) - { - (*first)->workInputs_.Copy((*current)->workInputs_); - ++current; - } - } - - currentInput_ += 1; - } - }; - - - std::string jobType_; - bool done_; - boost::mutex mutex_; - std::vector<Operation*> operations_; - size_t current_; - boost::condition_variable operationAdded_; - boost::posix_time::time_duration trailingTimeout_; - std::list<IObserver*> observers_; - - // Invoked from constructors - void Setup() - { - done_ = false; - current_ = 0; - trailingTimeout_ = boost::posix_time::milliseconds(1000); - } - - public: - SequenceOfOperationsJob() : - jobType_("SequenceOfOperations") - { - Setup(); - } - - SequenceOfOperationsJob(const std::string& jobType) : - jobType_(jobType) - { - Setup(); - } - - virtual ~SequenceOfOperationsJob() - { - for (size_t i = 0; i < operations_.size(); i++) - { - if (operations_[i] != NULL) - { - delete operations_[i]; - } - } - } - - void Register(IObserver& observer) - { - boost::mutex::scoped_lock lock(mutex_); - observers_.push_back(&observer); - } - - // This lock allows adding new operations to the end of the job, - // from another thread than the worker thread, after the job has - // been submitted for processing - class Lock : public boost::noncopyable - { - private: - SequenceOfOperationsJob& that_; - boost::mutex::scoped_lock lock_; - - public: - Lock(SequenceOfOperationsJob& that) : - that_(that), - lock_(that.mutex_) - { - } - - bool IsDone() const - { - return that_.done_; - } - - void SetTrailingOperationTimeout(unsigned int timeout) - { - that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); - } - - size_t AddOperation(IJobOperation* operation) - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - - that_.operations_.push_back(new Operation(operation)); - that_.operationAdded_.notify_one(); - - return that_.operations_.size() - 1; - } - - size_t GetOperationsCount() const - { - return that_.operations_.size(); - } - - void AddInput(size_t index, - const JobOperationValue& value) - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else if (index >= that_.operations_.size() || - index < that_.current_) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - else - { - that_.operations_[index]->AddOriginalInput(value); - } - } - - void Connect(size_t input, - size_t output) - { - if (IsDone()) - { - throw OrthancException(ErrorCode_BadSequenceOfCalls); - } - else if (input >= output || - input >= that_.operations_.size() || - output >= that_.operations_.size() || - input < that_.current_ || - output < that_.current_) - { - throw OrthancException(ErrorCode_ParameterOutOfRange); - } - else - { - Operation& a = *that_.operations_[input]; - Operation& b = *that_.operations_[output]; - a.AddNextOperation(b); - } - } - }; - - virtual void Start() - { - } - - virtual JobStepResult ExecuteStep() - { - boost::mutex::scoped_lock lock(mutex_); - - if (current_ == operations_.size()) - { - LOG(INFO) << "Executing the trailing timeout in the sequence of operations"; - operationAdded_.timed_wait(lock, trailingTimeout_); - - if (current_ == operations_.size()) - { - // No operation was added during the trailing timeout: The - // job is over - LOG(INFO) << "The sequence of operations is over"; - done_ = true; - - for (std::list<IObserver*>::iterator it = observers_.begin(); - it != observers_.end(); ++it) - { - (*it)->SignalDone(*this); - } - - return JobStepResult::Success(); - } - else - { - LOG(INFO) << "New operation added to the sequence of operations"; - } - } - - assert(current_ < operations_.size()); - - while (current_ < operations_.size() && - operations_[current_]->IsDone()) - { - current_++; - } - - if (current_ < operations_.size()) - { - operations_[current_]->Step(); - } - - return JobStepResult::Continue(); - } - - virtual void SignalResubmit() - { - boost::mutex::scoped_lock lock(mutex_); - - current_ = 0; - done_ = false; - - for (size_t i = 0; i < operations_.size(); i++) - { - operations_[i]->Reset(); - } - } - - virtual void ReleaseResources() - { - } - - virtual float GetProgress() - { - boost::mutex::scoped_lock lock(mutex_); - - return (static_cast<float>(current_) / - static_cast<float>(operations_.size() + 1)); - } - - virtual void GetJobType(std::string& target) - { - target = jobType_; - } - - virtual void GetPublicContent(Json::Value& value) - { - boost::mutex::scoped_lock lock(mutex_); - - value["CountOperations"] = static_cast<unsigned int>(operations_.size()); - } - - virtual void GetInternalContent(Json::Value& value) - { - // TODO - } - }; - - - class LuaJobManager : private SequenceOfOperationsJob::IObserver - { - public: - typedef SequenceOfOperationsJob::Lock Lock; - - private: - boost::mutex mutex_; - JobsEngine& engine_; - TimeoutDicomConnectionManager connectionManager_; - std::string currentId_; - SequenceOfOperationsJob* currentJob_; - size_t maxOperations_; - int priority_; - unsigned int trailingTimeout_; - bool continue_; - boost::thread connectionTimeoutThread_; - - static void ConnectionTimeoutThread(LuaJobManager* manager) - { - while (manager->continue_) - { - manager->connectionManager_.CheckTimeout(); - boost::this_thread::sleep(boost::posix_time::milliseconds(100)); - } - } - - virtual void SignalDone(const SequenceOfOperationsJob& job) - { - boost::mutex::scoped_lock lock(mutex_); - - if (&job == currentJob_) - { - currentId_.clear(); - currentJob_ = NULL; - } - } - - public: - LuaJobManager(JobsEngine& engine) : - engine_(engine), - currentJob_(NULL), - maxOperations_(1000), - priority_(0), - continue_(true) - { - connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this); - } - - ~LuaJobManager() - { - continue_ = false; - - if (connectionTimeoutThread_.joinable()) - { - connectionTimeoutThread_.join(); - } - } - - void SetMaxOperationsPerJob(size_t count) - { - boost::mutex::scoped_lock lock(mutex_); - maxOperations_ = count; - } - - void SetPriority(int priority) - { - boost::mutex::scoped_lock lock(mutex_); - priority_ = priority; - } - - void SetTrailingOperationTimeout(unsigned int timeout) - { - boost::mutex::scoped_lock lock(mutex_); - trailingTimeout_ = timeout; - } - - Lock* Modify() - { - boost::mutex::scoped_lock lock(mutex_); - - if (currentJob_ != NULL) - { - std::auto_ptr<Lock> result(new Lock(*currentJob_)); - - if (!result->IsDone() && - result->GetOperationsCount() < maxOperations_) - { - return result.release(); - } - } - - // Need to create a new job, as the previous one is either - // finished, or is getting too long - currentJob_ = new SequenceOfOperationsJob; - engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); - - std::auto_ptr<Lock> result(new Lock(*currentJob_)); - result->SetTrailingOperationTimeout(trailingTimeout_); - - return result.release(); - } - }; -} - TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) {