Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2601:5b6c3d77a2a1 jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 17 May 2018 17:03:40 +0200 |
parents | 140a539b4eba |
children | 988936118354 |
comparison
equal
deleted
inserted
replaced
2600:140a539b4eba | 2601:5b6c3d77a2a1 |
---|---|
743 | 743 |
744 | 744 |
745 #include "../OrthancServer/ServerContext.h" | 745 #include "../OrthancServer/ServerContext.h" |
746 #include "../Core/Logging.h" | 746 #include "../Core/Logging.h" |
747 | 747 |
748 namespace | 748 #include "../Core/DicomNetworking/IDicomConnectionManager.h" |
749 { | 749 #include "../Core/JobsEngine/Operations/JobOperationValues.h" |
750 class JobOperationValue : public boost::noncopyable | 750 #include "../Core/JobsEngine/Operations/StringOperationValue.h" |
751 { | 751 #include "../Core/JobsEngine/Operations/LogJobOperation.h" |
752 public: | 752 |
753 enum Type | 753 namespace Orthanc |
754 { | 754 { |
755 Type_DicomInstance, | |
756 Type_Null, | |
757 Type_String | |
758 }; | |
759 | |
760 private: | |
761 Type type_; | |
762 | |
763 protected: | |
764 JobOperationValue(Type type) : | |
765 type_(type) | |
766 { | |
767 } | |
768 | |
769 public: | |
770 virtual ~JobOperationValue() | |
771 { | |
772 } | |
773 | |
774 Type GetType() const | |
775 { | |
776 return type_; | |
777 } | |
778 | |
779 virtual JobOperationValue* Clone() const = 0; | |
780 }; | |
781 | |
782 | |
783 class NullOperationValue : public JobOperationValue | |
784 { | |
785 public: | |
786 NullOperationValue() : | |
787 JobOperationValue(Type_Null) | |
788 { | |
789 } | |
790 | |
791 virtual JobOperationValue* Clone() const | |
792 { | |
793 return new NullOperationValue; | |
794 } | |
795 }; | |
796 | |
797 | |
798 class StringOperationValue : public JobOperationValue | |
799 { | |
800 private: | |
801 std::string content_; | |
802 | |
803 public: | |
804 StringOperationValue(const std::string& content) : | |
805 JobOperationValue(JobOperationValue::Type_String), | |
806 content_(content) | |
807 { | |
808 } | |
809 | |
810 virtual JobOperationValue* Clone() const | |
811 { | |
812 return new StringOperationValue(content_); | |
813 } | |
814 | |
815 const std::string& GetContent() const | |
816 { | |
817 return content_; | |
818 } | |
819 }; | |
820 | |
821 | |
822 class IDicomConnectionManager : public boost::noncopyable | |
823 { | |
824 public: | |
825 virtual ~IDicomConnectionManager() | |
826 { | |
827 } | |
828 | |
829 class IResource : public boost::noncopyable | |
830 { | |
831 public: | |
832 virtual ~IResource() | |
833 { | |
834 } | |
835 | |
836 virtual DicomUserConnection& GetConnection() = 0; | |
837 }; | |
838 | |
839 virtual IResource* AcquireConnection(const std::string& localAet, | |
840 const RemoteModalityParameters& remote) = 0; | |
841 }; | |
842 | |
843 | |
844 class JobOperationValues : public boost::noncopyable | |
845 { | |
846 private: | |
847 std::vector<JobOperationValue*> values_; | |
848 | |
849 void Append(JobOperationValues& target, | |
850 bool clear) | |
851 { | |
852 target.Reserve(target.GetSize() + GetSize()); | |
853 | |
854 for (size_t i = 0; i < values_.size(); i++) | |
855 { | |
856 if (clear) | |
857 { | |
858 target.Append(values_[i]); | |
859 values_[i] = NULL; | |
860 } | |
861 else | |
862 { | |
863 target.Append(GetValue(i).Clone()); | |
864 } | |
865 } | |
866 | |
867 if (clear) | |
868 { | |
869 Clear(); | |
870 } | |
871 } | |
872 | |
873 public: | |
874 ~JobOperationValues() | |
875 { | |
876 Clear(); | |
877 } | |
878 | |
879 void Move(JobOperationValues& target) | |
880 { | |
881 return Append(target, true); | |
882 } | |
883 | |
884 void Copy(JobOperationValues& target) | |
885 { | |
886 return Append(target, false); | |
887 } | |
888 | |
889 void Clear() | |
890 { | |
891 for (size_t i = 0; i < values_.size(); i++) | |
892 { | |
893 if (values_[i] != NULL) | |
894 { | |
895 delete values_[i]; | |
896 } | |
897 } | |
898 | |
899 values_.clear(); | |
900 } | |
901 | |
902 void Reserve(size_t count) | |
903 { | |
904 values_.reserve(count); | |
905 } | |
906 | |
907 void Append(JobOperationValue* value) // Takes ownership | |
908 { | |
909 if (value == NULL) | |
910 { | |
911 throw OrthancException(ErrorCode_NullPointer); | |
912 } | |
913 else | |
914 { | |
915 values_.push_back(value); | |
916 } | |
917 } | |
918 | |
919 size_t GetSize() const | |
920 { | |
921 return values_.size(); | |
922 } | |
923 | |
924 JobOperationValue& GetValue(size_t index) const | |
925 { | |
926 if (index >= values_.size()) | |
927 { | |
928 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
929 } | |
930 else | |
931 { | |
932 assert(values_[index] != NULL); | |
933 return *values_[index]; | |
934 } | |
935 } | |
936 }; | |
937 | |
938 | |
939 | |
940 class IJobOperation : public boost::noncopyable | |
941 { | |
942 public: | |
943 virtual ~IJobOperation() | |
944 { | |
945 } | |
946 | |
947 virtual void Apply(JobOperationValues& outputs, | |
948 const JobOperationValue& input, | |
949 IDicomConnectionManager& manager) = 0; | |
950 }; | |
951 | |
952 | |
953 class LogJobOperation : public IJobOperation | |
954 { | |
955 public: | |
956 virtual void Apply(JobOperationValues& outputs, | |
957 const JobOperationValue& input, | |
958 IDicomConnectionManager& manager) | |
959 { | |
960 switch (input.GetType()) | |
961 { | |
962 case JobOperationValue::Type_String: | |
963 LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent(); | |
964 break; | |
965 | |
966 case JobOperationValue::Type_Null: | |
967 LOG(INFO) << "Job value: (null)"; | |
968 break; | |
969 | |
970 default: | |
971 LOG(INFO) << "Job value: (unsupport)"; | |
972 break; | |
973 } | |
974 | |
975 outputs.Append(input.Clone()); | |
976 } | |
977 }; | |
978 | |
979 | |
980 class DicomInstanceValue : public JobOperationValue | 755 class DicomInstanceValue : public JobOperationValue |
981 { | 756 { |
982 private: | 757 private: |
983 ServerContext& context_; | 758 ServerContext& context_; |
984 std::string id_; | 759 std::string id_; |
1170 void SetTimeout(unsigned int timeout) | 945 void SetTimeout(unsigned int timeout) |
1171 { | 946 { |
1172 boost::mutex::scoped_lock lock(mutex_); | 947 boost::mutex::scoped_lock lock(mutex_); |
1173 | 948 |
1174 timeout_ = boost::posix_time::milliseconds(timeout); | 949 timeout_ = boost::posix_time::milliseconds(timeout); |
1175 CheckTimeout(); | 950 CheckTimeoutInternal(); |
1176 } | 951 } |
1177 | 952 |
1178 unsigned int GetTimeout() | 953 unsigned int GetTimeout() |
1179 { | 954 { |
1180 boost::mutex::scoped_lock lock(mutex_); | 955 boost::mutex::scoped_lock lock(mutex_); |
1284 bool IsDone() const | 1059 bool IsDone() const |
1285 { | 1060 { |
1286 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); | 1061 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); |
1287 } | 1062 } |
1288 | 1063 |
1289 void Step(IDicomConnectionManager& manager) | 1064 void Step() |
1290 { | 1065 { |
1291 if (IsDone()) | 1066 if (IsDone()) |
1292 { | 1067 { |
1293 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 1068 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
1294 } | 1069 } |
1303 { | 1078 { |
1304 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); | 1079 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); |
1305 } | 1080 } |
1306 | 1081 |
1307 JobOperationValues outputs; | 1082 JobOperationValues outputs; |
1308 operation_->Apply(outputs, *input, manager); | 1083 operation_->Apply(outputs, *input); |
1309 | 1084 |
1310 if (!nextOperations_.empty()) | 1085 if (!nextOperations_.empty()) |
1311 { | 1086 { |
1312 std::list<Operation*>::iterator first = nextOperations_.begin(); | 1087 std::list<Operation*>::iterator first = nextOperations_.begin(); |
1313 outputs.Move((*first)->workInputs_); | 1088 outputs.Move((*first)->workInputs_); |
1331 bool done_; | 1106 bool done_; |
1332 boost::mutex mutex_; | 1107 boost::mutex mutex_; |
1333 std::vector<Operation*> operations_; | 1108 std::vector<Operation*> operations_; |
1334 size_t current_; | 1109 size_t current_; |
1335 boost::condition_variable operationAdded_; | 1110 boost::condition_variable operationAdded_; |
1336 TimeoutDicomConnectionManager& connectionManager_; | |
1337 boost::posix_time::time_duration trailingTimeout_; | 1111 boost::posix_time::time_duration trailingTimeout_; |
1338 std::list<IObserver*> observers_; | 1112 std::list<IObserver*> observers_; |
1339 | 1113 |
1340 // Invoked from constructors | 1114 // Invoked from constructors |
1341 void Setup() | 1115 void Setup() |
1344 current_ = 0; | 1118 current_ = 0; |
1345 trailingTimeout_ = boost::posix_time::milliseconds(1000); | 1119 trailingTimeout_ = boost::posix_time::milliseconds(1000); |
1346 } | 1120 } |
1347 | 1121 |
1348 public: | 1122 public: |
1349 SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) : | 1123 SequenceOfOperationsJob() : |
1350 jobType_("SequenceOfOperations"), | 1124 jobType_("SequenceOfOperations") |
1351 connectionManager_(manager) | |
1352 { | 1125 { |
1353 Setup(); | 1126 Setup(); |
1354 } | 1127 } |
1355 | 1128 |
1356 SequenceOfOperationsJob(const std::string& jobType, | 1129 SequenceOfOperationsJob(const std::string& jobType) : |
1357 TimeoutDicomConnectionManager& manager) : | 1130 jobType_(jobType) |
1358 jobType_(jobType), | |
1359 connectionManager_(manager) | |
1360 { | 1131 { |
1361 Setup(); | 1132 Setup(); |
1362 } | 1133 } |
1363 | 1134 |
1364 virtual ~SequenceOfOperationsJob() | 1135 virtual ~SequenceOfOperationsJob() |
1395 } | 1166 } |
1396 | 1167 |
1397 bool IsDone() const | 1168 bool IsDone() const |
1398 { | 1169 { |
1399 return that_.done_; | 1170 return that_.done_; |
1400 } | |
1401 | |
1402 void SetDicomConnectionTimeout(unsigned int timeout) | |
1403 { | |
1404 that_.connectionManager_.SetTimeout(timeout); | |
1405 } | 1171 } |
1406 | 1172 |
1407 void SetTrailingOperationTimeout(unsigned int timeout) | 1173 void SetTrailingOperationTimeout(unsigned int timeout) |
1408 { | 1174 { |
1409 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); | 1175 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); |
1511 current_++; | 1277 current_++; |
1512 } | 1278 } |
1513 | 1279 |
1514 if (current_ < operations_.size()) | 1280 if (current_ < operations_.size()) |
1515 { | 1281 { |
1516 operations_[current_]->Step(connectionManager_); | 1282 operations_[current_]->Step(); |
1517 } | 1283 } |
1518 | 1284 |
1519 return JobStepResult::Continue(); | 1285 return JobStepResult::Continue(); |
1520 } | 1286 } |
1521 | 1287 |
1532 } | 1298 } |
1533 } | 1299 } |
1534 | 1300 |
1535 virtual void ReleaseResources() | 1301 virtual void ReleaseResources() |
1536 { | 1302 { |
1537 boost::mutex::scoped_lock lock(mutex_); | |
1538 connectionManager_.Close(); | |
1539 } | 1303 } |
1540 | 1304 |
1541 virtual float GetProgress() | 1305 virtual float GetProgress() |
1542 { | 1306 { |
1543 boost::mutex::scoped_lock lock(mutex_); | 1307 boost::mutex::scoped_lock lock(mutex_); |
1571 typedef SequenceOfOperationsJob::Lock Lock; | 1335 typedef SequenceOfOperationsJob::Lock Lock; |
1572 | 1336 |
1573 private: | 1337 private: |
1574 boost::mutex mutex_; | 1338 boost::mutex mutex_; |
1575 JobsEngine& engine_; | 1339 JobsEngine& engine_; |
1576 TimeoutDicomConnectionManager manager_; | 1340 TimeoutDicomConnectionManager connectionManager_; |
1577 std::string currentId_; | 1341 std::string currentId_; |
1578 SequenceOfOperationsJob* currentJob_; | 1342 SequenceOfOperationsJob* currentJob_; |
1579 size_t maxOperations_; | 1343 size_t maxOperations_; |
1580 int priority_; | 1344 int priority_; |
1581 unsigned int trailingTimeout_; | 1345 unsigned int trailingTimeout_; |
1582 | 1346 bool continue_; |
1347 boost::thread connectionTimeoutThread_; | |
1348 | |
1349 static void ConnectionTimeoutThread(LuaJobManager* manager) | |
1350 { | |
1351 while (manager->continue_) | |
1352 { | |
1353 manager->connectionManager_.CheckTimeout(); | |
1354 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); | |
1355 } | |
1356 } | |
1357 | |
1583 virtual void SignalDone(const SequenceOfOperationsJob& job) | 1358 virtual void SignalDone(const SequenceOfOperationsJob& job) |
1584 { | 1359 { |
1585 boost::mutex::scoped_lock lock(mutex_); | 1360 boost::mutex::scoped_lock lock(mutex_); |
1586 | 1361 |
1587 if (&job == currentJob_) | 1362 if (&job == currentJob_) |
1594 public: | 1369 public: |
1595 LuaJobManager(JobsEngine& engine) : | 1370 LuaJobManager(JobsEngine& engine) : |
1596 engine_(engine), | 1371 engine_(engine), |
1597 currentJob_(NULL), | 1372 currentJob_(NULL), |
1598 maxOperations_(1000), | 1373 maxOperations_(1000), |
1599 priority_(0) | 1374 priority_(0), |
1600 { | 1375 continue_(true) |
1376 { | |
1377 connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this); | |
1378 } | |
1379 | |
1380 ~LuaJobManager() | |
1381 { | |
1382 continue_ = false; | |
1383 | |
1384 if (connectionTimeoutThread_.joinable()) | |
1385 { | |
1386 connectionTimeoutThread_.join(); | |
1387 } | |
1601 } | 1388 } |
1602 | 1389 |
1603 void SetMaxOperationsPerJob(size_t count) | 1390 void SetMaxOperationsPerJob(size_t count) |
1604 { | 1391 { |
1605 boost::mutex::scoped_lock lock(mutex_); | 1392 boost::mutex::scoped_lock lock(mutex_); |
1633 } | 1420 } |
1634 } | 1421 } |
1635 | 1422 |
1636 // Need to create a new job, as the previous one is either | 1423 // Need to create a new job, as the previous one is either |
1637 // finished, or is getting too long | 1424 // finished, or is getting too long |
1638 currentJob_ = new SequenceOfOperationsJob(manager_); | 1425 currentJob_ = new SequenceOfOperationsJob; |
1639 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); | 1426 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); |
1640 | 1427 |
1641 std::auto_ptr<Lock> result(new Lock(*currentJob_)); | 1428 std::auto_ptr<Lock> result(new Lock(*currentJob_)); |
1642 result->SetTrailingOperationTimeout(trailingTimeout_); | 1429 result->SetTrailingOperationTimeout(trailingTimeout_); |
1643 | 1430 |
1647 } | 1434 } |
1648 | 1435 |
1649 | 1436 |
1650 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) | 1437 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) |
1651 { | 1438 { |
1652 TimeoutDicomConnectionManager manager; | |
1653 JobsEngine engine; | 1439 JobsEngine engine; |
1654 engine.SetWorkersCount(3); | 1440 engine.SetWorkersCount(3); |
1655 engine.Start(); | 1441 engine.Start(); |
1656 | 1442 |
1657 std::string id; | 1443 std::string id; |
1658 SequenceOfOperationsJob* job = NULL; | 1444 SequenceOfOperationsJob* job = NULL; |
1659 | 1445 |
1660 { | 1446 { |
1661 std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager)); | 1447 std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob); |
1662 job = a.get(); | 1448 job = a.get(); |
1663 engine.GetRegistry().Submit(id, a.release(), 0); | 1449 engine.GetRegistry().Submit(id, a.release(), 0); |
1664 } | 1450 } |
1665 | 1451 |
1666 boost::this_thread::sleep(boost::posix_time::milliseconds(500)); | 1452 boost::this_thread::sleep(boost::posix_time::milliseconds(500)); |