comparison Core/MultiThreading/BagOfTasksProcessor.h @ 1920:b97aa579e85b

bag of tasks
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 19 Feb 2016 17:01:50 +0100
parents
children 6ac7f31fc543
comparison
equal deleted inserted replaced
1919:d9e33b165112 1920:b97aa579e85b
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #pragma once
34
35 #include "BagOfTasks.h"
36 #include "SharedMessageQueue.h"
37
38 #include <stdint.h>
39 #include <map>
40
41 namespace Orthanc
42 {
43 class BagOfTasksProcessor : public boost::noncopyable
44 {
45 private:
46 enum BagStatus
47 {
48 BagStatus_Running,
49 BagStatus_Canceled,
50 BagStatus_Failed
51 };
52
53
54 struct Bag
55 {
56 size_t size_;
57 size_t done_;
58 BagStatus status_;
59
60 Bag() :
61 size_(0),
62 done_(0),
63 status_(BagStatus_Failed)
64 {
65 }
66
67 Bag(size_t size) :
68 size_(size),
69 done_(0),
70 status_(BagStatus_Running)
71 {
72 }
73 };
74
75 class Task;
76
77
78 typedef std::map<uint64_t, Bag> Bags;
79 typedef std::map<uint64_t, bool> ExitStatus;
80
81 SharedMessageQueue queue_;
82
83 boost::mutex mutex_;
84 uint64_t countBags_;
85 Bags bags_;
86 std::vector<boost::thread*> threads_;
87 ExitStatus exitStatus_;
88 bool continue_;
89
90 boost::condition_variable bagFinished_;
91
92 static void Worker(BagOfTasksProcessor* that);
93
94 void Cancel(int64_t bag);
95
96 bool Join(int64_t bag);
97
98 float GetProgress(int64_t bag);
99
100 public:
101 class Handle : public boost::noncopyable
102 {
103 friend class BagOfTasksProcessor;
104
105 private:
106 BagOfTasksProcessor& that_;
107 uint64_t bag_;
108 bool hasJoined_;
109 bool status_;
110
111 Handle(BagOfTasksProcessor& that,
112 uint64_t bag) :
113 that_(that),
114 bag_(bag),
115 hasJoined_(false)
116 {
117 }
118
119 public:
120 ~Handle()
121 {
122 Join();
123 }
124
125 void Cancel()
126 {
127 that_.Cancel(bag_);
128 }
129
130 bool Join();
131
132 float GetProgress()
133 {
134 return that_.GetProgress(bag_);
135 }
136 };
137
138
139 BagOfTasksProcessor(size_t countThreads);
140
141 ~BagOfTasksProcessor();
142
143 Handle* Submit(BagOfTasks& tasks);
144 };
145 }