comparison UnitTestsSources/MultiThreadingTests.cpp @ 2603:988936118354 jobs

reorganization
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 18 May 2018 17:02:25 +0200
parents 5b6c3d77a2a1
children 76ef12fa136c
comparison
equal deleted inserted replaced
2602:c25f1a52acbc 2603:988936118354
740 740
741 741
742 742
743 743
744 744
745 #include "../OrthancServer/ServerContext.h" 745 #include "../OrthancServer/ServerJobs/LuaJobManager.h"
746 #include "../Core/Logging.h"
747
748 #include "../Core/DicomNetworking/IDicomConnectionManager.h"
749 #include "../Core/JobsEngine/Operations/JobOperationValues.h"
750 #include "../Core/JobsEngine/Operations/StringOperationValue.h" 746 #include "../Core/JobsEngine/Operations/StringOperationValue.h"
751 #include "../Core/JobsEngine/Operations/LogJobOperation.h" 747 #include "../Core/JobsEngine/Operations/LogJobOperation.h"
752
753 namespace Orthanc
754 {
755 class DicomInstanceValue : public JobOperationValue
756 {
757 private:
758 ServerContext& context_;
759 std::string id_;
760
761 public:
762 DicomInstanceValue(ServerContext& context,
763 const std::string& id) :
764 JobOperationValue(Type_DicomInstance),
765 context_(context),
766 id_(id)
767 {
768 }
769
770 const std::string& GetId() const
771 {
772 return id_;
773 }
774
775 void ReadContent(std::string& dicom) const
776 {
777 context_.ReadDicom(dicom, id_);
778 }
779
780 virtual JobOperationValue* Clone() const
781 {
782 return new DicomInstanceValue(context_, id_);
783 }
784 };
785
786
787 class StoreScuOperation : public IJobOperation
788 {
789 private:
790 std::string localAet_;
791 RemoteModalityParameters modality_;
792
793 public:
794 StoreScuOperation(const std::string& localAet,
795 const RemoteModalityParameters& modality) :
796 localAet_(localAet),
797 modality_(modality)
798 {
799 }
800
801 virtual void Apply(JobOperationValues& outputs,
802 const JobOperationValue& input,
803 IDicomConnectionManager& manager)
804 {
805 std::auto_ptr<IDicomConnectionManager::IResource> resource
806 (manager.AcquireConnection(localAet_, modality_));
807
808 if (resource.get() == NULL)
809 {
810 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle();
811 return;
812 }
813
814 if (input.GetType() != JobOperationValue::Type_DicomInstance)
815 {
816 throw OrthancException(ErrorCode_BadParameterType);
817 }
818
819 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
820
821 LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \""
822 << modality_.GetApplicationEntityTitle() << "\"";
823
824 try
825 {
826 std::string dicom;
827 instance.ReadContent(dicom);
828 resource->GetConnection().Store(dicom);
829 outputs.Append(instance.Clone());
830 }
831 catch (OrthancException& e)
832 {
833 LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \""
834 << modality_.GetApplicationEntityTitle() << "\": " << e.What();
835 }
836 }
837 };
838
839
840 class DeleteResourceOperation : public IJobOperation
841 {
842 private:
843 ServerContext& context_;
844
845 public:
846 DeleteResourceOperation(ServerContext& context) :
847 context_(context)
848 {
849 }
850
851 virtual void Apply(JobOperationValues& outputs,
852 const JobOperationValue& input,
853 IDicomConnectionManager& manager)
854 {
855 switch (input.GetType())
856 {
857 case JobOperationValue::Type_DicomInstance:
858 {
859 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
860 LOG(INFO) << "Deleting instance: " << instance.GetId();
861
862 try
863 {
864 Json::Value tmp;
865 context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance);
866 }
867 catch (OrthancException& e)
868 {
869 LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What();
870 }
871
872 break;
873 }
874
875 default:
876 break;
877 }
878 }
879 };
880
881
882
883 class TimeoutDicomConnectionManager : public IDicomConnectionManager
884 {
885 private:
886 class Resource : public IDicomConnectionManager::IResource
887 {
888 private:
889 TimeoutDicomConnectionManager& that_;
890 boost::mutex::scoped_lock lock_;
891
892 public:
893 Resource(TimeoutDicomConnectionManager& that) :
894 that_(that),
895 lock_(that.mutex_)
896 {
897 }
898
899 virtual ~Resource()
900 {
901 that_.Touch();
902 }
903
904 virtual DicomUserConnection& GetConnection()
905 {
906 if (that_.connection_.get() == NULL)
907 {
908 throw OrthancException(ErrorCode_InternalError);
909 }
910
911 return *that_.connection_;
912 }
913 };
914
915 boost::mutex mutex_;
916 std::auto_ptr<DicomUserConnection> connection_;
917 boost::posix_time::ptime lastUse_;
918 boost::posix_time::time_duration timeout_;
919
920 static boost::posix_time::ptime GetNow()
921 {
922 return boost::posix_time::microsec_clock::universal_time();
923 }
924
925 void Touch()
926 {
927 lastUse_ = GetNow();
928 }
929
930 void CheckTimeoutInternal()
931 {
932 if (connection_.get() != NULL &&
933 (GetNow() - lastUse_) >= timeout_)
934 {
935 connection_.reset(NULL);
936 }
937 }
938
939 public:
940 TimeoutDicomConnectionManager() :
941 timeout_(boost::posix_time::milliseconds(1000))
942 {
943 }
944
945 void SetTimeout(unsigned int timeout)
946 {
947 boost::mutex::scoped_lock lock(mutex_);
948
949 timeout_ = boost::posix_time::milliseconds(timeout);
950 CheckTimeoutInternal();
951 }
952
953 unsigned int GetTimeout()
954 {
955 boost::mutex::scoped_lock lock(mutex_);
956 return timeout_.total_milliseconds();
957 }
958
959 void Close()
960 {
961 boost::mutex::scoped_lock lock(mutex_);
962 connection_.reset(NULL);
963 }
964
965 void CheckTimeout()
966 {
967 boost::mutex::scoped_lock lock(mutex_);
968 CheckTimeoutInternal();
969 }
970
971 virtual IResource* AcquireConnection(const std::string& localAet,
972 const RemoteModalityParameters& remote)
973 {
974 boost::mutex::scoped_lock lock(mutex_);
975
976 if (connection_.get() == NULL ||
977 !connection_->IsSameAssociation(localAet, remote))
978 {
979 connection_.reset(new DicomUserConnection(localAet, remote));
980 }
981
982 return new Resource(*this);
983 }
984 };
985
986
987
988 class SequenceOfOperationsJob : public IJob
989 {
990 public:
991 class IObserver : public boost::noncopyable
992 {
993 public:
994 virtual ~IObserver()
995 {
996 }
997
998 virtual void SignalDone(const SequenceOfOperationsJob& job) = 0;
999 };
1000
1001 private:
1002 class Operation : public boost::noncopyable
1003 {
1004 private:
1005 JobOperationValues originalInputs_;
1006 JobOperationValues workInputs_;
1007 std::auto_ptr<IJobOperation> operation_;
1008 std::list<Operation*> nextOperations_;
1009 size_t currentInput_;
1010
1011 public:
1012 Operation(IJobOperation* operation) :
1013 operation_(operation),
1014 currentInput_(0)
1015 {
1016 if (operation == NULL)
1017 {
1018 throw OrthancException(ErrorCode_NullPointer);
1019 }
1020 }
1021
1022 void AddOriginalInput(const JobOperationValue& value)
1023 {
1024 if (currentInput_ != 0)
1025 {
1026 // Cannot add input after processing has started
1027 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1028 }
1029 else
1030 {
1031 originalInputs_.Append(value.Clone());
1032 }
1033 }
1034
1035 const JobOperationValues& GetOriginalInputs() const
1036 {
1037 return originalInputs_;
1038 }
1039
1040 void Reset()
1041 {
1042 workInputs_.Clear();
1043 currentInput_ = 0;
1044 }
1045
1046 void AddNextOperation(Operation& other)
1047 {
1048 if (currentInput_ != 0)
1049 {
1050 // Cannot add input after processing has started
1051 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1052 }
1053 else
1054 {
1055 nextOperations_.push_back(&other);
1056 }
1057 }
1058
1059 bool IsDone() const
1060 {
1061 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
1062 }
1063
1064 void Step()
1065 {
1066 if (IsDone())
1067 {
1068 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1069 }
1070
1071 const JobOperationValue* input;
1072
1073 if (currentInput_ < originalInputs_.GetSize())
1074 {
1075 input = &originalInputs_.GetValue(currentInput_);
1076 }
1077 else
1078 {
1079 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
1080 }
1081
1082 JobOperationValues outputs;
1083 operation_->Apply(outputs, *input);
1084
1085 if (!nextOperations_.empty())
1086 {
1087 std::list<Operation*>::iterator first = nextOperations_.begin();
1088 outputs.Move((*first)->workInputs_);
1089
1090 std::list<Operation*>::iterator current = first;
1091 ++current;
1092
1093 while (current != nextOperations_.end())
1094 {
1095 (*first)->workInputs_.Copy((*current)->workInputs_);
1096 ++current;
1097 }
1098 }
1099
1100 currentInput_ += 1;
1101 }
1102 };
1103
1104
1105 std::string jobType_;
1106 bool done_;
1107 boost::mutex mutex_;
1108 std::vector<Operation*> operations_;
1109 size_t current_;
1110 boost::condition_variable operationAdded_;
1111 boost::posix_time::time_duration trailingTimeout_;
1112 std::list<IObserver*> observers_;
1113
1114 // Invoked from constructors
1115 void Setup()
1116 {
1117 done_ = false;
1118 current_ = 0;
1119 trailingTimeout_ = boost::posix_time::milliseconds(1000);
1120 }
1121
1122 public:
1123 SequenceOfOperationsJob() :
1124 jobType_("SequenceOfOperations")
1125 {
1126 Setup();
1127 }
1128
1129 SequenceOfOperationsJob(const std::string& jobType) :
1130 jobType_(jobType)
1131 {
1132 Setup();
1133 }
1134
1135 virtual ~SequenceOfOperationsJob()
1136 {
1137 for (size_t i = 0; i < operations_.size(); i++)
1138 {
1139 if (operations_[i] != NULL)
1140 {
1141 delete operations_[i];
1142 }
1143 }
1144 }
1145
1146 void Register(IObserver& observer)
1147 {
1148 boost::mutex::scoped_lock lock(mutex_);
1149 observers_.push_back(&observer);
1150 }
1151
1152 // This lock allows adding new operations to the end of the job,
1153 // from another thread than the worker thread, after the job has
1154 // been submitted for processing
1155 class Lock : public boost::noncopyable
1156 {
1157 private:
1158 SequenceOfOperationsJob& that_;
1159 boost::mutex::scoped_lock lock_;
1160
1161 public:
1162 Lock(SequenceOfOperationsJob& that) :
1163 that_(that),
1164 lock_(that.mutex_)
1165 {
1166 }
1167
1168 bool IsDone() const
1169 {
1170 return that_.done_;
1171 }
1172
1173 void SetTrailingOperationTimeout(unsigned int timeout)
1174 {
1175 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
1176 }
1177
1178 size_t AddOperation(IJobOperation* operation)
1179 {
1180 if (IsDone())
1181 {
1182 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1183 }
1184
1185 that_.operations_.push_back(new Operation(operation));
1186 that_.operationAdded_.notify_one();
1187
1188 return that_.operations_.size() - 1;
1189 }
1190
1191 size_t GetOperationsCount() const
1192 {
1193 return that_.operations_.size();
1194 }
1195
1196 void AddInput(size_t index,
1197 const JobOperationValue& value)
1198 {
1199 if (IsDone())
1200 {
1201 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1202 }
1203 else if (index >= that_.operations_.size() ||
1204 index < that_.current_)
1205 {
1206 throw OrthancException(ErrorCode_ParameterOutOfRange);
1207 }
1208 else
1209 {
1210 that_.operations_[index]->AddOriginalInput(value);
1211 }
1212 }
1213
1214 void Connect(size_t input,
1215 size_t output)
1216 {
1217 if (IsDone())
1218 {
1219 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1220 }
1221 else if (input >= output ||
1222 input >= that_.operations_.size() ||
1223 output >= that_.operations_.size() ||
1224 input < that_.current_ ||
1225 output < that_.current_)
1226 {
1227 throw OrthancException(ErrorCode_ParameterOutOfRange);
1228 }
1229 else
1230 {
1231 Operation& a = *that_.operations_[input];
1232 Operation& b = *that_.operations_[output];
1233 a.AddNextOperation(b);
1234 }
1235 }
1236 };
1237
1238 virtual void Start()
1239 {
1240 }
1241
1242 virtual JobStepResult ExecuteStep()
1243 {
1244 boost::mutex::scoped_lock lock(mutex_);
1245
1246 if (current_ == operations_.size())
1247 {
1248 LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
1249 operationAdded_.timed_wait(lock, trailingTimeout_);
1250
1251 if (current_ == operations_.size())
1252 {
1253 // No operation was added during the trailing timeout: The
1254 // job is over
1255 LOG(INFO) << "The sequence of operations is over";
1256 done_ = true;
1257
1258 for (std::list<IObserver*>::iterator it = observers_.begin();
1259 it != observers_.end(); ++it)
1260 {
1261 (*it)->SignalDone(*this);
1262 }
1263
1264 return JobStepResult::Success();
1265 }
1266 else
1267 {
1268 LOG(INFO) << "New operation added to the sequence of operations";
1269 }
1270 }
1271
1272 assert(current_ < operations_.size());
1273
1274 while (current_ < operations_.size() &&
1275 operations_[current_]->IsDone())
1276 {
1277 current_++;
1278 }
1279
1280 if (current_ < operations_.size())
1281 {
1282 operations_[current_]->Step();
1283 }
1284
1285 return JobStepResult::Continue();
1286 }
1287
1288 virtual void SignalResubmit()
1289 {
1290 boost::mutex::scoped_lock lock(mutex_);
1291
1292 current_ = 0;
1293 done_ = false;
1294
1295 for (size_t i = 0; i < operations_.size(); i++)
1296 {
1297 operations_[i]->Reset();
1298 }
1299 }
1300
1301 virtual void ReleaseResources()
1302 {
1303 }
1304
1305 virtual float GetProgress()
1306 {
1307 boost::mutex::scoped_lock lock(mutex_);
1308
1309 return (static_cast<float>(current_) /
1310 static_cast<float>(operations_.size() + 1));
1311 }
1312
1313 virtual void GetJobType(std::string& target)
1314 {
1315 target = jobType_;
1316 }
1317
1318 virtual void GetPublicContent(Json::Value& value)
1319 {
1320 boost::mutex::scoped_lock lock(mutex_);
1321
1322 value["CountOperations"] = static_cast<unsigned int>(operations_.size());
1323 }
1324
1325 virtual void GetInternalContent(Json::Value& value)
1326 {
1327 // TODO
1328 }
1329 };
1330
1331
1332 class LuaJobManager : private SequenceOfOperationsJob::IObserver
1333 {
1334 public:
1335 typedef SequenceOfOperationsJob::Lock Lock;
1336
1337 private:
1338 boost::mutex mutex_;
1339 JobsEngine& engine_;
1340 TimeoutDicomConnectionManager connectionManager_;
1341 std::string currentId_;
1342 SequenceOfOperationsJob* currentJob_;
1343 size_t maxOperations_;
1344 int priority_;
1345 unsigned int trailingTimeout_;
1346 bool continue_;
1347 boost::thread connectionTimeoutThread_;
1348
1349 static void ConnectionTimeoutThread(LuaJobManager* manager)
1350 {
1351 while (manager->continue_)
1352 {
1353 manager->connectionManager_.CheckTimeout();
1354 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
1355 }
1356 }
1357
1358 virtual void SignalDone(const SequenceOfOperationsJob& job)
1359 {
1360 boost::mutex::scoped_lock lock(mutex_);
1361
1362 if (&job == currentJob_)
1363 {
1364 currentId_.clear();
1365 currentJob_ = NULL;
1366 }
1367 }
1368
1369 public:
1370 LuaJobManager(JobsEngine& engine) :
1371 engine_(engine),
1372 currentJob_(NULL),
1373 maxOperations_(1000),
1374 priority_(0),
1375 continue_(true)
1376 {
1377 connectionTimeoutThread_ = boost::thread(ConnectionTimeoutThread, this);
1378 }
1379
1380 ~LuaJobManager()
1381 {
1382 continue_ = false;
1383
1384 if (connectionTimeoutThread_.joinable())
1385 {
1386 connectionTimeoutThread_.join();
1387 }
1388 }
1389
1390 void SetMaxOperationsPerJob(size_t count)
1391 {
1392 boost::mutex::scoped_lock lock(mutex_);
1393 maxOperations_ = count;
1394 }
1395
1396 void SetPriority(int priority)
1397 {
1398 boost::mutex::scoped_lock lock(mutex_);
1399 priority_ = priority;
1400 }
1401
1402 void SetTrailingOperationTimeout(unsigned int timeout)
1403 {
1404 boost::mutex::scoped_lock lock(mutex_);
1405 trailingTimeout_ = timeout;
1406 }
1407
1408 Lock* Modify()
1409 {
1410 boost::mutex::scoped_lock lock(mutex_);
1411
1412 if (currentJob_ != NULL)
1413 {
1414 std::auto_ptr<Lock> result(new Lock(*currentJob_));
1415
1416 if (!result->IsDone() &&
1417 result->GetOperationsCount() < maxOperations_)
1418 {
1419 return result.release();
1420 }
1421 }
1422
1423 // Need to create a new job, as the previous one is either
1424 // finished, or is getting too long
1425 currentJob_ = new SequenceOfOperationsJob;
1426 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
1427
1428 std::auto_ptr<Lock> result(new Lock(*currentJob_));
1429 result->SetTrailingOperationTimeout(trailingTimeout_);
1430
1431 return result.release();
1432 }
1433 };
1434 }
1435 748
1436 749
1437 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob) 750 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
1438 { 751 {
1439 JobsEngine engine; 752 JobsEngine engine;