comparison Core/MultiThreading/ThreadedCommandProcessor.cpp @ 458:84966299c8f8

ThreadedCommandProcessor
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 04 Jul 2013 16:10:42 +0200
parents
children d665b8fc8560
comparison
equal deleted inserted replaced
457:997282a61ff8 458:84966299c8f8
1 /**
2 * Orthanc - A Lightweight, RESTful DICOM Store
3 * Copyright (C) 2012-2013 Medical Physics Department, CHU of Liege,
4 * Belgium
5 *
6 * This program is free software: you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License as
8 * published by the Free Software Foundation, either version 3 of the
9 * License, or (at your option) any later version.
10 *
11 * In addition, as a special exception, the copyright holders of this
12 * program give permission to link the code of its release with the
13 * OpenSSL project's "OpenSSL" library (or with modified versions of it
14 * that use the same license as the "OpenSSL" library), and distribute
15 * the linked executables. You must obey the GNU General Public License
16 * in all respects for all of the code used other than "OpenSSL". If you
17 * modify file(s) with this exception, you may extend this exception to
18 * your version of the file(s), but you are not obligated to do so. If
19 * you do not wish to do so, delete this exception statement from your
20 * version. If you delete this exception statement from all source files
21 * in the program, then also delete it here.
22 *
23 * This program is distributed in the hope that it will be useful, but
24 * WITHOUT ANY WARRANTY; without even the implied warranty of
25 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
26 * General Public License for more details.
27 *
28 * You should have received a copy of the GNU General Public License
29 * along with this program. If not, see <http://www.gnu.org/licenses/>.
30 **/
31
32
33 #include "ThreadedCommandProcessor.h"
34
35 #include "../OrthancException.h"
36
37 namespace Orthanc
38 {
39 static const int32_t TIMEOUT = 10;
40
41
42 void ThreadedCommandProcessor::Processor(ThreadedCommandProcessor* that)
43 {
44 while (!that->done_)
45 {
46 std::auto_ptr<IDynamicObject> command(that->queue_.Dequeue(TIMEOUT));
47
48 if (command.get() != NULL)
49 {
50 try
51 {
52 dynamic_cast<ICommand&>(*command).Execute();
53 }
54 catch (OrthancException)
55 {
56 }
57
58 {
59 boost::mutex::scoped_lock lock(that->mutex_);
60 assert(that->remainingCommands_ > 0);
61 that->remainingCommands_--;
62 that->processedCommand_.notify_all();
63 }
64 }
65 }
66 }
67
68
69 ThreadedCommandProcessor::ThreadedCommandProcessor(unsigned int numThreads)
70 {
71 if (numThreads < 1)
72 {
73 throw OrthancException(ErrorCode_ParameterOutOfRange);
74 }
75
76 done_ = false;
77 threads_.resize(numThreads);
78 remainingCommands_ = 0;
79
80 for (unsigned int i = 0; i < numThreads; i++)
81 {
82 threads_[i] = new boost::thread(Processor, this);
83 }
84 }
85
86
87 ThreadedCommandProcessor::~ThreadedCommandProcessor()
88 {
89 done_ = true;
90
91 for (unsigned int i = 0; i < threads_.size(); i++)
92 {
93 boost::thread* t = threads_[i];
94
95 if (t != NULL)
96 {
97 if (t->joinable())
98 {
99 t->join();
100 }
101
102 delete t;
103 }
104 }
105 }
106
107
108 void ThreadedCommandProcessor::Post(ICommand* command)
109 {
110 queue_.Enqueue(command);
111
112 {
113 boost::mutex::scoped_lock lock(mutex_);
114 remainingCommands_++;
115 }
116 }
117
118
119 void ThreadedCommandProcessor::Join()
120 {
121 boost::mutex::scoped_lock lock(mutex_);
122
123 while (!remainingCommands_ == 0)
124 {
125 processedCommand_.wait(lock);
126 }
127 }
128 }