comparison Resources/Graveyard/Deprecated/Platforms/Generic/Oracle.cpp @ 1503:553084468225

moving /Deprecated/ to /Resources/Graveyard/Deprecated/
author Sebastien Jodogne <s.jodogne@gmail.com>
date Tue, 30 Jun 2020 11:38:13 +0200
parents Deprecated/Platforms/Generic/Oracle.cpp@419d0320c344
children
comparison
equal deleted inserted replaced
1502:e5729dab3f67 1503:553084468225
1 /**
2 * Stone of Orthanc
3 * Copyright (C) 2012-2016 Sebastien Jodogne, Medical Physics
4 * Department, University Hospital of Liege, Belgium
5 * Copyright (C) 2017-2020 Osimis S.A., Belgium
6 *
7 * This program is free software: you can redistribute it and/or
8 * modify it under the terms of the GNU Affero General Public License
9 * as published by the Free Software Foundation, either version 3 of
10 * the License, or (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Affero General Public License for more details.
16 *
17 * You should have received a copy of the GNU Affero General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/
20
21
22 #include "Oracle.h"
23
24 #include <Core/Logging.h>
25 #include <Core/MultiThreading/SharedMessageQueue.h>
26 #include <Core/OrthancException.h>
27
28 #include <vector>
29 #include <stdio.h>
30 #include <boost/thread/mutex.hpp>
31
32 namespace Deprecated
33 {
34 class Oracle::PImpl
35 {
36 private:
37 enum State
38 {
39 State_Init,
40 State_Started,
41 State_Stopped
42 };
43
44 boost::mutex oracleMutex_;
45 State state_;
46 std::vector<boost::thread*> threads_;
47 Orthanc::SharedMessageQueue queue_;
48
49 static void Worker(PImpl* that)
50 {
51 for (;;)
52 {
53 State state;
54
55 {
56 boost::mutex::scoped_lock lock(that->oracleMutex_);
57 state = that->state_;
58 }
59
60 if (state == State_Stopped)
61 {
62 break;
63 }
64
65 std::unique_ptr<Orthanc::IDynamicObject> item(that->queue_.Dequeue(100));
66 if (item.get() != NULL)
67 {
68 IOracleCommand& command = dynamic_cast<IOracleCommand&>(*item);
69 try
70 {
71 command.Execute();
72 }
73 catch (Orthanc::OrthancException& /*ex*/)
74 {
75 // this is probably a curl error that has been triggered. We may just ignore it.
76 // The command.success_ will stay at false and this will be handled in the command.Commit
77 }
78
79 // Random sleeping to test
80 //boost::this_thread::sleep(boost::posix_time::milliseconds(50 * (1 + rand() % 10)));
81
82 command.Commit();
83 }
84 }
85 }
86
87 public:
88 PImpl(unsigned int threadCount) :
89 state_(State_Init),
90 threads_(threadCount)
91 {
92 }
93
94 ~PImpl()
95 {
96 if (state_ == State_Started)
97 {
98 LOG(ERROR) << "You should have manually called Oracle::Stop()";
99 Stop();
100 }
101 }
102
103 Orthanc::SharedMessageQueue& GetQueue()
104 {
105 return queue_;
106 }
107
108 void Submit(IOracleCommand* command)
109 {
110 std::unique_ptr<IOracleCommand> protection(command);
111
112 if (command == NULL)
113 {
114 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
115 }
116
117 boost::mutex::scoped_lock lock(oracleMutex_);
118
119 switch (state_)
120 {
121 case State_Init:
122 case State_Started:
123 queue_.Enqueue(protection.release());
124 break;
125
126 case State_Stopped:
127 LOG(ERROR) << "Cannot schedule a request to the Oracle after having "
128 << "called Oracle::Stop()";
129 break;
130
131 default:
132 throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError);
133 }
134
135 }
136
137 void Start()
138 {
139 boost::mutex::scoped_lock lock(oracleMutex_);
140
141 if (state_ != State_Init)
142 {
143 LOG(ERROR) << "Oracle::PImpl::Start: (state_ != State_Init)";
144 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
145 }
146
147 for (size_t i = 0; i < threads_.size(); i++)
148 {
149 threads_[i] = new boost::thread(Worker, this);
150 }
151
152 state_ = State_Started;
153 }
154
155 void Stop()
156 {
157 {
158 boost::mutex::scoped_lock lock(oracleMutex_);
159
160 if (state_ != State_Started)
161 {
162 LOG(ERROR) << "Oracle::PImpl::Stop(): (state_ != State_Started)";
163 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
164 }
165
166 state_ = State_Stopped;
167 }
168
169 for (size_t i = 0; i < threads_.size(); i++)
170 {
171 if (threads_[i] != NULL)
172 {
173 if (threads_[i]->joinable())
174 {
175 threads_[i]->join();
176 }
177
178 delete threads_[i];
179 }
180 }
181 }
182 };
183
184
185 Oracle::Oracle(unsigned int threadCount) :
186 pimpl_(new PImpl(threadCount))
187 {
188 }
189
190 void Oracle::Start()
191 {
192 pimpl_->Start();
193 }
194
195
196 void Oracle::Submit(IOracleCommand* command)
197 {
198 pimpl_->Submit(command);
199 }
200
201
202 void Oracle::Stop()
203 {
204 pimpl_->Stop();
205 }
206
207
208 void Oracle::WaitEmpty()
209 {
210 pimpl_->GetQueue().WaitEmpty(50);
211 }
212 }