Mercurial > hg > orthanc-stone
comparison Samples/Sdl/Loader.cpp @ 748:ab236bb5dbc7
ThreadedOracle
author | Sebastien Jodogne <s.jodogne@gmail.com> |
---|---|
date | Wed, 22 May 2019 14:46:26 +0200 |
parents | d716bfb3e07c |
children | f7c236894c1a |
comparison
equal
deleted
inserted
replaced
746:d716bfb3e07c | 748:ab236bb5dbc7 |
---|---|
18 * along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 * along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 **/ | 19 **/ |
20 | 20 |
21 | 21 |
22 #include "../../Framework/Toolbox/DicomInstanceParameters.h" | 22 #include "../../Framework/Toolbox/DicomInstanceParameters.h" |
23 #include "../../Framework/Oracle/ThreadedOracle.h" | |
23 #include "../../Framework/Oracle/GetOrthancWebViewerJpegCommand.h" | 24 #include "../../Framework/Oracle/GetOrthancWebViewerJpegCommand.h" |
24 #include "../../Framework/Oracle/GetOrthancImageCommand.h" | 25 #include "../../Framework/Oracle/GetOrthancImageCommand.h" |
25 #include "../../Framework/Oracle/OrthancRestApiCommand.h" | 26 #include "../../Framework/Oracle/OrthancRestApiCommand.h" |
26 #include "../../Framework/Oracle/SleepOracleCommand.h" | 27 #include "../../Framework/Oracle/SleepOracleCommand.h" |
27 #include "../../Framework/Oracle/OracleCommandExceptionMessage.h" | 28 #include "../../Framework/Oracle/OracleCommandExceptionMessage.h" |
737 | 738 |
738 scene_.SetLayer(layerDepth_, texture.release()); | 739 scene_.SetLayer(layerDepth_, texture.release()); |
739 } | 740 } |
740 } | 741 } |
741 }; | 742 }; |
742 | |
743 | |
744 | |
745 | |
746 | |
747 class ThreadedOracle : public IOracle | |
748 { | |
749 private: | |
750 typedef std::map<std::string, std::string> HttpHeaders; | |
751 | |
752 class Item : public Orthanc::IDynamicObject | |
753 { | |
754 private: | |
755 const IObserver& receiver_; | |
756 std::auto_ptr<IOracleCommand> command_; | |
757 | |
758 public: | |
759 Item(const IObserver& receiver, | |
760 IOracleCommand* command) : | |
761 receiver_(receiver), | |
762 command_(command) | |
763 { | |
764 if (command == NULL) | |
765 { | |
766 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); | |
767 } | |
768 } | |
769 | |
770 const IObserver& GetReceiver() const | |
771 { | |
772 return receiver_; | |
773 } | |
774 | |
775 IOracleCommand& GetCommand() | |
776 { | |
777 assert(command_.get() != NULL); | |
778 return *command_; | |
779 } | |
780 }; | |
781 | |
782 | |
783 enum State | |
784 { | |
785 State_Setup, | |
786 State_Running, | |
787 State_Stopped | |
788 }; | |
789 | |
790 | |
791 class SleepingCommands : public boost::noncopyable | |
792 { | |
793 private: | |
794 class Item | |
795 { | |
796 private: | |
797 const IObserver& receiver_; | |
798 std::auto_ptr<SleepOracleCommand> command_; | |
799 boost::posix_time::ptime expiration_; | |
800 | |
801 public: | |
802 Item(const IObserver& receiver, | |
803 SleepOracleCommand* command) : | |
804 receiver_(receiver), | |
805 command_(command) | |
806 { | |
807 if (command == NULL) | |
808 { | |
809 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer); | |
810 } | |
811 | |
812 expiration_ = (boost::posix_time::second_clock::local_time() + | |
813 boost::posix_time::milliseconds(command_->GetDelay())); | |
814 } | |
815 | |
816 const boost::posix_time::ptime& GetExpirationTime() const | |
817 { | |
818 return expiration_; | |
819 } | |
820 | |
821 void Awake(IMessageEmitter& emitter) | |
822 { | |
823 assert(command_.get() != NULL); | |
824 | |
825 SleepOracleCommand::TimeoutMessage message(*command_); | |
826 emitter.EmitMessage(receiver_, message); | |
827 } | |
828 }; | |
829 | |
830 typedef std::list<Item*> Content; | |
831 | |
832 boost::mutex mutex_; | |
833 Content content_; | |
834 | |
835 public: | |
836 ~SleepingCommands() | |
837 { | |
838 for (Content::iterator it = content_.begin(); it != content_.end(); ++it) | |
839 { | |
840 if (*it != NULL) | |
841 { | |
842 delete *it; | |
843 } | |
844 } | |
845 } | |
846 | |
847 void Add(const IObserver& receiver, | |
848 SleepOracleCommand* command) // Takes ownership | |
849 { | |
850 boost::mutex::scoped_lock lock(mutex_); | |
851 | |
852 content_.push_back(new Item(receiver, command)); | |
853 } | |
854 | |
855 void AwakeExpired(IMessageEmitter& emitter) | |
856 { | |
857 boost::mutex::scoped_lock lock(mutex_); | |
858 | |
859 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); | |
860 | |
861 Content stillSleeping; | |
862 | |
863 for (Content::iterator it = content_.begin(); it != content_.end(); ++it) | |
864 { | |
865 if (*it != NULL && | |
866 (*it)->GetExpirationTime() <= now) | |
867 { | |
868 (*it)->Awake(emitter); | |
869 delete *it; | |
870 *it = NULL; | |
871 } | |
872 else | |
873 { | |
874 stillSleeping.push_back(*it); | |
875 } | |
876 } | |
877 | |
878 // Compact the still-sleeping commands | |
879 content_ = stillSleeping; | |
880 } | |
881 }; | |
882 | |
883 | |
884 IMessageEmitter& emitter_; | |
885 Orthanc::WebServiceParameters orthanc_; | |
886 Orthanc::SharedMessageQueue queue_; | |
887 State state_; | |
888 boost::mutex mutex_; | |
889 std::vector<boost::thread*> workers_; | |
890 SleepingCommands sleepingCommands_; | |
891 boost::thread sleepingWorker_; | |
892 unsigned int sleepingTimeResolution_; | |
893 | |
894 void CopyHttpHeaders(Orthanc::HttpClient& client, | |
895 const HttpHeaders& headers) | |
896 { | |
897 for (HttpHeaders::const_iterator it = headers.begin(); it != headers.end(); it++ ) | |
898 { | |
899 client.AddHeader(it->first, it->second); | |
900 } | |
901 } | |
902 | |
903 | |
904 void DecodeAnswer(std::string& answer, | |
905 const HttpHeaders& headers) | |
906 { | |
907 Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None; | |
908 | |
909 for (HttpHeaders::const_iterator it = headers.begin(); | |
910 it != headers.end(); ++it) | |
911 { | |
912 std::string s; | |
913 Orthanc::Toolbox::ToLowerCase(s, it->first); | |
914 | |
915 if (s == "content-encoding") | |
916 { | |
917 if (it->second == "gzip") | |
918 { | |
919 contentEncoding = Orthanc::HttpCompression_Gzip; | |
920 } | |
921 else | |
922 { | |
923 throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol, | |
924 "Unsupported HTTP Content-Encoding: " + it->second); | |
925 } | |
926 | |
927 break; | |
928 } | |
929 } | |
930 | |
931 if (contentEncoding == Orthanc::HttpCompression_Gzip) | |
932 { | |
933 std::string compressed; | |
934 answer.swap(compressed); | |
935 | |
936 Orthanc::GzipCompressor compressor; | |
937 compressor.Uncompress(answer, compressed.c_str(), compressed.size()); | |
938 } | |
939 } | |
940 | |
941 | |
942 void Execute(const IObserver& receiver, | |
943 SleepOracleCommand& command) | |
944 { | |
945 std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay())); | |
946 | |
947 if (command.HasPayload()) | |
948 { | |
949 copy->SetPayload(command.ReleasePayload()); | |
950 } | |
951 | |
952 sleepingCommands_.Add(receiver, copy.release()); | |
953 } | |
954 | |
955 | |
956 void Execute(const IObserver& receiver, | |
957 const OrthancRestApiCommand& command) | |
958 { | |
959 Orthanc::HttpClient client(orthanc_, command.GetUri()); | |
960 client.SetMethod(command.GetMethod()); | |
961 client.SetTimeout(command.GetTimeout()); | |
962 | |
963 CopyHttpHeaders(client, command.GetHttpHeaders()); | |
964 | |
965 if (command.GetMethod() == Orthanc::HttpMethod_Post || | |
966 command.GetMethod() == Orthanc::HttpMethod_Put) | |
967 { | |
968 client.SetBody(command.GetBody()); | |
969 } | |
970 | |
971 std::string answer; | |
972 HttpHeaders answerHeaders; | |
973 client.ApplyAndThrowException(answer, answerHeaders); | |
974 | |
975 DecodeAnswer(answer, answerHeaders); | |
976 | |
977 OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer); | |
978 emitter_.EmitMessage(receiver, message); | |
979 } | |
980 | |
981 | |
982 void Execute(const IObserver& receiver, | |
983 const GetOrthancImageCommand& command) | |
984 { | |
985 Orthanc::HttpClient client(orthanc_, command.GetUri()); | |
986 client.SetTimeout(command.GetTimeout()); | |
987 | |
988 CopyHttpHeaders(client, command.GetHttpHeaders()); | |
989 | |
990 std::string answer; | |
991 HttpHeaders answerHeaders; | |
992 client.ApplyAndThrowException(answer, answerHeaders); | |
993 | |
994 DecodeAnswer(answer, answerHeaders); | |
995 | |
996 command.ProcessHttpAnswer(emitter_, receiver, answer, answerHeaders); | |
997 } | |
998 | |
999 | |
1000 void Execute(const IObserver& receiver, | |
1001 const GetOrthancWebViewerJpegCommand& command) | |
1002 { | |
1003 Orthanc::HttpClient client(orthanc_, command.GetUri()); | |
1004 client.SetTimeout(command.GetTimeout()); | |
1005 | |
1006 CopyHttpHeaders(client, command.GetHttpHeaders()); | |
1007 | |
1008 std::string answer; | |
1009 HttpHeaders answerHeaders; | |
1010 client.ApplyAndThrowException(answer, answerHeaders); | |
1011 | |
1012 DecodeAnswer(answer, answerHeaders); | |
1013 | |
1014 command.ProcessHttpAnswer(emitter_, receiver, answer); | |
1015 } | |
1016 | |
1017 | |
1018 void Step() | |
1019 { | |
1020 std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100)); | |
1021 | |
1022 if (object.get() != NULL) | |
1023 { | |
1024 Item& item = dynamic_cast<Item&>(*object); | |
1025 | |
1026 try | |
1027 { | |
1028 switch (item.GetCommand().GetType()) | |
1029 { | |
1030 case IOracleCommand::Type_Sleep: | |
1031 Execute(item.GetReceiver(), | |
1032 dynamic_cast<SleepOracleCommand&>(item.GetCommand())); | |
1033 break; | |
1034 | |
1035 case IOracleCommand::Type_OrthancRestApi: | |
1036 Execute(item.GetReceiver(), | |
1037 dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand())); | |
1038 break; | |
1039 | |
1040 case IOracleCommand::Type_GetOrthancImage: | |
1041 Execute(item.GetReceiver(), | |
1042 dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand())); | |
1043 break; | |
1044 | |
1045 case IOracleCommand::Type_GetOrthancWebViewerJpeg: | |
1046 Execute(item.GetReceiver(), | |
1047 dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand())); | |
1048 break; | |
1049 | |
1050 default: | |
1051 throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented); | |
1052 } | |
1053 } | |
1054 catch (Orthanc::OrthancException& e) | |
1055 { | |
1056 LOG(ERROR) << "Exception within the oracle: " << e.What(); | |
1057 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e)); | |
1058 } | |
1059 catch (...) | |
1060 { | |
1061 LOG(ERROR) << "Native exception within the oracle"; | |
1062 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage | |
1063 (item.GetCommand(), Orthanc::ErrorCode_InternalError)); | |
1064 } | |
1065 } | |
1066 } | |
1067 | |
1068 | |
1069 static void Worker(ThreadedOracle* that) | |
1070 { | |
1071 assert(that != NULL); | |
1072 | |
1073 for (;;) | |
1074 { | |
1075 { | |
1076 boost::mutex::scoped_lock lock(that->mutex_); | |
1077 if (that->state_ != State_Running) | |
1078 { | |
1079 return; | |
1080 } | |
1081 } | |
1082 | |
1083 that->Step(); | |
1084 } | |
1085 } | |
1086 | |
1087 | |
1088 static void SleepingWorker(ThreadedOracle* that) | |
1089 { | |
1090 assert(that != NULL); | |
1091 | |
1092 for (;;) | |
1093 { | |
1094 { | |
1095 boost::mutex::scoped_lock lock(that->mutex_); | |
1096 if (that->state_ != State_Running) | |
1097 { | |
1098 return; | |
1099 } | |
1100 } | |
1101 | |
1102 that->sleepingCommands_.AwakeExpired(that->emitter_); | |
1103 | |
1104 boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_)); | |
1105 } | |
1106 } | |
1107 | |
1108 | |
1109 void StopInternal() | |
1110 { | |
1111 { | |
1112 boost::mutex::scoped_lock lock(mutex_); | |
1113 | |
1114 if (state_ == State_Setup || | |
1115 state_ == State_Stopped) | |
1116 { | |
1117 return; | |
1118 } | |
1119 else | |
1120 { | |
1121 state_ = State_Stopped; | |
1122 } | |
1123 } | |
1124 | |
1125 if (sleepingWorker_.joinable()) | |
1126 { | |
1127 sleepingWorker_.join(); | |
1128 } | |
1129 | |
1130 for (size_t i = 0; i < workers_.size(); i++) | |
1131 { | |
1132 if (workers_[i] != NULL) | |
1133 { | |
1134 if (workers_[i]->joinable()) | |
1135 { | |
1136 workers_[i]->join(); | |
1137 } | |
1138 | |
1139 delete workers_[i]; | |
1140 } | |
1141 } | |
1142 } | |
1143 | |
1144 | |
1145 public: | |
1146 ThreadedOracle(IMessageEmitter& emitter) : | |
1147 emitter_(emitter), | |
1148 state_(State_Setup), | |
1149 workers_(4), | |
1150 sleepingTimeResolution_(50) // By default, time resolution of 50ms | |
1151 { | |
1152 } | |
1153 | |
1154 virtual ~ThreadedOracle() | |
1155 { | |
1156 StopInternal(); | |
1157 } | |
1158 | |
1159 void SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc) | |
1160 { | |
1161 boost::mutex::scoped_lock lock(mutex_); | |
1162 | |
1163 if (state_ != State_Setup) | |
1164 { | |
1165 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | |
1166 } | |
1167 else | |
1168 { | |
1169 orthanc_ = orthanc; | |
1170 } | |
1171 } | |
1172 | |
1173 void SetWorkersCount(unsigned int count) | |
1174 { | |
1175 boost::mutex::scoped_lock lock(mutex_); | |
1176 | |
1177 if (count <= 0) | |
1178 { | |
1179 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
1180 } | |
1181 else if (state_ != State_Setup) | |
1182 { | |
1183 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | |
1184 } | |
1185 else | |
1186 { | |
1187 workers_.resize(count); | |
1188 } | |
1189 } | |
1190 | |
1191 void SetSleepingTimeResolution(unsigned int milliseconds) | |
1192 { | |
1193 boost::mutex::scoped_lock lock(mutex_); | |
1194 | |
1195 if (milliseconds <= 0) | |
1196 { | |
1197 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange); | |
1198 } | |
1199 else if (state_ != State_Setup) | |
1200 { | |
1201 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | |
1202 } | |
1203 else | |
1204 { | |
1205 sleepingTimeResolution_ = milliseconds; | |
1206 } | |
1207 } | |
1208 | |
1209 void Start() | |
1210 { | |
1211 boost::mutex::scoped_lock lock(mutex_); | |
1212 | |
1213 if (state_ != State_Setup) | |
1214 { | |
1215 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); | |
1216 } | |
1217 else | |
1218 { | |
1219 state_ = State_Running; | |
1220 | |
1221 for (unsigned int i = 0; i < workers_.size(); i++) | |
1222 { | |
1223 workers_[i] = new boost::thread(Worker, this); | |
1224 } | |
1225 | |
1226 sleepingWorker_ = boost::thread(SleepingWorker, this); | |
1227 } | |
1228 } | |
1229 | |
1230 void Stop() | |
1231 { | |
1232 StopInternal(); | |
1233 } | |
1234 | |
1235 virtual void Schedule(const IObserver& receiver, | |
1236 IOracleCommand* command) | |
1237 { | |
1238 queue_.Enqueue(new Item(receiver, command)); | |
1239 } | |
1240 }; | |
1241 | 743 |
1242 | 744 |
1243 class NativeApplicationContext : public IMessageEmitter | 745 class NativeApplicationContext : public IMessageEmitter |
1244 { | 746 { |
1245 private: | 747 private: |