comparison OrthancServer/ServerIndex.cpp @ 546:0e510ea3de31 laaw

merge mainline -> laaw
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 13 Sep 2013 11:25:08 +0200
parents 2c739f76d0bb
children b2357f1f026f
comparison
equal deleted inserted replaced
543:fe796b053863 546:0e510ea3de31
35 #ifndef NOMINMAX 35 #ifndef NOMINMAX
36 #define NOMINMAX 36 #define NOMINMAX
37 #endif 37 #endif
38 38
39 #include "EmbeddedResources.h" 39 #include "EmbeddedResources.h"
40 #include "OrthancInitialization.h"
40 #include "../Core/Toolbox.h" 41 #include "../Core/Toolbox.h"
41 #include "../Core/Uuid.h" 42 #include "../Core/Uuid.h"
42 #include "../Core/DicomFormat/DicomArray.h" 43 #include "../Core/DicomFormat/DicomArray.h"
43 #include "../Core/SQLite/Transaction.h" 44 #include "../Core/SQLite/Transaction.h"
44 #include "FromDcmtkBridge.h" 45 #include "FromDcmtkBridge.h"
185 } 186 }
186 } 187 }
187 }; 188 };
188 189
189 190
191 struct ServerIndex::UnstableResourcePayload
192 {
193 Orthanc::ResourceType type_;
194 boost::posix_time::ptime time_;
195
196 UnstableResourcePayload() : type_(Orthanc::ResourceType_Instance)
197 {
198 }
199
200 UnstableResourcePayload(Orthanc::ResourceType type) : type_(type)
201 {
202 time_ = boost::posix_time::second_clock::local_time();
203 }
204
205 unsigned int GetAge() const
206 {
207 return (boost::posix_time::second_clock::local_time() - time_).total_seconds();
208 }
209 };
210
211
190 bool ServerIndex::DeleteResource(Json::Value& target, 212 bool ServerIndex::DeleteResource(Json::Value& target,
191 const std::string& uuid, 213 const std::string& uuid,
192 ResourceType expectedType) 214 ResourceType expectedType)
193 { 215 {
194 boost::mutex::scoped_lock lock(mutex_); 216 boost::mutex::scoped_lock lock(mutex_);
225 247
226 return true; 248 return true;
227 } 249 }
228 250
229 251
230 static void FlushThread(DatabaseWrapper* db, 252 void ServerIndex::FlushThread(ServerIndex* that)
231 boost::mutex* mutex, 253 {
232 unsigned int sleep) 254 unsigned int sleep;
233 { 255
256 try
257 {
258 std::string sleepString = that->db_->GetGlobalProperty(GlobalProperty_FlushSleep);
259 sleep = boost::lexical_cast<unsigned int>(sleepString);
260 }
261 catch (boost::bad_lexical_cast&)
262 {
263 // By default, wait for 10 seconds before flushing
264 sleep = 10;
265 }
266
234 LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")"; 267 LOG(INFO) << "Starting the database flushing thread (sleep = " << sleep << ")";
235 268
236 while (1) 269 unsigned int count = 0;
237 { 270
238 boost::this_thread::sleep(boost::posix_time::seconds(sleep)); 271 while (!that->done_)
239 boost::mutex::scoped_lock lock(*mutex); 272 {
240 db->FlushToDisk(); 273 boost::this_thread::sleep(boost::posix_time::seconds(1));
241 } 274 count++;
275 if (count < sleep)
276 {
277 continue;
278 }
279
280 boost::mutex::scoped_lock lock(that->mutex_);
281 that->db_->FlushToDisk();
282 count = 0;
283 }
284
285 LOG(INFO) << "Stopping the database flushing thread";
242 } 286 }
243 287
244 288
245 static void ComputeExpectedNumberOfInstances(DatabaseWrapper& db, 289 static void ComputeExpectedNumberOfInstances(DatabaseWrapper& db,
246 int64_t series, 290 int64_t series,
282 } 326 }
283 327
284 328
285 ServerIndex::ServerIndex(ServerContext& context, 329 ServerIndex::ServerIndex(ServerContext& context,
286 const std::string& dbPath) : 330 const std::string& dbPath) :
331 done_(false),
287 maximumStorageSize_(0), 332 maximumStorageSize_(0),
288 maximumPatients_(0) 333 maximumPatients_(0)
289 { 334 {
290 listener_.reset(new Internals::ServerIndexListener(context)); 335 listener_.reset(new Internals::ServerIndexListener(context));
291 336
312 357
313 // Initial recycling if the parameters have changed since the last 358 // Initial recycling if the parameters have changed since the last
314 // execution of Orthanc 359 // execution of Orthanc
315 StandaloneRecycling(); 360 StandaloneRecycling();
316 361
317 unsigned int sleep; 362 flushThread_ = boost::thread(FlushThread, this);
318 try 363 unstableResourcesMonitorThread_ = boost::thread(UnstableResourcesMonitorThread, this);
319 {
320 std::string sleepString = db_->GetGlobalProperty(GlobalProperty_FlushSleep);
321 sleep = boost::lexical_cast<unsigned int>(sleepString);
322 }
323 catch (boost::bad_lexical_cast&)
324 {
325 // By default, wait for 10 seconds before flushing
326 sleep = 10;
327 }
328
329 flushThread_ = boost::thread(FlushThread, db_.get(), &mutex_, sleep);
330 } 364 }
331 365
332 366
333 ServerIndex::~ServerIndex() 367 ServerIndex::~ServerIndex()
334 { 368 {
335 LOG(INFO) << "Stopping the database flushing thread"; 369 done_ = true;
370
371 if (flushThread_.joinable())
372 {
373 flushThread_.join();
374 }
375
376 if (unstableResourcesMonitorThread_.joinable())
377 {
378 unstableResourcesMonitorThread_.join();
379 }
336 } 380 }
337 381
338 382
339 StoreStatus ServerIndex::Store(const DicomMap& dicomSummary, 383 StoreStatus ServerIndex::Store(const DicomMap& dicomSummary,
340 const Attachments& attachments, 384 const Attachments& attachments,
477 std::string now = Toolbox::GetNowIsoString(); 521 std::string now = Toolbox::GetNowIsoString();
478 db_->SetMetadata(instance, MetadataType_Instance_ReceptionDate, now); 522 db_->SetMetadata(instance, MetadataType_Instance_ReceptionDate, now);
479 db_->SetMetadata(series, MetadataType_LastUpdate, now); 523 db_->SetMetadata(series, MetadataType_LastUpdate, now);
480 db_->SetMetadata(study, MetadataType_LastUpdate, now); 524 db_->SetMetadata(study, MetadataType_LastUpdate, now);
481 db_->SetMetadata(patient, MetadataType_LastUpdate, now); 525 db_->SetMetadata(patient, MetadataType_LastUpdate, now);
482
483 db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet); 526 db_->SetMetadata(instance, MetadataType_Instance_RemoteAet, remoteAet);
484 527
485 const DicomValue* value; 528 const DicomValue* value;
486 if ((value = dicomSummary.TestAndGetValue(DICOM_TAG_INSTANCE_NUMBER)) != NULL || 529 if ((value = dicomSummary.TestAndGetValue(DICOM_TAG_INSTANCE_NUMBER)) != NULL ||
487 (value = dicomSummary.TestAndGetValue(DICOM_TAG_IMAGE_INDEX)) != NULL) 530 (value = dicomSummary.TestAndGetValue(DICOM_TAG_IMAGE_INDEX)) != NULL)
498 SeriesStatus seriesStatus = GetSeriesStatus(series); 541 SeriesStatus seriesStatus = GetSeriesStatus(series);
499 if (seriesStatus == SeriesStatus_Complete) 542 if (seriesStatus == SeriesStatus_Complete)
500 { 543 {
501 db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series); 544 db_->LogChange(ChangeType_CompletedSeries, series, ResourceType_Series);
502 } 545 }
546
547 // Mark the parent resources of this instance as unstable
548 MarkAsUnstable(patient, ResourceType_Patient);
549 MarkAsUnstable(study, ResourceType_Study);
550 MarkAsUnstable(series, ResourceType_Series);
503 551
504 t.Commit(instanceSize); 552 t.Commit(instanceSize);
505 553
506 return StoreStatus_Success; 554 return StoreStatus_Success;
507 } 555 }
753 result["AnonymizedFrom"] = tmp; 801 result["AnonymizedFrom"] = tmp;
754 802
755 tmp = db_->GetMetadata(id, MetadataType_ModifiedFrom); 803 tmp = db_->GetMetadata(id, MetadataType_ModifiedFrom);
756 if (tmp.size() != 0) 804 if (tmp.size() != 0)
757 result["ModifiedFrom"] = tmp; 805 result["ModifiedFrom"] = tmp;
806
807 if (type == ResourceType_Patient ||
808 type == ResourceType_Study ||
809 type == ResourceType_Series)
810 {
811 result["IsStable"] = !unstableResources_.Contains(id);
812 }
758 813
759 return true; 814 return true;
760 } 815 }
761 816
762 817
1341 case ResourceType_Instance: 1396 case ResourceType_Instance:
1342 default: 1397 default:
1343 break; 1398 break;
1344 } 1399 }
1345 } 1400 }
1401
1402
1403 void ServerIndex::UnstableResourcesMonitorThread(ServerIndex* that)
1404 {
1405 int stableAge = GetGlobalIntegerParameter("StableAge", 60);
1406 if (stableAge <= 0)
1407 {
1408 stableAge = 60;
1409 }
1410
1411 LOG(INFO) << "Starting the monitor for stable resources (stable age = " << stableAge << ")";
1412
1413 while (!that->done_)
1414 {
1415 // Check for stable resources each second
1416 boost::this_thread::sleep(boost::posix_time::seconds(1));
1417
1418 boost::mutex::scoped_lock lock(that->mutex_);
1419
1420 while (!that->unstableResources_.IsEmpty() &&
1421 that->unstableResources_.GetOldestPayload().GetAge() > static_cast<unsigned int>(stableAge))
1422 {
1423 // This DICOM resource has not received any new instance for
1424 // some time. It can be considered as stable.
1425
1426 UnstableResourcePayload payload;
1427 int64_t id = that->unstableResources_.RemoveOldest(payload);
1428
1429 // Ensure that the resource is still existing before logging the change
1430 if (that->db_->IsExistingResource(id))
1431 {
1432 switch (payload.type_)
1433 {
1434 case Orthanc::ResourceType_Patient:
1435 that->db_->LogChange(ChangeType_StablePatient, id, ResourceType_Patient);
1436 break;
1437
1438 case Orthanc::ResourceType_Study:
1439 that->db_->LogChange(ChangeType_StableStudy, id, ResourceType_Study);
1440 break;
1441
1442 case Orthanc::ResourceType_Series:
1443 that->db_->LogChange(ChangeType_StableSeries, id, ResourceType_Series);
1444 break;
1445
1446 default:
1447 throw OrthancException(ErrorCode_InternalError);
1448 }
1449
1450 //LOG(INFO) << "Stable resource: " << EnumerationToString(payload.type_) << " " << id;
1451 }
1452 }
1453 }
1454
1455 LOG(INFO) << "Closing the monitor thread for stable resources";
1456 }
1457
1458
1459 void ServerIndex::MarkAsUnstable(int64_t id,
1460 Orthanc::ResourceType type)
1461 {
1462 // WARNING: Before calling this method, "mutex_" must be locked.
1463
1464 assert(type == Orthanc::ResourceType_Patient ||
1465 type == Orthanc::ResourceType_Study ||
1466 type == Orthanc::ResourceType_Series);
1467
1468 unstableResources_.AddOrMakeMostRecent(id, type);
1469 //LOG(INFO) << "Unstable resource: " << EnumerationToString(type) << " " << id;
1470 }
1471
1472
1473
1474 void ServerIndex::LookupTagValue(std::list<std::string>& result,
1475 DicomTag tag,
1476 const std::string& value)
1477 {
1478 result.clear();
1479
1480 boost::mutex::scoped_lock lock(mutex_);
1481
1482 std::list<int64_t> id;
1483 db_->LookupTagValue(id, tag, value);
1484
1485 for (std::list<int64_t>::const_iterator
1486 it = id.begin(); it != id.end(); it++)
1487 {
1488 result.push_back(db_->GetPublicId(*it));
1489 }
1490 }
1491
1492
1493 void ServerIndex::LookupTagValue(std::list<std::string>& result,
1494 const std::string& value)
1495 {
1496 result.clear();
1497
1498 boost::mutex::scoped_lock lock(mutex_);
1499
1500 std::list<int64_t> id;
1501 db_->LookupTagValue(id, value);
1502
1503 for (std::list<int64_t>::const_iterator
1504 it = id.begin(); it != id.end(); it++)
1505 {
1506 result.push_back(db_->GetPublicId(*it));
1507 }
1508 }
1346 } 1509 }