comparison UnitTestsSources/MultiThreadingTests.cpp @ 2599:593d6b0f4cba jobs

IJobOperation
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 16 May 2018 19:00:43 +0200
parents 34dc57f4a7d2
children 140a539b4eba
comparison
equal deleted inserted replaced
2598:34dc57f4a7d2 2599:593d6b0f4cba
270 { 270 {
271 } 271 }
272 272
273 virtual JobStepResult ExecuteStep() 273 virtual JobStepResult ExecuteStep()
274 { 274 {
275 boost::this_thread::sleep(boost::posix_time::milliseconds(10));
276
277 if (fails_) 275 if (fails_)
278 { 276 {
279 return JobStepResult::Failure(ErrorCode_ParameterOutOfRange); 277 return JobStepResult::Failure(ErrorCode_ParameterOutOfRange);
280 } 278 }
281 else if (count_ == steps_ - 1) 279 else if (count_ == steps_ - 1)
726 ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob)); 724 ASSERT_TRUE(CheckErrorCode(registry, id, ErrorCode_CanceledJob));
727 } 725 }
728 726
729 727
730 728
731 729 TEST(JobsEngine, SubmitAndWait)
732 TEST(JobsEngine, Basic)
733 { 730 {
734 JobsEngine engine; 731 JobsEngine engine;
735
736 std::string s;
737
738 for (size_t i = 0; i < 20; i++)
739 engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10);
740
741 engine.SetWorkersCount(3); 732 engine.SetWorkersCount(3);
742 engine.Start(); 733 engine.Start();
743 734
744 boost::this_thread::sleep(boost::posix_time::milliseconds(100)); 735 ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10));
745 736 ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10));
746 { 737
747 typedef std::set<std::string> Jobs;
748
749 Jobs jobs;
750 engine.GetRegistry().ListJobs(jobs);
751
752 Json::Value v = Json::arrayValue;
753 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
754 {
755 JobInfo info;
756
757 if (engine.GetRegistry().GetJobInfo(info, *it))
758 {
759 Json::Value vv;
760 info.Serialize(vv, true);
761 v.append(vv);
762 }
763 }
764
765 std::cout << v << std::endl;
766 }
767 std::cout << "====================================================" << std::endl;
768
769 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
770
771 if (1)
772 {
773 ASSERT_TRUE(engine.GetRegistry().SubmitAndWait(new DummyJob(), rand() % 10));
774 ASSERT_FALSE(engine.GetRegistry().SubmitAndWait(new DummyJob(true), rand() % 10));
775 }
776
777 boost::this_thread::sleep(boost::posix_time::milliseconds(100));
778
779
780 engine.Stop(); 738 engine.Stop();
781 739 }
782 if (0) 740
783 { 741
784 typedef std::set<std::string> Jobs; 742
785 743
786 Jobs jobs; 744
787 engine.GetRegistry().ListJobs(jobs); 745 #include "../OrthancServer/ServerContext.h"
788 746 #include "../Core/Logging.h"
789 Json::Value v = Json::arrayValue; 747
790 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it) 748 namespace
791 { 749 {
792 JobInfo info; 750 class JobOperationValue : public boost::noncopyable
793 751 {
794 if (engine.GetRegistry().GetJobInfo(info, *it)) 752 public:
795 { 753 enum Type
796 Json::Value vv; 754 {
797 info.Serialize(vv, true); 755 Type_DicomInstance,
798 v.append(vv); 756 Type_Null
799 } 757 };
800 } 758
801 759 private:
802 std::cout << v << std::endl; 760 Type type_;
803 } 761
804 } 762 protected:
763 JobOperationValue(Type type) :
764 type_(type)
765 {
766 }
767
768 public:
769 virtual ~JobOperationValue()
770 {
771 }
772
773 Type GetType() const
774 {
775 return type_;
776 }
777
778 virtual JobOperationValue* Clone() const = 0;
779 };
780
781
782 class IDicomConnectionProvider : public boost::noncopyable
783 {
784 public:
785 virtual ~IDicomConnectionProvider()
786 {
787 }
788
789 class IResource : public boost::noncopyable
790 {
791 public:
792 virtual ~IResource()
793 {
794 }
795
796 virtual DicomUserConnection& GetConnection() = 0;
797 };
798
799 virtual IResource* Acquire(const std::string& localAet,
800 const RemoteModalityParameters& remote) = 0;
801 };
802
803
804 class JobOperationValues : public boost::noncopyable
805 {
806 private:
807 std::vector<JobOperationValue*> values_;
808
809 public:
810 ~JobOperationValues()
811 {
812 Clear();
813 }
814
815 void Append(JobOperationValues& target,
816 bool clear)
817 {
818 target.Reserve(target.GetSize() + GetSize());
819
820 for (size_t i = 0; i < values_.size(); i++)
821 {
822 if (clear)
823 {
824 target.Append(values_[i]);
825 values_[i] = NULL;
826 }
827 else
828 {
829 target.Append(GetValue(i).Clone());
830 }
831 }
832
833 if (clear)
834 {
835 Clear();
836 }
837 }
838
839 void Clear()
840 {
841 for (size_t i = 0; i < values_.size(); i++)
842 {
843 if (values_[i] != NULL)
844 {
845 delete values_[i];
846 }
847 }
848
849 values_.clear();
850 }
851
852 void Reserve(size_t count)
853 {
854 values_.reserve(count);
855 }
856
857 void Append(JobOperationValue* value) // Takes ownership
858 {
859 if (value == NULL)
860 {
861 throw OrthancException(ErrorCode_NullPointer);
862 }
863 else
864 {
865 values_.push_back(value);
866 }
867 }
868
869 size_t GetSize() const
870 {
871 return values_.size();
872 }
873
874 JobOperationValue& GetValue(size_t index) const
875 {
876 if (index >= values_.size())
877 {
878 throw OrthancException(ErrorCode_ParameterOutOfRange);
879 }
880 else
881 {
882 assert(values_[index] != NULL);
883 return *values_[index];
884 }
885 }
886 };
887
888
889
890 class IJobOperation : public boost::noncopyable
891 {
892 public:
893 virtual ~IJobOperation()
894 {
895 }
896
897 virtual void Apply(JobOperationValues& outputs,
898 const JobOperationValue& input,
899 IDicomConnectionProvider& provider);
900 };
901
902
903 class DicomInstanceValue : public JobOperationValue
904 {
905 private:
906 ServerContext& context_;
907 std::string id_;
908
909 public:
910 DicomInstanceValue(ServerContext& context,
911 const std::string& id) :
912 JobOperationValue(Type_DicomInstance),
913 context_(context),
914 id_(id)
915 {
916 }
917
918 const std::string& GetId() const
919 {
920 return id_;
921 }
922
923 void ReadContent(std::string& dicom) const
924 {
925 context_.ReadDicom(dicom, id_);
926 }
927
928 virtual JobOperationValue* Clone() const
929 {
930 return new DicomInstanceValue(context_, id_);
931 }
932 };
933
934
935 class StoreScuOperation : public IJobOperation
936 {
937 private:
938 std::string localAet_;
939 RemoteModalityParameters modality_;
940
941 public:
942 StoreScuOperation(const std::string& localAet,
943 const RemoteModalityParameters& modality) :
944 localAet_(localAet),
945 modality_(modality)
946 {
947 }
948
949 virtual void Apply(JobOperationValues& outputs,
950 const JobOperationValue& input,
951 IDicomConnectionProvider& provider)
952 {
953 std::auto_ptr<IDicomConnectionProvider::IResource> resource(provider.Acquire(localAet_, modality_));
954
955 if (resource.get() == NULL)
956 {
957 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle();
958 return;
959 }
960
961 if (input.GetType() != JobOperationValue::Type_DicomInstance)
962 {
963 throw OrthancException(ErrorCode_BadParameterType);
964 }
965
966 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
967
968 LOG(INFO) << "Sending instance " << instance.GetId() << " to modality \""
969 << modality_.GetApplicationEntityTitle() << "\"";
970
971 try
972 {
973 std::string dicom;
974 instance.ReadContent(dicom);
975 resource->GetConnection().Store(dicom);
976 outputs.Append(instance.Clone());
977 }
978 catch (OrthancException& e)
979 {
980 LOG(ERROR) << "Unable to send instance " << instance.GetId() << " to modality \""
981 << modality_.GetApplicationEntityTitle() << "\": " << e.What();
982 }
983 }
984 };
985
986
987 class DeleteResourceOperation : public IJobOperation
988 {
989 private:
990 ServerContext& context_;
991
992 public:
993 DeleteResourceOperation(ServerContext& context) :
994 context_(context)
995 {
996 }
997
998 virtual void Apply(JobOperationValues& outputs,
999 const JobOperationValue& input,
1000 IDicomConnectionProvider& provider)
1001 {
1002 switch (input.GetType())
1003 {
1004 case JobOperationValue::Type_DicomInstance:
1005 {
1006 const DicomInstanceValue& instance = dynamic_cast<const DicomInstanceValue&>(input);
1007 LOG(INFO) << "Deleting instance: " << instance.GetId();
1008
1009 try
1010 {
1011 Json::Value tmp;
1012 context_.DeleteResource(tmp, instance.GetId(), ResourceType_Instance);
1013 }
1014 catch (OrthancException& e)
1015 {
1016 LOG(ERROR) << "Unable to delete instance " << instance.GetId() << ": " << e.What();
1017 }
1018
1019 break;
1020 }
1021
1022 default:
1023 break;
1024 }
1025 }
1026 };
1027
1028
1029 class SequenceOfOperationsJob :
1030 public IJob,
1031 private IDicomConnectionProvider
1032 {
1033 private:
1034 /*class DicomConnection
1035 {
1036 private:
1037 boost::posix_time::ptime lastUse_;
1038
1039 void Touch()
1040 {
1041 lastUse_ = boost::posix_time::microsec_clock::universal_time();
1042 }
1043
1044 public:
1045 class Resource : public IDicomConnectionProvider::IResource
1046 {
1047 private:
1048 DicomConnection()
1049 };
1050 };*/
1051
1052 class Operation : public boost::noncopyable
1053 {
1054 private:
1055 JobOperationValues originalInputs_;
1056 JobOperationValues workInputs_;
1057 std::auto_ptr<IJobOperation> operation_;
1058 std::list<Operation*> nextOperations_;
1059 size_t currentInput_;
1060
1061 public:
1062 Operation(IJobOperation* operation) :
1063 operation_(operation),
1064 currentInput_(0)
1065 {
1066 if (operation == NULL)
1067 {
1068 throw OrthancException(ErrorCode_NullPointer);
1069 }
1070 }
1071
1072 void AddOriginalInput(const JobOperationValue& value)
1073 {
1074 if (currentInput_ != 0)
1075 {
1076 // Cannot add input after processing has started
1077 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1078 }
1079 else
1080 {
1081 originalInputs_.Append(value.Clone());
1082 }
1083 }
1084
1085 const JobOperationValues& GetOriginalInputs() const
1086 {
1087 return originalInputs_;
1088 }
1089
1090 void Reset()
1091 {
1092 workInputs_.Clear();
1093 currentInput_ = 0;
1094 }
1095
1096 void AddNextOperation(Operation& other)
1097 {
1098 if (currentInput_ != 0)
1099 {
1100 // Cannot add input after processing has started
1101 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1102 }
1103 else
1104 {
1105 nextOperations_.push_back(&other);
1106 }
1107 }
1108
1109 bool IsDone() const
1110 {
1111 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
1112 }
1113
1114 void Step(IDicomConnectionProvider& provider)
1115 {
1116 if (IsDone())
1117 {
1118 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1119 }
1120
1121 const JobOperationValue* input;
1122
1123 if (currentInput_ < originalInputs_.GetSize())
1124 {
1125 input = &originalInputs_.GetValue(currentInput_);
1126 }
1127 else
1128 {
1129 input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
1130 }
1131
1132 JobOperationValues outputs;
1133 operation_->Apply(outputs, *input, provider);
1134
1135 if (!nextOperations_.empty())
1136 {
1137 // TODO
1138 }
1139
1140 currentInput_ += 1;
1141 }
1142 };
1143
1144
1145 boost::mutex mutex_;
1146 std::vector<Operation*> operations_;
1147 size_t currentOperation_;
1148 boost::condition_variable operationAdded_;
1149
1150 public:
1151 SequenceOfOperationsJob() :
1152 currentOperation_(0)
1153 {
1154 }
1155
1156 virtual ~SequenceOfOperationsJob()
1157 {
1158 for (size_t i = 0; i < operations_.size(); i++)
1159 {
1160 if (operations_[i] != NULL)
1161 {
1162 delete operations_[i];
1163 }
1164 }
1165 }
1166
1167 class Lock : public boost::noncopyable
1168 {
1169 private:
1170 SequenceOfOperationsJob& that_;
1171 boost::mutex::scoped_lock lock_;
1172
1173 public:
1174 Lock(SequenceOfOperationsJob& that) :
1175 that_(that),
1176 lock_(that.mutex_)
1177 {
1178 }
1179
1180 size_t AddOperation(IJobOperation* operation)
1181 {
1182 that_.operations_.push_back(new Operation(operation));
1183 that_.operationAdded_.notify_one();
1184
1185 return that_.operations_.size() - 1;
1186 }
1187
1188 void AddInput(size_t index,
1189 const JobOperationValue& value)
1190 {
1191 if (index >= that_.operations_.size() ||
1192 index < that_.currentOperation_)
1193 {
1194 throw OrthancException(ErrorCode_ParameterOutOfRange);
1195 }
1196 else
1197 {
1198 that_.operations_[index]->AddOriginalInput(value);
1199 }
1200 }
1201
1202 void Connect(size_t input,
1203 size_t output)
1204 {
1205 if (input >= output ||
1206 input >= that_.operations_.size() ||
1207 output >= that_.operations_.size() ||
1208 input < that_.currentOperation_ ||
1209 output < that_.currentOperation_)
1210 {
1211 throw OrthancException(ErrorCode_ParameterOutOfRange);
1212 }
1213 else
1214 {
1215 Operation& a = *that_.operations_[input];
1216 Operation& b = *that_.operations_[output];
1217 a.AddNextOperation(b);
1218 }
1219 }
1220 };
1221
1222 virtual void Start()
1223 {
1224 }
1225
1226 virtual JobStepResult ExecuteStep() = 0;
1227
1228 virtual void SignalResubmit()
1229 {
1230 boost::mutex::scoped_lock lock(mutex_);
1231
1232 currentOperation_ = 0;
1233
1234 for (size_t i = 0; i < operations_.size(); i++)
1235 {
1236 operations_[i]->Reset();
1237 }
1238 }
1239
1240 virtual void ReleaseResources() = 0; // For pausing/canceling jobs
1241
1242 virtual float GetProgress() = 0;
1243
1244 virtual void GetJobType(std::string& target) = 0;
1245
1246 virtual void GetPublicContent(Json::Value& value) = 0;
1247
1248 virtual void GetInternalContent(Json::Value& value) = 0;
1249 };
1250 }