comparison UnitTestsSources/MultiThreadingTests.cpp @ 2600:140a539b4eba jobs

SequenceOfOperationsJob
author Sebastien Jodogne <s.jodogne@gmail.com>
date Thu, 17 May 2018 16:22:40 +0200
parents 593d6b0f4cba
children 5b6c3d77a2a1
comparison
equal deleted inserted replaced
2599:593d6b0f4cba 2600:140a539b4eba
751 { 751 {
752 public: 752 public:
753 enum Type 753 enum Type
754 { 754 {
755 Type_DicomInstance, 755 Type_DicomInstance,
756 Type_Null 756 Type_Null,
757 Type_String
757 }; 758 };
758 759
759 private: 760 private:
760 Type type_; 761 Type type_;
761 762
777 778
778 virtual JobOperationValue* Clone() const = 0; 779 virtual JobOperationValue* Clone() const = 0;
779 }; 780 };
780 781
781 782
782 class IDicomConnectionProvider : public boost::noncopyable 783 class NullOperationValue : public JobOperationValue
783 { 784 {
784 public: 785 public:
785 virtual ~IDicomConnectionProvider() 786 NullOperationValue() :
787 JobOperationValue(Type_Null)
788 {
789 }
790
791 virtual JobOperationValue* Clone() const
792 {
793 return new NullOperationValue;
794 }
795 };
796
797
798 class StringOperationValue : public JobOperationValue
799 {
800 private:
801 std::string content_;
802
803 public:
804 StringOperationValue(const std::string& content) :
805 JobOperationValue(JobOperationValue::Type_String),
806 content_(content)
807 {
808 }
809
810 virtual JobOperationValue* Clone() const
811 {
812 return new StringOperationValue(content_);
813 }
814
815 const std::string& GetContent() const
816 {
817 return content_;
818 }
819 };
820
821
822 class IDicomConnectionManager : public boost::noncopyable
823 {
824 public:
825 virtual ~IDicomConnectionManager()
786 { 826 {
787 } 827 }
788 828
789 class IResource : public boost::noncopyable 829 class IResource : public boost::noncopyable
790 { 830 {
794 } 834 }
795 835
796 virtual DicomUserConnection& GetConnection() = 0; 836 virtual DicomUserConnection& GetConnection() = 0;
797 }; 837 };
798 838
799 virtual IResource* Acquire(const std::string& localAet, 839 virtual IResource* AcquireConnection(const std::string& localAet,
800 const RemoteModalityParameters& remote) = 0; 840 const RemoteModalityParameters& remote) = 0;
801 }; 841 };
802 842
803 843
804 class JobOperationValues : public boost::noncopyable 844 class JobOperationValues : public boost::noncopyable
805 { 845 {
806 private: 846 private:
807 std::vector<JobOperationValue*> values_; 847 std::vector<JobOperationValue*> values_;
808 848
849 void Append(JobOperationValues& target,
850 bool clear)
851 {
852 target.Reserve(target.GetSize() + GetSize());
853
854 for (size_t i = 0; i < values_.size(); i++)
855 {
856 if (clear)
857 {
858 target.Append(values_[i]);
859 values_[i] = NULL;
860 }
861 else
862 {
863 target.Append(GetValue(i).Clone());
864 }
865 }
866
867 if (clear)
868 {
869 Clear();
870 }
871 }
872
809 public: 873 public:
810 ~JobOperationValues() 874 ~JobOperationValues()
811 { 875 {
812 Clear(); 876 Clear();
813 } 877 }
814 878
815 void Append(JobOperationValues& target, 879 void Move(JobOperationValues& target)
816 bool clear) 880 {
817 { 881 return Append(target, true);
818 target.Reserve(target.GetSize() + GetSize()); 882 }
819 883
820 for (size_t i = 0; i < values_.size(); i++) 884 void Copy(JobOperationValues& target)
821 { 885 {
822 if (clear) 886 return Append(target, false);
823 {
824 target.Append(values_[i]);
825 values_[i] = NULL;
826 }
827 else
828 {
829 target.Append(GetValue(i).Clone());
830 }
831 }
832
833 if (clear)
834 {
835 Clear();
836 }
837 } 887 }
838 888
839 void Clear() 889 void Clear()
840 { 890 {
841 for (size_t i = 0; i < values_.size(); i++) 891 for (size_t i = 0; i < values_.size(); i++)
894 { 944 {
895 } 945 }
896 946
897 virtual void Apply(JobOperationValues& outputs, 947 virtual void Apply(JobOperationValues& outputs,
898 const JobOperationValue& input, 948 const JobOperationValue& input,
899 IDicomConnectionProvider& provider); 949 IDicomConnectionManager& manager) = 0;
950 };
951
952
953 class LogJobOperation : public IJobOperation
954 {
955 public:
956 virtual void Apply(JobOperationValues& outputs,
957 const JobOperationValue& input,
958 IDicomConnectionManager& manager)
959 {
960 switch (input.GetType())
961 {
962 case JobOperationValue::Type_String:
963 LOG(INFO) << "Job value: " << dynamic_cast<const StringOperationValue&>(input).GetContent();
964 break;
965
966 case JobOperationValue::Type_Null:
967 LOG(INFO) << "Job value: (null)";
968 break;
969
970 default:
971 LOG(INFO) << "Job value: (unsupport)";
972 break;
973 }
974
975 outputs.Append(input.Clone());
976 }
900 }; 977 };
901 978
902 979
903 class DicomInstanceValue : public JobOperationValue 980 class DicomInstanceValue : public JobOperationValue
904 { 981 {
946 { 1023 {
947 } 1024 }
948 1025
949 virtual void Apply(JobOperationValues& outputs, 1026 virtual void Apply(JobOperationValues& outputs,
950 const JobOperationValue& input, 1027 const JobOperationValue& input,
951 IDicomConnectionProvider& provider) 1028 IDicomConnectionManager& manager)
952 { 1029 {
953 std::auto_ptr<IDicomConnectionProvider::IResource> resource(provider.Acquire(localAet_, modality_)); 1030 std::auto_ptr<IDicomConnectionManager::IResource> resource
1031 (manager.AcquireConnection(localAet_, modality_));
954 1032
955 if (resource.get() == NULL) 1033 if (resource.get() == NULL)
956 { 1034 {
957 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle(); 1035 LOG(ERROR) << "Cannot connect to modality: " << modality_.GetApplicationEntityTitle();
958 return; 1036 return;
995 { 1073 {
996 } 1074 }
997 1075
998 virtual void Apply(JobOperationValues& outputs, 1076 virtual void Apply(JobOperationValues& outputs,
999 const JobOperationValue& input, 1077 const JobOperationValue& input,
1000 IDicomConnectionProvider& provider) 1078 IDicomConnectionManager& manager)
1001 { 1079 {
1002 switch (input.GetType()) 1080 switch (input.GetType())
1003 { 1081 {
1004 case JobOperationValue::Type_DicomInstance: 1082 case JobOperationValue::Type_DicomInstance:
1005 { 1083 {
1024 } 1102 }
1025 } 1103 }
1026 }; 1104 };
1027 1105
1028 1106
1029 class SequenceOfOperationsJob : 1107
1030 public IJob, 1108 class TimeoutDicomConnectionManager : public IDicomConnectionManager
1031 private IDicomConnectionProvider
1032 { 1109 {
1033 private: 1110 private:
1034 /*class DicomConnection 1111 class Resource : public IDicomConnectionManager::IResource
1035 { 1112 {
1036 private: 1113 private:
1037 boost::posix_time::ptime lastUse_; 1114 TimeoutDicomConnectionManager& that_;
1038 1115 boost::mutex::scoped_lock lock_;
1039 void Touch()
1040 {
1041 lastUse_ = boost::posix_time::microsec_clock::universal_time();
1042 }
1043 1116
1044 public: 1117 public:
1045 class Resource : public IDicomConnectionProvider::IResource 1118 Resource(TimeoutDicomConnectionManager& that) :
1046 { 1119 that_(that),
1047 private: 1120 lock_(that.mutex_)
1048 DicomConnection() 1121 {
1049 }; 1122 }
1050 };*/ 1123
1051 1124 virtual ~Resource()
1052 class Operation : public boost::noncopyable 1125 {
1126 that_.Touch();
1127 }
1128
1129 virtual DicomUserConnection& GetConnection()
1130 {
1131 if (that_.connection_.get() == NULL)
1132 {
1133 throw OrthancException(ErrorCode_InternalError);
1134 }
1135
1136 return *that_.connection_;
1137 }
1138 };
1139
1140 boost::mutex mutex_;
1141 std::auto_ptr<DicomUserConnection> connection_;
1142 boost::posix_time::ptime lastUse_;
1143 boost::posix_time::time_duration timeout_;
1144
1145 static boost::posix_time::ptime GetNow()
1146 {
1147 return boost::posix_time::microsec_clock::universal_time();
1148 }
1149
1150 void Touch()
1151 {
1152 lastUse_ = GetNow();
1153 }
1154
1155 void CheckTimeoutInternal()
1156 {
1157 if (connection_.get() != NULL &&
1158 (GetNow() - lastUse_) >= timeout_)
1159 {
1160 connection_.reset(NULL);
1161 }
1162 }
1163
1164 public:
1165 TimeoutDicomConnectionManager() :
1166 timeout_(boost::posix_time::milliseconds(1000))
1167 {
1168 }
1169
1170 void SetTimeout(unsigned int timeout)
1171 {
1172 boost::mutex::scoped_lock lock(mutex_);
1173
1174 timeout_ = boost::posix_time::milliseconds(timeout);
1175 CheckTimeout();
1176 }
1177
1178 unsigned int GetTimeout()
1179 {
1180 boost::mutex::scoped_lock lock(mutex_);
1181 return timeout_.total_milliseconds();
1182 }
1183
1184 void Close()
1185 {
1186 boost::mutex::scoped_lock lock(mutex_);
1187 connection_.reset(NULL);
1188 }
1189
1190 void CheckTimeout()
1191 {
1192 boost::mutex::scoped_lock lock(mutex_);
1193 CheckTimeoutInternal();
1194 }
1195
1196 virtual IResource* AcquireConnection(const std::string& localAet,
1197 const RemoteModalityParameters& remote)
1198 {
1199 boost::mutex::scoped_lock lock(mutex_);
1200
1201 if (connection_.get() == NULL ||
1202 !connection_->IsSameAssociation(localAet, remote))
1203 {
1204 connection_.reset(new DicomUserConnection(localAet, remote));
1205 }
1206
1207 return new Resource(*this);
1208 }
1209 };
1210
1211
1212
1213 class SequenceOfOperationsJob : public IJob
1214 {
1215 public:
1216 class IObserver : public boost::noncopyable
1217 {
1218 public:
1219 virtual ~IObserver()
1220 {
1221 }
1222
1223 virtual void SignalDone(const SequenceOfOperationsJob& job) = 0;
1224 };
1225
1226 private:
1227 class Operation : public boost::noncopyable
1053 { 1228 {
1054 private: 1229 private:
1055 JobOperationValues originalInputs_; 1230 JobOperationValues originalInputs_;
1056 JobOperationValues workInputs_; 1231 JobOperationValues workInputs_;
1057 std::auto_ptr<IJobOperation> operation_; 1232 std::auto_ptr<IJobOperation> operation_;
1109 bool IsDone() const 1284 bool IsDone() const
1110 { 1285 {
1111 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize(); 1286 return currentInput_ >= originalInputs_.GetSize() + workInputs_.GetSize();
1112 } 1287 }
1113 1288
1114 void Step(IDicomConnectionProvider& provider) 1289 void Step(IDicomConnectionManager& manager)
1115 { 1290 {
1116 if (IsDone()) 1291 if (IsDone())
1117 { 1292 {
1118 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1293 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1119 } 1294 }
1124 { 1299 {
1125 input = &originalInputs_.GetValue(currentInput_); 1300 input = &originalInputs_.GetValue(currentInput_);
1126 } 1301 }
1127 else 1302 else
1128 { 1303 {
1129 input = &originalInputs_.GetValue(currentInput_ - originalInputs_.GetSize()); 1304 input = &workInputs_.GetValue(currentInput_ - originalInputs_.GetSize());
1130 } 1305 }
1131 1306
1132 JobOperationValues outputs; 1307 JobOperationValues outputs;
1133 operation_->Apply(outputs, *input, provider); 1308 operation_->Apply(outputs, *input, manager);
1134 1309
1135 if (!nextOperations_.empty()) 1310 if (!nextOperations_.empty())
1136 { 1311 {
1137 // TODO 1312 std::list<Operation*>::iterator first = nextOperations_.begin();
1313 outputs.Move((*first)->workInputs_);
1314
1315 std::list<Operation*>::iterator current = first;
1316 ++current;
1317
1318 while (current != nextOperations_.end())
1319 {
1320 (*first)->workInputs_.Copy((*current)->workInputs_);
1321 ++current;
1322 }
1138 } 1323 }
1139 1324
1140 currentInput_ += 1; 1325 currentInput_ += 1;
1141 } 1326 }
1142 }; 1327 };
1143 1328
1144 1329
1145 boost::mutex mutex_; 1330 std::string jobType_;
1146 std::vector<Operation*> operations_; 1331 bool done_;
1147 size_t currentOperation_; 1332 boost::mutex mutex_;
1148 boost::condition_variable operationAdded_; 1333 std::vector<Operation*> operations_;
1334 size_t current_;
1335 boost::condition_variable operationAdded_;
1336 TimeoutDicomConnectionManager& connectionManager_;
1337 boost::posix_time::time_duration trailingTimeout_;
1338 std::list<IObserver*> observers_;
1339
1340 // Invoked from constructors
1341 void Setup()
1342 {
1343 done_ = false;
1344 current_ = 0;
1345 trailingTimeout_ = boost::posix_time::milliseconds(1000);
1346 }
1149 1347
1150 public: 1348 public:
1151 SequenceOfOperationsJob() : 1349 SequenceOfOperationsJob(TimeoutDicomConnectionManager& manager) :
1152 currentOperation_(0) 1350 jobType_("SequenceOfOperations"),
1153 { 1351 connectionManager_(manager)
1154 } 1352 {
1353 Setup();
1354 }
1355
1356 SequenceOfOperationsJob(const std::string& jobType,
1357 TimeoutDicomConnectionManager& manager) :
1358 jobType_(jobType),
1359 connectionManager_(manager)
1360 {
1361 Setup();
1362 }
1155 1363
1156 virtual ~SequenceOfOperationsJob() 1364 virtual ~SequenceOfOperationsJob()
1157 { 1365 {
1158 for (size_t i = 0; i < operations_.size(); i++) 1366 for (size_t i = 0; i < operations_.size(); i++)
1159 { 1367 {
1162 delete operations_[i]; 1370 delete operations_[i];
1163 } 1371 }
1164 } 1372 }
1165 } 1373 }
1166 1374
1375 void Register(IObserver& observer)
1376 {
1377 boost::mutex::scoped_lock lock(mutex_);
1378 observers_.push_back(&observer);
1379 }
1380
1381 // This lock allows adding new operations to the end of the job,
1382 // from another thread than the worker thread, after the job has
1383 // been submitted for processing
1167 class Lock : public boost::noncopyable 1384 class Lock : public boost::noncopyable
1168 { 1385 {
1169 private: 1386 private:
1170 SequenceOfOperationsJob& that_; 1387 SequenceOfOperationsJob& that_;
1171 boost::mutex::scoped_lock lock_; 1388 boost::mutex::scoped_lock lock_;
1175 that_(that), 1392 that_(that),
1176 lock_(that.mutex_) 1393 lock_(that.mutex_)
1177 { 1394 {
1178 } 1395 }
1179 1396
1397 bool IsDone() const
1398 {
1399 return that_.done_;
1400 }
1401
1402 void SetDicomConnectionTimeout(unsigned int timeout)
1403 {
1404 that_.connectionManager_.SetTimeout(timeout);
1405 }
1406
1407 void SetTrailingOperationTimeout(unsigned int timeout)
1408 {
1409 that_.trailingTimeout_ = boost::posix_time::milliseconds(timeout);
1410 }
1411
1180 size_t AddOperation(IJobOperation* operation) 1412 size_t AddOperation(IJobOperation* operation)
1181 { 1413 {
1414 if (IsDone())
1415 {
1416 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1417 }
1418
1182 that_.operations_.push_back(new Operation(operation)); 1419 that_.operations_.push_back(new Operation(operation));
1183 that_.operationAdded_.notify_one(); 1420 that_.operationAdded_.notify_one();
1184 1421
1185 return that_.operations_.size() - 1; 1422 return that_.operations_.size() - 1;
1186 } 1423 }
1187 1424
1425 size_t GetOperationsCount() const
1426 {
1427 return that_.operations_.size();
1428 }
1429
1188 void AddInput(size_t index, 1430 void AddInput(size_t index,
1189 const JobOperationValue& value) 1431 const JobOperationValue& value)
1190 { 1432 {
1191 if (index >= that_.operations_.size() || 1433 if (IsDone())
1192 index < that_.currentOperation_) 1434 {
1435 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1436 }
1437 else if (index >= that_.operations_.size() ||
1438 index < that_.current_)
1193 { 1439 {
1194 throw OrthancException(ErrorCode_ParameterOutOfRange); 1440 throw OrthancException(ErrorCode_ParameterOutOfRange);
1195 } 1441 }
1196 else 1442 else
1197 { 1443 {
1200 } 1446 }
1201 1447
1202 void Connect(size_t input, 1448 void Connect(size_t input,
1203 size_t output) 1449 size_t output)
1204 { 1450 {
1205 if (input >= output || 1451 if (IsDone())
1206 input >= that_.operations_.size() || 1452 {
1207 output >= that_.operations_.size() || 1453 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1208 input < that_.currentOperation_ || 1454 }
1209 output < that_.currentOperation_) 1455 else if (input >= output ||
1456 input >= that_.operations_.size() ||
1457 output >= that_.operations_.size() ||
1458 input < that_.current_ ||
1459 output < that_.current_)
1210 { 1460 {
1211 throw OrthancException(ErrorCode_ParameterOutOfRange); 1461 throw OrthancException(ErrorCode_ParameterOutOfRange);
1212 } 1462 }
1213 else 1463 else
1214 { 1464 {
1221 1471
1222 virtual void Start() 1472 virtual void Start()
1223 { 1473 {
1224 } 1474 }
1225 1475
1226 virtual JobStepResult ExecuteStep() = 0; 1476 virtual JobStepResult ExecuteStep()
1477 {
1478 boost::mutex::scoped_lock lock(mutex_);
1479
1480 if (current_ == operations_.size())
1481 {
1482 LOG(INFO) << "Executing the trailing timeout in the sequence of operations";
1483 operationAdded_.timed_wait(lock, trailingTimeout_);
1484
1485 if (current_ == operations_.size())
1486 {
1487 // No operation was added during the trailing timeout: The
1488 // job is over
1489 LOG(INFO) << "The sequence of operations is over";
1490 done_ = true;
1491
1492 for (std::list<IObserver*>::iterator it = observers_.begin();
1493 it != observers_.end(); ++it)
1494 {
1495 (*it)->SignalDone(*this);
1496 }
1497
1498 return JobStepResult::Success();
1499 }
1500 else
1501 {
1502 LOG(INFO) << "New operation added to the sequence of operations";
1503 }
1504 }
1505
1506 assert(current_ < operations_.size());
1507
1508 while (current_ < operations_.size() &&
1509 operations_[current_]->IsDone())
1510 {
1511 current_++;
1512 }
1513
1514 if (current_ < operations_.size())
1515 {
1516 operations_[current_]->Step(connectionManager_);
1517 }
1518
1519 return JobStepResult::Continue();
1520 }
1227 1521
1228 virtual void SignalResubmit() 1522 virtual void SignalResubmit()
1229 { 1523 {
1230 boost::mutex::scoped_lock lock(mutex_); 1524 boost::mutex::scoped_lock lock(mutex_);
1231 1525
1232 currentOperation_ = 0; 1526 current_ = 0;
1527 done_ = false;
1233 1528
1234 for (size_t i = 0; i < operations_.size(); i++) 1529 for (size_t i = 0; i < operations_.size(); i++)
1235 { 1530 {
1236 operations_[i]->Reset(); 1531 operations_[i]->Reset();
1237 } 1532 }
1238 } 1533 }
1239 1534
1240 virtual void ReleaseResources() = 0; // For pausing/canceling jobs 1535 virtual void ReleaseResources()
1241 1536 {
1242 virtual float GetProgress() = 0; 1537 boost::mutex::scoped_lock lock(mutex_);
1243 1538 connectionManager_.Close();
1244 virtual void GetJobType(std::string& target) = 0; 1539 }
1245 1540
1246 virtual void GetPublicContent(Json::Value& value) = 0; 1541 virtual float GetProgress()
1247 1542 {
1248 virtual void GetInternalContent(Json::Value& value) = 0; 1543 boost::mutex::scoped_lock lock(mutex_);
1544
1545 return (static_cast<float>(current_) /
1546 static_cast<float>(operations_.size() + 1));
1547 }
1548
1549 virtual void GetJobType(std::string& target)
1550 {
1551 target = jobType_;
1552 }
1553
1554 virtual void GetPublicContent(Json::Value& value)
1555 {
1556 boost::mutex::scoped_lock lock(mutex_);
1557
1558 value["CountOperations"] = static_cast<unsigned int>(operations_.size());
1559 }
1560
1561 virtual void GetInternalContent(Json::Value& value)
1562 {
1563 // TODO
1564 }
1249 }; 1565 };
1250 } 1566
1567
1568 class LuaJobManager : private SequenceOfOperationsJob::IObserver
1569 {
1570 public:
1571 typedef SequenceOfOperationsJob::Lock Lock;
1572
1573 private:
1574 boost::mutex mutex_;
1575 JobsEngine& engine_;
1576 TimeoutDicomConnectionManager manager_;
1577 std::string currentId_;
1578 SequenceOfOperationsJob* currentJob_;
1579 size_t maxOperations_;
1580 int priority_;
1581 unsigned int trailingTimeout_;
1582
1583 virtual void SignalDone(const SequenceOfOperationsJob& job)
1584 {
1585 boost::mutex::scoped_lock lock(mutex_);
1586
1587 if (&job == currentJob_)
1588 {
1589 currentId_.clear();
1590 currentJob_ = NULL;
1591 }
1592 }
1593
1594 public:
1595 LuaJobManager(JobsEngine& engine) :
1596 engine_(engine),
1597 currentJob_(NULL),
1598 maxOperations_(1000),
1599 priority_(0)
1600 {
1601 }
1602
1603 void SetMaxOperationsPerJob(size_t count)
1604 {
1605 boost::mutex::scoped_lock lock(mutex_);
1606 maxOperations_ = count;
1607 }
1608
1609 void SetPriority(int priority)
1610 {
1611 boost::mutex::scoped_lock lock(mutex_);
1612 priority_ = priority;
1613 }
1614
1615 void SetTrailingOperationTimeout(unsigned int timeout)
1616 {
1617 boost::mutex::scoped_lock lock(mutex_);
1618 trailingTimeout_ = timeout;
1619 }
1620
1621 Lock* Modify()
1622 {
1623 boost::mutex::scoped_lock lock(mutex_);
1624
1625 if (currentJob_ != NULL)
1626 {
1627 std::auto_ptr<Lock> result(new Lock(*currentJob_));
1628
1629 if (!result->IsDone() &&
1630 result->GetOperationsCount() < maxOperations_)
1631 {
1632 return result.release();
1633 }
1634 }
1635
1636 // Need to create a new job, as the previous one is either
1637 // finished, or is getting too long
1638 currentJob_ = new SequenceOfOperationsJob(manager_);
1639 engine_.GetRegistry().Submit(currentId_, currentJob_, priority_);
1640
1641 std::auto_ptr<Lock> result(new Lock(*currentJob_));
1642 result->SetTrailingOperationTimeout(trailingTimeout_);
1643
1644 return result.release();
1645 }
1646 };
1647 }
1648
1649
1650 TEST(JobsEngine, DISABLED_SequenceOfOperationsJob)
1651 {
1652 TimeoutDicomConnectionManager manager;
1653 JobsEngine engine;
1654 engine.SetWorkersCount(3);
1655 engine.Start();
1656
1657 std::string id;
1658 SequenceOfOperationsJob* job = NULL;
1659
1660 {
1661 std::auto_ptr<SequenceOfOperationsJob> a(new SequenceOfOperationsJob(manager));
1662 job = a.get();
1663 engine.GetRegistry().Submit(id, a.release(), 0);
1664 }
1665
1666 boost::this_thread::sleep(boost::posix_time::milliseconds(500));
1667
1668 {
1669 SequenceOfOperationsJob::Lock lock(*job);
1670 size_t i = lock.AddOperation(new LogJobOperation);
1671 size_t j = lock.AddOperation(new LogJobOperation);
1672 size_t k = lock.AddOperation(new LogJobOperation);
1673 lock.AddInput(i, StringOperationValue("Hello"));
1674 lock.AddInput(i, StringOperationValue("World"));
1675 lock.Connect(i, j);
1676 lock.Connect(j, k);
1677 }
1678
1679 boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
1680
1681 engine.Stop();
1682
1683 }
1684
1685
1686 TEST(JobsEngine, Lua)
1687 {
1688 JobsEngine engine;
1689 engine.SetWorkersCount(2);
1690 engine.Start();
1691
1692 LuaJobManager lua(engine);
1693 lua.SetMaxOperationsPerJob(5);
1694 lua.SetTrailingOperationTimeout(200);
1695
1696 for (size_t i = 0; i < 30; i++)
1697 {
1698 boost::this_thread::sleep(boost::posix_time::milliseconds(150));
1699 std::auto_ptr<LuaJobManager::Lock> lock(lua.Modify());
1700 size_t a = lock->AddOperation(new LogJobOperation);
1701 lock->AddInput(a, StringOperationValue(boost::lexical_cast<std::string>(i)));
1702 }
1703
1704 boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
1705
1706 engine.Stop();
1707
1708 }