Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2603:988936118354 jobs
reorganization
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Fri, 18 May 2018 17:02:25 +0200 |
parents | 5b6c3d77a2a1 |
children | 76ef12fa136c |
comparison
equal
deleted
inserted
replaced
2602:c25f1a52acbc | 2603:988936118354 |
---|---|
740 | 740 |
741 | 741 |
742 | 742 |
743 | 743 |
744 | 744 |
745 #include "../OrthancServer/ServerContext.h" | 745 #include "../OrthancServer/ServerJobs/LuaJobManager.h" |
746 #include "../Core/Logging.h" | |
747 | |
748 #include "../Core/DicomNetworking/IDicomConnectionManager.h" | |
749 #include "../Core/JobsEngine/Operations/JobOperationValues.h" | |
750 #include "../Core/JobsEngine/Operations/StringOperationValue.h" | 746 #include "../Core/JobsEngine/Operations/StringOperationValue.h" |
751 #include "../Core/JobsEngine/Operations/LogJobOperation.h" | 747 #include "../Core/JobsEngine/Operations/LogJobOperation.h" |
752 | |
753 namespace Orthanc | |
754 { | |
755 class DicomInstanceValue : public JobOperationValue | |
756 { | |
757 private: | |
758 ServerContext& context_; | |
759 std::string id_; | |
760 | |
761 public: | |
762 DicomInstanceValue(ServerContext& context, | |
763 const std::string& id) : | |
764 JobOperationValue(Type_DicomInstance), | |
765 context_(context), | |
766 id_(id) | |
767 { | |
768 } | |
769 | |
770 const std::string& GetId() const | |
771 { | |
772 return id_; | |
773 } | |
774 | |
775 void ReadContent(std::string& dicom) const | |
776 { | |
777 context_.ReadDicom(dicom, id_); | |
778 } | |
779 | |
780 virtual JobOperationValue* Clone() const | |
781 { | |
782 return new DicomInstanceValue(context_, id_); | |
783 } | |
784 }; | |
785 | |
786 | |
787 class StoreScuOperation : public IJobOperation | |
788 { | |
789 private: | |
790 std::string localAet_; | |
791 RemoteModalityParameters modality_; | |
792 | |
793 public: | |
794 StoreScuOperation(const std::string& localAet, | |
795 const RemoteModalityParameters& modality) : | |
796 localAet_(localAet), | |
797 modality_(modality) | |
798 { | |
799 } | |
800 | |
801 virtual void Apply(JobOperationValues& outputs, | |
802 const JobOperationValue& input, | |
803 IDicomConnectionManager& manager) | |
804 { | |
805 std::auto_ptr<IDicomConnectionManager::IResource> resource | |
806 (manager.AcquireConnection(localAet_, modality_)); | |
807 | |
808 if (resource.get() == NULL) | |
809 { | |
810 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); | |
811 return; | |
812 } | |
813 | |
814 if (input.GetType() != JobOperationValue::Type_DicomInstance) | |
815 { | |
816 throw OrthancException(ErrorCode_BadParameterType); | |
817 } | |
818 | |
819 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); | |
820 | |
821 LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" | |
822 << modality_.GetApplicationEntityTitle() << "\""; | |
823 | |
824 try | |
825 { | |
826 std::string dicom; | |
827 instance.ReadContent(dicom); | |
828 resource->GetConnection().Store(dicom); | |
829 outputs.Append(instance.Clone()); | |
830 } | |
831 catch (OrthancException& e) | |
832 { | |
833 LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" | |
834 << modality_.GetApplicationEntityTitle() << "\": " << e.What(); | |
835 } | |
836 } | |
837 }; | |
838 | |
839 | |
840 class DeleteResourceOperation : public IJobOperation | |
841 { | |
842 private: | |
843 ServerContext& context_; | |
844 | |
845 public: | |
846 DeleteResourceOperation(ServerContext& context) : | |
847 context_(context) | |
848 { | |
849 } | |
850 | |
851 virtual void Apply(JobOperationValues& outputs, | |
852 const JobOperationValue& input, | |
853 IDicomConnectionManager& manager) | |
854 { | |
855 switch (input.GetType()) | |
856 { | |
857 case JobOperationValue::Type_DicomInstance: | |
858 { | |
859 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); | |
860 LOG(INFO) << "Deleting instance: " << instance.GetId(); | |
861 | |
862 try | |
863 { | |
864 Json::Value tmp; | |
865 context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance); | |
866 } | |
867 catch (OrthancException& e) | |
868 { | |
869 LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What(); | |
870 } | |
871 | |
872 break; | |
873 } | |
874 | |
875 default: | |
876 break; | |
877 } | |
878 } | |
879 }; | |
880 | |
881 | |
882 | |
883 class TimeoutDicomConnectionManager : public IDicomConnectionManager | |
884 { | |
885 private: | |
886 class Resource : public IDicomConnectionManager::IResource | |
887 { | |
888 private: | |
889 TimeoutDicomConnectionManager& that_; | |
890 boost::mutex::scoped_lock lock_; | |
891 | |
892 public: | |
893 Resource(TimeoutDicomConnectionManager& that) : | |
894 that_(that), | |
895 lock_(that.mutex_) | |
896 { | |
897 } | |
898 | |
899 virtual ~Resource() | |
900 { | |
901 that_.Touch(); | |
902 } | |
903 | |
904 virtual DicomUserConnection& GetConnection() | |
905 { | |
906 if (that_.connection_.get() == NULL) | |
907 { | |
908 throw OrthancException(ErrorCode_InternalError); | |
909 } | |
910 | |
911 return *that_.connection_; | |
912 } | |
913 }; | |
914 | |
915 boost::mutex mutex_; | |
916 std::auto_ptr<DicomUserConnection> connection_; | |
917 boost::posix_time::ptime lastUse_; | |
918 boost::posix_time::time_duration timeout_; | |
919 | |
920 static boost::posix_time::ptime GetNow() | |
921 { | |
922 return boost::posix_time::microsec_clock::universal_time(); | |
923 } | |
924 | |
925 void Touch() | |
926 { | |
927 lastUse_ = GetNow(); | |
928 } | |
929 | |
930 void CheckTimeoutInternal() | |
931 { | |
932 if (connection_.get() != NULL && | |
933 (GetNow() - lastUse_) >= timeout_) | |
934 { | |
935 connection_.reset(NULL); | |
936 } | |
937 } | |
938 | |
939 public: | |
940 TimeoutDicomConnectionManager() : | |
941 timeout_(boost::posix_time::milliseconds(1000)) | |
942 { | |
943 } | |
944 | |
945 void SetTimeout(unsigned int timeout) | |
946 { | |
947 boost::mutex::scoped_lock lock(mutex_); | |
948 | |
949 timeout_ = boost::posix_time::milliseconds(timeout); | |
950 CheckTimeoutInternal(); | |
951 } | |
952 | |
953 unsigned int GetTimeout() | |
954 { | |
955 boost::mutex::scoped_lock lock(mutex_); | |
956 return timeout_.total_milliseconds(); | |
957 } | |
958 | |
959 void Close() | |
960 { | |
961 boost::mutex::scoped_lock lock(mutex_); | |
962 connection_.reset(NULL); | |
963 } | |
964 | |
965 void CheckTimeout() | |
966 { | |
967 boost::mutex::scoped_lock lock(mutex_); | |
968 CheckTimeoutInternal(); | |
969 } | |
970 | |
971 virtual IResource* AcquireConnection(const std::string& localAet, | |
972 const RemoteModalityParameters& remote) | |
973 { | |
974 boost::mutex::scoped_lock lock(mutex_); | |
975 | |
976 if (connection_.get() == NULL || | |
977 !connection_->IsSameAssociation(localAet, remote)) | |
978 { | |
979 connection_.reset(new DicomUserConnection(localAet, remote)); | |
980 } | |
981 | |
982 return new Resource(*this); | |
983 } | |
984 }; | |
985 | |
986 | |
987 | |
988 class SequenceOfOperationsJob : public IJob | |
989 { | |
990 public: | |
991 class IObserver : public boost::noncopyable | |
992 { | |
993 public: | |
994 virtual ~IObserver() | |
995 { | |
996 } | |
997 | |
998 virtual void SignalDone(const SequenceOfOperationsJob& job) = 0; | |
999 }; | |
1000 | |
1001 private: | |
1002 class Operation : public boost::noncopyable | |
1003 { | |
1004 private: | |
1005 JobOperationValues originalInputs_; | |
1006 JobOperationValues workInputs_; | |
1007 std::auto_ptr<IJobOperation> operation_; | |
1008 std::list<Operation*> nextOperations_; | |
1009 size_t currentInput_; | |
1010 | |
1011 public: | |
1012 Operation(IJobOperation* operation) : | |
1013 operation_(operation), | |
1014 currentInput_(0) | |
1015 { | |
1016 if (operation == NULL) | |
1017 { | |
1018 throw OrthancException(ErrorCode_NullPointer); | |
1019 } | |
1020 } | |
1021 | |
1022 void AddOriginalInput(const JobOperationValue& value) | |
1023 { | |
1024 if (currentInput_ != 0) | |
1025 { | |
1026 // Cannot add input after processing has started | |
1027 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1028 } | |
1029 else | |
1030 { | |
1031 originalInputs_.Append(value.Clone()); | |
1032 } | |
1033 } | |
1034 | |
1035 const JobOperationValues& GetOriginalInputs() const | |
1036 { | |
1037 return originalInputs_; | |
1038 } | |
1039 | |
1040 void Reset() | |
1041 { | |
1042 workInputs_.Clear(); | |
1043 currentInput_ = 0; | |
1044 } | |
1045 | |
1046 void AddNextOperation(Operation& other) | |
1047 { | |
1048 if (currentInput_ != 0) | |
1049 { | |
1050 // Cannot add input after processing has started | |
1051 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1052 } | |
1053 else | |
1054 { | |
1055 nextOperations_.push_back(&other); | |
1056 } | |
1057 } | |
1058 | |
1059 bool IsDone() const | |
1060 { | |
1061 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); | |
1062 } | |
1063 | |
1064 void Step() | |
1065 { | |
1066 if (IsDone()) | |
1067 { | |
1068 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1069 } | |
1070 | |
1071 const JobOperationValue* input; | |
1072 | |
1073 if (currentInput_ < originalInputs_.GetSize()) | |
1074 { | |
1075 input = &originalInputs_.GetValue(currentInput_); | |
1076 } | |
1077 else | |
1078 { | |
1079 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); | |
1080 } | |
1081 | |
1082 JobOperationValues outputs; | |
1083 operation_->Apply(outputs, *input); | |
1084 | |
1085 if (!nextOperations_.empty()) | |
1086 { | |
1087 std::list<Operation*>::iterator first = nextOperations_.begin(); | |
1088 outputs.Move((*first)->workInputs_); | |
1089 | |
1090 std::list<Operation*>::iterator current = first; | |
1091 ++current; | |
1092 | |
1093 while (current != nextOperations_.end()) | |
1094 { | |
1095 (*first)->workInputs_.Copy((*current)->workInputs_); | |
1096 ++current; | |
1097 } | |
1098 } | |
1099 | |
1100 currentInput_ += 1; | |
1101 } | |
1102 }; | |
1103 | |
1104 | |
1105 std::string jobType_; | |
1106 bool done_; | |
1107 boost::mutex mutex_; | |
1108 std::vector<Operation*> operations_; | |
1109 size_t current_; | |
1110 boost::condition_variable operationAdded_; | |
1111 boost::posix_time::time_duration trailingTimeout_; | |
1112 std::list<IObserver*> observers_; | |
1113 | |
1114 // Invoked from constructors | |
1115 void Setup() | |
1116 { | |
1117 done_ = false; | |
1118 current_ = 0; | |
1119 trailingTimeout_ = boost::posix_time::milliseconds(1000); | |
1120 } | |
1121 | |
1122 public: | |
1123 SequenceOfOperationsJob() : | |
1124 jobType_("SequenceOfOperations") | |
1125 { | |
1126 Setup(); | |
1127 } | |
1128 | |
1129 SequenceOfOperationsJob(const std::string& jobType) : | |
1130 jobType_(jobType) | |
1131 { | |
1132 Setup(); | |
1133 } | |
1134 | |
1135 virtual ~SequenceOfOperationsJob() | |
1136 { | |
1137 for (size_t i = 0; i < operations_.size(); i++) | |
1138 { | |
1139 if (operations_[i] != NULL) | |
1140 { | |
1141 delete operations_[i]; | |
1142 } | |
1143 } | |
1144 } | |
1145 | |
1146 void Register(IObserver& observer) | |
1147 { | |
1148 boost::mutex::scoped_lock lock(mutex_); | |
1149 observers_.push_back(&observer); | |
1150 } | |
1151 | |
1152 // This lock allows adding new operations to the end of the job, | |
1153 // from another thread than the worker thread, after the job has | |
1154 // been submitted for processing | |
1155 class Lock : public boost::noncopyable | |
1156 { | |
1157 private: | |
1158 SequenceOfOperationsJob& that_; | |
1159 boost::mutex::scoped_lock lock_; | |
1160 | |
1161 public: | |
1162 Lock(SequenceOfOperationsJob& that) : | |
1163 that_(that), | |
1164 lock_(that.mutex_) | |
1165 { | |
1166 } | |
1167 | |
1168 bool IsDone() const | |
1169 { | |
1170 return that_.done_; | |
1171 } | |
1172 | |
1173 void SetTrailingOperationTimeout(unsigned int timeout) | |
1174 { | |
1175 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); | |
1176 } | |
1177 | |
1178 size_t AddOperation(IJobOperation* operation) | |
1179 { | |
1180 if (IsDone()) | |
1181 { | |
1182 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1183 } | |
1184 | |
1185 that_.operations_.push_back(new Operation(operation)); | |
1186 that_.operationAdded_.notify_one(); | |
1187 | |
1188 return that_.operations_.size() - 1; | |
1189 } | |
1190 | |
1191 size_t GetOperationsCount() const | |
1192 { | |
1193 return that_.operations_.size(); | |
1194 } | |
1195 | |
1196 void AddInput(size_t index, | |
1197 const JobOperationValue& value) | |
1198 { | |
1199 if (IsDone()) | |
1200 { | |
1201 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1202 } | |
1203 else if (index >= that_.operations_.size() || | |
1204 index < that_.current_) | |
1205 { | |
1206 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
1207 } | |
1208 else | |
1209 { | |
1210 that_.operations_[index]->AddOriginalInput(value); | |
1211 } | |
1212 } | |
1213 | |
1214 void Connect(size_t input, | |
1215 size_t output) | |
1216 { | |
1217 if (IsDone()) | |
1218 { | |
1219 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1220 } | |
1221 else if (input >= output || | |
1222 input >= that_.operations_.size() || | |
1223 output >= that_.operations_.size() || | |
1224 input < that_.current_ || | |
1225 output < that_.current_) | |
1226 { | |
1227 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
1228 } | |
1229 else | |
1230 { | |
1231 Operation& a = *that_.operations_[input]; | |
1232 Operation& b = *that_.operations_[output]; | |
1233 a.AddNextOperation(b); | |
1234 } | |
1235 } | |
1236 }; | |
1237 | |
1238 virtual void Start() | |
1239 { | |
1240 } | |
1241 | |
1242 virtual JobStepResult ExecuteStep() | |
1243 { | |
1244 boost::mutex::scoped_lock lock(mutex_); | |
1245 | |
1246 if (current_ == operations_.size()) | |
1247 { | |
1248 LOG(INFO) << "Executing the trailing timeout in the sequence of operations"; | |
1249 operationAdded_.timed_wait(lock, trailingTimeout_); | |
1250 | |
1251 if (current_ == operations_.size()) | |
1252 { | |
1253 // No operation was added during the trailing timeout: The | |
1254 // job is over | |
1255 LOG(INFO) << "The sequence of operations is over"; | |
1256 done_ = true; | |
1257 | |
1258 for (std::list<IObserver*>::iterator it = observers_.begin(); | |
1259 it != observers_.end(); ++it) | |
1260 { | |
1261 (*it)->SignalDone(*this); | |
1262 } | |
1263 | |
1264 return JobStepResult::Success(); | |
1265 } | |
1266 else | |
1267 { | |
1268 LOG(INFO) << "New operation added to the sequence of operations"; | |
1269 } | |
1270 } | |
1271 | |
1272 assert(current_ < operations_.size()); | |
1273 | |
1274 while (current_ < operations_.size() && | |
1275 operations_[current_]->IsDone()) | |
1276 { | |
1277 current_++; | |
1278 } | |
1279 | |
1280 if (current_ < operations_.size()) | |
1281 { | |
1282 operations_[current_]->Step(); | |
1283 } | |
1284 | |
1285 return JobStepResult::Continue(); | |
1286 } | |
1287 | |
1288 virtual void SignalResubmit() | |
1289 { | |
1290 boost::mutex::scoped_lock lock(mutex_); | |
1291 | |
1292 current_ = 0; | |
1293 done_ = false; | |
1294 | |
1295 for (size_t i = 0; i < operations_.size(); i++) | |
1296 { | |
1297 operations_[i]->Reset(); | |
1298 } | |
1299 } | |
1300 | |
1301 virtual void ReleaseResources() | |
1302 { | |
1303 } | |
1304 | |
1305 virtual float GetProgress() | |
1306 { | |
1307 boost::mutex::scoped_lock lock(mutex_); | |
1308 | |
1309 return (static_cast<float>(current_) / | |
1310 static_cast<float>(operations_.size() + 1)); | |
1311 } | |
1312 | |
1313 virtual void GetJobType(std::string& target) | |
1314 { | |
1315 target = jobType_; | |
1316 } | |
1317 | |
1318 virtual void GetPublicContent(Json::Value& value) | |
1319 { | |
1320 boost::mutex::scoped_lock lock(mutex_); | |
1321 | |
1322 value["CountOperations"] = static_cast<unsigned int>(operations_.size()); | |
1323 } | |
1324 | |
1325 virtual void GetInternalContent(Json::Value& value) | |
1326 { | |
1327 // TODO | |
1328 } | |
1329 }; | |
1330 | |
1331 | |
1332 class LuaJobManager : private SequenceOfOperationsJob::IObserver | |
1333 { | |
1334 public: | |
1335 typedef SequenceOfOperationsJob::Lock Lock; | |
1336 | |
1337 private: | |
1338 boost::mutex mutex_; | |
1339 JobsEngine& engine_; | |
1340 TimeoutDicomConnectionManager connectionManager_; | |
1341 std::string currentId_; | |
1342 SequenceOfOperationsJob* currentJob_; | |
1343 size_t maxOperations_; | |
1344 int priority_; | |
1345 unsigned int trailingTimeout_; | |
1346 bool continue_; | |
1347 boost::thread connectionTimeoutThread_; | |
1348 | |
1349 static void ConnectionTimeoutThread(LuaJobManager* manager) | |
1350 { | |
1351 while (manager->continue_) | |
1352 { | |
1353 manager->connectionManager_.CheckTimeout(); | |
1354 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); | |
1355 } | |
1356 } | |
1357 | |
1358 virtual void SignalDone(const SequenceOfOperationsJob& job) | |
1359 { | |
1360 boost::mutex::scoped_lock lock(mutex_); | |
1361 | |
1362 if (&job == currentJob_) | |
1363 { | |
1364 currentId_.clear(); | |
1365 currentJob_ = NULL; | |
1366 } | |
1367 } | |
1368 | |
1369 public: | |
1370 LuaJobManager(JobsEngine& engine) : | |
1371 engine_(engine), | |
1372 currentJob_(NULL), | |
1373 maxOperations_(1000), | |
1374 priority_(0), | |
1375 continue_(true) | |
1376 { | |
1377 connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this); | |
1378 } | |
1379 | |
1380 ~LuaJobManager() | |
1381 { | |
1382 continue_ = false; | |
1383 | |
1384 if (connectionTimeoutThread_.joinable()) | |
1385 { | |
1386 connectionTimeoutThread_.join(); | |
1387 } | |
1388 } | |
1389 | |
1390 void SetMaxOperationsPerJob(size_t count) | |
1391 { | |
1392 boost::mutex::scoped_lock lock(mutex_); | |
1393 maxOperations_ = count; | |
1394 } | |
1395 | |
1396 void SetPriority(int priority) | |
1397 { | |
1398 boost::mutex::scoped_lock lock(mutex_); | |
1399 priority_ = priority; | |
1400 } | |
1401 | |
1402 void SetTrailingOperationTimeout(unsigned int timeout) | |
1403 { | |
1404 boost::mutex::scoped_lock lock(mutex_); | |
1405 trailingTimeout_ = timeout; | |
1406 } | |
1407 | |
1408 Lock* Modify() | |
1409 { | |
1410 boost::mutex::scoped_lock lock(mutex_); | |
1411 | |
1412 if (currentJob_ != NULL) | |
1413 { | |
1414 std::auto_ptr<Lock> result(new Lock(*currentJob_)); | |
1415 | |
1416 if (!result->IsDone() && | |
1417 result->GetOperationsCount() < maxOperations_) | |
1418 { | |
1419 return result.release(); | |
1420 } | |
1421 } | |
1422 | |
1423 // Need to create a new job, as the previous one is either | |
1424 // finished, or is getting too long | |
1425 currentJob_ = new SequenceOfOperationsJob; | |
1426 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); | |
1427 | |
1428 std::auto_ptr<Lock> result(new Lock(*currentJob_)); | |
1429 result->SetTrailingOperationTimeout(trailingTimeout_); | |
1430 | |
1431 return result.release(); | |
1432 } | |
1433 }; | |
1434 } | |
1435 | 748 |
1436 | 749 |
1437 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) | 750 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) |
1438 { | 751 { |
1439 JobsEngine engine; | 752 JobsEngine engine; |