comparison UnitTestsSources/MultiThreadingTests.cpp @ 2565:0f06b4d5b3d0 jobs

JobsEngine
author Sebastien Jodogne <s.jodogne@gmail.com>
date Mon, 07 May 2018 13:20:29 +0200
parents f8681f251caa
children c09ce3c038fc
comparison
equal deleted inserted replaced
2564:f8681f251caa 2565:0f06b4d5b3d0
287 JobState_Success, 287 JobState_Success,
288 JobState_Failure, 288 JobState_Failure,
289 JobState_Paused, 289 JobState_Paused,
290 JobState_Retry 290 JobState_Retry
291 }; 291 };
292
293 static const char* EnumerationToString(JobState state)
294 {
295 switch (state)
296 {
297 case JobState_Pending:
298 return "Pending";
299
300 case JobState_Running:
301 return "Running";
302
303 case JobState_Success:
304 return "Success";
305
306 case JobState_Failure:
307 return "Failure";
308
309 case JobState_Paused:
310 return "Paused";
311
312 case JobState_Retry:
313 return "Retry";
314
315 default:
316 throw OrthancException(ErrorCode_ParameterOutOfRange);
317 }
318 }
292 319
293 enum JobStepCode 320 enum JobStepCode
294 { 321 {
295 JobStepCode_Success, 322 JobStepCode_Success,
296 JobStepCode_Failure, 323 JobStepCode_Failure,
375 JobStatus(ErrorCode code, 402 JobStatus(ErrorCode code,
376 IJob& job) : 403 IJob& job) :
377 errorCode_(code), 404 errorCode_(code),
378 progress_(job.GetProgress()) 405 progress_(job.GetProgress())
379 { 406 {
380 if (progress_ < 0 || 407 if (progress_ < 0)
381 progress_ > 1) 408 {
382 { 409 progress_ = 0;
383 throw OrthancException(ErrorCode_ParameterOutOfRange); 410 }
411
412 if (progress_ > 1)
413 {
414 progress_ = 1;
384 } 415 }
385 416
386 job.GetDescription(description_); 417 job.GetDescription(description_);
387 } 418 }
388 419
406 class JobInfo 437 class JobInfo
407 { 438 {
408 private: 439 private:
409 std::string id_; 440 std::string id_;
410 int priority_; 441 int priority_;
411 ErrorCode errorCode_;
412 JobState state_; 442 JobState state_;
413 boost::posix_time::ptime infoTime_; 443 boost::posix_time::ptime timestamp_;
414 boost::posix_time::ptime creationTime_; 444 boost::posix_time::ptime creationTime_;
415 boost::posix_time::time_duration runtime_; 445 boost::posix_time::time_duration runtime_;
416 boost::posix_time::ptime eta_; 446 boost::posix_time::ptime eta_;
417 JobStatus status_; 447 JobStatus status_;
418 448
424 const boost::posix_time::ptime& creationTime, 454 const boost::posix_time::ptime& creationTime,
425 const boost::posix_time::time_duration& runtime) : 455 const boost::posix_time::time_duration& runtime) :
426 id_(id), 456 id_(id),
427 priority_(priority), 457 priority_(priority),
428 state_(state), 458 state_(state),
429 infoTime_(boost::posix_time::microsec_clock::universal_time()), 459 timestamp_(boost::posix_time::microsec_clock::universal_time()),
430 creationTime_(creationTime), 460 creationTime_(creationTime),
431 runtime_(runtime), 461 runtime_(runtime),
432 status_(status) 462 status_(status)
433 { 463 {
434 float ms = static_cast<float>(runtime_.total_milliseconds()); 464 float ms = static_cast<float>(runtime_.total_milliseconds());
435 float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms; 465 float remaining = boost::math::llround(1.0f - status_.GetProgress()) * ms;
436 eta_ = infoTime_ + boost::posix_time::milliseconds(remaining); 466 eta_ = timestamp_ + boost::posix_time::milliseconds(remaining);
467 }
468
469 JobInfo() :
470 priority_(0),
471 state_(JobState_Failure),
472 timestamp_(boost::posix_time::microsec_clock::universal_time()),
473 creationTime_(timestamp_),
474 runtime_(boost::posix_time::milliseconds(0)),
475 eta_(timestamp_)
476 {
437 } 477 }
438 478
439 const std::string& GetIdentifier() const 479 const std::string& GetIdentifier() const
440 { 480 {
441 return id_; 481 return id_;
444 int GetPriority() const 484 int GetPriority() const
445 { 485 {
446 return priority_; 486 return priority_;
447 } 487 }
448 488
449 ErrorCode GetErrorCode() const
450 {
451 return errorCode_;
452 }
453
454 JobState GetState() const 489 JobState GetState() const
455 { 490 {
456 return state_; 491 return state_;
457 } 492 }
458 493
459 const boost::posix_time::ptime& GetInfoTime() const 494 const boost::posix_time::ptime& GetInfoTime() const
460 { 495 {
461 return infoTime_; 496 return timestamp_;
462 } 497 }
463 498
464 const boost::posix_time::ptime& GetCreationTime() const 499 const boost::posix_time::ptime& GetCreationTime() const
465 { 500 {
466 return creationTime_; 501 return creationTime_;
482 } 517 }
483 518
484 JobStatus& GetStatus() 519 JobStatus& GetStatus()
485 { 520 {
486 return status_; 521 return status_;
522 }
523
524 void Format(Json::Value& target) const
525 {
526 target = Json::objectValue;
527 target["ID"] = id_;
528 target["Priority"] = priority_;
529 target["ErrorCode"] = EnumerationToString(status_.GetErrorCode());
530 target["State"] = EnumerationToString(state_);
531 target["Timestamp"] = boost::posix_time::to_iso_string(timestamp_);
532 target["CreationTime"] = boost::posix_time::to_iso_string(creationTime_);
533 target["Runtime"] = static_cast<uint32_t>(runtime_.total_milliseconds());
534 target["EstimatedTimeOfArrival"] = boost::posix_time::to_iso_string(eta_);
535 target["Progress"] = boost::math::iround(status_.GetProgress() * 100.0f);
536 target["Description"] = status_.GetDescription();
487 } 537 }
488 }; 538 };
489 539
490 540
491 class JobHandler : public boost::noncopyable 541 class JobHandler : public boost::noncopyable
615 } 665 }
616 else 666 else
617 { 667 {
618 return retryTime_ <= now; 668 return retryTime_ <= now;
619 } 669 }
670 }
671
672 const boost::posix_time::ptime& GetCreationTime() const
673 {
674 return creationTime_;
675 }
676
677 const boost::posix_time::time_duration& GetRuntime() const
678 {
679 return runtime_;
620 } 680 }
621 681
622 const JobStatus& GetLastStatus() const 682 const JobStatus& GetLastStatus() const
623 { 683 {
624 return lastStatus_; 684 return lastStatus_;
867 target.insert(it->first); 927 target.insert(it->first);
868 } 928 }
869 } 929 }
870 930
871 931
932 void GetJobsInfo(std::map<std::string, JobInfo>& target)
933 {
934 boost::mutex::scoped_lock lock(mutex_);
935 CheckInvariants();
936
937 for (JobsIndex::const_iterator it = jobsIndex_.begin();
938 it != jobsIndex_.end(); ++it)
939 {
940 const JobHandler& handler = *it->second;
941 target[it->first] = JobInfo(handler.GetId(),
942 handler.GetPriority(),
943 handler.GetState(),
944 handler.GetLastStatus(),
945 handler.GetCreationTime(),
946 handler.GetRuntime());
947 }
948 }
949
950
872 void Submit(std::string& id, 951 void Submit(std::string& id,
873 IJob* job, // Takes ownership 952 IJob* job, // Takes ownership
874 int priority) 953 int priority)
875 { 954 {
876 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority)); 955 std::auto_ptr<JobHandler> handler(new JobHandler(job, priority));
883 pendingJobs_.push(handler.get()); 962 pendingJobs_.push(handler.get());
884 pendingJobAvailable_.notify_one(); 963 pendingJobAvailable_.notify_one();
885 964
886 jobsIndex_.insert(std::make_pair(id, handler.release())); 965 jobsIndex_.insert(std::make_pair(id, handler.release()));
887 966
888 LOG(INFO) << "New job submitted: " << id; 967 LOG(INFO) << "New job submitted with priority " << priority << ": " << id;
889 968
890 CheckInvariants(); 969 CheckInvariants();
891 } 970 }
892 971
893 972
1241 { 1320 {
1242 return priority_; 1321 return priority_;
1243 } 1322 }
1244 } 1323 }
1245 1324
1325 IJob& GetJob()
1326 {
1327 if (!IsValid())
1328 {
1329 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1330 }
1331 else
1332 {
1333 return *job_;
1334 }
1335 }
1336
1246 bool IsPauseScheduled() 1337 bool IsPauseScheduled()
1247 { 1338 {
1248 if (!IsValid()) 1339 if (!IsValid())
1249 { 1340 {
1250 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1341 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1289 { 1380 {
1290 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1381 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1291 } 1382 }
1292 else 1383 else
1293 { 1384 {
1294 job_->ReleaseResources();
1295 targetState_ = JobState_Paused; 1385 targetState_ = JobState_Paused;
1296 } 1386 }
1297 } 1387 }
1298 1388
1299 void MarkRetry(unsigned int timeout) 1389 void MarkRetry(unsigned int timeout)
1307 targetState_ = JobState_Retry; 1397 targetState_ = JobState_Retry;
1308 targetRetryTimeout_ = timeout; 1398 targetRetryTimeout_ = timeout;
1309 } 1399 }
1310 } 1400 }
1311 1401
1312 void UpdateStatus(const JobStatus& status) 1402 void UpdateStatus(ErrorCode code)
1313 { 1403 {
1314 if (!IsValid()) 1404 if (!IsValid())
1315 { 1405 {
1316 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1406 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1317 } 1407 }
1318 else 1408 else
1319 { 1409 {
1410 JobStatus status(code, *job_);
1411
1320 boost::mutex::scoped_lock lock(registry_.mutex_); 1412 boost::mutex::scoped_lock lock(registry_.mutex_);
1321 registry_.CheckInvariants(); 1413 registry_.CheckInvariants();
1322 assert(handler_->GetState() == JobState_Running); 1414 assert(handler_->GetState() == JobState_Running);
1323 1415
1324 handler_->SetLastStatus(status); 1416 handler_->SetLastStatus(status);
1325 } 1417 }
1326 } 1418 }
1327 1419 };
1328 bool ExecuteStep() 1420 };
1329 { 1421
1330 if (!IsValid()) 1422
1331 { 1423
1332 throw OrthancException(ErrorCode_BadSequenceOfCalls); 1424 class JobsEngine
1333 } 1425 {
1334 1426 private:
1335 if (IsPauseScheduled()) 1427 enum State
1336 { 1428 {
1337 targetState_ = JobState_Paused; 1429 State_Setup,
1338 return false; 1430 State_Running,
1339 } 1431 State_Stopping,
1340 1432 State_Done
1341 std::auto_ptr<JobStepResult> result; 1433 };
1342 ErrorCode code; 1434
1343 1435 boost::mutex stateMutex_;
1344 { 1436 State state_;
1345 bool ok = false; 1437 JobsRegistry registry_;
1346 1438 boost::thread retryHandler_;
1347 try 1439 std::vector<boost::thread> workers_;
1440
1441 bool ExecuteStep(JobsRegistry::RunningJob& running,
1442 size_t workerIndex)
1443 {
1444 assert(running.IsValid());
1445
1446 LOG(INFO) << "Executing job with priority " << running.GetPriority()
1447 << " in worker thread " << workerIndex << ": " << running.GetId();
1448
1449 if (running.IsPauseScheduled())
1450 {
1451 running.GetJob().ReleaseResources();
1452 running.MarkPause();
1453 return false;
1454 }
1455
1456 std::auto_ptr<JobStepResult> result;
1457
1458 {
1459 try
1460 {
1461 result.reset(running.GetJob().ExecuteStep());
1462
1463 if (result->GetCode() == JobStepCode_Failure)
1348 { 1464 {
1349 result.reset(job_->ExecuteStep()); 1465 running.UpdateStatus(ErrorCode_InternalError);
1350 ok = true;
1351
1352 if (result->GetCode() == JobStepCode_Failure)
1353 {
1354 code = ErrorCode_InternalError;
1355 }
1356 }
1357 catch (OrthancException& e)
1358 {
1359 code = e.GetErrorCode();
1360 }
1361 catch (boost::bad_lexical_cast&)
1362 {
1363 code = ErrorCode_BadFileFormat;
1364 }
1365 catch (...)
1366 {
1367 code = ErrorCode_InternalError;
1368 }
1369
1370 if (ok)
1371 {
1372 code = ErrorCode_Success;
1373 } 1466 }
1374 else 1467 else
1375 { 1468 {
1376 result.reset(new JobStepResult(JobStepCode_Failure)); 1469 running.UpdateStatus(ErrorCode_Success);
1377 } 1470 }
1378 } 1471 }
1379 1472 catch (OrthancException& e)
1380 { 1473 {
1381 JobStatus status(code, *job_); 1474 running.UpdateStatus(e.GetErrorCode());
1382 UpdateStatus(status); 1475 }
1383 } 1476 catch (boost::bad_lexical_cast&)
1384 1477 {
1385 switch (result->GetCode()) 1478 running.UpdateStatus(ErrorCode_BadFileFormat);
1386 { 1479 }
1387 case JobStepCode_Success: 1480 catch (...)
1388 targetState_ = JobState_Success; 1481 {
1389 return false; 1482 running.UpdateStatus(ErrorCode_InternalError);
1390 1483 }
1391 case JobStepCode_Failure: 1484
1392 targetState_ = JobState_Failure; 1485 if (result.get() == NULL)
1393 return false; 1486 {
1394 1487 result.reset(new JobStepResult(JobStepCode_Failure));
1395 case JobStepCode_Retry: 1488 }
1396 targetState_ = JobState_Retry; 1489 }
1397 targetRetryTimeout_ = dynamic_cast<RetryResult&>(*result).GetTimeout(); 1490
1398 return false; 1491 switch (result->GetCode())
1399 1492 {
1400 case JobStepCode_Continue: 1493 case JobStepCode_Success:
1401 return true; 1494 running.MarkSuccess();
1495 return false;
1496
1497 case JobStepCode_Failure:
1498 running.MarkFailure();
1499 return false;
1500
1501 case JobStepCode_Retry:
1502 running.MarkRetry(dynamic_cast<RetryResult&>(*result).GetTimeout());
1503 return false;
1504
1505 case JobStepCode_Continue:
1506 return true;
1402 1507
1403 default: 1508 default:
1404 throw OrthancException(ErrorCode_InternalError); 1509 throw OrthancException(ErrorCode_InternalError);
1405 } 1510 }
1406 } 1511 }
1407 }; 1512
1513 static void RetryHandler(JobsEngine* engine)
1514 {
1515 assert(engine != NULL);
1516
1517 for (;;)
1518 {
1519 boost::this_thread::sleep(boost::posix_time::milliseconds(200));
1520
1521 {
1522 boost::mutex::scoped_lock lock(engine->stateMutex_);
1523
1524 if (engine->state_ != State_Running)
1525 {
1526 return;
1527 }
1528 }
1529
1530 engine->GetRegistry().ScheduleRetries();
1531 }
1532 }
1533
1534 static void Worker(JobsEngine* engine,
1535 size_t workerIndex)
1536 {
1537 assert(engine != NULL);
1538
1539 LOG(INFO) << "Worker thread " << workerIndex << " has started";
1540
1541 for (;;)
1542 {
1543 {
1544 boost::mutex::scoped_lock lock(engine->stateMutex_);
1545
1546 if (engine->state_ != State_Running)
1547 {
1548 return;
1549 }
1550 }
1551
1552 JobsRegistry::RunningJob running(engine->GetRegistry(), 100);
1553
1554 if (running.IsValid())
1555 {
1556 for (;;)
1557 {
1558 if (!engine->ExecuteStep(running, workerIndex))
1559 {
1560 break;
1561 }
1562 }
1563 }
1564 }
1565 }
1566
1567 public:
1568 JobsEngine() :
1569 state_(State_Setup),
1570 workers_(1)
1571 {
1572 }
1573
1574 ~JobsEngine()
1575 {
1576 if (state_ != State_Setup &&
1577 state_ != State_Done)
1578 {
1579 LOG(ERROR) << "INTERNAL ERROR: JobsEngine::Stop() should be invoked manually to avoid mess in the destruction order!";
1580 Stop();
1581 }
1582 }
1583
1584 void SetWorkersCount(size_t count)
1585 {
1586 if (count == 0)
1587 {
1588 throw OrthancException(ErrorCode_ParameterOutOfRange);
1589 }
1590
1591 boost::mutex::scoped_lock lock(stateMutex_);
1592
1593 if (state_ != State_Setup)
1594 {
1595 // Can only be invoked before calling "Start()"
1596 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1597 }
1598
1599 workers_.resize(count);
1600 }
1601
1602 JobsRegistry& GetRegistry()
1603 {
1604 return registry_;
1605 }
1606
1607 void Start()
1608 {
1609 boost::mutex::scoped_lock lock(stateMutex_);
1610
1611 if (state_ != State_Setup)
1612 {
1613 throw OrthancException(ErrorCode_BadSequenceOfCalls);
1614 }
1615
1616 retryHandler_ = boost::thread(RetryHandler, this);
1617
1618 for (size_t i = 0; i < workers_.size(); i++)
1619 {
1620 workers_[i] = boost::thread(Worker, this, i);
1621 }
1622
1623 state_ = State_Running;
1624
1625 LOG(WARNING) << "The jobs engine has started";
1626 }
1627
1628
1629 void Stop()
1630 {
1631 {
1632 boost::mutex::scoped_lock lock(stateMutex_);
1633
1634 if (state_ != State_Running)
1635 {
1636 return;
1637 }
1638
1639 state_ = State_Stopping;
1640 }
1641
1642 LOG(INFO) << "Stopping the jobs engine";
1643
1644 if (retryHandler_.joinable())
1645 {
1646 retryHandler_.join();
1647 }
1648
1649 for (size_t i = 0; i < workers_.size(); i++)
1650 {
1651 if (workers_[i].joinable())
1652 {
1653 workers_[i].join();
1654 }
1655 }
1656
1657 {
1658 boost::mutex::scoped_lock lock(stateMutex_);
1659 state_ = State_Done;
1660 }
1661
1662 LOG(WARNING) << "The jobs engine has stopped";
1663 }
1408 }; 1664 };
1409 } 1665 }
1410 1666
1411 1667
1412 1668
1413 class DummyJob : public Orthanc::IJob 1669 class DummyJob : public Orthanc::IJob
1414 { 1670 {
1415 private: 1671 private:
1416 JobStepResult result_; 1672 JobStepResult result_;
1673 unsigned int count_;
1674 unsigned int steps_;
1417 1675
1418 public: 1676 public:
1419 DummyJob() : 1677 DummyJob() :
1420 result_(Orthanc::JobStepCode_Success) 1678 result_(Orthanc::JobStepCode_Success),
1679 count_(0),
1680 steps_(4)
1421 { 1681 {
1422 } 1682 }
1423 1683
1424 explicit DummyJob(JobStepResult result) : 1684 explicit DummyJob(JobStepResult result) :
1425 result_(result) 1685 result_(result),
1686 count_(0),
1687 steps_(4)
1426 { 1688 {
1427 } 1689 }
1428 1690
1429 virtual JobStepResult* ExecuteStep() 1691 virtual JobStepResult* ExecuteStep()
1430 { 1692 {
1431 return new JobStepResult(result_); 1693 boost::this_thread::sleep(boost::posix_time::milliseconds(50));
1694
1695 if (count_ == steps_ - 1)
1696 {
1697 return new JobStepResult(result_);
1698 }
1699 else
1700 {
1701 count_++;
1702 return new JobStepResult(JobStepCode_Continue);
1703 }
1432 } 1704 }
1433 1705
1434 virtual void ReleaseResources() 1706 virtual void ReleaseResources()
1435 { 1707 {
1436 } 1708 }
1437 1709
1438 virtual float GetProgress() 1710 virtual float GetProgress()
1439 { 1711 {
1440 return 0; 1712 return static_cast<float>(count_) / static_cast<float>(steps_ - 1);
1441 } 1713 }
1442 1714
1443 virtual void GetDescription(Json::Value& value) 1715 virtual void GetDescription(Json::Value& value)
1444 { 1716 {
1717 value["hello"] = "world";
1445 } 1718 }
1446 }; 1719 };
1447 1720
1448 1721
1449 static bool CheckState(Orthanc::JobsRegistry& registry, 1722 static bool CheckState(Orthanc::JobsRegistry& registry,
1746 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running)); 2019 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Running));
1747 } 2020 }
1748 2021
1749 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success)); 2022 ASSERT_TRUE(CheckState(registry, id, Orthanc::JobState_Success));
1750 } 2023 }
2024
2025
2026 TEST(JobsEngine, Basic)
2027 {
2028 JobsEngine engine;
2029
2030 std::string s;
2031
2032 for (size_t i = 0; i < 20; i++)
2033 engine.GetRegistry().Submit(s, new DummyJob(), rand() % 10);
2034
2035 engine.SetWorkersCount(3);
2036 engine.Start();
2037
2038 boost::this_thread::sleep(boost::posix_time::milliseconds(200));
2039
2040 engine.Stop();
2041
2042 typedef std::map<std::string, JobInfo> Jobs;
2043
2044 Jobs jobs;
2045 engine.GetRegistry().GetJobsInfo(jobs);
2046
2047 Json::Value v;
2048 for (Jobs::const_iterator it = jobs.begin(); it != jobs.end(); ++it)
2049 {
2050 Json::Value vv;
2051 it->second.Format(vv);
2052 v[it->first] = vv;
2053 }
2054
2055 std::cout << v << std::endl;
2056 }