comparison Framework/MultiThreading/BagOfTasksProcessor.h @ 151:fb8d4cd2f618

fix applications
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 17 Jul 2018 10:06:39 +0200
parents
children 6b8ccfc02051
comparison
equal deleted inserted replaced
150:442102e14933 151:fb8d4cd2f618
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2018 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #pragma once
23
24 #include "BagOfTasks.h"
25
26 #include <Core/MultiThreading/SharedMessageQueue.h>
27
28 #include <stdint.h>
29 #include <map>
30
31 namespace Orthanc
32 {
33 class BagOfTasksProcessor : public boost::noncopyable
34 {
35 private:
36 enum BagStatus
37 {
38 BagStatus_Running,
39 BagStatus_Canceled,
40 BagStatus_Failed
41 };
42
43
44 struct Bag
45 {
46 size_t size_;
47 size_t done_;
48 BagStatus status_;
49
50 Bag() :
51 size_(0),
52 done_(0),
53 status_(BagStatus_Failed)
54 {
55 }
56
57 explicit Bag(size_t size) :
58 size_(size),
59 done_(0),
60 status_(BagStatus_Running)
61 {
62 }
63 };
64
65 class Task;
66
67
68 typedef std::map<uint64_t, Bag> Bags;
69 typedef std::map<uint64_t, bool> ExitStatus;
70
71 SharedMessageQueue queue_;
72
73 boost::mutex mutex_;
74 uint64_t countBags_;
75 Bags bags_;
76 std::vector<boost::thread*> threads_;
77 ExitStatus exitStatus_;
78 bool continue_;
79
80 boost::condition_variable bagFinished_;
81
82 static void Worker(BagOfTasksProcessor* that);
83
84 void Cancel(int64_t bag);
85
86 bool Join(int64_t bag);
87
88 float GetProgress(int64_t bag);
89
90 void SignalProgress(Task& task,
91 Bag& bag);
92
93 public:
94 class Handle : public boost::noncopyable
95 {
96 friend class BagOfTasksProcessor;
97
98 private:
99 BagOfTasksProcessor& that_;
100 uint64_t bag_;
101 bool hasJoined_;
102 bool status_;
103
104 Handle(BagOfTasksProcessor& that,
105 uint64_t bag,
106 bool empty) :
107 that_(that),
108 bag_(bag),
109 hasJoined_(empty)
110 {
111 }
112
113 public:
114 ~Handle()
115 {
116 Join();
117 }
118
119 void Cancel()
120 {
121 that_.Cancel(bag_);
122 }
123
124 bool Join();
125
126 float GetProgress()
127 {
128 return that_.GetProgress(bag_);
129 }
130 };
131
132
133 explicit BagOfTasksProcessor(size_t countThreads);
134
135 ~BagOfTasksProcessor();
136
137 Handle* Submit(BagOfTasks& tasks);
138 };
139 }