comparison UnitTestsSources/MultiThreadingTests.cpp @ 2601:5b6c3d77a2a1 jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 17 May 2018 17:03:40 +0200
parents 140a539b4eba
children 988936118354
comparison
equal deleted inserted replaced
2600:140a539b4eba 2601:5b6c3d77a2a1
743 743
744 744
745 #include "../OrthancServer/ServerContext.h" 745 #include "../OrthancServer/ServerContext.h"
746 #include "../Core/Logging.h" 746 #include "../Core/Logging.h"
747 747
748 namespace 748 #include "../Core/DicomNetworking/IDicomConnectionManager.h"
749 { 749 #include "../Core/JobsEngine/Operations/JobOperationValues.h"
750 class JobOperationValue : public boost::noncopyable 750 #include "../Core/JobsEngine/Operations/StringOperationValue.h"
751 { 751 #include "../Core/JobsEngine/Operations/LogJobOperation.h"
752 public: 752
753 enum Type 753 namespace Orthanc
754 { 754 {
755 Type_DicomInstance,
756 Type_Null,
757 Type_String
758 };
759
760 private:
761 Type type_;
762
763 protected:
764 JobOperationValue(Type type) :
765 type_(type)
766 {
767 }
768
769 public:
770 virtual ~JobOperationValue()
771 {
772 }
773
774 Type GetType() const
775 {
776 return type_;
777 }
778
779 virtual JobOperationValue* Clone() const = 0;
780 };
781
782
783 class NullOperationValue : public JobOperationValue
784 {
785 public:
786 NullOperationValue() :
787 JobOperationValue(Type_Null)
788 {
789 }
790
791 virtual JobOperationValue* Clone() const
792 {
793 return new NullOperationValue;
794 }
795 };
796
797
798 class StringOperationValue : public JobOperationValue
799 {
800 private:
801 std::string content_;
802
803 public:
804 StringOperationValue(const std::string& content) :
805 JobOperationValue(JobOperationValue::Type_String),
806 content_(content)
807 {
808 }
809
810 virtual JobOperationValue* Clone() const
811 {
812 return new StringOperationValue(content_);
813 }
814
815 const std::string& GetContent() const
816 {
817 return content_;
818 }
819 };
820
821
822 class IDicomConnectionManager : public boost::noncopyable
823 {
824 public:
825 virtual ~IDicomConnectionManager()
826 {
827 }
828
829 class IResource : public boost::noncopyable
830 {
831 public:
832 virtual ~IResource()
833 {
834 }
835
836 virtual DicomUserConnection& GetConnection() = 0;
837 };
838
839 virtual IResource* AcquireConnection(const std::string& localAet,
840 const RemoteModalityParameters& remote) = 0;
841 };
842
843
844 class JobOperationValues : public boost::noncopyable
845 {
846 private:
847 std::vector<JobOperationValue*> values_;
848
849 void Append(JobOperationValues& target,
850 bool clear)
851 {
852 target.Reserve(target.GetSize() + GetSize());
853
854 for (size_t i = 0; i < values_.size(); i++)
855 {
856 if (clear)
857 {
858 target.Append(values_[i]);
859 values_[i] = NULL;
860 }
861 else
862 {
863 target.Append(GetValue(i).Clone());
864 }
865 }
866
867 if (clear)
868 {
869 Clear();
870 }
871 }
872
873 public:
874 ~JobOperationValues()
875 {
876 Clear();
877 }
878
879 void Move(JobOperationValues& target)
880 {
881 return Append(target, true);
882 }
883
884 void Copy(JobOperationValues& target)
885 {
886 return Append(target, false);
887 }
888
889 void Clear()
890 {
891 for (size_t i = 0; i < values_.size(); i++)
892 {
893 if (values_[i] != NULL)
894 {
895 delete values_[i];
896 }
897 }
898
899 values_.clear();
900 }
901
902 void Reserve(size_t count)
903 {
904 values_.reserve(count);
905 }
906
907 void Append(JobOperationValue* value) // Takes ownership
908 {
909 if (value == NULL)
910 {
911 throw OrthancException(ErrorCode_NullPointer);
912 }
913 else
914 {
915 values_.push_back(value);
916 }
917 }
918
919 size_t GetSize() const
920 {
921 return values_.size();
922 }
923
924 JobOperationValue& GetValue(size_t index) const
925 {
926 if (index >= values_.size())
927 {
928 throw OrthancException(ErrorCode_ParameterOutOfRange);
929 }
930 else
931 {
932 assert(values_[index] != NULL);
933 return *values_[index];
934 }
935 }
936 };
937
938
939
940 class IJobOperation : public boost::noncopyable
941 {
942 public:
943 virtual ~IJobOperation()
944 {
945 }
946
947 virtual void Apply(JobOperationValues& outputs,
948 const JobOperationValue& input,
949 IDicomConnectionManager& manager) = 0;
950 };
951
952
953 class LogJobOperation : public IJobOperation
954 {
955 public:
956 virtual void Apply(JobOperationValues& outputs,
957 const JobOperationValue& input,
958 IDicomConnectionManager& manager)
959 {
960 switch (input.GetType())
961 {
962 case JobOperationValue::Type_String:
963 LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent();
964 break;
965
966 case JobOperationValue::Type_Null:
967 LOG(INFO) << "Job value: (null)";
968 break;
969
970 default:
971 LOG(INFO) << "Job value: (unsupport)";
972 break;
973 }
974
975 outputs.Append(input.Clone());
976 }
977 };
978
979
980 class DicomInstanceValue : public JobOperationValue 755 class DicomInstanceValue : public JobOperationValue
981 { 756 {
982 private: 757 private:
983 ServerContext& context_; 758 ServerContext& context_;
984 std::string id_; 759 std::string id_;
1170 void SetTimeout(unsigned int timeout) 945 void SetTimeout(unsigned int timeout)
1171 { 946 {
1172 boost::mutex::scoped_lock lock(mutex_); 947 boost::mutex::scoped_lock lock(mutex_);
1173 948
1174 timeout_ = boost::posix_time::milliseconds(timeout); 949 timeout_ = boost::posix_time::milliseconds(timeout);
1175 CheckTimeout(); 950 CheckTimeoutInternal();
1176 } 951 }
1177 952
1178 unsigned int GetTimeout() 953 unsigned int GetTimeout()
1179 { 954 {
1180 boost::mutex::scoped_lock lock(mutex_); 955 boost::mutex::scoped_lock lock(mutex_);
1284 bool IsDone() const 1059 bool IsDone() const
1285 { 1060 {
1286 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); 1061 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
1287 } 1062 }
1288 1063
1289 void Step(IDicomConnectionManager& manager) 1064 void Step()
1290 { 1065 {
1291 if (IsDone()) 1066 if (IsDone())
1292 { 1067 {
1293 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1068 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1294 } 1069 }
1303 { 1078 {
1304 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); 1079 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
1305 } 1080 }
1306 1081
1307 JobOperationValues outputs; 1082 JobOperationValues outputs;
1308 operation_->Apply(outputs, *input, manager); 1083 operation_->Apply(outputs, *input);
1309 1084
1310 if (!nextOperations_.empty()) 1085 if (!nextOperations_.empty())
1311 { 1086 {
1312 std::list<Operation*>::iterator first = nextOperations_.begin(); 1087 std::list<Operation*>::iterator first = nextOperations_.begin();
1313 outputs.Move((*first)->workInputs_); 1088 outputs.Move((*first)->workInputs_);
1331 bool done_; 1106 bool done_;
1332 boost::mutex mutex_; 1107 boost::mutex mutex_;
1333 std::vector<Operation*> operations_; 1108 std::vector<Operation*> operations_;
1334 size_t current_; 1109 size_t current_;
1335 boost::condition_variable operationAdded_; 1110 boost::condition_variable operationAdded_;
1336 TimeoutDicomConnectionManager& connectionManager_;
1337 boost::posix_time::time_duration trailingTimeout_; 1111 boost::posix_time::time_duration trailingTimeout_;
1338 std::list<IObserver*> observers_; 1112 std::list<IObserver*> observers_;
1339 1113
1340 // Invoked from constructors 1114 // Invoked from constructors
1341 void Setup() 1115 void Setup()
1344 current_ = 0; 1118 current_ = 0;
1345 trailingTimeout_ = boost::posix_time::milliseconds(1000); 1119 trailingTimeout_ = boost::posix_time::milliseconds(1000);
1346 } 1120 }
1347 1121
1348 public: 1122 public:
1349 SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) : 1123 SequenceOfOperationsJob() :
1350 jobType_("SequenceOfOperations"), 1124 jobType_("SequenceOfOperations")
1351 connectionManager_(manager)
1352 { 1125 {
1353 Setup(); 1126 Setup();
1354 } 1127 }
1355 1128
1356 SequenceOfOperationsJob(const std::string& jobType, 1129 SequenceOfOperationsJob(const std::string& jobType) :
1357 TimeoutDicomConnectionManager& manager) : 1130 jobType_(jobType)
1358 jobType_(jobType),
1359 connectionManager_(manager)
1360 { 1131 {
1361 Setup(); 1132 Setup();
1362 } 1133 }
1363 1134
1364 virtual ~SequenceOfOperationsJob() 1135 virtual ~SequenceOfOperationsJob()
1395 } 1166 }
1396 1167
1397 bool IsDone() const 1168 bool IsDone() const
1398 { 1169 {
1399 return that_.done_; 1170 return that_.done_;
1400 }
1401
1402 void SetDicomConnectionTimeout(unsigned int timeout)
1403 {
1404 that_.connectionManager_.SetTimeout(timeout);
1405 } 1171 }
1406 1172
1407 void SetTrailingOperationTimeout(unsigned int timeout) 1173 void SetTrailingOperationTimeout(unsigned int timeout)
1408 { 1174 {
1409 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); 1175 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
1511 current_++; 1277 current_++;
1512 } 1278 }
1513 1279
1514 if (current_ < operations_.size()) 1280 if (current_ < operations_.size())
1515 { 1281 {
1516 operations_[current_]->Step(connectionManager_); 1282 operations_[current_]->Step();
1517 } 1283 }
1518 1284
1519 return JobStepResult::Continue(); 1285 return JobStepResult::Continue();
1520 } 1286 }
1521 1287
1532 } 1298 }
1533 } 1299 }
1534 1300
1535 virtual void ReleaseResources() 1301 virtual void ReleaseResources()
1536 { 1302 {
1537 boost::mutex::scoped_lock lock(mutex_);
1538 connectionManager_.Close();
1539 } 1303 }
1540 1304
1541 virtual float GetProgress() 1305 virtual float GetProgress()
1542 { 1306 {
1543 boost::mutex::scoped_lock lock(mutex_); 1307 boost::mutex::scoped_lock lock(mutex_);
1571 typedef SequenceOfOperationsJob::Lock Lock; 1335 typedef SequenceOfOperationsJob::Lock Lock;
1572 1336
1573 private: 1337 private:
1574 boost::mutex mutex_; 1338 boost::mutex mutex_;
1575 JobsEngine& engine_; 1339 JobsEngine& engine_;
1576 TimeoutDicomConnectionManager manager_; 1340 TimeoutDicomConnectionManager connectionManager_;
1577 std::string currentId_; 1341 std::string currentId_;
1578 SequenceOfOperationsJob* currentJob_; 1342 SequenceOfOperationsJob* currentJob_;
1579 size_t maxOperations_; 1343 size_t maxOperations_;
1580 int priority_; 1344 int priority_;
1581 unsigned int trailingTimeout_; 1345 unsigned int trailingTimeout_;
1582 1346 bool continue_;
1347 boost::thread connectionTimeoutThread_;
1348
1349 static void ConnectionTimeoutThread(LuaJobManager* manager)
1350 {
1351 while (manager->continue_)
1352 {
1353 manager->connectionManager_.CheckTimeout();
1354 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
1355 }
1356 }
1357
1583 virtual void SignalDone(const SequenceOfOperationsJob& job) 1358 virtual void SignalDone(const SequenceOfOperationsJob& job)
1584 { 1359 {
1585 boost::mutex::scoped_lock lock(mutex_); 1360 boost::mutex::scoped_lock lock(mutex_);
1586 1361
1587 if (&job == currentJob_) 1362 if (&job == currentJob_)
1594 public: 1369 public:
1595 LuaJobManager(JobsEngine& engine) : 1370 LuaJobManager(JobsEngine& engine) :
1596 engine_(engine), 1371 engine_(engine),
1597 currentJob_(NULL), 1372 currentJob_(NULL),
1598 maxOperations_(1000), 1373 maxOperations_(1000),
1599 priority_(0) 1374 priority_(0),
1600 { 1375 continue_(true)
1376 {
1377 connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this);
1378 }
1379
1380 ~LuaJobManager()
1381 {
1382 continue_ = false;
1383
1384 if (connectionTimeoutThread_.joinable())
1385 {
1386 connectionTimeoutThread_.join();
1387 }
1601 } 1388 }
1602 1389
1603 void SetMaxOperationsPerJob(size_t count) 1390 void SetMaxOperationsPerJob(size_t count)
1604 { 1391 {
1605 boost::mutex::scoped_lock lock(mutex_); 1392 boost::mutex::scoped_lock lock(mutex_);
1633 } 1420 }
1634 } 1421 }
1635 1422
1636 // Need to create a new job, as the previous one is either 1423 // Need to create a new job, as the previous one is either
1637 // finished, or is getting too long 1424 // finished, or is getting too long
1638 currentJob_ = new SequenceOfOperationsJob(manager_); 1425 currentJob_ = new SequenceOfOperationsJob;
1639 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); 1426 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
1640 1427
1641 std::auto_ptr<Lock> result(new Lock(*currentJob_)); 1428 std::auto_ptr<Lock> result(new Lock(*currentJob_));
1642 result->SetTrailingOperationTimeout(trailingTimeout_); 1429 result->SetTrailingOperationTimeout(trailingTimeout_);
1643 1430
1647 } 1434 }
1648 1435
1649 1436
1650 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) 1437 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
1651 { 1438 {
1652 TimeoutDicomConnectionManager manager;
1653 JobsEngine engine; 1439 JobsEngine engine;
1654 engine.SetWorkersCount(3); 1440 engine.SetWorkersCount(3);
1655 engine.Start(); 1441 engine.Start();
1656 1442
1657 std::string id; 1443 std::string id;
1658 SequenceOfOperationsJob* job = NULL; 1444 SequenceOfOperationsJob* job = NULL;
1659 1445
1660 { 1446 {
1661 std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager)); 1447 std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob);
1662 job = a.get(); 1448 job = a.get();
1663 engine.GetRegistry().Submit(id, a.release(), 0); 1449 engine.GetRegistry().Submit(id, a.release(), 0);
1664 } 1450 }
1665 1451
1666 boost::this_thread::sleep(boost::posix_time::milliseconds(500)); 1452 boost::this_thread::sleep(boost::posix_time::milliseconds(500));