Mercurial > hg > orthanc
diff OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp @ 5136:e71b22a43c0b
Threaded modifications continued: call ReconstructInstance at the end of the modification to update the DB model
author | Alain Mazy <am@osimis.io> |
---|---|
date | Tue, 17 Jan 2023 17:54:38 +0100 |
parents | 252385892197 |
children | 15109c3f0f7d |
line wrap: on
line diff
--- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Wed Jan 11 11:14:00 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Tue Jan 17 17:54:38 2023 +0100 @@ -39,7 +39,6 @@ bool hasPostProcessing, bool keepSource, size_t workersCount) : - processedInstancesCount_(0), hasPostProcessing_(hasPostProcessing), started_(false), stopRequested_(false), @@ -80,7 +79,7 @@ // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue for (size_t i = 0; i < instancesWorkers_.size(); i++) { - instancesToProcess_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE)); + instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(EXIT_WORKER_MESSAGE)); } for (size_t i = 0; i < instancesWorkers_.size(); i++) @@ -143,9 +142,9 @@ if (currentStep_ == ThreadedJobStep_ProcessingInstances) { // create the workers and enqueue all instances - for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) { - instancesToProcess_.Enqueue(new SingleValueObject<std::string>(*it)); + instancesToProcessQueue_.Enqueue(new SingleValueObject<std::string>(*it)); } InitWorkers(workersCount_); @@ -153,7 +152,7 @@ WaitWorkersComplete(); // check job has really completed !!! it might have been interrupted because of an error - if ((processedInstancesCount_ != instances_.size()) + if ((processedInstances_.size() != instancesToProcess_.size()) || (!IsPermissive() && failedInstances_.size() > 0)) { return JobStepResult::Failure(GetErrorCode(), NULL); @@ -177,7 +176,7 @@ // clean after the post processing step if (HasCleanupStep()) { - for (std::set<std::string>::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + for (std::set<std::string>::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) { Json::Value tmp; context_.DeleteResource(tmp, *it, ResourceType_Instance); @@ -211,6 +210,14 @@ return hasPostProcessing_; } + void ThreadedSetOfInstancesJob::PostProcessInstances() + { + if (HasPostProcessingStep()) + { + throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances"); + } + } + bool ThreadedSetOfInstancesJob::HasCleanupStep() const { @@ -224,26 +231,18 @@ { while (true) { - std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcess_.Dequeue(0))); + std::unique_ptr<SingleValueObject<std::string> > instanceId(dynamic_cast<SingleValueObject<std::string>*>(that->instancesToProcessQueue_.Dequeue(0))); if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean || instanceId->GetValue() == EXIT_WORKER_MESSAGE) { return; } + bool processed = false; + try { - bool processed = that->HandleInstance(instanceId->GetValue()); - - { - boost::recursive_mutex::scoped_lock lock(that->mutex_); - - that->processedInstancesCount_++; - if (!processed) - { - that->failedInstances_.insert(instanceId->GetValue()); - } - } + processed = that->HandleInstance(instanceId->GetValue()); } catch (const Orthanc::OrthancException& e) { @@ -256,6 +255,7 @@ LOG(ERROR) << "Error in a non-permissive job: " << e.What(); that->SetErrorCode(e.GetErrorCode()); that->StopWorkers(); + return; } } catch (...) @@ -263,8 +263,20 @@ LOG(ERROR) << "Native exception while executing a job"; that->SetErrorCode(ErrorCode_InternalError); that->StopWorkers(); + return; } - + + { + boost::recursive_mutex::scoped_lock lock(that->mutex_); + + that->processedInstances_.insert(instanceId->GetValue()); + + if (!processed) + { + that->failedInstances_.insert(instanceId->GetValue()); + } + } + } } @@ -282,7 +294,7 @@ { boost::recursive_mutex::scoped_lock lock(mutex_); - return instances_.size(); + return instancesToProcess_.size(); } @@ -298,7 +310,7 @@ { boost::recursive_mutex::scoped_lock lock(mutex_); - target = instances_; + target = instancesToProcess_; } @@ -318,17 +330,19 @@ } + // Reset is called when resubmitting a failed job void ThreadedSetOfInstancesJob::Reset() { boost::recursive_mutex::scoped_lock lock(mutex_); if (started_) { + // TODO: cleanup the instances that have been generated during the previous run currentStep_ = ThreadedJobStep_ProcessingInstances; stopRequested_ = false; - processedInstancesCount_ = 0; + processedInstances_.clear(); failedInstances_.clear(); - instancesToProcess_.Clear(); + instancesToProcessQueue_.Clear(); } else { @@ -381,7 +395,7 @@ target[KEY_KEEP_SOURCE] = keepSource_; target[KEY_WORKERS_COUNT] = static_cast<unsigned int>(workersCount_); - SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES); + SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES); SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); @@ -393,7 +407,6 @@ const Json::Value& source, bool hasPostProcessing, bool defaultKeepSource) : - processedInstancesCount_(0), hasPostProcessing_(hasPostProcessing), started_(false), stopRequested_(false), @@ -424,7 +437,7 @@ if (source.isMember(KEY_INSTANCES)) { - SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); + SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES); } if (source.isMember(KEY_CURRENT_STEP)) @@ -458,7 +471,7 @@ else { size_t totalProgress = GetInstancesCount(); - size_t currentProgress = processedInstancesCount_; + size_t currentProgress = processedInstances_.size(); if (HasPostProcessingStep()) { @@ -559,7 +572,7 @@ for (std::list<std::string>::const_iterator it = instances.begin(); it != instances.end(); ++it) { - instances_.insert(*it); + instancesToProcess_.insert(*it); } }