# HG changeset patch # User Sebastien Jodogne # Date 1561126362 -7200 # Node ID bff4b45cbf1f9d46ce0deeddd0f8a33c020291d2 # Parent 6bafe1399d97615160f9ff3a35a5d88250227eec simplification of SingleFunctionJob diff -r 6bafe1399d97 -r bff4b45cbf1f Plugin/DicomWebClient.cpp --- a/Plugin/DicomWebClient.cpp Fri Jun 21 13:19:50 2019 +0200 +++ b/Plugin/DicomWebClient.cpp Fri Jun 21 16:12:42 2019 +0200 @@ -85,10 +85,7 @@ { } - // Must return "true" if the job has completed with success, or - // "false" if the job has been canceled. Pausing the job - // corresponds to canceling it. - virtual bool Execute(JobContext& context) = 0; + virtual void Execute(JobContext& context) = 0; }; @@ -99,13 +96,12 @@ { } - // WARNING: "CancelFunction()" will be invoked while "Execute()" - // is running. Mutex is probably necessary. + // Called when the job is paused or canceled. WARNING: + // "CancelFunction()" will be invoked while "Execute()" is + // running. Mutex is probably necessary. virtual void CancelFunction() = 0; - // Only called when no function is running, to deal with - // "Resubmit()" after job cancelation/failure. - virtual void ResetFunction() = 0; + virtual void PauseFunction() = 0; virtual IFunction* CreateFunction() = 0; }; @@ -116,7 +112,7 @@ { boost::mutex::scoped_lock lock(mutex_); - if (state_ != State_Setup) + if (factory_ != NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } @@ -128,19 +124,19 @@ private: - enum State + enum FunctionResult { - State_Setup, - State_Running, - State_Success, - State_Failure + FunctionResult_Running, + FunctionResult_Done, + FunctionResult_Failure }; boost::mutex mutex_; - State state_; // Can only be modified by the "Worker()" function + FunctionResult functionResult_; // Can only be modified by the "Worker()" function std::auto_ptr worker_; Json::Value content_; IFunctionFactory* factory_; + bool stopping_; void JoinWorker() { @@ -161,9 +157,9 @@ { assert(factory_ != NULL); - if (worker_.get() == NULL && - factory_ != NULL) + if (worker_.get() == NULL) { + stopping_ = false; worker_.reset(new boost::thread(Worker, this, factory_)); } } @@ -179,27 +175,20 @@ static void Worker(SingleFunctionJob* job, IFunctionFactory* factory) { + printf("=================================> STARTING\n"); + assert(job != NULL && factory != NULL); JobContext context(*job); - { - boost::mutex::scoped_lock lock(job->mutex_); - job->state_ = State_Running; - } - try { std::auto_ptr function(factory->CreateFunction()); - bool success = function->Execute(context); + function->Execute(context); { boost::mutex::scoped_lock lock(job->mutex_); - job->state_ = (success ? State_Success : State_Failure); - if (success) - { - job->UpdateProgress(1); - } + job->functionResult_ = FunctionResult_Done; } } catch (Orthanc::OrthancException& e) @@ -208,14 +197,20 @@ { boost::mutex::scoped_lock lock(job->mutex_); - job->state_ = State_Failure; - job->content_["FunctionErrorCode"] = e.GetErrorCode(); - job->content_["FunctionErrorDescription"] = e.What(); - if (e.HasDetails()) + + job->functionResult_ = FunctionResult_Failure; + + if (!job->stopping_) { - job->content_["FunctionErrorDetails"] = e.GetDetails(); + // Don't report exceptions that are a consequence of stopping the function + job->content_["FunctionErrorCode"] = e.GetErrorCode(); + job->content_["FunctionErrorDescription"] = e.What(); + if (e.HasDetails()) + { + job->content_["FunctionErrorDetails"] = e.GetDetails(); + } + job->UpdateContent(job->content_); } - job->UpdateContent(job->content_); } } } @@ -223,7 +218,7 @@ public: SingleFunctionJob(const std::string& jobName) : OrthancJob(jobName), - state_(State_Setup), + functionResult_(FunctionResult_Running), content_(Json::objectValue), factory_(NULL) { @@ -235,14 +230,7 @@ { LOG(ERROR) << "Classes deriving from SingleFunctionJob must " << "explicitly call Finalize() in their destructor"; - - try - { - JoinWorker(); - } - catch (Orthanc::OrthancException&) - { - } + Finalize(); } } @@ -250,11 +238,7 @@ { try { - if (factory_ != NULL) - { - factory_->CancelFunction(); - JoinWorker(); - } + Stop(OrthancPluginJobStopReason_Canceled); } catch (Orthanc::OrthancException&) { @@ -263,76 +247,77 @@ virtual OrthancPluginJobStepStatus Step() { + printf("=================================> STEP\n"); + if (factory_ == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } - State state; + FunctionResult result; { boost::mutex::scoped_lock lock(mutex_); - state = state_; + result = functionResult_; } - switch (state) + switch (result) { - case State_Setup: + case FunctionResult_Running: StartWorker(); - break; + boost::this_thread::sleep(boost::posix_time::milliseconds(500)); + return OrthancPluginJobStepStatus_Continue; - case State_Running: - break; - - case State_Success: + case FunctionResult_Done: JoinWorker(); return OrthancPluginJobStepStatus_Success; - case State_Failure: + case FunctionResult_Failure: JoinWorker(); return OrthancPluginJobStepStatus_Failure; default: throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } - - boost::this_thread::sleep(boost::posix_time::milliseconds(500)); - return OrthancPluginJobStepStatus_Continue; } virtual void Stop(OrthancPluginJobStopReason reason) { + printf("=================================> STOP %d\n", (int) reason); + if (factory_ == NULL) { return; } - - if (reason == OrthancPluginJobStopReason_Paused || - reason == OrthancPluginJobStopReason_Canceled) + else if (reason == OrthancPluginJobStopReason_Paused || + reason == OrthancPluginJobStopReason_Canceled) { - factory_->CancelFunction(); - } + stopping_ = true; - JoinWorker(); + if (reason == OrthancPluginJobStopReason_Paused) + { + factory_->PauseFunction(); + } + else + { + factory_->CancelFunction(); + } - if (reason == OrthancPluginJobStopReason_Paused) - { - // This type of job cannot be paused: Reset under the hood - Reset(); + JoinWorker(); + + // Be ready for the next possible call to "Step()" that will resume the function + functionResult_ = FunctionResult_Running; } } virtual void Reset() { + printf("=================================> RESET\n"); + boost::mutex::scoped_lock lock(mutex_); - if (factory_ != NULL) - { - factory_->ResetFunction(); - } - - state_ = State_Setup; - + assert(worker_.get() == NULL); + functionResult_ = FunctionResult_Running; content_ = Json::objectValue; ClearContent(); } @@ -1014,7 +999,8 @@ State_Body, State_Canceled }; - + + bool debug_; boost::mutex mutex_; State state_; std::list instances_; @@ -1025,6 +1011,8 @@ const void* part, size_t size) { + printf(" part %d\n", size); + std::string contentType; if (!Orthanc::MultipartStreamReader::GetMainContentType(contentType, headers)) { @@ -1061,10 +1049,16 @@ { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } + + if (debug_) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } } public: WadoRetrieveAnswer() : + debug_(false), state_(State_Headers), networkSize_(0) { @@ -1074,6 +1068,11 @@ { } + void SetDebug(bool debug) + { + debug_ = debug; + } + void Close() { boost::mutex::scoped_lock lock(mutex_); @@ -1126,6 +1125,11 @@ reader_.reset(new Orthanc::MultipartStreamReader(boundary)); reader_->SetHandler(*this); + + if (debug_) + { + reader_->SetBlockSize(1024 * 64); + } } } @@ -1211,14 +1215,6 @@ }; - enum Status - { - Status_Done, - Status_Canceled, - Status_Continue - }; - - class F : public IFunction { private: @@ -1226,28 +1222,24 @@ public: F(WadoRetrieveJob& that) : - that_(that) + that_(that) { } - virtual bool Execute(JobContext& context) + virtual void Execute(JobContext& context) { for (;;) { OrthancPlugins::HttpClient client; - switch (that_.SetupNextResource(client)) + if (that_.SetupNextResource(client, context)) { - case Status_Continue: - client.Execute(*that_.answer_); - that_.CloseResource(context); - break; - - case Status_Canceled: - return false; - - case Status_Done: - return true; + client.Execute(*that_.answer_); + that_.CloseResource(context); + } + else + { + return; // We're done } } } @@ -1258,27 +1250,28 @@ std::string serverName_; size_t position_; std::vector resources_; - bool canceled_; + bool stopped_; std::list retrievedInstances_; std::auto_ptr answer_; uint64_t networkSize_; - Status SetupNextResource(OrthancPlugins::HttpClient& client) + bool SetupNextResource(OrthancPlugins::HttpClient& client, + JobContext& context) { boost::mutex::scoped_lock lock(mutex_); - if (canceled_) + if (stopped_ || + position_ == resources_.size()) { - return Status_Canceled; - } - else if (position_ == resources_.size()) - { - return Status_Done; + return false; } else { + context.SetProgress(position_, resources_.size()); + answer_.reset(new WadoRetrieveAnswer); + answer_->SetDebug(true); // TODO const Resource* resource = resources_[position_++]; if (resource == NULL) @@ -1290,7 +1283,7 @@ (client, serverName_, resource->GetUri()); client.AddHeaders(resource->GetAdditionalHeaders()); - return Status_Continue; + return true; } } @@ -1318,23 +1311,29 @@ virtual void CancelFunction() { boost::mutex::scoped_lock lock(mutex_); - canceled_ = true; + + stopped_ = true; if (answer_.get() != NULL) - { + { answer_->Cancel(); } } - virtual void ResetFunction() + virtual void PauseFunction() { - boost::mutex::scoped_lock lock(mutex_); - canceled_ = false; - position_ = 0; - retrievedInstances_.clear(); + // This type of job cannot be paused + CancelFunction(); } virtual IFunction* CreateFunction() { + // This type of job cannot be paused: If restarting, always go + // back to the beginning + + stopped_ = false; + position_ = 0; + retrievedInstances_.clear(); + return new F(*this); } @@ -1343,7 +1342,7 @@ SingleFunctionJob("DicomWebWadoRetrieveClient"), serverName_(serverName), position_(0), - canceled_(false), + stopped_(false), networkSize_(0) { SetFactory(*this);