A SystemC productivity library for virtual platform development utilizing SCV and TLM2.0 https://www.minres.com/#opensource

thread_syncronizer.h 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. /*******************************************************************************
  2. * Copyright 2017 MINRES Technologies GmbH
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. *******************************************************************************/
  16. #ifndef _THREAD_SYNCRONIZER_H_
  17. #define _THREAD_SYNCRONIZER_H_
  18. #include <atomic>
  19. #include <functional>
  20. #include <future>
  21. #include <queue>
  22. #include <stdexcept>
  23. namespace util {
  24. /**
  25. * executes a function in another thread
  26. */
  27. class thread_syncronizer {
  28. private:
  29. std::queue<std::function<void()>> tasks_;
  30. std::atomic<bool> ready;
  31. std::mutex mutex_;
  32. std::condition_variable condition_;
  33. public:
  34. /**
  35. * the constructor.
  36. */
  37. thread_syncronizer() = default;
  38. /**
  39. * the destructor
  40. */
  41. ~thread_syncronizer() {
  42. // Set running flag to false then notify all threads.
  43. condition_.notify_all();
  44. }
  45. /**
  46. * check if the synchronizer can handle functions
  47. *
  48. * @return true if it can handle a new request
  49. */
  50. bool is_ready() { return ready.load(std::memory_order_acquire); }
  51. /**
  52. * enqueue a function to be executed in the other thread and wait for completion
  53. *
  54. * @param f the functor to execute
  55. * @param args the arguments to pass to the functor
  56. * @return the result of the function
  57. */
  58. template <class F, class... Args>
  59. typename std::result_of<F(Args...)>::type enqueue_and_wait(F &&f, Args &&... args) {
  60. auto res = enqueue(f, args...);
  61. res.wait();
  62. return res.get();
  63. }
  64. /**
  65. * enqueue a function to be executed in the other thread
  66. *
  67. * @param f the functor to execute
  68. * @param args the arguments to pass to the functor
  69. * @return the future holding the result of the execution
  70. */
  71. template <class F, class... Args>
  72. auto enqueue(F &&f, Args &&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
  73. using return_type = typename std::result_of<F(Args...)>::type;
  74. auto task = std::make_shared<std::packaged_task<return_type()>>(
  75. std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  76. std::future<return_type> res = task->get_future();
  77. {
  78. std::unique_lock<std::mutex> lock(mutex_);
  79. tasks_.emplace([task]() { (*task)(); });
  80. }
  81. condition_.notify_one();
  82. return res;
  83. }
  84. /**
  85. * execute the next task in queue but do not wait for the next one
  86. */
  87. void execute() {
  88. if (tasks_.empty()) return;
  89. {
  90. std::unique_lock<std::mutex> lock(mutex_);
  91. // Copy task locally and remove from the queue. This is done within
  92. // its own scope so that the task object is destructed immediately
  93. // after running the task. This is useful in the event that the
  94. // function contains shared_ptr arguments bound via bind.
  95. std::function<void()> functor = tasks_.front();
  96. tasks_.pop();
  97. lock.unlock();
  98. // Run the task.
  99. try {
  100. functor();
  101. } catch (...) {
  102. } // Suppress all exceptions.
  103. }
  104. }
  105. /**
  106. * execute the next task in queue or wait for the next one
  107. */
  108. void executeNext() {
  109. ready.store(true, std::memory_order_release);
  110. // Wait on condition variable while the task is empty
  111. std::unique_lock<std::mutex> lock(mutex_);
  112. while (tasks_.empty() && ready.load(std::memory_order_acquire)) {
  113. condition_.wait_for(lock, std::chrono::milliseconds(10));
  114. }
  115. lock.unlock();
  116. execute();
  117. ready.store(false, std::memory_order_release);
  118. }
  119. };
  120. }
  121. #endif /* _THREAD_SYNCRONIZER_H_ */