comparison Platforms/Generic/Oracle.cpp @ 80:f40a78cc7070 wasm

Oracle
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 24 May 2017 22:33:20 +0200
parents
children 02c3a7a4938f
comparison
equal deleted inserted replaced
79:4e21f6b3aa0d 80:f40a78cc7070
1 /**
2 * Stone of Orthanc
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017 Osimis, Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "Oracle.h"
23
24 #include "../../Resources/Orthanc/Core/Logging.h"
25 #include "../../Resources/Orthanc/Core/MultiThreading/SharedMessageQueue.h"
26 #include "../../Resources/Orthanc/Core/OrthancException.h"
27
28 #include <vector>
29
30 namespace OrthancStone
31 {
32 class Oracle::PImpl
33 {
34 private:
35 enum State
36 {
37 State_Init,
38 State_Started,
39 State_Stopped
40 };
41
42 boost::mutex* globalMutex_;
43 boost::mutex oracleMutex_;
44 State state_;
45 std::vector<boost::thread*> threads_;
46 Orthanc::SharedMessageQueue queue_;
47
48 static void Worker(PImpl* that)
49 {
50 for (;;)
51 {
52 State state;
53
54 {
55 boost::mutex::scoped_lock lock(that->oracleMutex_);
56 state = that->state_;
57 }
58
59 if (state == State_Stopped)
60 {
61 break;
62 }
63
64 std::auto_ptr<Orthanc::IDynamicObject> item(that->queue_.Dequeue(100));
65 if (item.get() != NULL)
66 {
67 IOracleCommand& command = dynamic_cast<IOracleCommand&>(*item);
68 command.Execute();
69
70 if (that->globalMutex_ != NULL)
71 {
72 boost::mutex::scoped_lock lock(*that->globalMutex_);
73 command.Commit();
74 }
75 else
76 {
77 command.Commit();
78 }
79 }
80 }
81 }
82
83 public:
84 PImpl(boost::mutex* globalMutex,
85 unsigned int threadCount) :
86 globalMutex_(globalMutex),
87 state_(State_Init),
88 threads_(threadCount)
89 {
90 }
91
92 ~PImpl()
93 {
94 if (state_ == State_Started)
95 {
96 LOG(ERROR) << "You should have manually called Oracle::Stop()";
97 Stop();
98 }
99 }
100
101 void Submit(IOracleCommand* command)
102 {
103 std::auto_ptr<IOracleCommand> protection(command);
104
105 if (command == NULL)
106 {
107 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
108 }
109
110 boost::mutex::scoped_lock lock(oracleMutex_);
111
112 switch (state_)
113 {
114 case State_Init:
115 LOG(ERROR) << "You must call Oracle::Start()";
116 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
117
118 case State_Started:
119 queue_.Enqueue(protection.release());
120 break;
121
122 case State_Stopped:
123 LOG(ERROR) << "Cannot schedule a request to the Oracle after having "
124 << "called Oracle::Stop()";
125 break;
126
127 default:
128 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
129 }
130
131 }
132
133 void Start()
134 {
135 boost::mutex::scoped_lock lock(oracleMutex_);
136
137 if (state_ != State_Init)
138 {
139 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
140 }
141
142 for (size_t i = 0; i < threads_.size(); i++)
143 {
144 threads_[i] = new boost::thread(Worker, this);
145 }
146
147 state_ = State_Started;
148 }
149
150 void Stop()
151 {
152 {
153 boost::mutex::scoped_lock lock(oracleMutex_);
154
155 if (state_ != State_Started)
156 {
157 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
158 }
159
160 state_ = State_Stopped;
161 }
162
163 for (size_t i = 0; i < threads_.size(); i++)
164 {
165 if (threads_[i] != NULL)
166 {
167 if (threads_[i]->joinable())
168 {
169 threads_[i]->join();
170 }
171
172 delete threads_[i];
173 }
174 }
175 }
176 };
177
178
179 Oracle::Oracle(boost::mutex& globalMutex,
180 unsigned int threadCount) :
181 pimpl_(new PImpl(&globalMutex, threadCount))
182 {
183 }
184
185
186 Oracle::Oracle(unsigned int threadCount) :
187 pimpl_(new PImpl(NULL, threadCount))
188 {
189 }
190
191
192 void Oracle::Start()
193 {
194 pimpl_->Start();
195 }
196
197
198 void Oracle::Submit(IOracleCommand* command)
199 {
200 pimpl_->Submit(command);
201 }
202
203
204 void Oracle::Stop()
205 {
206 pimpl_->Stop();
207 }
208 }