comparison Samples/Sdl/Loader.cpp @ 748:ab236bb5dbc7

ThreadedOracle
author Sebastien Jodogne <s.jodogne@gmail.com>
date Wed, 22 May 2019 14:46:26 +0200
parents d716bfb3e07c
children f7c236894c1a
comparison
equal deleted inserted replaced
746:d716bfb3e07c 748:ab236bb5dbc7
18 * along with this program. If not, see <http://www.gnu.org/licenses/>. 18 * along with this program. If not, see <http://www.gnu.org/licenses/>.
19 **/ 19 **/
20 20
21 21
22 #include "../../Framework/Toolbox/DicomInstanceParameters.h" 22 #include "../../Framework/Toolbox/DicomInstanceParameters.h"
23 #include "../../Framework/Oracle/ThreadedOracle.h"
23 #include "../../Framework/Oracle/GetOrthancWebViewerJpegCommand.h" 24 #include "../../Framework/Oracle/GetOrthancWebViewerJpegCommand.h"
24 #include "../../Framework/Oracle/GetOrthancImageCommand.h" 25 #include "../../Framework/Oracle/GetOrthancImageCommand.h"
25 #include "../../Framework/Oracle/OrthancRestApiCommand.h" 26 #include "../../Framework/Oracle/OrthancRestApiCommand.h"
26 #include "../../Framework/Oracle/SleepOracleCommand.h" 27 #include "../../Framework/Oracle/SleepOracleCommand.h"
27 #include "../../Framework/Oracle/OracleCommandExceptionMessage.h" 28 #include "../../Framework/Oracle/OracleCommandExceptionMessage.h"
737 738
738 scene_.SetLayer(layerDepth_, texture.release()); 739 scene_.SetLayer(layerDepth_, texture.release());
739 } 740 }
740 } 741 }
741 }; 742 };
742
743
744
745
746
747 class ThreadedOracle : public IOracle
748 {
749 private:
750 typedef std::map<std::string, std::string> HttpHeaders;
751
752 class Item : public Orthanc::IDynamicObject
753 {
754 private:
755 const IObserver& receiver_;
756 std::auto_ptr<IOracleCommand> command_;
757
758 public:
759 Item(const IObserver& receiver,
760 IOracleCommand* command) :
761 receiver_(receiver),
762 command_(command)
763 {
764 if (command == NULL)
765 {
766 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
767 }
768 }
769
770 const IObserver& GetReceiver() const
771 {
772 return receiver_;
773 }
774
775 IOracleCommand& GetCommand()
776 {
777 assert(command_.get() != NULL);
778 return *command_;
779 }
780 };
781
782
783 enum State
784 {
785 State_Setup,
786 State_Running,
787 State_Stopped
788 };
789
790
791 class SleepingCommands : public boost::noncopyable
792 {
793 private:
794 class Item
795 {
796 private:
797 const IObserver& receiver_;
798 std::auto_ptr<SleepOracleCommand> command_;
799 boost::posix_time::ptime expiration_;
800
801 public:
802 Item(const IObserver& receiver,
803 SleepOracleCommand* command) :
804 receiver_(receiver),
805 command_(command)
806 {
807 if (command == NULL)
808 {
809 throw Orthanc::OrthancException(Orthanc::ErrorCode_NullPointer);
810 }
811
812 expiration_ = (boost::posix_time::second_clock::local_time() +
813 boost::posix_time::milliseconds(command_->GetDelay()));
814 }
815
816 const boost::posix_time::ptime& GetExpirationTime() const
817 {
818 return expiration_;
819 }
820
821 void Awake(IMessageEmitter& emitter)
822 {
823 assert(command_.get() != NULL);
824
825 SleepOracleCommand::TimeoutMessage message(*command_);
826 emitter.EmitMessage(receiver_, message);
827 }
828 };
829
830 typedef std::list<Item*> Content;
831
832 boost::mutex mutex_;
833 Content content_;
834
835 public:
836 ~SleepingCommands()
837 {
838 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
839 {
840 if (*it != NULL)
841 {
842 delete *it;
843 }
844 }
845 }
846
847 void Add(const IObserver& receiver,
848 SleepOracleCommand* command) // Takes ownership
849 {
850 boost::mutex::scoped_lock lock(mutex_);
851
852 content_.push_back(new Item(receiver, command));
853 }
854
855 void AwakeExpired(IMessageEmitter& emitter)
856 {
857 boost::mutex::scoped_lock lock(mutex_);
858
859 const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time();
860
861 Content stillSleeping;
862
863 for (Content::iterator it = content_.begin(); it != content_.end(); ++it)
864 {
865 if (*it != NULL &&
866 (*it)->GetExpirationTime() <= now)
867 {
868 (*it)->Awake(emitter);
869 delete *it;
870 *it = NULL;
871 }
872 else
873 {
874 stillSleeping.push_back(*it);
875 }
876 }
877
878 // Compact the still-sleeping commands
879 content_ = stillSleeping;
880 }
881 };
882
883
884 IMessageEmitter& emitter_;
885 Orthanc::WebServiceParameters orthanc_;
886 Orthanc::SharedMessageQueue queue_;
887 State state_;
888 boost::mutex mutex_;
889 std::vector<boost::thread*> workers_;
890 SleepingCommands sleepingCommands_;
891 boost::thread sleepingWorker_;
892 unsigned int sleepingTimeResolution_;
893
894 void CopyHttpHeaders(Orthanc::HttpClient& client,
895 const HttpHeaders& headers)
896 {
897 for (HttpHeaders::const_iterator it = headers.begin(); it != headers.end(); it++ )
898 {
899 client.AddHeader(it->first, it->second);
900 }
901 }
902
903
904 void DecodeAnswer(std::string& answer,
905 const HttpHeaders& headers)
906 {
907 Orthanc::HttpCompression contentEncoding = Orthanc::HttpCompression_None;
908
909 for (HttpHeaders::const_iterator it = headers.begin();
910 it != headers.end(); ++it)
911 {
912 std::string s;
913 Orthanc::Toolbox::ToLowerCase(s, it->first);
914
915 if (s == "content-encoding")
916 {
917 if (it->second == "gzip")
918 {
919 contentEncoding = Orthanc::HttpCompression_Gzip;
920 }
921 else
922 {
923 throw Orthanc::OrthancException(Orthanc::ErrorCode_NetworkProtocol,
924 "Unsupported HTTP Content-Encoding: " + it->second);
925 }
926
927 break;
928 }
929 }
930
931 if (contentEncoding == Orthanc::HttpCompression_Gzip)
932 {
933 std::string compressed;
934 answer.swap(compressed);
935
936 Orthanc::GzipCompressor compressor;
937 compressor.Uncompress(answer, compressed.c_str(), compressed.size());
938 }
939 }
940
941
942 void Execute(const IObserver& receiver,
943 SleepOracleCommand& command)
944 {
945 std::auto_ptr<SleepOracleCommand> copy(new SleepOracleCommand(command.GetDelay()));
946
947 if (command.HasPayload())
948 {
949 copy->SetPayload(command.ReleasePayload());
950 }
951
952 sleepingCommands_.Add(receiver, copy.release());
953 }
954
955
956 void Execute(const IObserver& receiver,
957 const OrthancRestApiCommand& command)
958 {
959 Orthanc::HttpClient client(orthanc_, command.GetUri());
960 client.SetMethod(command.GetMethod());
961 client.SetTimeout(command.GetTimeout());
962
963 CopyHttpHeaders(client, command.GetHttpHeaders());
964
965 if (command.GetMethod() == Orthanc::HttpMethod_Post ||
966 command.GetMethod() == Orthanc::HttpMethod_Put)
967 {
968 client.SetBody(command.GetBody());
969 }
970
971 std::string answer;
972 HttpHeaders answerHeaders;
973 client.ApplyAndThrowException(answer, answerHeaders);
974
975 DecodeAnswer(answer, answerHeaders);
976
977 OrthancRestApiCommand::SuccessMessage message(command, answerHeaders, answer);
978 emitter_.EmitMessage(receiver, message);
979 }
980
981
982 void Execute(const IObserver& receiver,
983 const GetOrthancImageCommand& command)
984 {
985 Orthanc::HttpClient client(orthanc_, command.GetUri());
986 client.SetTimeout(command.GetTimeout());
987
988 CopyHttpHeaders(client, command.GetHttpHeaders());
989
990 std::string answer;
991 HttpHeaders answerHeaders;
992 client.ApplyAndThrowException(answer, answerHeaders);
993
994 DecodeAnswer(answer, answerHeaders);
995
996 command.ProcessHttpAnswer(emitter_, receiver, answer, answerHeaders);
997 }
998
999
1000 void Execute(const IObserver& receiver,
1001 const GetOrthancWebViewerJpegCommand& command)
1002 {
1003 Orthanc::HttpClient client(orthanc_, command.GetUri());
1004 client.SetTimeout(command.GetTimeout());
1005
1006 CopyHttpHeaders(client, command.GetHttpHeaders());
1007
1008 std::string answer;
1009 HttpHeaders answerHeaders;
1010 client.ApplyAndThrowException(answer, answerHeaders);
1011
1012 DecodeAnswer(answer, answerHeaders);
1013
1014 command.ProcessHttpAnswer(emitter_, receiver, answer);
1015 }
1016
1017
1018 void Step()
1019 {
1020 std::auto_ptr<Orthanc::IDynamicObject> object(queue_.Dequeue(100));
1021
1022 if (object.get() != NULL)
1023 {
1024 Item& item = dynamic_cast<Item&>(*object);
1025
1026 try
1027 {
1028 switch (item.GetCommand().GetType())
1029 {
1030 case IOracleCommand::Type_Sleep:
1031 Execute(item.GetReceiver(),
1032 dynamic_cast<SleepOracleCommand&>(item.GetCommand()));
1033 break;
1034
1035 case IOracleCommand::Type_OrthancRestApi:
1036 Execute(item.GetReceiver(),
1037 dynamic_cast<const OrthancRestApiCommand&>(item.GetCommand()));
1038 break;
1039
1040 case IOracleCommand::Type_GetOrthancImage:
1041 Execute(item.GetReceiver(),
1042 dynamic_cast<const GetOrthancImageCommand&>(item.GetCommand()));
1043 break;
1044
1045 case IOracleCommand::Type_GetOrthancWebViewerJpeg:
1046 Execute(item.GetReceiver(),
1047 dynamic_cast<const GetOrthancWebViewerJpegCommand&>(item.GetCommand()));
1048 break;
1049
1050 default:
1051 throw Orthanc::OrthancException(Orthanc::ErrorCode_NotImplemented);
1052 }
1053 }
1054 catch (Orthanc::OrthancException& e)
1055 {
1056 LOG(ERROR) << "Exception within the oracle: " << e.What();
1057 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage(item.GetCommand(), e));
1058 }
1059 catch (...)
1060 {
1061 LOG(ERROR) << "Native exception within the oracle";
1062 emitter_.EmitMessage(item.GetReceiver(), OracleCommandExceptionMessage
1063 (item.GetCommand(), Orthanc::ErrorCode_InternalError));
1064 }
1065 }
1066 }
1067
1068
1069 static void Worker(ThreadedOracle* that)
1070 {
1071 assert(that != NULL);
1072
1073 for (;;)
1074 {
1075 {
1076 boost::mutex::scoped_lock lock(that->mutex_);
1077 if (that->state_ != State_Running)
1078 {
1079 return;
1080 }
1081 }
1082
1083 that->Step();
1084 }
1085 }
1086
1087
1088 static void SleepingWorker(ThreadedOracle* that)
1089 {
1090 assert(that != NULL);
1091
1092 for (;;)
1093 {
1094 {
1095 boost::mutex::scoped_lock lock(that->mutex_);
1096 if (that->state_ != State_Running)
1097 {
1098 return;
1099 }
1100 }
1101
1102 that->sleepingCommands_.AwakeExpired(that->emitter_);
1103
1104 boost::this_thread::sleep(boost::posix_time::milliseconds(that->sleepingTimeResolution_));
1105 }
1106 }
1107
1108
1109 void StopInternal()
1110 {
1111 {
1112 boost::mutex::scoped_lock lock(mutex_);
1113
1114 if (state_ == State_Setup ||
1115 state_ == State_Stopped)
1116 {
1117 return;
1118 }
1119 else
1120 {
1121 state_ = State_Stopped;
1122 }
1123 }
1124
1125 if (sleepingWorker_.joinable())
1126 {
1127 sleepingWorker_.join();
1128 }
1129
1130 for (size_t i = 0; i < workers_.size(); i++)
1131 {
1132 if (workers_[i] != NULL)
1133 {
1134 if (workers_[i]->joinable())
1135 {
1136 workers_[i]->join();
1137 }
1138
1139 delete workers_[i];
1140 }
1141 }
1142 }
1143
1144
1145 public:
1146 ThreadedOracle(IMessageEmitter& emitter) :
1147 emitter_(emitter),
1148 state_(State_Setup),
1149 workers_(4),
1150 sleepingTimeResolution_(50) // By default, time resolution of 50ms
1151 {
1152 }
1153
1154 virtual ~ThreadedOracle()
1155 {
1156 StopInternal();
1157 }
1158
1159 void SetOrthancParameters(const Orthanc::WebServiceParameters& orthanc)
1160 {
1161 boost::mutex::scoped_lock lock(mutex_);
1162
1163 if (state_ != State_Setup)
1164 {
1165 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
1166 }
1167 else
1168 {
1169 orthanc_ = orthanc;
1170 }
1171 }
1172
1173 void SetWorkersCount(unsigned int count)
1174 {
1175 boost::mutex::scoped_lock lock(mutex_);
1176
1177 if (count <= 0)
1178 {
1179 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
1180 }
1181 else if (state_ != State_Setup)
1182 {
1183 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
1184 }
1185 else
1186 {
1187 workers_.resize(count);
1188 }
1189 }
1190
1191 void SetSleepingTimeResolution(unsigned int milliseconds)
1192 {
1193 boost::mutex::scoped_lock lock(mutex_);
1194
1195 if (milliseconds <= 0)
1196 {
1197 throw Orthanc::OrthancException(Orthanc::ErrorCode_ParameterOutOfRange);
1198 }
1199 else if (state_ != State_Setup)
1200 {
1201 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
1202 }
1203 else
1204 {
1205 sleepingTimeResolution_ = milliseconds;
1206 }
1207 }
1208
1209 void Start()
1210 {
1211 boost::mutex::scoped_lock lock(mutex_);
1212
1213 if (state_ != State_Setup)
1214 {
1215 throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls);
1216 }
1217 else
1218 {
1219 state_ = State_Running;
1220
1221 for (unsigned int i = 0; i < workers_.size(); i++)
1222 {
1223 workers_[i] = new boost::thread(Worker, this);
1224 }
1225
1226 sleepingWorker_ = boost::thread(SleepingWorker, this);
1227 }
1228 }
1229
1230 void Stop()
1231 {
1232 StopInternal();
1233 }
1234
1235 virtual void Schedule(const IObserver& receiver,
1236 IOracleCommand* command)
1237 {
1238 queue_.Enqueue(new Item(receiver, command));
1239 }
1240 };
1241 743
1242 744
1243 class NativeApplicationContext : public IMessageEmitter 745 class NativeApplicationContext : public IMessageEmitter
1244 { 746 {
1245 private: 747 private: