comparison OrthancFramework/Sources/JobsEngine/JobsRegistry.cpp @ 4044:d25f4c0fa160 framework

splitting code into OrthancFramework and OrthancServer
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 10 Jun 2020 20:30:34 +0200
parents Core/JobsEngine/JobsRegistry.cpp@1d2b31fc782f
children bf7b9edf6b81
comparison
equal deleted inserted replaced
4043:6c6239aec462 4044:d25f4c0fa160
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License as
9 * published by the Free Software Foundation, either version 3 of the
10 * License, or (at your option) any later version.
11 *
12 * In addition, as a special exception, the copyright holders of this
13 * program give permission to link the code of its release with the
14 * OpenSSL project's "OpenSSL" library (or with modified versions of it
15 * that use the same license as the "OpenSSL" library), and distribute
16 * the linked executables. You must obey the GNU General Public License
17 * in all respects for all of the code used other than "OpenSSL". If you
18 * modify file(s) with this exception, you may extend this exception to
19 * your version of the file(s), but you are not obligated to do so. If
20 * you do not wish to do so, delete this exception statement from your
21 * version. If you delete this exception statement from all source files
22 * in the program, then also delete it here.
23 *
24 * This program is distributed in the hope that it will be useful, but
25 * WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
27 * General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program. If not, see <http://www.gnu.org/licenses/>.
31 **/
32
33
34 #include "../PrecompiledHeaders.h"
35 #include "JobsRegistry.h"
36
37 #include "../Logging.h"
38 #include "../OrthancException.h"
39 #include "../Toolbox.h"
40 #include "../SerializationToolbox.h"
41
42 namespace Orthanc
43 {
44 static const char* STATE = "State";
45 static const char* TYPE = "Type";
46 static const char* PRIORITY = "Priority";
47 static const char* JOB = "Job";
48 static const char* JOBS = "Jobs";
49 static const char* JOBS_REGISTRY = "JobsRegistry";
50 static const char* CREATION_TIME = "CreationTime";
51 static const char* LAST_CHANGE_TIME = "LastChangeTime";
52 static const char* RUNTIME = "Runtime";
53
54
55 class JobsRegistry::JobHandler : public boost::noncopyable
56 {
57 private:
58 std::string id_;
59 JobState state_;
60 std::string jobType_;
61 std::unique_ptr<IJob> job_;
62 int priority_; // "+inf()" means highest priority
63 boost::posix_time::ptime creationTime_;
64 boost::posix_time::ptime lastStateChangeTime_;
65 boost::posix_time::time_duration runtime_;
66 boost::posix_time::ptime retryTime_;
67 bool pauseScheduled_;
68 bool cancelScheduled_;
69 JobStatus lastStatus_;
70
71 void Touch()
72 {
73 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
74
75 if (state_ == JobState_Running)
76 {
77 runtime_ += (now - lastStateChangeTime_);
78 }
79
80 lastStateChangeTime_ = now;
81 }
82
83 void SetStateInternal(JobState state)
84 {
85 state_ = state;
86 pauseScheduled_ = false;
87 cancelScheduled_ = false;
88 Touch();
89 }
90
91 public:
92 JobHandler(IJob* job,
93 int priority) :
94 id_(Toolbox::GenerateUuid()),
95 state_(JobState_Pending),
96 job_(job),
97 priority_(priority),
98 creationTime_(boost::posix_time::microsec_clock::universal_time()),
99 lastStateChangeTime_(creationTime_),
100 runtime_(boost::posix_time::milliseconds(0)),
101 retryTime_(creationTime_),
102 pauseScheduled_(false),
103 cancelScheduled_(false)
104 {
105 if (job == NULL)
106 {
107 throw OrthancException(ErrorCode_NullPointer);
108 }
109
110 job->GetJobType(jobType_);
111 job->Start();
112
113 lastStatus_ = JobStatus(ErrorCode_Success, "", *job_);
114 }
115
116 const std::string& GetId() const
117 {
118 return id_;
119 }
120
121 IJob& GetJob() const
122 {
123 assert(job_.get() != NULL);
124 return *job_;
125 }
126
127 void SetPriority(int priority)
128 {
129 priority_ = priority;
130 }
131
132 int GetPriority() const
133 {
134 return priority_;
135 }
136
137 JobState GetState() const
138 {
139 return state_;
140 }
141
142 void SetState(JobState state)
143 {
144 if (state == JobState_Retry)
145 {
146 // Use "SetRetryState()"
147 throw OrthancException(ErrorCode_BadSequenceOfCalls);
148 }
149 else
150 {
151 SetStateInternal(state);
152 }
153 }
154
155 void SetRetryState(unsigned int timeout)
156 {
157 if (state_ == JobState_Running)
158 {
159 SetStateInternal(JobState_Retry);
160 retryTime_ = (boost::posix_time::microsec_clock::universal_time() +
161 boost::posix_time::milliseconds(timeout));
162 }
163 else
164 {
165 // Only valid for running jobs
166 throw OrthancException(ErrorCode_BadSequenceOfCalls);
167 }
168 }
169
170 void SchedulePause()
171 {
172 if (state_ == JobState_Running)
173 {
174 pauseScheduled_ = true;
175 }
176 else
177 {
178 // Only valid for running jobs
179 throw OrthancException(ErrorCode_BadSequenceOfCalls);
180 }
181 }
182
183 void ScheduleCancel()
184 {
185 if (state_ == JobState_Running)
186 {
187 cancelScheduled_ = true;
188 }
189 else
190 {
191 // Only valid for running jobs
192 throw OrthancException(ErrorCode_BadSequenceOfCalls);
193 }
194 }
195
196 bool IsPauseScheduled()
197 {
198 return pauseScheduled_;
199 }
200
201 bool IsCancelScheduled()
202 {
203 return cancelScheduled_;
204 }
205
206 bool IsRetryReady(const boost::posix_time::ptime& now) const
207 {
208 if (state_ != JobState_Retry)
209 {
210 throw OrthancException(ErrorCode_BadSequenceOfCalls);
211 }
212 else
213 {
214 return retryTime_ <= now;
215 }
216 }
217
218 const boost::posix_time::ptime& GetCreationTime() const
219 {
220 return creationTime_;
221 }
222
223 const boost::posix_time::ptime& GetLastStateChangeTime() const
224 {
225 return lastStateChangeTime_;
226 }
227
228 void SetLastStateChangeTime(const boost::posix_time::ptime& time)
229 {
230 lastStateChangeTime_ = time;
231 }
232
233 const boost::posix_time::time_duration& GetRuntime() const
234 {
235 return runtime_;
236 }
237
238 const JobStatus& GetLastStatus() const
239 {
240 return lastStatus_;
241 }
242
243 void SetLastStatus(const JobStatus& status)
244 {
245 lastStatus_ = status;
246 Touch();
247 }
248
249 void SetLastErrorCode(ErrorCode code)
250 {
251 lastStatus_.SetErrorCode(code);
252 }
253
254 bool Serialize(Json::Value& target) const
255 {
256 target = Json::objectValue;
257
258 bool ok;
259
260 if (state_ == JobState_Running)
261 {
262 // WARNING: Cannot directly access the "job_" member, as long
263 // as a "RunningJob" instance is running. We do not use a
264 // mutex at the "JobHandler" level, as serialization would be
265 // blocked while a step in the job is running. Instead, we
266 // save a snapshot of the serialized job. (*)
267
268 if (lastStatus_.HasSerialized())
269 {
270 target[JOB] = lastStatus_.GetSerialized();
271 ok = true;
272 }
273 else
274 {
275 ok = false;
276 }
277 }
278 else
279 {
280 ok = job_->Serialize(target[JOB]);
281 }
282
283 if (ok)
284 {
285 target[STATE] = EnumerationToString(state_);
286 target[PRIORITY] = priority_;
287 target[CREATION_TIME] = boost::posix_time::to_iso_string(creationTime_);
288 target[LAST_CHANGE_TIME] = boost::posix_time::to_iso_string(lastStateChangeTime_);
289 target[RUNTIME] = static_cast<unsigned int>(runtime_.total_milliseconds());
290 return true;
291 }
292 else
293 {
294 VLOG(1) << "Job backup is not supported for job of type: " << jobType_;
295 return false;
296 }
297 }
298
299 JobHandler(IJobUnserializer& unserializer,
300 const Json::Value& serialized,
301 const std::string& id) :
302 id_(id),
303 pauseScheduled_(false),
304 cancelScheduled_(false)
305 {
306 state_ = StringToJobState(SerializationToolbox::ReadString(serialized, STATE));
307 priority_ = SerializationToolbox::ReadInteger(serialized, PRIORITY);
308 creationTime_ = boost::posix_time::from_iso_string
309 (SerializationToolbox::ReadString(serialized, CREATION_TIME));
310 lastStateChangeTime_ = boost::posix_time::from_iso_string
311 (SerializationToolbox::ReadString(serialized, LAST_CHANGE_TIME));
312 runtime_ = boost::posix_time::milliseconds
313 (SerializationToolbox::ReadInteger(serialized, RUNTIME));
314
315 retryTime_ = creationTime_;
316
317 job_.reset(unserializer.UnserializeJob(serialized[JOB]));
318 job_->GetJobType(jobType_);
319 job_->Start();
320
321 lastStatus_ = JobStatus(ErrorCode_Success, "", *job_);
322 }
323 };
324
325
326 bool JobsRegistry::PriorityComparator::operator() (JobHandler*& a,
327 JobHandler*& b) const
328 {
329 return a->GetPriority() < b->GetPriority();
330 }
331
332
333 #if defined(NDEBUG)
334 void JobsRegistry::CheckInvariants() const
335 {
336 }
337
338 #else
339 bool JobsRegistry::IsPendingJob(const JobHandler& job) const
340 {
341 PendingJobs copy = pendingJobs_;
342 while (!copy.empty())
343 {
344 if (copy.top() == &job)
345 {
346 return true;
347 }
348
349 copy.pop();
350 }
351
352 return false;
353 }
354
355 bool JobsRegistry::IsCompletedJob(JobHandler& job) const
356 {
357 for (CompletedJobs::const_iterator it = completedJobs_.begin();
358 it != completedJobs_.end(); ++it)
359 {
360 if (*it == &job)
361 {
362 return true;
363 }
364 }
365
366 return false;
367 }
368
369 bool JobsRegistry::IsRetryJob(JobHandler& job) const
370 {
371 return retryJobs_.find(&job) != retryJobs_.end();
372 }
373
374 void JobsRegistry::CheckInvariants() const
375 {
376 {
377 PendingJobs copy = pendingJobs_;
378 while (!copy.empty())
379 {
380 assert(copy.top()->GetState() == JobState_Pending);
381 copy.pop();
382 }
383 }
384
385 assert(completedJobs_.size() <= maxCompletedJobs_);
386
387 for (CompletedJobs::const_iterator it = completedJobs_.begin();
388 it != completedJobs_.end(); ++it)
389 {
390 assert((*it)->GetState() == JobState_Success ||
391 (*it)->GetState() == JobState_Failure);
392 }
393
394 for (RetryJobs::const_iterator it = retryJobs_.begin();
395 it != retryJobs_.end(); ++it)
396 {
397 assert((*it)->GetState() == JobState_Retry);
398 }
399
400 for (JobsIndex::const_iterator it = jobsIndex_.begin();
401 it != jobsIndex_.end(); ++it)
402 {
403 JobHandler& job = *it->second;
404
405 assert(job.GetId() == it->first);
406
407 switch (job.GetState())
408 {
409 case JobState_Pending:
410 assert(!IsRetryJob(job) && IsPendingJob(job) && !IsCompletedJob(job));
411 break;
412
413 case JobState_Success:
414 case JobState_Failure:
415 assert(!IsRetryJob(job) && !IsPendingJob(job) && IsCompletedJob(job));
416 break;
417
418 case JobState_Retry:
419 assert(IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
420 break;
421
422 case JobState_Running:
423 case JobState_Paused:
424 assert(!IsRetryJob(job) && !IsPendingJob(job) && !IsCompletedJob(job));
425 break;
426
427 default:
428 throw OrthancException(ErrorCode_InternalError);
429 }
430 }
431 }
432 #endif
433
434
435 void JobsRegistry::ForgetOldCompletedJobs()
436 {
437 while (completedJobs_.size() > maxCompletedJobs_)
438 {
439 assert(completedJobs_.front() != NULL);
440
441 std::string id = completedJobs_.front()->GetId();
442 assert(jobsIndex_.find(id) != jobsIndex_.end());
443
444 jobsIndex_.erase(id);
445 delete(completedJobs_.front());
446 completedJobs_.pop_front();
447 }
448
449 CheckInvariants();
450 }
451
452
453 void JobsRegistry::SetCompletedJob(JobHandler& job,
454 bool success)
455 {
456 job.SetState(success ? JobState_Success : JobState_Failure);
457
458 completedJobs_.push_back(&job);
459 someJobComplete_.notify_all();
460 }
461
462
463 void JobsRegistry::MarkRunningAsCompleted(JobHandler& job,
464 CompletedReason reason)
465 {
466 const char* tmp;
467
468 switch (reason)
469 {
470 case CompletedReason_Success:
471 tmp = "success";
472 break;
473
474 case CompletedReason_Failure:
475 tmp = "failure";
476 break;
477
478 case CompletedReason_Canceled:
479 tmp = "cancel";
480 break;
481
482 default:
483 throw OrthancException(ErrorCode_InternalError);
484 }
485
486 LOG(INFO) << "Job has completed with " << tmp << ": " << job.GetId();
487
488 CheckInvariants();
489
490 assert(job.GetState() == JobState_Running);
491 SetCompletedJob(job, reason == CompletedReason_Success);
492
493 if (reason == CompletedReason_Canceled)
494 {
495 job.SetLastErrorCode(ErrorCode_CanceledJob);
496 }
497
498 if (observer_ != NULL)
499 {
500 if (reason == CompletedReason_Success)
501 {
502 observer_->SignalJobSuccess(job.GetId());
503 }
504 else
505 {
506 observer_->SignalJobFailure(job.GetId());
507 }
508 }
509
510 // WARNING: The following call might make "job" invalid if the job
511 // history size is empty
512 ForgetOldCompletedJobs();
513 }
514
515
516 void JobsRegistry::MarkRunningAsRetry(JobHandler& job,
517 unsigned int timeout)
518 {
519 LOG(INFO) << "Job scheduled for retry in " << timeout << "ms: " << job.GetId();
520
521 CheckInvariants();
522
523 assert(job.GetState() == JobState_Running &&
524 retryJobs_.find(&job) == retryJobs_.end());
525
526 retryJobs_.insert(&job);
527 job.SetRetryState(timeout);
528
529 CheckInvariants();
530 }
531
532
533 void JobsRegistry::MarkRunningAsPaused(JobHandler& job)
534 {
535 LOG(INFO) << "Job paused: " << job.GetId();
536
537 CheckInvariants();
538 assert(job.GetState() == JobState_Running);
539
540 job.SetState(JobState_Paused);
541
542 CheckInvariants();
543 }
544
545
546 bool JobsRegistry::GetStateInternal(JobState& state,
547 const std::string& id)
548 {
549 CheckInvariants();
550
551 JobsIndex::const_iterator it = jobsIndex_.find(id);
552 if (it == jobsIndex_.end())
553 {
554 return false;
555 }
556 else
557 {
558 state = it->second->GetState();
559 return true;
560 }
561 }
562
563
564 JobsRegistry::~JobsRegistry()
565 {
566 for (JobsIndex::iterator it = jobsIndex_.begin(); it != jobsIndex_.end(); ++it)
567 {
568 assert(it->second != NULL);
569 delete it->second;
570 }
571 }
572
573
574 void JobsRegistry::SetMaxCompletedJobs(size_t n)
575 {
576 boost::mutex::scoped_lock lock(mutex_);
577 CheckInvariants();
578
579 LOG(INFO) << "The size of the history of the jobs engine is set to: " << n << " job(s)";
580
581 maxCompletedJobs_ = n;
582 ForgetOldCompletedJobs();
583 }
584
585
586 size_t JobsRegistry::GetMaxCompletedJobs()
587 {
588 boost::mutex::scoped_lock lock(mutex_);
589 CheckInvariants();
590 return maxCompletedJobs_;
591 }
592
593
594 void JobsRegistry::ListJobs(std::set<std::string>& target)
595 {
596 boost::mutex::scoped_lock lock(mutex_);
597 CheckInvariants();
598
599 for (JobsIndex::const_iterator it = jobsIndex_.begin();
600 it != jobsIndex_.end(); ++it)
601 {
602 target.insert(it->first);
603 }
604 }
605
606
607 bool JobsRegistry::GetJobInfo(JobInfo& target,
608 const std::string& id)
609 {
610 boost::mutex::scoped_lock lock(mutex_);
611 CheckInvariants();
612
613 JobsIndex::const_iterator found = jobsIndex_.find(id);
614
615 if (found == jobsIndex_.end())
616 {
617 return false;
618 }
619 else
620 {
621 const JobHandler& handler = *found->second;
622 target = JobInfo(handler.GetId(),
623 handler.GetPriority(),
624 handler.GetState(),
625 handler.GetLastStatus(),
626 handler.GetCreationTime(),
627 handler.GetLastStateChangeTime(),
628 handler.GetRuntime());
629 return true;
630 }
631 }
632
633
634 bool JobsRegistry::GetJobOutput(std::string& output,
635 MimeType& mime,
636 const std::string& job,
637 const std::string& key)
638 {
639 boost::mutex::scoped_lock lock(mutex_);
640 CheckInvariants();
641
642 JobsIndex::const_iterator found = jobsIndex_.find(job);
643
644 if (found == jobsIndex_.end())
645 {
646 return false;
647 }
648 else
649 {
650 const JobHandler& handler = *found->second;
651
652 if (handler.GetState() == JobState_Success)
653 {
654 return handler.GetJob().GetOutput(output, mime, key);
655 }
656 else
657 {
658 return false;
659 }
660 }
661 }
662
663
664 void JobsRegistry::SubmitInternal(std::string& id,
665 JobHandler* handler)
666 {
667 if (handler == NULL)
668 {
669 throw OrthancException(ErrorCode_NullPointer);
670 }
671
672 std::unique_ptr<JobHandler> protection(handler);
673
674 {
675 boost::mutex::scoped_lock lock(mutex_);
676 CheckInvariants();
677
678 id = handler->GetId();
679 int priority = handler->GetPriority();
680
681 jobsIndex_.insert(std::make_pair(id, protection.release()));
682
683 switch (handler->GetState())
684 {
685 case JobState_Pending:
686 case JobState_Retry:
687 case JobState_Running:
688 handler->SetState(JobState_Pending);
689 pendingJobs_.push(handler);
690 pendingJobAvailable_.notify_one();
691 break;
692
693 case JobState_Success:
694 SetCompletedJob(*handler, true);
695 break;
696
697 case JobState_Failure:
698 SetCompletedJob(*handler, false);
699 break;
700
701 case JobState_Paused:
702 break;
703
704 default:
705 {
706 std::string details = ("A job should not be loaded from state: " +
707 std::string(EnumerationToString(handler->GetState())));
708 throw OrthancException(ErrorCode_InternalError, details);
709 }
710 }
711
712 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
713
714 if (observer_ != NULL)
715 {
716 observer_->SignalJobSubmitted(id);
717 }
718
719 // WARNING: The following call might make "handler" invalid if
720 // the job history size is empty
721 ForgetOldCompletedJobs();
722 }
723 }
724
725
726 void JobsRegistry::Submit(std::string& id,
727 IJob* job, // Takes ownership
728 int priority)
729 {
730 SubmitInternal(id, new JobHandler(job, priority));
731 }
732
733
734 void JobsRegistry::Submit(IJob* job, // Takes ownership
735 int priority)
736 {
737 std::string id;
738 SubmitInternal(id, new JobHandler(job, priority));
739 }
740
741
742 void JobsRegistry::SubmitAndWait(Json::Value& successContent,
743 IJob* job, // Takes ownership
744 int priority)
745 {
746 std::string id;
747 Submit(id, job, priority);
748
749 JobState state = JobState_Pending; // Dummy initialization
750
751 {
752 boost::mutex::scoped_lock lock(mutex_);
753
754 for (;;)
755 {
756 if (!GetStateInternal(state, id))
757 {
758 // Job has finished and has been lost (typically happens if
759 // "JobsHistorySize" is 0)
760 throw OrthancException(ErrorCode_InexistentItem,
761 "Cannot retrieve the status of the job, "
762 "make sure that \"JobsHistorySize\" is not 0");
763 }
764 else if (state == JobState_Failure)
765 {
766 // Failure
767 JobsIndex::const_iterator it = jobsIndex_.find(id);
768 if (it != jobsIndex_.end()) // Should always be true, already tested in GetStateInternal()
769 {
770 ErrorCode code = it->second->GetLastStatus().GetErrorCode();
771 const std::string& details = it->second->GetLastStatus().GetDetails();
772
773 if (details.empty())
774 {
775 throw OrthancException(code);
776 }
777 else
778 {
779 throw OrthancException(code, details);
780 }
781 }
782 else
783 {
784 throw OrthancException(ErrorCode_InternalError);
785 }
786 }
787 else if (state == JobState_Success)
788 {
789 // Success, try and retrieve the status of the job
790 JobsIndex::const_iterator it = jobsIndex_.find(id);
791 if (it == jobsIndex_.end())
792 {
793 // Should not happen
794 state = JobState_Failure;
795 }
796 else
797 {
798 const JobStatus& status = it->second->GetLastStatus();
799 successContent = status.GetPublicContent();
800 }
801
802 return;
803 }
804 else
805 {
806 // This job has not finished yet, wait for new completion
807 someJobComplete_.wait(lock);
808 }
809 }
810 }
811 }
812
813
814 bool JobsRegistry::SetPriority(const std::string& id,
815 int priority)
816 {
817 LOG(INFO) << "Changing priority to " << priority << " for job: " << id;
818
819 boost::mutex::scoped_lock lock(mutex_);
820 CheckInvariants();
821
822 JobsIndex::iterator found = jobsIndex_.find(id);
823
824 if (found == jobsIndex_.end())
825 {
826 LOG(WARNING) << "Unknown job: " << id;
827 return false;
828 }
829 else
830 {
831 found->second->SetPriority(priority);
832
833 if (found->second->GetState() == JobState_Pending)
834 {
835 // If the job is pending, we need to reconstruct the
836 // priority queue, as the heap condition has changed
837
838 PendingJobs copy;
839 std::swap(copy, pendingJobs_);
840
841 assert(pendingJobs_.empty());
842 while (!copy.empty())
843 {
844 pendingJobs_.push(copy.top());
845 copy.pop();
846 }
847 }
848
849 CheckInvariants();
850 return true;
851 }
852 }
853
854
855 void JobsRegistry::RemovePendingJob(const std::string& id)
856 {
857 // If the job is pending, we need to reconstruct the priority
858 // queue to remove it
859 PendingJobs copy;
860 std::swap(copy, pendingJobs_);
861
862 assert(pendingJobs_.empty());
863 while (!copy.empty())
864 {
865 if (copy.top()->GetId() != id)
866 {
867 pendingJobs_.push(copy.top());
868 }
869
870 copy.pop();
871 }
872 }
873
874
875 void JobsRegistry::RemoveRetryJob(JobHandler* handler)
876 {
877 RetryJobs::iterator item = retryJobs_.find(handler);
878 assert(item != retryJobs_.end());
879 retryJobs_.erase(item);
880 }
881
882
883 bool JobsRegistry::Pause(const std::string& id)
884 {
885 LOG(INFO) << "Pausing job: " << id;
886
887 boost::mutex::scoped_lock lock(mutex_);
888 CheckInvariants();
889
890 JobsIndex::iterator found = jobsIndex_.find(id);
891
892 if (found == jobsIndex_.end())
893 {
894 LOG(WARNING) << "Unknown job: " << id;
895 return false;
896 }
897 else
898 {
899 switch (found->second->GetState())
900 {
901 case JobState_Pending:
902 RemovePendingJob(id);
903 found->second->SetState(JobState_Paused);
904 break;
905
906 case JobState_Retry:
907 RemoveRetryJob(found->second);
908 found->second->SetState(JobState_Paused);
909 break;
910
911 case JobState_Paused:
912 case JobState_Success:
913 case JobState_Failure:
914 // Nothing to be done
915 break;
916
917 case JobState_Running:
918 found->second->SchedulePause();
919 break;
920
921 default:
922 throw OrthancException(ErrorCode_InternalError);
923 }
924
925 CheckInvariants();
926 return true;
927 }
928 }
929
930
931 bool JobsRegistry::Cancel(const std::string& id)
932 {
933 LOG(INFO) << "Canceling job: " << id;
934
935 boost::mutex::scoped_lock lock(mutex_);
936 CheckInvariants();
937
938 JobsIndex::iterator found = jobsIndex_.find(id);
939
940 if (found == jobsIndex_.end())
941 {
942 LOG(WARNING) << "Unknown job: " << id;
943 return false;
944 }
945 else
946 {
947 switch (found->second->GetState())
948 {
949 case JobState_Pending:
950 RemovePendingJob(id);
951 SetCompletedJob(*found->second, false);
952 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
953 break;
954
955 case JobState_Retry:
956 RemoveRetryJob(found->second);
957 SetCompletedJob(*found->second, false);
958 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
959 break;
960
961 case JobState_Paused:
962 SetCompletedJob(*found->second, false);
963 found->second->SetLastErrorCode(ErrorCode_CanceledJob);
964 break;
965
966 case JobState_Success:
967 case JobState_Failure:
968 // Nothing to be done
969 break;
970
971 case JobState_Running:
972 found->second->ScheduleCancel();
973 break;
974
975 default:
976 throw OrthancException(ErrorCode_InternalError);
977 }
978
979 // WARNING: The following call might make "handler" invalid if
980 // the job history size is empty
981 ForgetOldCompletedJobs();
982
983 return true;
984 }
985 }
986
987
988 bool JobsRegistry::Resume(const std::string& id)
989 {
990 LOG(INFO) << "Resuming job: " << id;
991
992 boost::mutex::scoped_lock lock(mutex_);
993 CheckInvariants();
994
995 JobsIndex::iterator found = jobsIndex_.find(id);
996
997 if (found == jobsIndex_.end())
998 {
999 LOG(WARNING) << "Unknown job: " << id;
1000 return false;
1001 }
1002 else if (found->second->GetState() != JobState_Paused)
1003 {
1004 LOG(WARNING) << "Cannot resume a job that is not paused: " << id;
1005 return false;
1006 }
1007 else
1008 {
1009 found->second->SetState(JobState_Pending);
1010 pendingJobs_.push(found->second);
1011 pendingJobAvailable_.notify_one();
1012 CheckInvariants();
1013 return true;
1014 }
1015 }
1016
1017
1018 bool JobsRegistry::Resubmit(const std::string& id)
1019 {
1020 LOG(INFO) << "Resubmitting failed job: " << id;
1021
1022 boost::mutex::scoped_lock lock(mutex_);
1023 CheckInvariants();
1024
1025 JobsIndex::iterator found = jobsIndex_.find(id);
1026
1027 if (found == jobsIndex_.end())
1028 {
1029 LOG(WARNING) << "Unknown job: " << id;
1030 return false;
1031 }
1032 else if (found->second->GetState() != JobState_Failure)
1033 {
1034 LOG(WARNING) << "Cannot resubmit a job that has not failed: " << id;
1035 return false;
1036 }
1037 else
1038 {
1039 found->second->GetJob().Reset();
1040
1041 bool ok = false;
1042 for (CompletedJobs::iterator it = completedJobs_.begin();
1043 it != completedJobs_.end(); ++it)
1044 {
1045 if (*it == found->second)
1046 {
1047 ok = true;
1048 completedJobs_.erase(it);
1049 break;
1050 }
1051 }
1052
1053 assert(ok);
1054
1055 found->second->SetState(JobState_Pending);
1056 pendingJobs_.push(found->second);
1057 pendingJobAvailable_.notify_one();
1058
1059 CheckInvariants();
1060 return true;
1061 }
1062 }
1063
1064
1065 void JobsRegistry::ScheduleRetries()
1066 {
1067 boost::mutex::scoped_lock lock(mutex_);
1068 CheckInvariants();
1069
1070 RetryJobs copy;
1071 std::swap(copy, retryJobs_);
1072
1073 const boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
1074
1075 assert(retryJobs_.empty());
1076 for (RetryJobs::iterator it = copy.begin(); it != copy.end(); ++it)
1077 {
1078 if ((*it)->IsRetryReady(now))
1079 {
1080 LOG(INFO) << "Retrying job: " << (*it)->GetId();
1081 (*it)->SetState(JobState_Pending);
1082 pendingJobs_.push(*it);
1083 pendingJobAvailable_.notify_one();
1084 }
1085 else
1086 {
1087 retryJobs_.insert(*it);
1088 }
1089 }
1090
1091 CheckInvariants();
1092 }
1093
1094
1095 bool JobsRegistry::GetState(JobState& state,
1096 const std::string& id)
1097 {
1098 boost::mutex::scoped_lock lock(mutex_);
1099 return GetStateInternal(state, id);
1100 }
1101
1102
1103 void JobsRegistry::SetObserver(JobsRegistry::IObserver& observer)
1104 {
1105 boost::mutex::scoped_lock lock(mutex_);
1106 observer_ = &observer;
1107 }
1108
1109
1110 void JobsRegistry::ResetObserver()
1111 {
1112 boost::mutex::scoped_lock lock(mutex_);
1113 observer_ = NULL;
1114 }
1115
1116
1117 JobsRegistry::RunningJob::RunningJob(JobsRegistry& registry,
1118 unsigned int timeout) :
1119 registry_(registry),
1120 handler_(NULL),
1121 targetState_(JobState_Failure),
1122 targetRetryTimeout_(0),
1123 canceled_(false)
1124 {
1125 {
1126 boost::mutex::scoped_lock lock(registry_.mutex_);
1127
1128 while (registry_.pendingJobs_.empty())
1129 {
1130 if (timeout == 0)
1131 {
1132 registry_.pendingJobAvailable_.wait(lock);
1133 }
1134 else
1135 {
1136 bool success = registry_.pendingJobAvailable_.timed_wait
1137 (lock, boost::posix_time::milliseconds(timeout));
1138 if (!success)
1139 {
1140 // No pending job
1141 return;
1142 }
1143 }
1144 }
1145
1146 handler_ = registry_.pendingJobs_.top();
1147 registry_.pendingJobs_.pop();
1148
1149 assert(handler_->GetState() == JobState_Pending);
1150 handler_->SetState(JobState_Running);
1151 handler_->SetLastErrorCode(ErrorCode_Success);
1152
1153 job_ = &handler_->GetJob();
1154 id_ = handler_->GetId();
1155 priority_ = handler_->GetPriority();
1156 }
1157 }
1158
1159
1160 JobsRegistry::RunningJob::~RunningJob()
1161 {
1162 if (IsValid())
1163 {
1164 boost::mutex::scoped_lock lock(registry_.mutex_);
1165
1166 switch (targetState_)
1167 {
1168 case JobState_Failure:
1169 registry_.MarkRunningAsCompleted
1170 (*handler_, canceled_ ? CompletedReason_Canceled : CompletedReason_Failure);
1171 break;
1172
1173 case JobState_Success:
1174 registry_.MarkRunningAsCompleted(*handler_, CompletedReason_Success);
1175 break;
1176
1177 case JobState_Paused:
1178 registry_.MarkRunningAsPaused(*handler_);
1179 break;
1180
1181 case JobState_Retry:
1182 registry_.MarkRunningAsRetry(*handler_, targetRetryTimeout_);
1183 break;
1184
1185 default:
1186 assert(0);
1187 }
1188 }
1189 }
1190
1191
1192 bool JobsRegistry::RunningJob::IsValid() const
1193 {
1194 return (handler_ != NULL &&
1195 job_ != NULL);
1196 }
1197
1198
1199 const std::string& JobsRegistry::RunningJob::GetId() const
1200 {
1201 if (!IsValid())
1202 {
1203 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1204 }
1205 else
1206 {
1207 return id_;
1208 }
1209 }
1210
1211
1212 int JobsRegistry::RunningJob::GetPriority() const
1213 {
1214 if (!IsValid())
1215 {
1216 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1217 }
1218 else
1219 {
1220 return priority_;
1221 }
1222 }
1223
1224
1225 IJob& JobsRegistry::RunningJob::GetJob()
1226 {
1227 if (!IsValid())
1228 {
1229 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1230 }
1231 else
1232 {
1233 return *job_;
1234 }
1235 }
1236
1237
1238 bool JobsRegistry::RunningJob::IsPauseScheduled()
1239 {
1240 if (!IsValid())
1241 {
1242 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1243 }
1244 else
1245 {
1246 boost::mutex::scoped_lock lock(registry_.mutex_);
1247 registry_.CheckInvariants();
1248 assert(handler_->GetState() == JobState_Running);
1249
1250 return handler_->IsPauseScheduled();
1251 }
1252 }
1253
1254
1255 bool JobsRegistry::RunningJob::IsCancelScheduled()
1256 {
1257 if (!IsValid())
1258 {
1259 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1260 }
1261 else
1262 {
1263 boost::mutex::scoped_lock lock(registry_.mutex_);
1264 registry_.CheckInvariants();
1265 assert(handler_->GetState() == JobState_Running);
1266
1267 return handler_->IsCancelScheduled();
1268 }
1269 }
1270
1271
1272 void JobsRegistry::RunningJob::MarkSuccess()
1273 {
1274 if (!IsValid())
1275 {
1276 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1277 }
1278 else
1279 {
1280 targetState_ = JobState_Success;
1281 }
1282 }
1283
1284
1285 void JobsRegistry::RunningJob::MarkFailure()
1286 {
1287 if (!IsValid())
1288 {
1289 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1290 }
1291 else
1292 {
1293 targetState_ = JobState_Failure;
1294 }
1295 }
1296
1297
1298 void JobsRegistry::RunningJob::MarkCanceled()
1299 {
1300 if (!IsValid())
1301 {
1302 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1303 }
1304 else
1305 {
1306 targetState_ = JobState_Failure;
1307 canceled_ = true;
1308 }
1309 }
1310
1311
1312 void JobsRegistry::RunningJob::MarkPause()
1313 {
1314 if (!IsValid())
1315 {
1316 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1317 }
1318 else
1319 {
1320 targetState_ = JobState_Paused;
1321 }
1322 }
1323
1324
1325 void JobsRegistry::RunningJob::MarkRetry(unsigned int timeout)
1326 {
1327 if (!IsValid())
1328 {
1329 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1330 }
1331 else
1332 {
1333 targetState_ = JobState_Retry;
1334 targetRetryTimeout_ = timeout;
1335 }
1336 }
1337
1338
1339 void JobsRegistry::RunningJob::UpdateStatus(ErrorCode code,
1340 const std::string& details)
1341 {
1342 if (!IsValid())
1343 {
1344 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1345 }
1346 else
1347 {
1348 JobStatus status(code, details, *job_);
1349
1350 boost::mutex::scoped_lock lock(registry_.mutex_);
1351 registry_.CheckInvariants();
1352 assert(handler_->GetState() == JobState_Running);
1353
1354 handler_->SetLastStatus(status);
1355 }
1356 }
1357
1358
1359
1360 void JobsRegistry::Serialize(Json::Value& target)
1361 {
1362 boost::mutex::scoped_lock lock(mutex_);
1363 CheckInvariants();
1364
1365 target = Json::objectValue;
1366 target[TYPE] = JOBS_REGISTRY;
1367 target[JOBS] = Json::objectValue;
1368
1369 for (JobsIndex::const_iterator it = jobsIndex_.begin();
1370 it != jobsIndex_.end(); ++it)
1371 {
1372 Json::Value v;
1373 if (it->second->Serialize(v))
1374 {
1375 target[JOBS][it->first] = v;
1376 }
1377 }
1378 }
1379
1380
1381 JobsRegistry::JobsRegistry(IJobUnserializer& unserializer,
1382 const Json::Value& s,
1383 size_t maxCompletedJobs) :
1384 maxCompletedJobs_(maxCompletedJobs),
1385 observer_(NULL)
1386 {
1387 if (SerializationToolbox::ReadString(s, TYPE) != JOBS_REGISTRY ||
1388 !s.isMember(JOBS) ||
1389 s[JOBS].type() != Json::objectValue)
1390 {
1391 throw OrthancException(ErrorCode_BadFileFormat);
1392 }
1393
1394 Json::Value::Members members = s[JOBS].getMemberNames();
1395
1396 for (Json::Value::Members::const_iterator it = members.begin();
1397 it != members.end(); ++it)
1398 {
1399 std::unique_ptr<JobHandler> job;
1400
1401 try
1402 {
1403 job.reset(new JobHandler(unserializer, s[JOBS][*it], *it));
1404 }
1405 catch (OrthancException& e)
1406 {
1407 LOG(WARNING) << "Cannot unserialize one job from previous execution, "
1408 << "skipping it: " << e.What();
1409 continue;
1410 }
1411
1412 const boost::posix_time::ptime lastChangeTime = job->GetLastStateChangeTime();
1413
1414 std::string id;
1415 SubmitInternal(id, job.release());
1416
1417 // Check whether the job has not been removed (which could be
1418 // the case if the "maxCompletedJobs_" value gets smaller)
1419 JobsIndex::iterator found = jobsIndex_.find(id);
1420 if (found != jobsIndex_.end())
1421 {
1422 // The job still lies in the history: Update the time of its
1423 // last change to the time that was serialized
1424 assert(found->second != NULL);
1425 found->second->SetLastStateChangeTime(lastChangeTime);
1426 }
1427 }
1428 }
1429
1430
1431 void JobsRegistry::GetStatistics(unsigned int& pending,
1432 unsigned int& running,
1433 unsigned int& success,
1434 unsigned int& failed)
1435 {
1436 boost::mutex::scoped_lock lock(mutex_);
1437 CheckInvariants();
1438
1439 pending = 0;
1440 running = 0;
1441 success = 0;
1442 failed = 0;
1443
1444 for (JobsIndex::const_iterator it = jobsIndex_.begin();
1445 it != jobsIndex_.end(); ++it)
1446 {
1447 JobHandler& job = *it->second;
1448
1449 switch (job.GetState())
1450 {
1451 case JobState_Retry:
1452 case JobState_Pending:
1453 pending ++;
1454 break;
1455
1456 case JobState_Paused:
1457 case JobState_Running:
1458 running ++;
1459 break;
1460
1461 case JobState_Success:
1462 success ++;
1463 break;
1464
1465 case JobState_Failure:
1466 failed ++;
1467 break;
1468
1469 default:
1470 throw OrthancException(ErrorCode_InternalError);
1471 }
1472 }
1473 }
1474 }