comparison OrthancServer/ServerJobs/LuaJobManager.cpp @ 2604:76ef12fa136c jobs

fix race conditions if creating Lua jobs
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 18 May 2018 17:37:14 +0200
parents 988936118354
children 1e11b0229e04
comparison
equal deleted inserted replaced
2603:988936118354 2604:76ef12fa136c
32 32
33 33
34 #include "../PrecompiledHeadersServer.h" 34 #include "../PrecompiledHeadersServer.h"
35 #include "LuaJobManager.h" 35 #include "LuaJobManager.h"
36 36
37 #include "DeleteResourceOperation.h"
38 #include "StoreScuOperation.h"
39 #include "../../Core/JobsEngine/Operations/LogJobOperation.h"
40
41 #include "DicomInstanceOperationValue.h"
42 #include "../../Core/JobsEngine/Operations/NullOperationValue.h"
43 #include "../../Core/JobsEngine/Operations/StringOperationValue.h"
37 44
38 namespace Orthanc 45 namespace Orthanc
39 { 46 {
40 void LuaJobManager::ConnectionTimeoutThread(LuaJobManager* manager) 47 void LuaJobManager::ConnectionTimeoutThread(LuaJobManager* manager)
41 { 48 {
57 currentJob_ = NULL; 64 currentJob_ = NULL;
58 } 65 }
59 } 66 }
60 67
61 68
62 LuaJobManager::LuaJobManager(JobsEngine& engine) : 69 LuaJobManager::LuaJobManager() :
63 engine_(engine),
64 currentJob_(NULL), 70 currentJob_(NULL),
65 maxOperations_(1000), 71 maxOperations_(1000),
66 priority_(0), 72 priority_(0),
67 continue_(true) 73 continue_(true)
68 { 74 {
100 boost::mutex::scoped_lock lock(mutex_); 106 boost::mutex::scoped_lock lock(mutex_);
101 trailingTimeout_ = timeout; 107 trailingTimeout_ = timeout;
102 } 108 }
103 109
104 110
105 LuaJobManager::Lock* LuaJobManager::Modify() 111 LuaJobManager::Lock::Lock(LuaJobManager& that,
106 { 112 JobsEngine& engine) :
107 boost::mutex::scoped_lock lock(mutex_); 113 that_(that),
108 114 lock_(that.mutex_),
109 if (currentJob_ != NULL) 115 engine_(engine)
110 { 116 {
111 std::auto_ptr<Lock> result(new Lock(*currentJob_)); 117 if (that_.currentJob_ == NULL)
112 118 {
113 if (!result->IsDone() && 119 isNewJob_ = true;
114 result->GetOperationsCount() < maxOperations_) 120 }
121 else
122 {
123 jobLock_.reset(new SequenceOfOperationsJob::Lock(*that_.currentJob_));
124
125 if (jobLock_->IsDone() ||
126 jobLock_->GetOperationsCount() >= that_.maxOperations_)
115 { 127 {
116 return result.release(); 128 jobLock_.reset(NULL);
129 isNewJob_ = true;
117 } 130 }
118 } 131 else
119 132 {
120 // Need to create a new job, as the previous one is either 133 isNewJob_ = false;
121 // finished, or is getting too long 134 }
122 currentJob_ = new SequenceOfOperationsJob; 135 }
123 136
124 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); 137 if (isNewJob_)
125 138 {
126 std::auto_ptr<Lock> result(new Lock(*currentJob_)); 139 // Need to create a new job, as the previous one is either
127 result->SetTrailingOperationTimeout(trailingTimeout_); 140 // finished, or is getting too long
128 141 that_.currentJob_ = new SequenceOfOperationsJob;
129 return result.release(); 142 jobLock_.reset(new SequenceOfOperationsJob::Lock(*that_.currentJob_));
143 jobLock_->SetTrailingOperationTimeout(that_.trailingTimeout_);
144 }
145
146 assert(jobLock_.get() != NULL);
147 }
148
149
150 LuaJobManager::Lock::~Lock()
151 {
152 assert(jobLock_.get() != NULL);
153 jobLock_.reset(NULL);
154
155 if (isNewJob_)
156 {
157 engine_.GetRegistry().Submit(that_.currentId_, that_.currentJob_, that_.priority_);
158 }
159 }
160
161
162 size_t LuaJobManager::Lock::AddDeleteResourceOperation(ServerContext& context)
163 {
164 assert(jobLock_.get() != NULL);
165 return jobLock_->AddOperation(new DeleteResourceOperation(context));
166 }
167
168
169 size_t LuaJobManager::Lock::AddLogOperation()
170 {
171 assert(jobLock_.get() != NULL);
172 return jobLock_->AddOperation(new LogJobOperation);
173 }
174
175
176 size_t LuaJobManager::Lock::AddStoreScuOperation(const std::string& localAet,
177 const RemoteModalityParameters& modality,
178 IDicomConnectionManager& manager)
179 {
180 assert(jobLock_.get() != NULL);
181 return jobLock_->AddOperation(new StoreScuOperation(localAet, modality, that_.connectionManager_));
182 }
183
184
185 void LuaJobManager::Lock::AddNullInput(size_t operation)
186 {
187 assert(jobLock_.get() != NULL);
188 jobLock_->AddInput(operation, NullOperationValue());
189 }
190
191
192 void LuaJobManager::Lock::AddStringInput(size_t operation,
193 const std::string& content)
194 {
195 assert(jobLock_.get() != NULL);
196 jobLock_->AddInput(operation, StringOperationValue(content));
197 }
198
199
200 void LuaJobManager::Lock::AddDicomInstanceInput(size_t operation,
201 ServerContext& context,
202 const std::string& instanceId)
203 {
204 assert(jobLock_.get() != NULL);
205 jobLock_->AddInput(operation, DicomInstanceOperationValue(context, instanceId));
206 }
207
208
209 void LuaJobManager::Lock::Connect(size_t operation1,
210 size_t operation2)
211 {
212 assert(jobLock_.get() != NULL);
213 jobLock_->Connect(operation1, operation2);
130 } 214 }
131 } 215 }