comparison OrthancServer/ServerIndex.cpp @ 511:3b735fdf320b

monitoring of stable patients/studies/series
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 16 Aug 2013 15:39:53 +0200
parents 23e5b35e3c5c
children 935e8c7e0b18
comparison
equal deleted inserted replaced
509:e7841864c97c 511:3b735fdf320b
35 #ifndef NOMINMAX 35 #ifndef NOMINMAX
36 #define NOMINMAX 36 #define NOMINMAX
37 #endif 37 #endif
38 38
39 #include "EmbeddedResources.h" 39 #include "EmbeddedResources.h"
40 #include "OrthancInitialization.h"
40 #include "../Core/Toolbox.h" 41 #include "../Core/Toolbox.h"
41 #include "../Core/Uuid.h" 42 #include "../Core/Uuid.h"
42 #include "../Core/DicomFormat/DicomArray.h" 43 #include "../Core/DicomFormat/DicomArray.h"
43 #include "../Core/SQLite/Transaction.h" 44 #include "../Core/SQLite/Transaction.h"
44 #include "FromDcmtkBridge.h" 45 #include "FromDcmtkBridge.h"
185 } 186 }
186 } 187 }
187 }; 188 };
188 189
189 190
191 struct ServerIndex::UnstableResourcePayload
192 {
193 Orthanc::ResourceType type_;
194 boost::posix_time::ptime time_;
195
196 UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance)
197 {
198 }
199
200 UnstableResourcePayload(Orthanc::ResourceType type) : type_(type)
201 {
202 time_ = boost::posix_time::second_clock::local_time();
203 }
204
205 unsigned int GetAge() const
206 {
207 return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
208 }
209 };
210
211
190 bool ServerIndex::DeleteResource(Json::Value& target, 212 bool ServerIndex::DeleteResource(Json::Value& target,
191 const std::string& uuid, 213 const std::string& uuid,
192 ResourceType expectedType) 214 ResourceType expectedType)
193 { 215 {
194 boost::mutex::scoped_lock lock(mutex_); 216 boost::mutex::scoped_lock lock(mutex_);
225 247
226 return true; 248 return true;
227 } 249 }
228 250
229 251
230 static void FlushThread(DatabaseWrapper* db, 252 void ServerIndex::FlushThread(ServerIndex* that)
231 boost::mutex* mutex, 253 {
232 unsigned int sleep) 254 unsigned int sleep;
233 { 255
256 try
257 {
258 std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep);
259 sleep = boost::lexical_cast<unsigned int>(sleepString);
260 }
261 catch (boost::bad_lexical_cast&)
262 {
263 // By default, wait for 10 seconds before flushing
264 sleep = 10;
265 }
266
234 LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")"; 267 LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")";
235 268
236 while (1) 269 unsigned int count = 0;
237 { 270
238 boost::this_thread::sleep(boost::posix_time::seconds(sleep)); 271 while (!that->done_)
239 boost::mutex::scoped_lock lock(*mutex); 272 {
240 db->FlushToDisk(); 273 boost::this_thread::sleep(boost::posix_time::seconds(1));
241 } 274 count++;
275 if (count < sleep)
276 {
277 continue;
278 }
279
280 boost::mutex::scoped_lock lock(that->mutex_);
281 that->db_->FlushToDisk();
282 count = 0;
283 }
284
285 LOG(INFO) << "Stopping the database flushing thread";
242 } 286 }
243 287
244 288
245 static void ComputeExpectedNumberOfInstances(DatabaseWrapper& db, 289 static void ComputeExpectedNumberOfInstances(DatabaseWrapper& db,
246 int64_t series, 290 int64_t series,
282 } 326 }
283 327
284 328
285 ServerIndex::ServerIndex(ServerContext& context, 329 ServerIndex::ServerIndex(ServerContext& context,
286 const std::string& dbPath) : 330 const std::string& dbPath) :
331 done_(false),
287 maximumStorageSize_(0), 332 maximumStorageSize_(0),
288 maximumPatients_(0) 333 maximumPatients_(0)
289 { 334 {
290 listener_.reset(new Internals::ServerIndexListener(context)); 335 listener_.reset(new Internals::ServerIndexListener(context));
291 336
312 357
313 // Initial recycling if the parameters have changed since the last 358 // Initial recycling if the parameters have changed since the last
314 // execution of Orthanc 359 // execution of Orthanc
315 StandaloneRecycling(); 360 StandaloneRecycling();
316 361
317 unsigned int sleep; 362 flushThread_ = boost::thread(FlushThread, this);
318 try 363 unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this);
319 {
320 std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep);
321 sleep = boost::lexical_cast<unsigned int>(sleepString);
322 }
323 catch (boost::bad_lexical_cast&)
324 {
325 // By default, wait for 10 seconds before flushing
326 sleep = 10;
327 }
328
329 flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep);
330 } 364 }
331 365
332 366
333 ServerIndex::~ServerIndex() 367 ServerIndex::~ServerIndex()
334 { 368 {
335 LOG(INFO) << "Stopping the database flushing thread"; 369 done_ = true;
370
371 if (flushThread_.joinable())
372 {
373 flushThread_.join();
374 }
375
376 if (unstableResourcesMonitorThread_.joinable())
377 {
378 unstableResourcesMonitorThread_.join();
379 }
336 } 380 }
337 381
338 382
339 StoreStatus ServerIndex::Store(const DicomMap& dicomSummary, 383 StoreStatus ServerIndex::Store(const DicomMap& dicomSummary,
340 const Attachments& attachments, 384 const Attachments& attachments,
477 std::string now = Toolbox::GetNowIsoString(); 521 std::string now = Toolbox::GetNowIsoString();
478 db_->SetMetadata(instance, MetadataType_Instance_ReceptionDate, now); 522 db_->SetMetadata(instance, MetadataType_Instance_ReceptionDate, now);
479 db_->SetMetadata(series, MetadataType_LastUpdate, now); 523 db_->SetMetadata(series, MetadataType_LastUpdate, now);
480 db_->SetMetadata(study, MetadataType_LastUpdate, now); 524 db_->SetMetadata(study, MetadataType_LastUpdate, now);
481 db_->SetMetadata(patient, MetadataType_LastUpdate, now); 525 db_->SetMetadata(patient, MetadataType_LastUpdate, now);
482
483 db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet); 526 db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet);
484 527
485 const DicomValue* value; 528 const DicomValue* value;
486 if ((value = dicomSummary.TestAndGetValue(DICOM_TAG_INSTANCE_NUMBER)) != NULL || 529 if ((value = dicomSummary.TestAndGetValue(DICOM_TAG_INSTANCE_NUMBER)) != NULL ||
487 (value = dicomSummary.TestAndGetValue(DICOM_TAG_IMAGE_INDEX)) != NULL) 530 (value = dicomSummary.TestAndGetValue(DICOM_TAG_IMAGE_INDEX)) != NULL)
498 SeriesStatus seriesStatus = GetSeriesStatus(series); 541 SeriesStatus seriesStatus = GetSeriesStatus(series);
499 if (seriesStatus == SeriesStatus_Complete) 542 if (seriesStatus == SeriesStatus_Complete)
500 { 543 {
501 db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series); 544 db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series);
502 } 545 }
546
547 // Mark the parent resources of this instance as unstable
548 MarkAsUnstable(patient, ResourceType_Patient);
549 MarkAsUnstable(study, ResourceType_Study);
550 MarkAsUnstable(series, ResourceType_Series);
503 551
504 t.Commit(instanceSize); 552 t.Commit(instanceSize);
505 553
506 return StoreStatus_Success; 554 return StoreStatus_Success;
507 } 555 }
753 result["AnonymizedFrom"] = tmp; 801 result["AnonymizedFrom"] = tmp;
754 802
755 tmp = db_->GetMetadata(id, MetadataType_ModifiedFrom); 803 tmp = db_->GetMetadata(id, MetadataType_ModifiedFrom);
756 if (tmp.size() != 0) 804 if (tmp.size() != 0)
757 result["ModifiedFrom"] = tmp; 805 result["ModifiedFrom"] = tmp;
806
807 if (type == ResourceType_Patient ||
808 type == ResourceType_Study ||
809 type == ResourceType_Series)
810 {
811 result["IsStable"] = !unstableResources_.Contains(id);
812 }
758 813
759 return true; 814 return true;
760 } 815 }
761 816
762 817
1341 case ResourceType_Instance: 1396 case ResourceType_Instance:
1342 default: 1397 default:
1343 break; 1398 break;
1344 } 1399 }
1345 } 1400 }
1401
1402
1403 void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that)
1404 {
1405 int stableAge = GetGlobalIntegerParameter("StableAge", 60);
1406 if (stableAge <= 0)
1407 {
1408 stableAge = 60;
1409 }
1410
1411 LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";
1412
1413 while (!that->done_)
1414 {
1415 // Check for stable resources each second
1416 boost::this_thread::sleep(boost::posix_time::seconds(1));
1417
1418 boost::mutex::scoped_lock lock(that->mutex_);
1419 while (!that->unstableResources_.IsEmpty() &&
1420 that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
1421 {
1422 // This DICOM resource has not received any new instance for
1423 // some time. It can be considered as stable.
1424
1425 UnstableResourcePayload payload;
1426 int64_t id = that->unstableResources_.RemoveOldest(payload);
1427
1428 switch (payload.type_)
1429 {
1430 case Orthanc::ResourceType_Patient:
1431 that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient);
1432 break;
1433
1434 case Orthanc::ResourceType_Study:
1435 that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study);
1436 break;
1437
1438 case Orthanc::ResourceType_Series:
1439 that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series);
1440 break;
1441
1442 default:
1443 throw OrthancException(ErrorCode_InternalError);
1444 }
1445
1446 //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id;
1447 }
1448 }
1449
1450 LOG(INFO) << "Closing the monitor thread for stable resources";
1451 }
1452
1453
1454 void ServerIndex::MarkAsUnstable(int64_t id,
1455 Orthanc::ResourceType type)
1456 {
1457 // WARNING: Before calling this method, "mutex_" must be locked.
1458
1459 assert(type == Orthanc::ResourceType_Patient ||
1460 type == Orthanc::ResourceType_Study ||
1461 type == Orthanc::ResourceType_Series);
1462
1463 unstableResources_.AddOrMakeMostRecent(id, type);
1464 //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
1465 }
1346 } 1466 }