Merge pull request #13608 from jordan-woyak/async-work-thread

Common: Add AsyncWorkThread.
This commit is contained in:
Admiral H. Curtiss 2025-05-04 18:45:14 +02:00 committed by GitHub
commit d2db9d9590
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 63 additions and 31 deletions

View File

@ -39,10 +39,10 @@ void CubebStream::StateCallback(cubeb_stream* stream, void* user_data, cubeb_sta
CubebStream::CubebStream()
#ifdef _WIN32
: m_work_queue("Cubeb Worker", [](const std::function<void()>& func) { func(); })
: m_work_queue("Cubeb Worker")
{
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
auto result = ::CoInitializeEx(nullptr, COINIT_MULTITHREADED | COINIT_DISABLE_OLE1DDE);
m_coinit_success = result == S_OK;
@ -62,7 +62,7 @@ bool CubebStream::Init()
if (!m_coinit_success)
return false;
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &return_value, &sync_event] {
m_work_queue.Push([this, &return_value, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
@ -113,7 +113,7 @@ bool CubebStream::SetRunning(bool running)
if (!m_coinit_success)
return false;
Common::Event sync_event;
m_work_queue.EmplaceItem([this, running, &return_value, &sync_event] {
m_work_queue.Push([this, running, &return_value, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
if (running)
@ -132,7 +132,7 @@ CubebStream::~CubebStream()
{
#ifdef _WIN32
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
cubeb_stream_stop(m_stream);
@ -156,7 +156,7 @@ void CubebStream::SetVolume(int volume)
if (!m_coinit_success)
return;
Common::Event sync_event;
m_work_queue.EmplaceItem([this, volume, &sync_event] {
m_work_queue.Push([this, volume, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
cubeb_stream_set_volume(m_stream, volume / 100.0f);

View File

@ -39,7 +39,7 @@ private:
std::vector<float> m_floatstereo_buffer;
#ifdef _WIN32
Common::WorkQueueThread<std::function<void()>> m_work_queue;
Common::AsyncWorkThread m_work_queue;
bool m_coinit_success = false;
bool m_should_couninit = false;
#endif

View File

@ -5,6 +5,7 @@
#include <atomic>
#include <functional>
#include <future>
#include <mutex>
#include <string>
#include <thread>
@ -13,8 +14,6 @@
#include "Common/SPSCQueue.h"
#include "Common/Thread.h"
// A thread that executes the given function for every item placed into its queue.
namespace Common
{
namespace detail
@ -158,6 +157,38 @@ private:
using ProducerMutex = std::conditional_t<IsSingleProducer, DummyMutex, std::recursive_mutex>;
ProducerMutex m_mutex;
};
// A WorkQueueThread-like class that takes functions to invoke.
template <template <typename> typename WorkThread>
class AsyncWorkThreadBase
{
public:
using FuncType = std::function<void()>;
AsyncWorkThreadBase() = default;
explicit AsyncWorkThreadBase(std::string thread_name) { Reset(std::move(thread_name)); }
void Reset(std::string thread_name)
{
m_worker.Reset(std::move(thread_name), std::invoke<FuncType>);
}
void Push(FuncType func) { m_worker.Push(std::move(func)); }
auto PushBlocking(FuncType func)
{
std::packaged_task task{std::move(func)};
m_worker.EmplaceItem([&] { task(); });
return task.get_future().get();
}
void Cancel() { m_worker.Cancel(); }
void Shutdown() { m_worker.Shutdown(); }
void WaitForCompletion() { m_worker.WaitForCompletion(); }
private:
WorkThread<FuncType> m_worker;
};
} // namespace detail
// Multiple threads may use the public interface.
@ -169,4 +200,7 @@ using WorkQueueThread = detail::WorkQueueThreadBase<T, false>;
template <typename T>
using WorkQueueThreadSP = detail::WorkQueueThreadBase<T, true>;
using AsyncWorkThread = detail::AsyncWorkThreadBase<WorkQueueThread>;
using AsyncWorkThreadSP = detail::AsyncWorkThreadBase<WorkQueueThreadSP>;
} // namespace Common

View File

@ -77,9 +77,8 @@ void AchievementManager::Init(void* hwnd)
});
m_config_changed_callback_id = Config::AddConfigChangedCallback([this] { SetHardcoreMode(); });
SetHardcoreMode();
m_queue.Reset("AchievementManagerQueue", [](const std::function<void()>& func) { func(); });
m_image_queue.Reset("AchievementManagerImageQueue",
[](const std::function<void()>& func) { func(); });
m_queue.Reset("AchievementManagerQueue");
m_image_queue.Reset("AchievementManagerImageQueue");
#ifdef RC_CLIENT_SUPPORTS_RAINTEGRATION
// Attempt to load the integration DLL from the directory containing the main client executable.
@ -1268,7 +1267,7 @@ void AchievementManager::Request(const rc_api_request_t* request,
{
std::string url = request->url;
std::string post_data = request->post_data;
AchievementManager::GetInstance().m_queue.EmplaceItem(
AchievementManager::GetInstance().m_queue.Push(
[url = std::move(url), post_data = std::move(post_data), callback = std::move(callback),
callback_data = std::move(callback_data)] {
Common::HttpRequest http_request;
@ -1364,7 +1363,7 @@ u32 AchievementManager::MemoryPeeker(u32 address, u8* buffer, u32 num_bytes, rc_
void AchievementManager::FetchBadge(AchievementManager::Badge* badge, u32 badge_type,
const AchievementManager::BadgeNameFunction function,
const UpdatedItems callback_data)
UpdatedItems callback_data)
{
if (!m_client || !HasAPIToken())
{
@ -1374,8 +1373,8 @@ void AchievementManager::FetchBadge(AchievementManager::Badge* badge, u32 badge_
return;
}
m_image_queue.EmplaceItem([this, badge, badge_type, function = std::move(function),
callback_data = std::move(callback_data)] {
m_image_queue.Push([this, badge, badge_type, function = std::move(function),
callback_data = std::move(callback_data)] {
Common::ScopeGuard on_end_scope([&]() {
if (m_display_welcome_message && badge_type == RC_IMAGE_TYPE_GAME)
DisplayWelcomeMessage();

View File

@ -301,8 +301,8 @@ private:
std::string m_title_estimate;
#endif // RC_CLIENT_SUPPORTS_RAINTEGRATION
Common::WorkQueueThread<std::function<void()>> m_queue;
Common::WorkQueueThread<std::function<void()>> m_image_queue;
Common::AsyncWorkThread m_queue;
Common::AsyncWorkThread m_image_queue;
mutable std::recursive_mutex m_lock;
std::recursive_mutex m_filereader_lock;
}; // class AchievementManager

View File

@ -37,7 +37,7 @@ void CEXIMic::StreamInit()
if (!m_coinit_success)
return;
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
m_cubeb_ctx = CubebUtils::GetContext();
@ -57,7 +57,7 @@ void CEXIMic::StreamTerminate()
if (!m_coinit_success)
return;
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
m_cubeb_ctx.reset();
@ -105,7 +105,7 @@ void CEXIMic::StreamStart()
if (!m_coinit_success)
return;
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
// Open stream with current parameters
@ -152,7 +152,7 @@ void CEXIMic::StreamStop()
{
#ifdef _WIN32
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
#endif
if (cubeb_stream_stop(m_cubeb_stream) != CUBEB_OK)
@ -200,7 +200,7 @@ CEXIMic::CEXIMic(Core::System& system, int index)
: IEXIDevice(system), slot(index)
#ifdef _WIN32
,
m_work_queue("Mic Worker", [](const std::function<void()>& func) { func(); })
m_work_queue("Mic Worker")
#endif
{
m_position = 0;
@ -218,7 +218,7 @@ CEXIMic::CEXIMic(Core::System& system, int index)
#ifdef _WIN32
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
auto result = ::CoInitializeEx(nullptr, COINIT_MULTITHREADED | COINIT_DISABLE_OLE1DDE);
m_coinit_success = result == S_OK;
@ -238,7 +238,7 @@ CEXIMic::~CEXIMic()
if (m_should_couninit)
{
Common::Event sync_event;
m_work_queue.EmplaceItem([this, &sync_event] {
m_work_queue.Push([this, &sync_event] {
Common::ScopeGuard sync_event_guard([&sync_event] { sync_event.Set(); });
m_should_couninit = false;
CoUninitialize();

View File

@ -102,7 +102,7 @@ private:
int samples_avail;
#ifdef _WIN32
Common::WorkQueueThread<std::function<void()>> m_work_queue;
Common::AsyncWorkThread m_work_queue;
bool m_coinit_success = false;
bool m_should_couninit = false;
#endif

View File

@ -169,8 +169,7 @@ NetKDRequestDevice::NetKDRequestDevice(EmulationKernel& ios, const std::string&
});
m_handle_mail = !ios.GetIOSC().IsUsingDefaultId() && !m_send_list.IsDisabled();
m_scheduler_work_queue.Reset("WiiConnect24 Scheduler Worker",
[](std::function<void()> task) { task(); });
m_scheduler_work_queue.Reset("WiiConnect24 Scheduler Worker");
m_scheduler_timer_thread = std::thread([this] { SchedulerTimer(); });
}
@ -218,7 +217,7 @@ void NetKDRequestDevice::SchedulerTimer()
std::lock_guard lg(m_scheduler_lock);
if (m_mail_span <= mail_time_state && m_handle_mail)
{
m_scheduler_work_queue.EmplaceItem([this] { SchedulerWorker(SchedulerEvent::Mail); });
m_scheduler_work_queue.Push([this] { SchedulerWorker(SchedulerEvent::Mail); });
INFO_LOG_FMT(IOS_WC24, "NET_KD_REQ: Dispatching Mail Task from Scheduler");
mail_time_state = 0;
}
@ -226,7 +225,7 @@ void NetKDRequestDevice::SchedulerTimer()
if (m_download_span <= download_time_state && !m_dl_list.IsDisabled())
{
INFO_LOG_FMT(IOS_WC24, "NET_KD_REQ: Dispatching Download Task from Scheduler");
m_scheduler_work_queue.EmplaceItem([this] { SchedulerWorker(SchedulerEvent::Download); });
m_scheduler_work_queue.Push([this] { SchedulerWorker(SchedulerEvent::Download); });
download_time_state = 0;
}
}

View File

@ -111,7 +111,7 @@ private:
NWC24::Mail::WC24SendList m_send_list;
NWC24::Mail::WC24FriendList m_friend_list;
Common::WorkQueueThreadSP<AsyncTask> m_work_queue;
Common::WorkQueueThreadSP<std::function<void()>> m_scheduler_work_queue;
Common::AsyncWorkThreadSP m_scheduler_work_queue;
std::mutex m_async_reply_lock;
std::mutex m_scheduler_buffer_lock;
std::queue<AsyncReply> m_async_replies;