Mercurial > hg > orthanc
changeset 2950:dc18d5804746
support of JobsHistorySize set to zero
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 30 Nov 2018 17:19:57 +0100 |
parents | e6204cd21443 |
children | 65b20d922e10 |
files | Core/JobsEngine/JobsEngine.cpp Core/JobsEngine/JobsEngine.h Core/JobsEngine/JobsRegistry.cpp Core/JobsEngine/JobsRegistry.h Core/JobsEngine/Operations/SequenceOfOperationsJob.h NEWS OrthancServer/ServerContext.cpp OrthancServer/ServerContext.h OrthancServer/ServerJobs/LuaJobManager.cpp OrthancServer/main.cpp Resources/Configuration.json UnitTestsSources/MultiThreadingTests.cpp UnitTestsSources/ServerIndexTests.cpp |
diffstat | 13 files changed, 153 insertions(+), 90 deletions(-) [+] |
line wrap: on
line diff
--- a/Core/JobsEngine/JobsEngine.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/Core/JobsEngine/JobsEngine.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -156,9 +156,9 @@ } - JobsEngine::JobsEngine() : + JobsEngine::JobsEngine(size_t maxCompletedJobs) : state_(State_Setup), - registry_(new JobsRegistry), + registry_(new JobsRegistry(maxCompletedJobs)), threadSleep_(200), workers_(1) { @@ -198,7 +198,9 @@ throw OrthancException(ErrorCode_BadSequenceOfCalls); } - registry_.reset(new JobsRegistry(unserializer, serialized)); + assert(registry_.get() != NULL); + const size_t maxCompletedJobs = registry_->GetMaxCompletedJobs(); + registry_.reset(new JobsRegistry(unserializer, serialized, maxCompletedJobs)); }
--- a/Core/JobsEngine/JobsEngine.h Thu Nov 29 20:36:55 2018 +0100 +++ b/Core/JobsEngine/JobsEngine.h Fri Nov 30 17:19:57 2018 +0100 @@ -68,7 +68,7 @@ size_t workerIndex); public: - JobsEngine(); + JobsEngine(size_t maxCompletedJobs); ~JobsEngine();
--- a/Core/JobsEngine/JobsRegistry.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/Core/JobsEngine/JobsRegistry.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -47,7 +47,6 @@ static const char* JOB = "Job"; static const char* JOBS = "Jobs"; static const char* JOBS_REGISTRY = "JobsRegistry"; - static const char* MAX_COMPLETED_JOBS = "MaxCompletedJobs"; static const char* CREATION_TIME = "CreationTime"; static const char* LAST_CHANGE_TIME = "LastChangeTime"; static const char* RUNTIME = "Runtime"; @@ -435,20 +434,19 @@ void JobsRegistry::ForgetOldCompletedJobs() { - if (maxCompletedJobs_ != 0) + while (completedJobs_.size() > maxCompletedJobs_) { - while (completedJobs_.size() > maxCompletedJobs_) - { - assert(completedJobs_.front() != NULL); + assert(completedJobs_.front() != NULL); + + std::string id = completedJobs_.front()->GetId(); + assert(jobsIndex_.find(id) != jobsIndex_.end()); - std::string id = completedJobs_.front()->GetId(); - assert(jobsIndex_.find(id) != jobsIndex_.end()); + jobsIndex_.erase(id); + delete(completedJobs_.front()); + completedJobs_.pop_front(); + } - jobsIndex_.erase(id); - delete(completedJobs_.front()); - completedJobs_.pop_front(); - } - } + CheckInvariants(); } @@ -458,26 +456,48 @@ job.SetState(success ? JobState_Success : JobState_Failure); completedJobs_.push_back(&job); - ForgetOldCompletedJobs(); - someJobComplete_.notify_all(); } void JobsRegistry::MarkRunningAsCompleted(JobHandler& job, - bool success) + CompletedReason reason) { - LOG(INFO) << "Job has completed with " << (success ? "success" : "failure") - << ": " << job.GetId(); + const char* tmp; + + switch (reason) + { + case CompletedReason_Success: + tmp = "success"; + break; + + case CompletedReason_Failure: + tmp = "success"; + break; + + case CompletedReason_Canceled: + tmp = "cancel"; + break; + + default: + throw OrthancException(ErrorCode_InternalError); + } + + LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId(); CheckInvariants(); assert(job.GetState() == JobState_Running); - SetCompletedJob(job, success); + SetCompletedJob(job, reason == CompletedReason_Success); + + if (reason == CompletedReason_Canceled) + { + job.SetLastErrorCode(ErrorCode_CanceledJob); + } if (observer_ != NULL) { - if (success) + if (reason == CompletedReason_Success) { observer_->SignalJobSuccess(job.GetId()); } @@ -487,7 +507,9 @@ } } - CheckInvariants(); + // WARNING: The following call might make "job" invalid if the job + // history size is empty + ForgetOldCompletedJobs(); } @@ -558,8 +580,14 @@ maxCompletedJobs_ = n; ForgetOldCompletedJobs(); + } + + size_t JobsRegistry::GetMaxCompletedJobs() + { + boost::mutex::scoped_lock lock(mutex_); CheckInvariants(); + return maxCompletedJobs_; } @@ -604,17 +632,14 @@ void JobsRegistry::SubmitInternal(std::string& id, - JobHandler* handlerRaw, - bool keepLastChangeTime) + JobHandler* handler) { - if (handlerRaw == NULL) + if (handler == NULL) { throw OrthancException(ErrorCode_NullPointer); } - std::auto_ptr<JobHandler> handler(handlerRaw); - - boost::posix_time::ptime lastChangeTime = handler->GetLastStateChangeTime(); + std::auto_ptr<JobHandler> protection(handler); { boost::mutex::scoped_lock lock(mutex_); @@ -623,13 +648,15 @@ id = handler->GetId(); int priority = handler->GetPriority(); + jobsIndex_.insert(std::make_pair(id, protection.release())); + switch (handler->GetState()) { case JobState_Pending: case JobState_Retry: case JobState_Running: handler->SetState(JobState_Pending); - pendingJobs_.push(handler.get()); + pendingJobs_.push(handler); pendingJobAvailable_.notify_one(); break; @@ -650,13 +677,6 @@ throw OrthancException(ErrorCode_InternalError); } - if (keepLastChangeTime) - { - handler->SetLastStateChangeTime(lastChangeTime); - } - - jobsIndex_.insert(std::make_pair(id, handler.release())); - LOG(INFO) << "New job submitted with priority " << priority << ": " << id; if (observer_ != NULL) @@ -664,7 +684,9 @@ observer_->SignalJobSubmitted(id); } - CheckInvariants(); + // WARNING: The following call might make "handler" invalid if + // the job history size is empty + ForgetOldCompletedJobs(); } } @@ -673,7 +695,7 @@ IJob* job, // Takes ownership int priority) { - SubmitInternal(id, new JobHandler(job, priority), false); + SubmitInternal(id, new JobHandler(job, priority)); } @@ -681,7 +703,7 @@ int priority) { std::string id; - SubmitInternal(id, new JobHandler(job, priority), false); + SubmitInternal(id, new JobHandler(job, priority)); } @@ -904,7 +926,10 @@ throw OrthancException(ErrorCode_InternalError); } - CheckInvariants(); + // WARNING: The following call might make "handler" invalid if + // the job history size is empty + ForgetOldCompletedJobs(); + return true; } } @@ -1091,17 +1116,12 @@ switch (targetState_) { case JobState_Failure: - registry_.MarkRunningAsCompleted(*handler_, false); - - if (canceled_) - { - handler_->SetLastErrorCode(ErrorCode_CanceledJob); - } - + registry_.MarkRunningAsCompleted + (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure); break; case JobState_Success: - registry_.MarkRunningAsCompleted(*handler_, true); + registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success); break; case JobState_Paused: @@ -1293,7 +1313,6 @@ target = Json::objectValue; target[TYPE] = JOBS_REGISTRY; - target[MAX_COMPLETED_JOBS] = static_cast<unsigned int>(maxCompletedJobs_); target[JOBS] = Json::objectValue; for (JobsIndex::const_iterator it = jobsIndex_.begin(); @@ -1309,7 +1328,9 @@ JobsRegistry::JobsRegistry(IJobUnserializer& unserializer, - const Json::Value& s) : + const Json::Value& s, + size_t maxCompletedJobs) : + maxCompletedJobs_(maxCompletedJobs), observer_(NULL) { if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY || @@ -1319,17 +1340,28 @@ throw OrthancException(ErrorCode_BadFileFormat); } - maxCompletedJobs_ = SerializationToolbox::ReadUnsignedInteger(s, MAX_COMPLETED_JOBS); - Json::Value::Members members = s[JOBS].getMemberNames(); for (Json::Value::Members::const_iterator it = members.begin(); it != members.end(); ++it) { std::auto_ptr<JobHandler> job(new JobHandler(unserializer, s[JOBS][*it], *it)); - + + const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime(); + std::string id; - SubmitInternal(id, job.release(), true); + SubmitInternal(id, job.release()); + + // Check whether the job has not been removed (which could be + // the case if the "maxCompletedJobs_" value gets smaller) + JobsIndex::iterator found = jobsIndex_.find(id); + if (found != jobsIndex_.end()) + { + // The job still lies in the history: Update the time of its + // last change to the time that was serialized + assert(found->second != NULL); + found->second->SetLastStateChangeTime(lastChangeTime); + } } } }
--- a/Core/JobsEngine/JobsRegistry.h Thu Nov 29 20:36:55 2018 +0100 +++ b/Core/JobsEngine/JobsRegistry.h Fri Nov 30 17:19:57 2018 +0100 @@ -71,6 +71,13 @@ }; private: + enum CompletedReason + { + CompletedReason_Success, + CompletedReason_Failure, + CompletedReason_Canceled + }; + class JobHandler; struct PriorityComparator @@ -115,7 +122,7 @@ bool success); void MarkRunningAsCompleted(JobHandler& job, - bool success); + CompletedReason reason); void MarkRunningAsRetry(JobHandler& job, unsigned int timeout); @@ -130,23 +137,25 @@ void RemoveRetryJob(JobHandler* handler); void SubmitInternal(std::string& id, - JobHandler* handler, - bool keepLastChangeTime); + JobHandler* handler); public: - JobsRegistry() : - maxCompletedJobs_(10), + JobsRegistry(size_t maxCompletedJobs) : + maxCompletedJobs_(maxCompletedJobs), observer_(NULL) { } JobsRegistry(IJobUnserializer& unserializer, - const Json::Value& s); + const Json::Value& s, + size_t maxCompletedJobs); ~JobsRegistry(); void SetMaxCompletedJobs(size_t i); + size_t GetMaxCompletedJobs(); + void ListJobs(std::set<std::string>& target); bool GetJobInfo(JobInfo& target,
--- a/Core/JobsEngine/Operations/SequenceOfOperationsJob.h Thu Nov 29 20:36:55 2018 +0100 +++ b/Core/JobsEngine/Operations/SequenceOfOperationsJob.h Fri Nov 30 17:19:57 2018 +0100 @@ -71,6 +71,8 @@ std::list<IObserver*> observers_; TimeoutDicomConnectionManager connectionManager_; + void NotifyDone() const; + public: SequenceOfOperationsJob(); @@ -96,8 +98,8 @@ public: Lock(SequenceOfOperationsJob& that) : - that_(that), - lock_(that.mutex_) + that_(that), + lock_(that.mutex_) { }
--- a/NEWS Thu Nov 29 20:36:55 2018 +0100 +++ b/NEWS Fri Nov 30 17:19:57 2018 +0100 @@ -39,6 +39,7 @@ * Orthanc starts even if jobs from a previous execution cannot be unserialized * New CMake option "ENABLE_DCMTK_LOG" to disable logging internal to DCMTK * Fix issue 114 (Boost 1.68 doesn't support SHA-1 anymore) +* Support of "JobsHistorySize" set to zero * Upgraded dependencies for static and Windows builds: - boost 1.68.0 - lua 5.3.5
--- a/OrthancServer/ServerContext.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/OrthancServer/ServerContext.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -214,7 +214,8 @@ ServerContext::ServerContext(IDatabaseWrapper& database, IStorageArea& area, - bool unitTesting) : + bool unitTesting, + size_t maxCompletedJobs) : index_(*this, database, (unitTesting ? 20 : 500)), area_(area), compressionEnabled_(false), @@ -224,6 +225,7 @@ mainLua_(*this), filterLua_(*this), luaListener_(*this), + jobsEngine_(maxCompletedJobs), #if ORTHANC_ENABLE_PLUGINS == 1 plugins_(NULL), #endif
--- a/OrthancServer/ServerContext.h Thu Nov 29 20:36:55 2018 +0100 +++ b/OrthancServer/ServerContext.h Fri Nov 30 17:19:57 2018 +0100 @@ -160,12 +160,17 @@ DicomCacheProvider provider_; boost::mutex dicomCacheMutex_; MemoryCache dicomCache_; - JobsEngine jobsEngine_; LuaScripting mainLua_; LuaScripting filterLua_; LuaServerListener luaListener_; + // The "JobsEngine" must be *after* "LuaScripting", as + // "LuaScripting" embeds "LuaJobManager" that registers as an + // observer to "SequenceOfOperationsJob", whose lifetime + // corresponds to that of "JobsEngine" + JobsEngine jobsEngine_; + #if ORTHANC_ENABLE_PLUGINS == 1 OrthancPlugins* plugins_; #endif @@ -206,7 +211,8 @@ ServerContext(IDatabaseWrapper& database, IStorageArea& area, - bool unitTesting); + bool unitTesting, + size_t maxCompletedJobs); ~ServerContext();
--- a/OrthancServer/ServerJobs/LuaJobManager.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/OrthancServer/ServerJobs/LuaJobManager.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -143,6 +143,7 @@ // Need to create a new job, as the previous one is either // finished, or is getting too long that_.currentJob_ = new SequenceOfOperationsJob; + that_.currentJob_->Register(that_); that_.currentJob_->SetDescription("Lua"); {
--- a/OrthancServer/main.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/OrthancServer/main.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -1085,8 +1085,8 @@ OrthancPlugins *plugins, bool loadJobsFromDatabase) { - ServerContext context(database, storageArea, false /* not running unit tests */); - + size_t maxCompletedJobs; + { OrthancConfiguration::ReaderLock lock; @@ -1101,6 +1101,15 @@ HttpClient::SetDefaultProxy(lock.GetConfiguration().GetStringParameter("HttpProxy", "")); DicomUserConnection::SetDefaultTimeout(lock.GetConfiguration().GetUnsignedIntegerParameter("DicomScuTimeout", 10)); + + maxCompletedJobs = lock.GetConfiguration().GetUnsignedIntegerParameter("JobsHistorySize", 10); + } + + ServerContext context(database, storageArea, false /* not running unit tests */, maxCompletedJobs); + + { + OrthancConfiguration::ReaderLock lock; + context.SetCompressionEnabled(lock.GetConfiguration().GetBooleanParameter("StorageCompression", false)); context.SetStoreMD5ForAttachments(lock.GetConfiguration().GetBooleanParameter("StoreMD5ForAttachments", true)); @@ -1125,9 +1134,6 @@ { context.GetIndex().SetMaximumStorageSize(0); } - - context.GetJobsEngine().GetRegistry().SetMaxCompletedJobs - (lock.GetConfiguration().GetUnsignedIntegerParameter("JobsHistorySize", 10)); } {
--- a/Resources/Configuration.json Thu Nov 29 20:36:55 2018 +0100 +++ b/Resources/Configuration.json Fri Nov 30 17:19:57 2018 +0100 @@ -430,7 +430,9 @@ // Maximum number of completed jobs that are kept in memory. A // processing job is considered as complete once it is tagged as - // "Success" or "Failure". + // "Success" or "Failure". Since Orthanc 1.4.3, a value of "0" + // indicates to keep no job in memory (i.e. jobs are removed from + // the history as soon as they are completed). "JobsHistorySize" : 10, // Specifies how Orthanc reacts when it receives a DICOM instance
--- a/UnitTestsSources/MultiThreadingTests.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/UnitTestsSources/MultiThreadingTests.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -329,7 +329,7 @@ TEST(JobsRegistry, Priority) { - JobsRegistry registry; + JobsRegistry registry(10); std::string i1, i2, i3, i4; registry.Submit(i1, new DummyJob(), 10); @@ -408,7 +408,7 @@ TEST(JobsRegistry, Simultaneous) { - JobsRegistry registry; + JobsRegistry registry(10); std::string i1, i2; registry.Submit(i1, new DummyJob(), 20); @@ -438,7 +438,7 @@ TEST(JobsRegistry, Resubmit) { - JobsRegistry registry; + JobsRegistry registry(10); std::string id; registry.Submit(id, new DummyJob(), 10); @@ -482,7 +482,7 @@ TEST(JobsRegistry, Retry) { - JobsRegistry registry; + JobsRegistry registry(10); std::string id; registry.Submit(id, new DummyJob(), 10); @@ -519,7 +519,7 @@ TEST(JobsRegistry, PausePending) { - JobsRegistry registry; + JobsRegistry registry(10); std::string id; registry.Submit(id, new DummyJob(), 10); @@ -542,7 +542,7 @@ TEST(JobsRegistry, PauseRunning) { - JobsRegistry registry; + JobsRegistry registry(10); std::string id; registry.Submit(id, new DummyJob(), 10); @@ -580,7 +580,7 @@ TEST(JobsRegistry, PauseRetry) { - JobsRegistry registry; + JobsRegistry registry(10); std::string id; registry.Submit(id, new DummyJob(), 10); @@ -617,7 +617,7 @@ TEST(JobsRegistry, Cancel) { - JobsRegistry registry; + JobsRegistry registry(10); std::string id; registry.Submit(id, new DummyJob(), 10); @@ -711,7 +711,7 @@ TEST(JobsEngine, SubmitAndWait) { - JobsEngine engine; + JobsEngine engine(10); engine.SetThreadSleep(10); engine.SetWorkersCount(3); engine.Start(); @@ -731,7 +731,7 @@ TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) { - JobsEngine engine; + JobsEngine engine(10); engine.SetThreadSleep(10); engine.SetWorkersCount(3); engine.Start(); @@ -771,7 +771,7 @@ TEST(JobsEngine, DISABLED_Lua) { - JobsEngine engine; + JobsEngine engine(10); engine.SetThreadSleep(10); engine.SetWorkersCount(2); engine.Start(); @@ -1282,7 +1282,7 @@ OrthancJobsSerialization() { db_.Open(); - context_.reset(new ServerContext(db_, storage_, true /* running unit tests */)); + context_.reset(new ServerContext(db_, storage_, true /* running unit tests */, 10)); context_->SetupJobsEngine(true, false); } @@ -1704,7 +1704,7 @@ std::string i1, i2; { - JobsRegistry registry; + JobsRegistry registry(10); registry.Submit(i1, new DummyJob(), 10); registry.Submit(i2, new SequenceOfOperationsJob(), 30); registry.Serialize(s); @@ -1712,7 +1712,7 @@ { DummyUnserializer unserializer; - JobsRegistry registry(unserializer, s); + JobsRegistry registry(unserializer, s, 10); Json::Value t; registry.Serialize(t);
--- a/UnitTestsSources/ServerIndexTests.cpp Thu Nov 29 20:36:55 2018 +0100 +++ b/UnitTestsSources/ServerIndexTests.cpp Fri Nov 30 17:19:57 2018 +0100 @@ -677,7 +677,7 @@ FilesystemStorage storage(path); DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage, true /* running unit tests */); + ServerContext context(db, storage, true /* running unit tests */, 10); context.SetupJobsEngine(true, false); ServerIndex& index = context.GetIndex(); @@ -777,7 +777,7 @@ FilesystemStorage storage(path); DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage, true /* running unit tests */); + ServerContext context(db, storage, true /* running unit tests */, 10); context.SetupJobsEngine(true, false); ServerIndex& index = context.GetIndex(); @@ -865,7 +865,7 @@ MemoryStorageArea storage; DatabaseWrapper db; // The SQLite DB is in memory db.Open(); - ServerContext context(db, storage, true /* running unit tests */); + ServerContext context(db, storage, true /* running unit tests */, 10); context.SetupJobsEngine(true, false); context.SetCompressionEnabled(true);