# HG changeset patch # User Alain Mazy # Date 1673974478 -3600 # Node ID e71b22a43c0bfde04c23d81020c7ae627f10d5dc # Parent 2523858921979fc310e1ca6a29b0c9e20a693863 Threaded modifications continued: call ReconstructInstance at the end of the modification to update the DB model diff -r 252385892197 -r e71b22a43c0b OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp --- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp Wed Jan 11 11:14:00 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.cpp Tue Jan 17 17:54:38 2023 +0100 @@ -161,8 +161,32 @@ return false; } }; + + // Reset is called when resubmitting a failed job + void ResourceModificationJob::Reset() + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + // TODO: cleanup the instances that have been generated during the previous run + modifiedInstances_.clear(); + + ThreadedSetOfInstancesJob::Reset(); + } + + void ResourceModificationJob::PostProcessInstances() + { + boost::recursive_mutex::scoped_lock lock(mutex_); + + // reconstruct the parents MainDicomTags in case one of them has changed + if (modifiedInstances_.size() > 0) + { + ServerContext::DicomCacheLocker locker(GetContext(), *(modifiedInstances_.begin())); + ParsedDicomFile& modifiedDicom = locker.GetDicom(); + + GetContext().GetIndex().ReconstructInstance(modifiedDicom); + } - + } bool ResourceModificationJob::HandleInstance(const std::string& instance) { @@ -296,6 +320,7 @@ boost::recursive_mutex::scoped_lock lock(outputMutex_); output_->Update(modifiedHasher); + modifiedInstances_.insert(modifiedInstance); } return true; @@ -303,7 +328,7 @@ ResourceModificationJob::ResourceModificationJob(ServerContext& context, unsigned int workersCount) : - ThreadedSetOfInstancesJob(context, false /* no post processing step */, true /* by default, keep source */, workersCount), + ThreadedSetOfInstancesJob(context, true /* post processing step */, true /* by default, keep source */, workersCount), isAnonymization_(false), transcode_(false), transferSyntax_(DicomTransferSyntax_LittleEndianExplicit) // dummy initialization diff -r 252385892197 -r e71b22a43c0b OrthancServer/Sources/ServerJobs/ResourceModificationJob.h --- a/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h Wed Jan 11 11:14:00 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ResourceModificationJob.h Tue Jan 17 17:54:38 2023 +0100 @@ -60,10 +60,14 @@ DicomInstanceOrigin origin_; bool transcode_; DicomTransferSyntax transferSyntax_; + std::set modifiedInstances_; // the list of new instance ids of the newly generated instances + protected: virtual bool HandleInstance(const std::string& instance) ORTHANC_OVERRIDE; // from ThreadedSetOfInstancesJob + virtual void PostProcessInstances(); + public: explicit ResourceModificationJob(ServerContext& context, unsigned int workersCount); @@ -121,5 +125,7 @@ virtual void GetPublicContent(Json::Value& value) ORTHANC_OVERRIDE; virtual bool Serialize(Json::Value& value) ORTHANC_OVERRIDE; + + virtual void Reset() ORTHANC_OVERRIDE; }; } diff -r 252385892197 -r e71b22a43c0b OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp --- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Wed Jan 11 11:14:00 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.cpp Tue Jan 17 17:54:38 2023 +0100 @@ -39,7 +39,6 @@ bool hasPostProcessing, bool keepSource, size_t workersCount) : - processedInstancesCount_(0), hasPostProcessing_(hasPostProcessing), started_(false), stopRequested_(false), @@ -80,7 +79,7 @@ // send a dummy "exit" message to all workers such that they stop waiting for messages on the queue for (size_t i = 0; i < instancesWorkers_.size(); i++) { - instancesToProcess_.Enqueue(new SingleValueObject(EXIT_WORKER_MESSAGE)); + instancesToProcessQueue_.Enqueue(new SingleValueObject(EXIT_WORKER_MESSAGE)); } for (size_t i = 0; i < instancesWorkers_.size(); i++) @@ -143,9 +142,9 @@ if (currentStep_ == ThreadedJobStep_ProcessingInstances) { // create the workers and enqueue all instances - for (std::set::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + for (std::set::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) { - instancesToProcess_.Enqueue(new SingleValueObject(*it)); + instancesToProcessQueue_.Enqueue(new SingleValueObject(*it)); } InitWorkers(workersCount_); @@ -153,7 +152,7 @@ WaitWorkersComplete(); // check job has really completed !!! it might have been interrupted because of an error - if ((processedInstancesCount_ != instances_.size()) + if ((processedInstances_.size() != instancesToProcess_.size()) || (!IsPermissive() && failedInstances_.size() > 0)) { return JobStepResult::Failure(GetErrorCode(), NULL); @@ -177,7 +176,7 @@ // clean after the post processing step if (HasCleanupStep()) { - for (std::set::const_iterator it = instances_.begin(); it != instances_.end(); ++it) + for (std::set::const_iterator it = instancesToProcess_.begin(); it != instancesToProcess_.end(); ++it) { Json::Value tmp; context_.DeleteResource(tmp, *it, ResourceType_Instance); @@ -211,6 +210,14 @@ return hasPostProcessing_; } + void ThreadedSetOfInstancesJob::PostProcessInstances() + { + if (HasPostProcessingStep()) + { + throw OrthancException(ErrorCode_InternalError, "Job with post-processing should override PostProcessInstances"); + } + } + bool ThreadedSetOfInstancesJob::HasCleanupStep() const { @@ -224,26 +231,18 @@ { while (true) { - std::unique_ptr > instanceId(dynamic_cast*>(that->instancesToProcess_.Dequeue(0))); + std::unique_ptr > instanceId(dynamic_cast*>(that->instancesToProcessQueue_.Dequeue(0))); if (that->stopRequested_ // no lock(mutex) to access this variable, this is safe since it's just reading a boolean || instanceId->GetValue() == EXIT_WORKER_MESSAGE) { return; } + bool processed = false; + try { - bool processed = that->HandleInstance(instanceId->GetValue()); - - { - boost::recursive_mutex::scoped_lock lock(that->mutex_); - - that->processedInstancesCount_++; - if (!processed) - { - that->failedInstances_.insert(instanceId->GetValue()); - } - } + processed = that->HandleInstance(instanceId->GetValue()); } catch (const Orthanc::OrthancException& e) { @@ -256,6 +255,7 @@ LOG(ERROR) << "Error in a non-permissive job: " << e.What(); that->SetErrorCode(e.GetErrorCode()); that->StopWorkers(); + return; } } catch (...) @@ -263,8 +263,20 @@ LOG(ERROR) << "Native exception while executing a job"; that->SetErrorCode(ErrorCode_InternalError); that->StopWorkers(); + return; } - + + { + boost::recursive_mutex::scoped_lock lock(that->mutex_); + + that->processedInstances_.insert(instanceId->GetValue()); + + if (!processed) + { + that->failedInstances_.insert(instanceId->GetValue()); + } + } + } } @@ -282,7 +294,7 @@ { boost::recursive_mutex::scoped_lock lock(mutex_); - return instances_.size(); + return instancesToProcess_.size(); } @@ -298,7 +310,7 @@ { boost::recursive_mutex::scoped_lock lock(mutex_); - target = instances_; + target = instancesToProcess_; } @@ -318,17 +330,19 @@ } + // Reset is called when resubmitting a failed job void ThreadedSetOfInstancesJob::Reset() { boost::recursive_mutex::scoped_lock lock(mutex_); if (started_) { + // TODO: cleanup the instances that have been generated during the previous run currentStep_ = ThreadedJobStep_ProcessingInstances; stopRequested_ = false; - processedInstancesCount_ = 0; + processedInstances_.clear(); failedInstances_.clear(); - instancesToProcess_.Clear(); + instancesToProcessQueue_.Clear(); } else { @@ -381,7 +395,7 @@ target[KEY_KEEP_SOURCE] = keepSource_; target[KEY_WORKERS_COUNT] = static_cast(workersCount_); - SerializationToolbox::WriteSetOfStrings(target, instances_, KEY_INSTANCES); + SerializationToolbox::WriteSetOfStrings(target, instancesToProcess_, KEY_INSTANCES); SerializationToolbox::WriteSetOfStrings(target, failedInstances_, KEY_FAILED_INSTANCES); SerializationToolbox::WriteSetOfStrings(target, parentResources_, KEY_PARENT_RESOURCES); @@ -393,7 +407,6 @@ const Json::Value& source, bool hasPostProcessing, bool defaultKeepSource) : - processedInstancesCount_(0), hasPostProcessing_(hasPostProcessing), started_(false), stopRequested_(false), @@ -424,7 +437,7 @@ if (source.isMember(KEY_INSTANCES)) { - SerializationToolbox::ReadSetOfStrings(instances_, source, KEY_INSTANCES); + SerializationToolbox::ReadSetOfStrings(instancesToProcess_, source, KEY_INSTANCES); } if (source.isMember(KEY_CURRENT_STEP)) @@ -458,7 +471,7 @@ else { size_t totalProgress = GetInstancesCount(); - size_t currentProgress = processedInstancesCount_; + size_t currentProgress = processedInstances_.size(); if (HasPostProcessingStep()) { @@ -559,7 +572,7 @@ for (std::list::const_iterator it = instances.begin(); it != instances.end(); ++it) { - instances_.insert(*it); + instancesToProcess_.insert(*it); } } diff -r 252385892197 -r e71b22a43c0b OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h --- a/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h Wed Jan 11 11:14:00 2023 +0100 +++ b/OrthancServer/Sources/ServerJobs/ThreadedSetOfInstancesJob.h Tue Jan 17 17:54:38 2023 +0100 @@ -48,13 +48,14 @@ }; private: - std::set failedInstances_; - std::set parentResources_; + std::set instancesToProcess_; // the list of source instances ids to process + std::set failedInstances_; // the list of source instances ids that failed processing + std::set processedInstances_; // the list of source instances ids that have been processed (including failed ones) + + std::set parentResources_; - size_t processedInstancesCount_; - SharedMessageQueue instancesToProcess_; + SharedMessageQueue instancesToProcessQueue_; std::vector > instancesWorkers_; - std::set instances_; bool hasPostProcessing_; // final step before "KeepSource" cleanup bool started_; @@ -86,7 +87,7 @@ protected: virtual bool HandleInstance(const std::string& instance) = 0; - virtual void PostProcessInstances() {} + virtual void PostProcessInstances(); void InitWorkers(size_t workersCount);