Mercurial > hg > orthanc
comparison OrthancServer/ServerIndex.cpp @ 511:3b735fdf320b
monitoring of stable patients/studies/series
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 16 Aug 2013 15:39:53 +0200 |
parents | 23e5b35e3c5c |
children | 935e8c7e0b18 |
comparison
equal
deleted
inserted
replaced
509:e7841864c97c | 511:3b735fdf320b |
---|---|
35 #ifndef NOMINMAX | 35 #ifndef NOMINMAX |
36 #define NOMINMAX | 36 #define NOMINMAX |
37 #endif | 37 #endif |
38 | 38 |
39 #include "EmbeddedResources.h" | 39 #include "EmbeddedResources.h" |
40 #include "OrthancInitialization.h" | |
40 #include "../Core/Toolbox.h" | 41 #include "../Core/Toolbox.h" |
41 #include "../Core/Uuid.h" | 42 #include "../Core/Uuid.h" |
42 #include "../Core/DicomFormat/DicomArray.h" | 43 #include "../Core/DicomFormat/DicomArray.h" |
43 #include "../Core/SQLite/Transaction.h" | 44 #include "../Core/SQLite/Transaction.h" |
44 #include "FromDcmtkBridge.h" | 45 #include "FromDcmtkBridge.h" |
185 } | 186 } |
186 } | 187 } |
187 }; | 188 }; |
188 | 189 |
189 | 190 |
191 struct ServerIndex::UnstableResourcePayload | |
192 { | |
193 Orthanc::ResourceType type_; | |
194 boost::posix_time::ptime time_; | |
195 | |
196 UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance) | |
197 { | |
198 } | |
199 | |
200 UnstableResourcePayload(Orthanc::ResourceType type) : type_(type) | |
201 { | |
202 time_ = boost::posix_time::second_clock::local_time(); | |
203 } | |
204 | |
205 unsigned int GetAge() const | |
206 { | |
207 return (boost::posix_time::second_clock::local_time() - time_).total_seconds(); | |
208 } | |
209 }; | |
210 | |
211 | |
190 bool ServerIndex::DeleteResource(Json::Value& target, | 212 bool ServerIndex::DeleteResource(Json::Value& target, |
191 const std::string& uuid, | 213 const std::string& uuid, |
192 ResourceType expectedType) | 214 ResourceType expectedType) |
193 { | 215 { |
194 boost::mutex::scoped_lock lock(mutex_); | 216 boost::mutex::scoped_lock lock(mutex_); |
225 | 247 |
226 return true; | 248 return true; |
227 } | 249 } |
228 | 250 |
229 | 251 |
230 static void FlushThread(DatabaseWrapper* db, | 252 void ServerIndex::FlushThread(ServerIndex* that) |
231 boost::mutex* mutex, | 253 { |
232 unsigned int sleep) | 254 unsigned int sleep; |
233 { | 255 |
256 try | |
257 { | |
258 std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep); | |
259 sleep = boost::lexical_cast<unsigned int>(sleepString); | |
260 } | |
261 catch (boost::bad_lexical_cast&) | |
262 { | |
263 // By default, wait for 10 seconds before flushing | |
264 sleep = 10; | |
265 } | |
266 | |
234 LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")"; | 267 LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")"; |
235 | 268 |
236 while (1) | 269 unsigned int count = 0; |
237 { | 270 |
238 boost::this_thread::sleep(boost::posix_time::seconds(sleep)); | 271 while (!that->done_) |
239 boost::mutex::scoped_lock lock(*mutex); | 272 { |
240 db->FlushToDisk(); | 273 boost::this_thread::sleep(boost::posix_time::seconds(1)); |
241 } | 274 count++; |
275 if (count < sleep) | |
276 { | |
277 continue; | |
278 } | |
279 | |
280 boost::mutex::scoped_lock lock(that->mutex_); | |
281 that->db_->FlushToDisk(); | |
282 count = 0; | |
283 } | |
284 | |
285 LOG(INFO) << "Stopping the database flushing thread"; | |
242 } | 286 } |
243 | 287 |
244 | 288 |
245 static void ComputeExpectedNumberOfInstances(DatabaseWrapper& db, | 289 static void ComputeExpectedNumberOfInstances(DatabaseWrapper& db, |
246 int64_t series, | 290 int64_t series, |
282 } | 326 } |
283 | 327 |
284 | 328 |
285 ServerIndex::ServerIndex(ServerContext& context, | 329 ServerIndex::ServerIndex(ServerContext& context, |
286 const std::string& dbPath) : | 330 const std::string& dbPath) : |
331 done_(false), | |
287 maximumStorageSize_(0), | 332 maximumStorageSize_(0), |
288 maximumPatients_(0) | 333 maximumPatients_(0) |
289 { | 334 { |
290 listener_.reset(new Internals::ServerIndexListener(context)); | 335 listener_.reset(new Internals::ServerIndexListener(context)); |
291 | 336 |
312 | 357 |
313 // Initial recycling if the parameters have changed since the last | 358 // Initial recycling if the parameters have changed since the last |
314 // execution of Orthanc | 359 // execution of Orthanc |
315 StandaloneRecycling(); | 360 StandaloneRecycling(); |
316 | 361 |
317 unsigned int sleep; | 362 flushThread_ = boost::thread(FlushThread, this); |
318 try | 363 unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this); |
319 { | |
320 std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep); | |
321 sleep = boost::lexical_cast<unsigned int>(sleepString); | |
322 } | |
323 catch (boost::bad_lexical_cast&) | |
324 { | |
325 // By default, wait for 10 seconds before flushing | |
326 sleep = 10; | |
327 } | |
328 | |
329 flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep); | |
330 } | 364 } |
331 | 365 |
332 | 366 |
333 ServerIndex::~ServerIndex() | 367 ServerIndex::~ServerIndex() |
334 { | 368 { |
335 LOG(INFO) << "Stopping the database flushing thread"; | 369 done_ = true; |
370 | |
371 if (flushThread_.joinable()) | |
372 { | |
373 flushThread_.join(); | |
374 } | |
375 | |
376 if (unstableResourcesMonitorThread_.joinable()) | |
377 { | |
378 unstableResourcesMonitorThread_.join(); | |
379 } | |
336 } | 380 } |
337 | 381 |
338 | 382 |
339 StoreStatus ServerIndex::Store(const DicomMap& dicomSummary, | 383 StoreStatus ServerIndex::Store(const DicomMap& dicomSummary, |
340 const Attachments& attachments, | 384 const Attachments& attachments, |
477 std::string now = Toolbox::GetNowIsoString(); | 521 std::string now = Toolbox::GetNowIsoString(); |
478 db_->SetMetadata(instance, MetadataType_Instance_ReceptionDate, now); | 522 db_->SetMetadata(instance, MetadataType_Instance_ReceptionDate, now); |
479 db_->SetMetadata(series, MetadataType_LastUpdate, now); | 523 db_->SetMetadata(series, MetadataType_LastUpdate, now); |
480 db_->SetMetadata(study, MetadataType_LastUpdate, now); | 524 db_->SetMetadata(study, MetadataType_LastUpdate, now); |
481 db_->SetMetadata(patient, MetadataType_LastUpdate, now); | 525 db_->SetMetadata(patient, MetadataType_LastUpdate, now); |
482 | |
483 db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet); | 526 db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet); |
484 | 527 |
485 const DicomValue* value; | 528 const DicomValue* value; |
486 if ((value = dicomSummary.TestAndGetValue(DICOM_TAG_INSTANCE_NUMBER)) != NULL || | 529 if ((value = dicomSummary.TestAndGetValue(DICOM_TAG_INSTANCE_NUMBER)) != NULL || |
487 (value = dicomSummary.TestAndGetValue(DICOM_TAG_IMAGE_INDEX)) != NULL) | 530 (value = dicomSummary.TestAndGetValue(DICOM_TAG_IMAGE_INDEX)) != NULL) |
498 SeriesStatus seriesStatus = GetSeriesStatus(series); | 541 SeriesStatus seriesStatus = GetSeriesStatus(series); |
499 if (seriesStatus == SeriesStatus_Complete) | 542 if (seriesStatus == SeriesStatus_Complete) |
500 { | 543 { |
501 db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series); | 544 db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series); |
502 } | 545 } |
546 | |
547 // Mark the parent resources of this instance as unstable | |
548 MarkAsUnstable(patient, ResourceType_Patient); | |
549 MarkAsUnstable(study, ResourceType_Study); | |
550 MarkAsUnstable(series, ResourceType_Series); | |
503 | 551 |
504 t.Commit(instanceSize); | 552 t.Commit(instanceSize); |
505 | 553 |
506 return StoreStatus_Success; | 554 return StoreStatus_Success; |
507 } | 555 } |
753 result["AnonymizedFrom"] = tmp; | 801 result["AnonymizedFrom"] = tmp; |
754 | 802 |
755 tmp = db_->GetMetadata(id, MetadataType_ModifiedFrom); | 803 tmp = db_->GetMetadata(id, MetadataType_ModifiedFrom); |
756 if (tmp.size() != 0) | 804 if (tmp.size() != 0) |
757 result["ModifiedFrom"] = tmp; | 805 result["ModifiedFrom"] = tmp; |
806 | |
807 if (type == ResourceType_Patient || | |
808 type == ResourceType_Study || | |
809 type == ResourceType_Series) | |
810 { | |
811 result["IsStable"] = !unstableResources_.Contains(id); | |
812 } | |
758 | 813 |
759 return true; | 814 return true; |
760 } | 815 } |
761 | 816 |
762 | 817 |
1341 case ResourceType_Instance: | 1396 case ResourceType_Instance: |
1342 default: | 1397 default: |
1343 break; | 1398 break; |
1344 } | 1399 } |
1345 } | 1400 } |
1401 | |
1402 | |
1403 void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that) | |
1404 { | |
1405 int stableAge = GetGlobalIntegerParameter("StableAge", 60); | |
1406 if (stableAge <= 0) | |
1407 { | |
1408 stableAge = 60; | |
1409 } | |
1410 | |
1411 LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")"; | |
1412 | |
1413 while (!that->done_) | |
1414 { | |
1415 // Check for stable resources each second | |
1416 boost::this_thread::sleep(boost::posix_time::seconds(1)); | |
1417 | |
1418 boost::mutex::scoped_lock lock(that->mutex_); | |
1419 while (!that->unstableResources_.IsEmpty() && | |
1420 that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge)) | |
1421 { | |
1422 // This DICOM resource has not received any new instance for | |
1423 // some time. It can be considered as stable. | |
1424 | |
1425 UnstableResourcePayload payload; | |
1426 int64_t id = that->unstableResources_.RemoveOldest(payload); | |
1427 | |
1428 switch (payload.type_) | |
1429 { | |
1430 case Orthanc::ResourceType_Patient: | |
1431 that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient); | |
1432 break; | |
1433 | |
1434 case Orthanc::ResourceType_Study: | |
1435 that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study); | |
1436 break; | |
1437 | |
1438 case Orthanc::ResourceType_Series: | |
1439 that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series); | |
1440 break; | |
1441 | |
1442 default: | |
1443 throw OrthancException(ErrorCode_InternalError); | |
1444 } | |
1445 | |
1446 //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id; | |
1447 } | |
1448 } | |
1449 | |
1450 LOG(INFO) << "Closing the monitor thread for stable resources"; | |
1451 } | |
1452 | |
1453 | |
1454 void ServerIndex::MarkAsUnstable(int64_t id, | |
1455 Orthanc::ResourceType type) | |
1456 { | |
1457 // WARNING: Before calling this method, "mutex_" must be locked. | |
1458 | |
1459 assert(type == Orthanc::ResourceType_Patient || | |
1460 type == Orthanc::ResourceType_Study || | |
1461 type == Orthanc::ResourceType_Series); | |
1462 | |
1463 unstableResources_.AddOrMakeMostRecent(id, type); | |
1464 //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id; | |
1465 } | |
1346 } | 1466 } |