Mercurial > hg > orthanc
comparison UnitTestsSources/MultiThreadingTests.cpp @ 2599:593d6b0f4cba jobs
IJobOperation
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 16 May 2018 19:00:43 +0200 |
parents | 34dc57f4a7d2 |
children | 140a539b4eba |
comparison
equal
deleted
inserted
replaced
2598:34dc57f4a7d2 | 2599:593d6b0f4cba |
---|---|
270 { | 270 { |
271 } | 271 } |
272 | 272 |
273 virtual JobStepResult ExecuteStep() | 273 virtual JobStepResult ExecuteStep() |
274 { | 274 { |
275 boost::this_thread::sleep(boost::posix_time::milliseconds(10)); | |
276 | |
277 if (fails_) | 275 if (fails_) |
278 { | 276 { |
279 return JobStepResult::Failure(ErrorCode_ParameterOutOfRange); | 277 return JobStepResult::Failure(ErrorCode_ParameterOutOfRange); |
280 } | 278 } |
281 else if (count_ == steps_ - 1) | 279 else if (count_ == steps_ - 1) |
726 ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); | 724 ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); |
727 } | 725 } |
728 | 726 |
729 | 727 |
730 | 728 |
731 | 729 TEST(JobsEngine, SubmitAndWait) |
732 TEST(JobsEngine, Basic) | |
733 { | 730 { |
734 JobsEngine engine; | 731 JobsEngine engine; |
735 | |
736 std::string s; | |
737 | |
738 for (size_t i = 0; i < 20; i++) | |
739 engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10); | |
740 | |
741 engine.SetWorkersCount(3); | 732 engine.SetWorkersCount(3); |
742 engine.Start(); | 733 engine.Start(); |
743 | 734 |
744 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); | 735 ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10)); |
745 | 736 ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10)); |
746 { | 737 |
747 typedef std::set<std::string> Jobs; | |
748 | |
749 Jobs jobs; | |
750 engine.GetRegistry().ListJobs(jobs); | |
751 | |
752 Json::Value v = Json::arrayValue; | |
753 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) | |
754 { | |
755 JobInfo info; | |
756 | |
757 if (engine.GetRegistry().GetJobInfo(info, *it)) | |
758 { | |
759 Json::Value vv; | |
760 info.Serialize(vv, true); | |
761 v.append(vv); | |
762 } | |
763 } | |
764 | |
765 std::cout << v << std::endl; | |
766 } | |
767 std::cout << "====================================================" << std::endl; | |
768 | |
769 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); | |
770 | |
771 if (1) | |
772 { | |
773 ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10)); | |
774 ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10)); | |
775 } | |
776 | |
777 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); | |
778 | |
779 | |
780 engine.Stop(); | 738 engine.Stop(); |
781 | 739 } |
782 if (0) | 740 |
783 { | 741 |
784 typedef std::set<std::string> Jobs; | 742 |
785 | 743 |
786 Jobs jobs; | 744 |
787 engine.GetRegistry().ListJobs(jobs); | 745 #include "../OrthancServer/ServerContext.h" |
788 | 746 #include "../Core/Logging.h" |
789 Json::Value v = Json::arrayValue; | 747 |
790 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) | 748 namespace |
791 { | 749 { |
792 JobInfo info; | 750 class JobOperationValue : public boost::noncopyable |
793 | 751 { |
794 if (engine.GetRegistry().GetJobInfo(info, *it)) | 752 public: |
795 { | 753 enum Type |
796 Json::Value vv; | 754 { |
797 info.Serialize(vv, true); | 755 Type_DicomInstance, |
798 v.append(vv); | 756 Type_Null |
799 } | 757 }; |
800 } | 758 |
801 | 759 private: |
802 std::cout << v << std::endl; | 760 Type type_; |
803 } | 761 |
804 } | 762 protected: |
763 JobOperationValue(Type type) : | |
764 type_(type) | |
765 { | |
766 } | |
767 | |
768 public: | |
769 virtual ~JobOperationValue() | |
770 { | |
771 } | |
772 | |
773 Type GetType() const | |
774 { | |
775 return type_; | |
776 } | |
777 | |
778 virtual JobOperationValue* Clone() const = 0; | |
779 }; | |
780 | |
781 | |
782 class IDicomConnectionProvider : public boost::noncopyable | |
783 { | |
784 public: | |
785 virtual ~IDicomConnectionProvider() | |
786 { | |
787 } | |
788 | |
789 class IResource : public boost::noncopyable | |
790 { | |
791 public: | |
792 virtual ~IResource() | |
793 { | |
794 } | |
795 | |
796 virtual DicomUserConnection& GetConnection() = 0; | |
797 }; | |
798 | |
799 virtual IResource* Acquire(const std::string& localAet, | |
800 const RemoteModalityParameters& remote) = 0; | |
801 }; | |
802 | |
803 | |
804 class JobOperationValues : public boost::noncopyable | |
805 { | |
806 private: | |
807 std::vector<JobOperationValue*> values_; | |
808 | |
809 public: | |
810 ~JobOperationValues() | |
811 { | |
812 Clear(); | |
813 } | |
814 | |
815 void Append(JobOperationValues& target, | |
816 bool clear) | |
817 { | |
818 target.Reserve(target.GetSize() + GetSize()); | |
819 | |
820 for (size_t i = 0; i < values_.size(); i++) | |
821 { | |
822 if (clear) | |
823 { | |
824 target.Append(values_[i]); | |
825 values_[i] = NULL; | |
826 } | |
827 else | |
828 { | |
829 target.Append(GetValue(i).Clone()); | |
830 } | |
831 } | |
832 | |
833 if (clear) | |
834 { | |
835 Clear(); | |
836 } | |
837 } | |
838 | |
839 void Clear() | |
840 { | |
841 for (size_t i = 0; i < values_.size(); i++) | |
842 { | |
843 if (values_[i] != NULL) | |
844 { | |
845 delete values_[i]; | |
846 } | |
847 } | |
848 | |
849 values_.clear(); | |
850 } | |
851 | |
852 void Reserve(size_t count) | |
853 { | |
854 values_.reserve(count); | |
855 } | |
856 | |
857 void Append(JobOperationValue* value) // Takes ownership | |
858 { | |
859 if (value == NULL) | |
860 { | |
861 throw OrthancException(ErrorCode_NullPointer); | |
862 } | |
863 else | |
864 { | |
865 values_.push_back(value); | |
866 } | |
867 } | |
868 | |
869 size_t GetSize() const | |
870 { | |
871 return values_.size(); | |
872 } | |
873 | |
874 JobOperationValue& GetValue(size_t index) const | |
875 { | |
876 if (index >= values_.size()) | |
877 { | |
878 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
879 } | |
880 else | |
881 { | |
882 assert(values_[index] != NULL); | |
883 return *values_[index]; | |
884 } | |
885 } | |
886 }; | |
887 | |
888 | |
889 | |
890 class IJobOperation : public boost::noncopyable | |
891 { | |
892 public: | |
893 virtual ~IJobOperation() | |
894 { | |
895 } | |
896 | |
897 virtual void Apply(JobOperationValues& outputs, | |
898 const JobOperationValue& input, | |
899 IDicomConnectionProvider& provider); | |
900 }; | |
901 | |
902 | |
903 class DicomInstanceValue : public JobOperationValue | |
904 { | |
905 private: | |
906 ServerContext& context_; | |
907 std::string id_; | |
908 | |
909 public: | |
910 DicomInstanceValue(ServerContext& context, | |
911 const std::string& id) : | |
912 JobOperationValue(Type_DicomInstance), | |
913 context_(context), | |
914 id_(id) | |
915 { | |
916 } | |
917 | |
918 const std::string& GetId() const | |
919 { | |
920 return id_; | |
921 } | |
922 | |
923 void ReadContent(std::string& dicom) const | |
924 { | |
925 context_.ReadDicom(dicom, id_); | |
926 } | |
927 | |
928 virtual JobOperationValue* Clone() const | |
929 { | |
930 return new DicomInstanceValue(context_, id_); | |
931 } | |
932 }; | |
933 | |
934 | |
935 class StoreScuOperation : public IJobOperation | |
936 { | |
937 private: | |
938 std::string localAet_; | |
939 RemoteModalityParameters modality_; | |
940 | |
941 public: | |
942 StoreScuOperation(const std::string& localAet, | |
943 const RemoteModalityParameters& modality) : | |
944 localAet_(localAet), | |
945 modality_(modality) | |
946 { | |
947 } | |
948 | |
949 virtual void Apply(JobOperationValues& outputs, | |
950 const JobOperationValue& input, | |
951 IDicomConnectionProvider& provider) | |
952 { | |
953 std::auto_ptr<IDicomConnectionProvider::IResource> resource(provider.Acquire(localAet_, modality_)); | |
954 | |
955 if (resource.get() == NULL) | |
956 { | |
957 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); | |
958 return; | |
959 } | |
960 | |
961 if (input.GetType() != JobOperationValue::Type_DicomInstance) | |
962 { | |
963 throw OrthancException(ErrorCode_BadParameterType); | |
964 } | |
965 | |
966 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); | |
967 | |
968 LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \"" | |
969 << modality_.GetApplicationEntityTitle() << "\""; | |
970 | |
971 try | |
972 { | |
973 std::string dicom; | |
974 instance.ReadContent(dicom); | |
975 resource->GetConnection().Store(dicom); | |
976 outputs.Append(instance.Clone()); | |
977 } | |
978 catch (OrthancException& e) | |
979 { | |
980 LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \"" | |
981 << modality_.GetApplicationEntityTitle() << "\": " << e.What(); | |
982 } | |
983 } | |
984 }; | |
985 | |
986 | |
987 class DeleteResourceOperation : public IJobOperation | |
988 { | |
989 private: | |
990 ServerContext& context_; | |
991 | |
992 public: | |
993 DeleteResourceOperation(ServerContext& context) : | |
994 context_(context) | |
995 { | |
996 } | |
997 | |
998 virtual void Apply(JobOperationValues& outputs, | |
999 const JobOperationValue& input, | |
1000 IDicomConnectionProvider& provider) | |
1001 { | |
1002 switch (input.GetType()) | |
1003 { | |
1004 case JobOperationValue::Type_DicomInstance: | |
1005 { | |
1006 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input); | |
1007 LOG(INFO) << "Deleting instance: " << instance.GetId(); | |
1008 | |
1009 try | |
1010 { | |
1011 Json::Value tmp; | |
1012 context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance); | |
1013 } | |
1014 catch (OrthancException& e) | |
1015 { | |
1016 LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What(); | |
1017 } | |
1018 | |
1019 break; | |
1020 } | |
1021 | |
1022 default: | |
1023 break; | |
1024 } | |
1025 } | |
1026 }; | |
1027 | |
1028 | |
1029 class SequenceOfOperationsJob : | |
1030 public IJob, | |
1031 private IDicomConnectionProvider | |
1032 { | |
1033 private: | |
1034 /*class DicomConnection | |
1035 { | |
1036 private: | |
1037 boost::posix_time::ptime lastUse_; | |
1038 | |
1039 void Touch() | |
1040 { | |
1041 lastUse_ = boost::posix_time::microsec_clock::universal_time(); | |
1042 } | |
1043 | |
1044 public: | |
1045 class Resource : public IDicomConnectionProvider::IResource | |
1046 { | |
1047 private: | |
1048 DicomConnection() | |
1049 }; | |
1050 };*/ | |
1051 | |
1052 class Operation : public boost::noncopyable | |
1053 { | |
1054 private: | |
1055 JobOperationValues originalInputs_; | |
1056 JobOperationValues workInputs_; | |
1057 std::auto_ptr<IJobOperation> operation_; | |
1058 std::list<Operation*> nextOperations_; | |
1059 size_t currentInput_; | |
1060 | |
1061 public: | |
1062 Operation(IJobOperation* operation) : | |
1063 operation_(operation), | |
1064 currentInput_(0) | |
1065 { | |
1066 if (operation == NULL) | |
1067 { | |
1068 throw OrthancException(ErrorCode_NullPointer); | |
1069 } | |
1070 } | |
1071 | |
1072 void AddOriginalInput(const JobOperationValue& value) | |
1073 { | |
1074 if (currentInput_ != 0) | |
1075 { | |
1076 // Cannot add input after processing has started | |
1077 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1078 } | |
1079 else | |
1080 { | |
1081 originalInputs_.Append(value.Clone()); | |
1082 } | |
1083 } | |
1084 | |
1085 const JobOperationValues& GetOriginalInputs() const | |
1086 { | |
1087 return originalInputs_; | |
1088 } | |
1089 | |
1090 void Reset() | |
1091 { | |
1092 workInputs_.Clear(); | |
1093 currentInput_ = 0; | |
1094 } | |
1095 | |
1096 void AddNextOperation(Operation& other) | |
1097 { | |
1098 if (currentInput_ != 0) | |
1099 { | |
1100 // Cannot add input after processing has started | |
1101 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1102 } | |
1103 else | |
1104 { | |
1105 nextOperations_.push_back(&other); | |
1106 } | |
1107 } | |
1108 | |
1109 bool IsDone() const | |
1110 { | |
1111 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); | |
1112 } | |
1113 | |
1114 void Step(IDicomConnectionProvider& provider) | |
1115 { | |
1116 if (IsDone()) | |
1117 { | |
1118 throw OrthancException(ErrorCode_BadSequenceOfCalls); | |
1119 } | |
1120 | |
1121 const JobOperationValue* input; | |
1122 | |
1123 if (currentInput_ < originalInputs_.GetSize()) | |
1124 { | |
1125 input = &originalInputs_.GetValue(currentInput_); | |
1126 } | |
1127 else | |
1128 { | |
1129 input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); | |
1130 } | |
1131 | |
1132 JobOperationValues outputs; | |
1133 operation_->Apply(outputs, *input, provider); | |
1134 | |
1135 if (!nextOperations_.empty()) | |
1136 { | |
1137 // TODO | |
1138 } | |
1139 | |
1140 currentInput_ += 1; | |
1141 } | |
1142 }; | |
1143 | |
1144 | |
1145 boost::mutex mutex_; | |
1146 std::vector<Operation*> operations_; | |
1147 size_t currentOperation_; | |
1148 boost::condition_variable operationAdded_; | |
1149 | |
1150 public: | |
1151 SequenceOfOperationsJob() : | |
1152 currentOperation_(0) | |
1153 { | |
1154 } | |
1155 | |
1156 virtual ~SequenceOfOperationsJob() | |
1157 { | |
1158 for (size_t i = 0; i < operations_.size(); i++) | |
1159 { | |
1160 if (operations_[i] != NULL) | |
1161 { | |
1162 delete operations_[i]; | |
1163 } | |
1164 } | |
1165 } | |
1166 | |
1167 class Lock : public boost::noncopyable | |
1168 { | |
1169 private: | |
1170 SequenceOfOperationsJob& that_; | |
1171 boost::mutex::scoped_lock lock_; | |
1172 | |
1173 public: | |
1174 Lock(SequenceOfOperationsJob& that) : | |
1175 that_(that), | |
1176 lock_(that.mutex_) | |
1177 { | |
1178 } | |
1179 | |
1180 size_t AddOperation(IJobOperation* operation) | |
1181 { | |
1182 that_.operations_.push_back(new Operation(operation)); | |
1183 that_.operationAdded_.notify_one(); | |
1184 | |
1185 return that_.operations_.size() - 1; | |
1186 } | |
1187 | |
1188 void AddInput(size_t index, | |
1189 const JobOperationValue& value) | |
1190 { | |
1191 if (index >= that_.operations_.size() || | |
1192 index < that_.currentOperation_) | |
1193 { | |
1194 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
1195 } | |
1196 else | |
1197 { | |
1198 that_.operations_[index]->AddOriginalInput(value); | |
1199 } | |
1200 } | |
1201 | |
1202 void Connect(size_t input, | |
1203 size_t output) | |
1204 { | |
1205 if (input >= output || | |
1206 input >= that_.operations_.size() || | |
1207 output >= that_.operations_.size() || | |
1208 input < that_.currentOperation_ || | |
1209 output < that_.currentOperation_) | |
1210 { | |
1211 throw OrthancException(ErrorCode_ParameterOutOfRange); | |
1212 } | |
1213 else | |
1214 { | |
1215 Operation& a = *that_.operations_[input]; | |
1216 Operation& b = *that_.operations_[output]; | |
1217 a.AddNextOperation(b); | |
1218 } | |
1219 } | |
1220 }; | |
1221 | |
1222 virtual void Start() | |
1223 { | |
1224 } | |
1225 | |
1226 virtual JobStepResult ExecuteStep() = 0; | |
1227 | |
1228 virtual void SignalResubmit() | |
1229 { | |
1230 boost::mutex::scoped_lock lock(mutex_); | |
1231 | |
1232 currentOperation_ = 0; | |
1233 | |
1234 for (size_t i = 0; i < operations_.size(); i++) | |
1235 { | |
1236 operations_[i]->Reset(); | |
1237 } | |
1238 } | |
1239 | |
1240 virtual void ReleaseResources() = 0; // For pausing/canceling jobs | |
1241 | |
1242 virtual float GetProgress() = 0; | |
1243 | |
1244 virtual void GetJobType(std::string& target) = 0; | |
1245 | |
1246 virtual void GetPublicContent(Json::Value& value) = 0; | |
1247 | |
1248 virtual void GetInternalContent(Json::Value& value) = 0; | |
1249 }; | |
1250 } |