Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2600:140a539b4eba jobs
SequenceOfOperationsJob
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Thu, 17 May 2018 16:22:40 +0200 |
parents | 593d6b0f4cba |
children | 5b6c3d77a2a1 |
comparison
equal
deleted
inserted
replaced
2599:593d6b0f4cba | 2600:140a539b4eba |
---|---|
751 { | 751 { |
752 public: | 752 public: |
753 enum Type | 753 enum Type |
754 { | 754 { |
755 Type_DicomInstance, | 755 Type_DicomInstance, |
756 Type_Null | 756 Type_Null, |
757 Type_String | |
757 }; | 758 }; |
758 | 759 |
759 private: | 760 private: |
760 Type type_; | 761 Type type_; |
761 | 762 |
777 | 778 |
778 virtual JobOperationValue* Clone() const = 0; | 779 virtual JobOperationValue* Clone() const = 0; |
779 }; | 780 }; |
780 | 781 |
781 | 782 |
782 class IDicomConnectionProvider : public boost::noncopyable | 783 class NullOperationValue : public JobOperationValue |
783 { | 784 { |
784 public: | 785 public: |
785 virtual ~IDicomConnectionProvider() | 786 NullOperationValue() : |
787 JobOperationValue(Type_Null) | |
788 { | |
789 } | |
790 | |
791 virtual JobOperationValue* Clone() const | |
792 { | |
793 return new NullOperationValue; | |
794 } | |
795 }; | |
796 | |
797 | |
798 class StringOperationValue : public JobOperationValue | |
799 { | |
800 private: | |
801 std::string content_; | |
802 | |
803 public: | |
804 StringOperationValue(const std::string& content) : | |
805 JobOperationValue(JobOperationValue::Type_String), | |
806 content_(content) | |
807 { | |
808 } | |
809 | |
810 virtual JobOperationValue* Clone() const | |
811 { | |
812 return new StringOperationValue(content_); | |
813 } | |
814 | |
815 const std::string& GetContent() const | |
816 { | |
817 return content_; | |
818 } | |
819 }; | |
820 | |
821 | |
822 class IDicomConnectionManager : public boost::noncopyable | |
823 { | |
824 public: | |
825 virtual ~IDicomConnectionManager() | |
786 { | 826 { |
787 } | 827 } |
788 | 828 |
789 class IResource : public boost::noncopyable | 829 class IResource : public boost::noncopyable |
790 { | 830 { |
794 } | 834 } |
795 | 835 |
796 virtual DicomUserConnection& GetConnection() = 0; | 836 virtual DicomUserConnection& GetConnection() = 0; |
797 }; | 837 }; |
798 | 838 |
799 virtual IResource* Acquire(const std::string& localAet, | 839 virtual IResource* AcquireConnection(const std::string& localAet, |
800 const RemoteModalityParameters& remote) = 0; | 840 const RemoteModalityParameters& remote) = 0; |
801 }; | 841 }; |
802 | 842 |
803 | 843 |
804 class JobOperationValues : public boost::noncopyable | 844 class JobOperationValues : public boost::noncopyable |
805 { | 845 { |
806 private: | 846 private: |
807 std::vector<JobOperationValue*> values_; | 847 std::vector<JobOperationValue*> values_; |
808 | 848 |
849 void Append(JobOperationValues& target, | |
850 bool clear) | |
851 { | |
852 target.Reserve(target.GetSize() + GetSize()); | |
853 | |
854 for (size_t i = 0; i < values_.size(); i++) | |
855 { | |
856 if (clear) | |
857 { | |
858 target.Append(values_[i]); | |
859 values_[i] = NULL; | |
860 } | |
861 else | |
862 { | |
863 target.Append(GetValue(i).Clone()); | |
864 } | |
865 } | |
866 | |
867 if (clear) | |
868 { | |
869 Clear(); | |
870 } | |
871 } | |
872 | |
809 public: | 873 public: |
810 ~JobOperationValues() | 874 ~JobOperationValues() |
811 { | 875 { |
812 Clear(); | 876 Clear(); |
813 } | 877 } |
814 | 878 |
815 void Append(JobOperationValues& target, | 879 void Move(JobOperationValues& target) |
816 bool clear) | 880 { |
817 { | 881 return Append(target, true); |
818 target.Reserve(target.GetSize() + GetSize()); | 882 } |
819 | 883 |
820 for (size_t i = 0; i < values_.size(); i++) | 884 void Copy(JobOperationValues& target) |
821 { | 885 { |
822 if (clear) | 886 return Append(target, false); |
823 { | |
824 target.Append(values_[i]); | |
825 values_[i] = NULL; | |
826 } | |
827 else | |
828 { | |
829 target.Append(GetValue(i).Clone()); | |
830 } | |
831 } | |
832 | |
833 if (clear) | |
834 { | |
835 Clear(); | |
836 } | |
837 } | 887 } |
838 | 888 |
839 void Clear() | 889 void Clear() |
840 { | 890 { |
841 for (size_t i = 0; i < values_.size(); i++) | 891 for (size_t i = 0; i < values_.size(); i++) |
894 { | 944 { |
895 } | 945 } |
896 | 946 |
897 virtual void Apply(JobOperationValues& outputs, | 947 virtual void Apply(JobOperationValues& outputs, |
898 const JobOperationValue& input, | 948 const JobOperationValue& input, |
899 IDicomConnectionProvider& provider); | 949 IDicomConnectionManager& manager) = 0; |
950 }; | |
951 | |
952 | |
953 class LogJobOperation : public IJobOperation | |
954 { | |
955 public: | |
956 virtual void Apply(JobOperationValues& outputs, | |
957 const JobOperationValue& input, | |
958 IDicomConnectionManager& manager) | |
959 { | |
960 switch (input.GetType()) | |
961 { | |
962 case JobOperationValue::Type_String: | |
963 LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent(); | |
964 break; | |
965 | |
966 case JobOperationValue::Type_Null: | |
967 LOG(INFO) << "Job value: (null)"; | |
968 break; | |
969 | |
970 default: | |
971 LOG(INFO) << "Job value: (unsupport)"; | |
972 break; | |
973 } | |
974 | |
975 outputs.Append(input.Clone()); | |
976 } | |
900 }; | 977 }; |
901 | 978 |
902 | 979 |
903 class DicomInstanceValue : public JobOperationValue | 980 class DicomInstanceValue : public JobOperationValue |
904 { | 981 { |
946 { | 1023 { |
947 } | 1024 } |
948 | 1025 |
949 virtual void Apply(JobOperationValues& outputs, | 1026 virtual void Apply(JobOperationValues& outputs, |
950 const JobOperationValue& input, | 1027 const JobOperationValue& input, |
951 IDicomConnectionProvider& provider) | 1028 IDicomConnectionManager& manager) |
952 { | 1029 { |
953 std::auto_ptr<IDicomConnectionProvider::IResource> resource(provider.Acquire(localAet_, modality_)); | 1030 std::auto_ptr<IDicomConnectionManager::IResource> resource |
1031 (manager.AcquireConnection(localAet_, modality_)); | |
954 | 1032 |
955 if (resource.get() == NULL) | 1033 if (resource.get() == NULL) |
956 { | 1034 { |
957 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); | 1035 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); |
958 return; | 1036 return; |
995 { | 1073 { |
996 } | 1074 } |
997 | 1075 |
998 virtual void Apply(JobOperationValues& outputs, | 1076 virtual void Apply(JobOperationValues& outputs, |
999 const JobOperationValue& input, | 1077 const JobOperationValue& input, |
1000 IDicomConnectionProvider& provider) | 1078 IDicomConnectionManager& manager) |
1001 { | 1079 { |
1002 switch (input.GetType()) | 1080 switch (input.GetType()) |
1003 { | 1081 { |
1004 case JobOperationValue::Type_DicomInstance: | 1082 case JobOperationValue::Type_DicomInstance: |
1005 { | 1083 { |
1024 } | 1102 } |
1025 } | 1103 } |
1026 }; | 1104 }; |
1027 | 1105 |
1028 | 1106 |
1029 class SequenceOfOperationsJob : | 1107 |
1030 public IJob, | 1108 class TimeoutDicomConnectionManager : public IDicomConnectionManager |
1031 private IDicomConnectionProvider | |
1032 { | 1109 { |
1033 private: | 1110 private: |
1034 /*class DicomConnection | 1111 class Resource : public IDicomConnectionManager::IResource |
1035 { | 1112 { |
1036 private: | 1113 private: |
1037 boost::posix_time::ptime lastUse_; | 1114 TimeoutDicomConnectionManager& that_; |
1038 | 1115 boost::mutex::scoped_lock lock_; |
1039 void Touch() | |
1040 { | |
1041 lastUse_ = boost::posix_time::microsec_clock::universal_time(); | |
1042 } | |
1043 | 1116 |
1044 public: | 1117 public: |
1045 class Resource : public IDicomConnectionProvider::IResource | 1118 Resource(TimeoutDicomConnectionManager& that) : |
1046 { | 1119 that_(that), |
1047 private: | 1120 lock_(that.mutex_) |
1048 DicomConnection() | 1121 { |
1049 }; | 1122 } |
1050 };*/ | 1123 |
1051 | 1124 virtual ~Resource() |
1052 class Operation : public boost::noncopyable | 1125 { |
1126 that_.Touch(); | |
1127 } | |
1128 | |
1129 virtual DicomUserConnection& GetConnection() | |
1130 { | |
1131 if (that_.connection_.get() == NULL) | |
1132 { | |
1133 throw OrthancException(ErrorCode_InternalError); | |
1134 } | |
1135 | |
1136 return *that_.connection_; | |
1137 } | |
1138 }; | |
1139 | |
1140 boost::mutex mutex_; | |
1141 std::auto_ptr<DicomUserConnection> connection_; | |
1142 boost::posix_time::ptime lastUse_; | |
1143 boost::posix_time::time_duration timeout_; | |
1144 | |
1145 static boost::posix_time::ptime GetNow() | |
1146 { | |
1147 return boost::posix_time::microsec_clock::universal_time(); | |
1148 } | |
1149 | |
1150 void Touch() | |
1151 { | |
1152 lastUse_ = GetNow(); | |
1153 } | |
1154 | |
1155 void CheckTimeoutInternal() | |
1156 { | |
1157 if (connection_.get() != NULL && | |
1158 (GetNow() - lastUse_) >= timeout_) | |
1159 { | |
1160 connection_.reset(NULL); | |
1161 } | |
1162 } | |
1163 | |
1164 public: | |
1165 TimeoutDicomConnectionManager() : | |
1166 timeout_(boost::posix_time::milliseconds(1000)) | |
1167 { | |
1168 } | |
1169 | |
1170 void SetTimeout(unsigned int timeout) | |
1171 { | |
1172 boost::mutex::scoped_lock lock(mutex_); | |
1173 | |
1174 timeout_ = boost::posix_time::milliseconds(timeout); | |
1175 CheckTimeout(); | |
1176 } | |
1177 | |
1178 unsigned int GetTimeout() | |
1179 { | |
1180 boost::mutex::scoped_lock lock(mutex_); | |
1181 return timeout_.total_milliseconds(); | |
1182 } | |
1183 | |
1184 void Close() | |
1185 { | |
1186 boost::mutex::scoped_lock lock(mutex_); | |
1187 connection_.reset(NULL); | |
1188 } | |
1189 | |
1190 void CheckTimeout() | |
1191 { | |
1192 boost::mutex::scoped_lock lock(mutex_); | |
1193 CheckTimeoutInternal(); | |
1194 } | |
1195 | |
1196 virtual IResource* AcquireConnection(const std::string& localAet, | |
1197 const RemoteModalityParameters& remote) | |
1198 { | |
1199 boost::mutex::scoped_lock lock(mutex_); | |
1200 | |
1201 if (connection_.get() == NULL || | |
1202 !connection_->IsSameAssociation(localAet, remote)) | |
1203 { | |
1204 connection_.reset(new DicomUserConnection(localAet, remote)); | |
1205 } | |
1206 | |
1207 return new Resource(*this); | |
1208 } | |
1209 }; | |
1210 | |
1211 | |
1212 | |
1213 class SequenceOfOperationsJob : public IJob | |
1214 { | |
1215 public: | |
1216 class IObserver : public boost::noncopyable | |
1217 { | |
1218 public: | |
1219 virtual ~IObserver() | |
1220 { | |
1221 } | |
1222 | |
1223 virtual void SignalDone(const SequenceOfOperationsJob& job) = 0; | |
1224 }; | |
1225 | |
1226 private: | |
1227 class Operation : public boost::noncopyable | |
1053 { | 1228 { |
1054 private: | 1229 private: |
1055 JobOperationValues originalInputs_; | 1230 JobOperationValues originalInputs_; |
1056 JobOperationValues workInputs_; | 1231 JobOperationValues workInputs_; |
1057 std::auto_ptr<IJobOperation> operation_; | 1232 std::auto_ptr<IJobOperation> operation_; |
1109 bool IsDone() const | 1284 bool IsDone() const |
1110 { | 1285 { |
1111 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); | 1286 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); |
1112 } | 1287 } |
1113 | 1288 |
1114 void Step(IDicomConnectionProvider& provider) | 1289 void Step(IDicomConnectionManager& manager) |
1115 { | 1290 { |
1116 if (IsDone()) | 1291 if (IsDone()) |
1117 { | 1292 { |
1118 throw OrthancException(ErrorCode_BadSequenceOfCalls); | 1293 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
1119 } | 1294 } |
1124 { | 1299 { |
1125 input = &originalInputs_.GetValue(currentInput_); | 1300 input = &originalInputs_.GetValue(currentInput_); |
1126 } | 1301 } |
1127 else | 1302 else |
1128 { | 1303 { |
1129 input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); | 1304 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); |
1130 } | 1305 } |
1131 | 1306 |
1132 JobOperationValues outputs; | 1307 JobOperationValues outputs; |
1133 operation_->Apply(outputs, *input, provider); | 1308 operation_->Apply(outputs, *input, manager); |
1134 | 1309 |
1135 if (!nextOperations_.empty()) | 1310 if (!nextOperations_.empty()) |
1136 { | 1311 { |
1137 // TODO | 1312 std::list<Operation*>::iterator first = nextOperations_.begin(); |
1313 outputs.Move((*first)->workInputs_); | |
1314 | |
1315 std::list<Operation*>::iterator current = first; | |
1316 ++current; | |
1317 | |
1318 while (current != nextOperations_.end()) | |
1319 { | |
1320 (*first)->workInputs_.Copy((*current)->workInputs_); | |
1321 ++current; | |
1322 } | |
1138 } | 1323 } |
1139 | 1324 |
1140 currentInput_ += 1; | 1325 currentInput_ += 1; |
1141 } | 1326 } |
1142 }; | 1327 }; |
1143 | 1328 |
1144 | 1329 |
1145 boost::mutex mutex_; | 1330 std::string jobType_; |
1146 std::vector<Operation*> operations_; | 1331 bool done_; |
1147 size_t currentOperation_; | 1332 boost::mutex mutex_; |
1148 boost::condition_variable operationAdded_; | 1333 std::vector<Operation*> operations_; |
1334 size_t current_; | |
1335 boost::condition_variable operationAdded_; | |
1336 TimeoutDicomConnectionManager& connectionManager_; | |
1337 boost::posix_time::time_duration trailingTimeout_; | |
1338 std::list<IObserver*> observers_; | |
1339 | |
1340 // Invoked from constructors | |
1341 void Setup() | |
1342 { | |
1343 done_ = false; | |
1344 current_ = 0; | |
1345 trailingTimeout_ = boost::posix_time::milliseconds(1000); | |
1346 } | |
1149 | 1347 |
1150 public: | 1348 public: |
1151 SequenceOfOperationsJob() : | 1349 SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) : |
1152 currentOperation_(0) | 1350 jobType_("SequenceOfOperations"), |
1153 { | 1351 connectionManager_(manager) |
1154 } | 1352 { |
1353 Setup(); | |
1354 } | |
1355 | |
1356 SequenceOfOperationsJob(const std::string& jobType, | |
1357 TimeoutDicomConnectionManager& manager) : | |
1358 jobType_(jobType), | |
1359 connectionManager_(manager) | |
1360 { | |
1361 Setup(); | |
1362 } | |
1155 | 1363 |
1156 virtual ~SequenceOfOperationsJob() | 1364 virtual ~SequenceOfOperationsJob() |
1157 { | 1365 { |
1158 for (size_t i = 0; i < operations_.size(); i++) | 1366 for (size_t i = 0; i < operations_.size(); i++) |
1159 { | 1367 { |
1162 delete operations_[i]; | 1370 delete operations_[i]; |
1163 } | 1371 } |
1164 } | 1372 } |
1165 } | 1373 } |
1166 | 1374 |
1375 void Register(IObserver& observer) | |
1376 { | |
1377 boost::mutex::scoped_lock lock(mutex_); | |
1378 observers_.push_back(&observer); | |
1379 } | |
1380 | |
1381 // This lock allows adding new operations to the end of the job, | |
1382 // from another thread than the worker thread, after the job has | |
1383 // been submitted for processing | |
1167 class Lock : public boost::noncopyable | 1384 class Lock : public boost::noncopyable |
1168 { | 1385 { |
1169 private: | 1386 private: |
1170 SequenceOfOperationsJob& that_; | 1387 SequenceOfOperationsJob& that_; |
1171 boost::mutex::scoped_lock lock_; | 1388 boost::mutex::scoped_lock lock_; |
1175 that_(that), | 1392 that_(that), |
1176 lock_(that.mutex_) | 1393 lock_(that.mutex_) |
1177 { | 1394 { |
1178 } | 1395 } |
1179 | 1396 |
1397 bool IsDone() const | |
1398 { | |
1399 return that_.done_; | |
1400 } | |
1401 | |
1402 void SetDicomConnectionTimeout(unsigned int timeout) | |
1403 { | |
1404 that_.connectionManager_.SetTimeout(timeout); | |
1405 } | |
1406 | |
1407 void SetTrailingOperationTimeout(unsigned int timeout) | |
1408 { | |
1409 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout); | |
1410 } | |
1411 | |
1180 size_t AddOperation(IJobOperation* operation) | 1412 size_t AddOperation(IJobOperation* operation) |
1181 { | 1413 { |
1414 if (IsDone()) | |
1415 { | |
1416 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1417 } | |
1418 | |
1182 that_.operations_.push_back(new Operation(operation)); | 1419 that_.operations_.push_back(new Operation(operation)); |
1183 that_.operationAdded_.notify_one(); | 1420 that_.operationAdded_.notify_one(); |
1184 | 1421 |
1185 return that_.operations_.size() - 1; | 1422 return that_.operations_.size() - 1; |
1186 } | 1423 } |
1187 | 1424 |
1425 size_t GetOperationsCount() const | |
1426 { | |
1427 return that_.operations_.size(); | |
1428 } | |
1429 | |
1188 void AddInput(size_t index, | 1430 void AddInput(size_t index, |
1189 const JobOperationValue& value) | 1431 const JobOperationValue& value) |
1190 { | 1432 { |
1191 if (index >= that_.operations_.size() || | 1433 if (IsDone()) |
1192 index < that_.currentOperation_) | 1434 { |
1435 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1436 } | |
1437 else if (index >= that_.operations_.size() || | |
1438 index < that_.current_) | |
1193 { | 1439 { |
1194 throw OrthancException(ErrorCode_ParameterOutOfRange); | 1440 throw OrthancException(ErrorCode_ParameterOutOfRange); |
1195 } | 1441 } |
1196 else | 1442 else |
1197 { | 1443 { |
1200 } | 1446 } |
1201 | 1447 |
1202 void Connect(size_t input, | 1448 void Connect(size_t input, |
1203 size_t output) | 1449 size_t output) |
1204 { | 1450 { |
1205 if (input >= output || | 1451 if (IsDone()) |
1206 input >= that_.operations_.size() || | 1452 { |
1207 output >= that_.operations_.size() || | 1453 throw OrthancException(ErrorCode_BadSequenceOfCalls); |
1208 input < that_.currentOperation_ || | 1454 } |
1209 output < that_.currentOperation_) | 1455 else if (input >= output || |
1456 input >= that_.operations_.size() || | |
1457 output >= that_.operations_.size() || | |
1458 input < that_.current_ || | |
1459 output < that_.current_) | |
1210 { | 1460 { |
1211 throw OrthancException(ErrorCode_ParameterOutOfRange); | 1461 throw OrthancException(ErrorCode_ParameterOutOfRange); |
1212 } | 1462 } |
1213 else | 1463 else |
1214 { | 1464 { |
1221 | 1471 |
1222 virtual void Start() | 1472 virtual void Start() |
1223 { | 1473 { |
1224 } | 1474 } |
1225 | 1475 |
1226 virtual JobStepResult ExecuteStep() = 0; | 1476 virtual JobStepResult ExecuteStep() |
1477 { | |
1478 boost::mutex::scoped_lock lock(mutex_); | |
1479 | |
1480 if (current_ == operations_.size()) | |
1481 { | |
1482 LOG(INFO) << "Executing the trailing timeout in the sequence of operations"; | |
1483 operationAdded_.timed_wait(lock, trailingTimeout_); | |
1484 | |
1485 if (current_ == operations_.size()) | |
1486 { | |
1487 // No operation was added during the trailing timeout: The | |
1488 // job is over | |
1489 LOG(INFO) << "The sequence of operations is over"; | |
1490 done_ = true; | |
1491 | |
1492 for (std::list<IObserver*>::iterator it = observers_.begin(); | |
1493 it != observers_.end(); ++it) | |
1494 { | |
1495 (*it)->SignalDone(*this); | |
1496 } | |
1497 | |
1498 return JobStepResult::Success(); | |
1499 } | |
1500 else | |
1501 { | |
1502 LOG(INFO) << "New operation added to the sequence of operations"; | |
1503 } | |
1504 } | |
1505 | |
1506 assert(current_ < operations_.size()); | |
1507 | |
1508 while (current_ < operations_.size() && | |
1509 operations_[current_]->IsDone()) | |
1510 { | |
1511 current_++; | |
1512 } | |
1513 | |
1514 if (current_ < operations_.size()) | |
1515 { | |
1516 operations_[current_]->Step(connectionManager_); | |
1517 } | |
1518 | |
1519 return JobStepResult::Continue(); | |
1520 } | |
1227 | 1521 |
1228 virtual void SignalResubmit() | 1522 virtual void SignalResubmit() |
1229 { | 1523 { |
1230 boost::mutex::scoped_lock lock(mutex_); | 1524 boost::mutex::scoped_lock lock(mutex_); |
1231 | 1525 |
1232 currentOperation_ = 0; | 1526 current_ = 0; |
1527 done_ = false; | |
1233 | 1528 |
1234 for (size_t i = 0; i < operations_.size(); i++) | 1529 for (size_t i = 0; i < operations_.size(); i++) |
1235 { | 1530 { |
1236 operations_[i]->Reset(); | 1531 operations_[i]->Reset(); |
1237 } | 1532 } |
1238 } | 1533 } |
1239 | 1534 |
1240 virtual void ReleaseResources() = 0; // For pausing/canceling jobs | 1535 virtual void ReleaseResources() |
1241 | 1536 { |
1242 virtual float GetProgress() = 0; | 1537 boost::mutex::scoped_lock lock(mutex_); |
1243 | 1538 connectionManager_.Close(); |
1244 virtual void GetJobType(std::string& target) = 0; | 1539 } |
1245 | 1540 |
1246 virtual void GetPublicContent(Json::Value& value) = 0; | 1541 virtual float GetProgress() |
1247 | 1542 { |
1248 virtual void GetInternalContent(Json::Value& value) = 0; | 1543 boost::mutex::scoped_lock lock(mutex_); |
1544 | |
1545 return (static_cast<float>(current_) / | |
1546 static_cast<float>(operations_.size() + 1)); | |
1547 } | |
1548 | |
1549 virtual void GetJobType(std::string& target) | |
1550 { | |
1551 target = jobType_; | |
1552 } | |
1553 | |
1554 virtual void GetPublicContent(Json::Value& value) | |
1555 { | |
1556 boost::mutex::scoped_lock lock(mutex_); | |
1557 | |
1558 value["CountOperations"] = static_cast<unsigned int>(operations_.size()); | |
1559 } | |
1560 | |
1561 virtual void GetInternalContent(Json::Value& value) | |
1562 { | |
1563 // TODO | |
1564 } | |
1249 }; | 1565 }; |
1250 } | 1566 |
1567 | |
1568 class LuaJobManager : private SequenceOfOperationsJob::IObserver | |
1569 { | |
1570 public: | |
1571 typedef SequenceOfOperationsJob::Lock Lock; | |
1572 | |
1573 private: | |
1574 boost::mutex mutex_; | |
1575 JobsEngine& engine_; | |
1576 TimeoutDicomConnectionManager manager_; | |
1577 std::string currentId_; | |
1578 SequenceOfOperationsJob* currentJob_; | |
1579 size_t maxOperations_; | |
1580 int priority_; | |
1581 unsigned int trailingTimeout_; | |
1582 | |
1583 virtual void SignalDone(const SequenceOfOperationsJob& job) | |
1584 { | |
1585 boost::mutex::scoped_lock lock(mutex_); | |
1586 | |
1587 if (&job == currentJob_) | |
1588 { | |
1589 currentId_.clear(); | |
1590 currentJob_ = NULL; | |
1591 } | |
1592 } | |
1593 | |
1594 public: | |
1595 LuaJobManager(JobsEngine& engine) : | |
1596 engine_(engine), | |
1597 currentJob_(NULL), | |
1598 maxOperations_(1000), | |
1599 priority_(0) | |
1600 { | |
1601 } | |
1602 | |
1603 void SetMaxOperationsPerJob(size_t count) | |
1604 { | |
1605 boost::mutex::scoped_lock lock(mutex_); | |
1606 maxOperations_ = count; | |
1607 } | |
1608 | |
1609 void SetPriority(int priority) | |
1610 { | |
1611 boost::mutex::scoped_lock lock(mutex_); | |
1612 priority_ = priority; | |
1613 } | |
1614 | |
1615 void SetTrailingOperationTimeout(unsigned int timeout) | |
1616 { | |
1617 boost::mutex::scoped_lock lock(mutex_); | |
1618 trailingTimeout_ = timeout; | |
1619 } | |
1620 | |
1621 Lock* Modify() | |
1622 { | |
1623 boost::mutex::scoped_lock lock(mutex_); | |
1624 | |
1625 if (currentJob_ != NULL) | |
1626 { | |
1627 std::auto_ptr<Lock> result(new Lock(*currentJob_)); | |
1628 | |
1629 if (!result->IsDone() && | |
1630 result->GetOperationsCount() < maxOperations_) | |
1631 { | |
1632 return result.release(); | |
1633 } | |
1634 } | |
1635 | |
1636 // Need to create a new job, as the previous one is either | |
1637 // finished, or is getting too long | |
1638 currentJob_ = new SequenceOfOperationsJob(manager_); | |
1639 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_); | |
1640 | |
1641 std::auto_ptr<Lock> result(new Lock(*currentJob_)); | |
1642 result->SetTrailingOperationTimeout(trailingTimeout_); | |
1643 | |
1644 return result.release(); | |
1645 } | |
1646 }; | |
1647 } | |
1648 | |
1649 | |
1650 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) | |
1651 { | |
1652 TimeoutDicomConnectionManager manager; | |
1653 JobsEngine engine; | |
1654 engine.SetWorkersCount(3); | |
1655 engine.Start(); | |
1656 | |
1657 std::string id; | |
1658 SequenceOfOperationsJob* job = NULL; | |
1659 | |
1660 { | |
1661 std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager)); | |
1662 job = a.get(); | |
1663 engine.GetRegistry().Submit(id, a.release(), 0); | |
1664 } | |
1665 | |
1666 boost::this_thread::sleep(boost::posix_time::milliseconds(500)); | |
1667 | |
1668 { | |
1669 SequenceOfOperationsJob::Lock lock(*job); | |
1670 size_t i = lock.AddOperation(new LogJobOperation); | |
1671 size_t j = lock.AddOperation(new LogJobOperation); | |
1672 size_t k = lock.AddOperation(new LogJobOperation); | |
1673 lock.AddInput(i, StringOperationValue("Hello")); | |
1674 lock.AddInput(i, StringOperationValue("World")); | |
1675 lock.Connect(i, j); | |
1676 lock.Connect(j, k); | |
1677 } | |
1678 | |
1679 boost::this_thread::sleep(boost::posix_time::milliseconds(2000)); | |
1680 | |
1681 engine.Stop(); | |
1682 | |
1683 } | |
1684 | |
1685 | |
1686 TEST(JobsEngine, Lua) | |
1687 { | |
1688 JobsEngine engine; | |
1689 engine.SetWorkersCount(2); | |
1690 engine.Start(); | |
1691 | |
1692 LuaJobManager lua(engine); | |
1693 lua.SetMaxOperationsPerJob(5); | |
1694 lua.SetTrailingOperationTimeout(200); | |
1695 | |
1696 for (size_t i = 0; i < 30; i++) | |
1697 { | |
1698 boost::this_thread::sleep(boost::posix_time::milliseconds(150)); | |
1699 std::auto_ptr<LuaJobManager::Lock> lock(lua.Modify()); | |
1700 size_t a = lock->AddOperation(new LogJobOperation); | |
1701 lock->AddInput(a, StringOperationValue(boost::lexical_cast<std::string>(i))); | |
1702 } | |
1703 | |
1704 boost::this_thread::sleep(boost::posix_time::milliseconds(2000)); | |
1705 | |
1706 engine.Stop(); | |
1707 | |
1708 } |