WorkQueueThread: Add ability to gather items while running. Add Empty and Size functions.

This commit is contained in:
Jordan Woyak 2025-04-30 23:12:31 -05:00
parent 776086fa1c
commit 0920e2f805
2 changed files with 42 additions and 2 deletions

View File

@ -98,6 +98,27 @@ public:
m_items.WaitForEmpty();
}
[[nodiscard]] bool Empty() const { return m_items.Empty(); }
[[nodiscard]] auto Size() const { return m_items.Size(); }
// Takes unprocessed items in a blocking manner.
void GatherItems(std::invocable<T&&> auto&& gather_func)
{
auto lg = GetLockGuard();
// Fast path avoids round trip thread communication and saves ~20us.
if (m_items.Empty())
return;
RunCommand([&] {
while (!m_items.Empty())
{
gather_func(std::move(m_items.Front()));
m_items.Pop();
}
});
}
private:
using CommandFunction = std::function<void()>;

View File

@ -9,10 +9,13 @@ TEST(WorkQueueThread, Simple)
{
Common::WorkQueueThreadSP<int> worker;
constexpr int BIG_VAL = 1000;
constexpr int BIG_VAL = 100;
int x = 0;
const auto func = [&](int value) { x = value; };
const auto func = [&](int value) {
x = value;
std::this_thread::yield();
};
worker.Push(1);
worker.WaitForCompletion();
@ -26,12 +29,14 @@ TEST(WorkQueueThread, Simple)
worker.WaitForCompletion();
// Items pushed before Reset are processed.
EXPECT_EQ(x, 1);
EXPECT_EQ(worker.Empty(), true);
worker.Shutdown();
worker.Push(0);
worker.WaitForCompletion();
// Still 1 because it's no longer running.
EXPECT_EQ(x, 1);
EXPECT_EQ(worker.Empty(), false);
worker.Cancel();
worker.Reset("test worker", func);
@ -39,6 +44,7 @@ TEST(WorkQueueThread, Simple)
// Still 1 because the work was canceled.
EXPECT_EQ(x, 1);
x = 0;
for (int i = 0; i != BIG_VAL; ++i)
worker.Push(i);
worker.Cancel();
@ -50,4 +56,17 @@ TEST(WorkQueueThread, Simple)
worker.WaitForCompletion();
// Still running after cancelation.
EXPECT_EQ(x, 2);
int processed_count = 0;
worker.Reset("test worker", [&](auto) { ++processed_count; });
for (int i = 0; i != BIG_VAL; ++i)
worker.Push(i);
int gather_count = 0;
worker.GatherItems([&](auto) { ++gather_count; });
EXPECT_EQ(worker.Empty(), true);
// Gathered all the items that weren't processed.
EXPECT_EQ(processed_count + gather_count, BIG_VAL);
GTEST_LOG_(INFO) << "Gathered items " << gather_count;
}