comparison OrthancServer/ServerContext.cpp @ 2668:d26dd081df97 jobs

saving jobs engine on exit
author Sebastien Jodogne <s.jodogne@gmail.com>
date Fri, 08 Jun 2018 18:08:48 +0200
parents 389d050a2e66
children 06c0a6b8a871
comparison
equal deleted inserted replaced
2667:5fa2f2ce74f0 2668:d26dd081df97
32 32
33 33
34 #include "PrecompiledHeadersServer.h" 34 #include "PrecompiledHeadersServer.h"
35 #include "ServerContext.h" 35 #include "ServerContext.h"
36 36
37 #include "../Core/DicomParsing/FromDcmtkBridge.h"
37 #include "../Core/FileStorage/StorageAccessor.h" 38 #include "../Core/FileStorage/StorageAccessor.h"
38 #include "../Core/HttpServer/FilesystemHttpSender.h" 39 #include "../Core/HttpServer/FilesystemHttpSender.h"
39 #include "../Core/HttpServer/HttpStreamTranscoder.h" 40 #include "../Core/HttpServer/HttpStreamTranscoder.h"
40 #include "../Core/Logging.h" 41 #include "../Core/Logging.h"
41 #include "../Core/DicomParsing/FromDcmtkBridge.h" 42 #include "../Plugins/Engine/OrthancPlugins.h"
43 #include "OrthancInitialization.h"
44 #include "OrthancRestApi/OrthancRestApi.h"
45 #include "Search/LookupResource.h"
46 #include "ServerJobs/OrthancJobUnserializer.h"
42 #include "ServerToolbox.h" 47 #include "ServerToolbox.h"
43 #include "OrthancInitialization.h"
44 48
45 #include <EmbeddedResources.h> 49 #include <EmbeddedResources.h>
46 #include <dcmtk/dcmdata/dcfilefo.h> 50 #include <dcmtk/dcmdata/dcfilefo.h>
47 51
48 #include "OrthancRestApi/OrthancRestApi.h"
49 #include "../Plugins/Engine/OrthancPlugins.h"
50 #include "Search/LookupResource.h"
51 52
52 53
53 #define ENABLE_DICOM_CACHE 1 54 #define ENABLE_DICOM_CACHE 1
54 55
55 static const size_t DICOM_CACHE_SIZE = 2; 56 static const size_t DICOM_CACHE_SIZE = 2;
63 * locking. 64 * locking.
64 **/ 65 **/
65 66
66 namespace Orthanc 67 namespace Orthanc
67 { 68 {
68 void ServerContext::ChangeThread(ServerContext* that) 69 void ServerContext::ChangeThread(ServerContext* that,
70 unsigned int sleepDelay)
69 { 71 {
70 while (!that->done_) 72 while (!that->done_)
71 { 73 {
72 std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(100)); 74 std::auto_ptr<IDynamicObject> obj(that->pendingChanges_.Dequeue(sleepDelay));
73 75
74 if (obj.get() != NULL) 76 if (obj.get() != NULL)
75 { 77 {
76 const ServerIndexChange& change = dynamic_cast<const ServerIndexChange&>(*obj.get()); 78 const ServerIndexChange& change = dynamic_cast<const ServerIndexChange&>(*obj.get());
77 79
104 } 106 }
105 } 107 }
106 } 108 }
107 109
108 110
111 void ServerContext::SetupJobsEngine(bool unitTesting)
112 {
113 jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2));
114 jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200);
115
116 std::string serialized;
117 if (index_.LookupGlobalProperty(serialized, GlobalProperty_JobsRegistry))
118 {
119 LOG(WARNING) << "Reloading the jobs from the last execution of Orthanc";
120 OrthancJobUnserializer unserializer(*this);
121
122 try
123 {
124 jobsEngine_.LoadRegistryFromString(unserializer, serialized);
125 }
126 catch (OrthancException& e)
127 {
128 LOG(ERROR) << "Cannot unserialize the jobs engine: " << e.What();
129 throw;
130 }
131 }
132 else
133 {
134 LOG(INFO) << "The last execution of Orthanc has archived no job";
135 //jobsEngine_.GetRegistry().SetMaxCompleted // TODO
136 }
137
138 jobsEngine_.Start();
139 }
140
141
142 void ServerContext::SaveJobsEngine()
143 {
144 LOG(INFO) << "Serializing the content of the jobs engine";
145
146 try
147 {
148 Json::Value value;
149 jobsEngine_.GetRegistry().Serialize(value);
150
151 Json::FastWriter writer;
152 std::string serialized = writer.write(value);
153
154 index_.SetGlobalProperty(GlobalProperty_JobsRegistry, serialized);
155 }
156 catch (OrthancException& e)
157 {
158 LOG(ERROR) << "Cannot serialize the jobs engine: " << e.What();
159 }
160 }
161
162
109 ServerContext::ServerContext(IDatabaseWrapper& database, 163 ServerContext::ServerContext(IDatabaseWrapper& database,
110 IStorageArea& area, 164 IStorageArea& area,
111 bool unitTesting) : 165 bool unitTesting) :
112 index_(*this, database, (unitTesting ? 20 : 500)), 166 index_(*this, database, (unitTesting ? 20 : 500)),
113 area_(area), 167 area_(area),
123 queryRetrieveArchive_(Configuration::GetGlobalUnsignedIntegerParameter("QueryRetrieveSize", 10)), 177 queryRetrieveArchive_(Configuration::GetGlobalUnsignedIntegerParameter("QueryRetrieveSize", 10)),
124 defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC")) 178 defaultLocalAet_(Configuration::GetGlobalStringParameter("DicomAet", "ORTHANC"))
125 { 179 {
126 listeners_.push_back(ServerListener(lua_, "Lua")); 180 listeners_.push_back(ServerListener(lua_, "Lua"));
127 181
128 jobsEngine_.SetWorkersCount(Configuration::GetGlobalUnsignedIntegerParameter("ConcurrentJobs", 2)); 182 SetupJobsEngine(unitTesting);
129 //jobsEngine_.SetMaxCompleted // TODO 183
130 jobsEngine_.SetThreadSleep(unitTesting ? 20 : 200); 184 changeThread_ = boost::thread(ChangeThread, this, (unitTesting ? 20 : 100));
131 jobsEngine_.Start();
132
133 changeThread_ = boost::thread(ChangeThread, this);
134 } 185 }
135 186
136 187
137 188
138 ServerContext::~ServerContext() 189 ServerContext::~ServerContext()
158 209
159 if (changeThread_.joinable()) 210 if (changeThread_.joinable())
160 { 211 {
161 changeThread_.join(); 212 changeThread_.join();
162 } 213 }
214
215 SaveJobsEngine();
163 216
164 // Do not change the order below! 217 // Do not change the order below!
165 jobsEngine_.Stop(); 218 jobsEngine_.Stop();
166 index_.Stop(); 219 index_.Stop();
167 } 220 }