Skip to content

Commit

Permalink
Transfer ThreadPool local queue to high-pri queue on Task blocking (#…
Browse files Browse the repository at this point in the history
…109989)

Added for now under the same config flag as is being used for other work prioritization experimentation.

Co-authored-by: Stephen Toub <[email protected]>
  • Loading branch information
github-actions[bot] and stephentoub authored Nov 26, 2024
1 parent 2c860a7 commit d955059
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3056,6 +3056,27 @@ private bool SpinThenBlockingWait(int millisecondsTimeout, CancellationToken can
bool returnValue = SpinWait(millisecondsTimeout);
if (!returnValue)
{
#if CORECLR
if (ThreadPoolWorkQueue.s_prioritizationExperiment)
{
// We're about to block waiting for the task to complete, which is expensive, and if
// the task being waited on depends on some other work to run, this thread could end up
// waiting for some other thread to do work. If the two threads are part of the same scheduler,
// such as the thread pool, that could lead to a (temporary) deadlock. This is made worse by
// it also leading to a possible priority inversion on previously queued work. Each thread in
// the thread pool has a local queue. A key motivator for this local queue is it allows this
// thread to create work items that it will then prioritize above all other work in the
// pool. However, while this thread makes its own local queue the top priority, that queue is
// every other thread's lowest priority. If this thread blocks, all of its created work that's
// supposed to be high priority becomes low priority, and work that's typically part of a
// currently in-flight operation gets deprioritized relative to new requests coming into the
// pool, which can lead to the whole system slowing down or even deadlocking. To address that,
// just before we block, we move all local work into a global queue, so that it's at least
// prioritized by other threads more fairly with respect to other work.
ThreadPoolWorkQueue.TransferAllLocalWorkItemsToHighPriorityGlobalQueue();
}
#endif

var mres = new SetOnInvokeMres();
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,27 @@ public void EnqueueAtHighPriority(object workItem)
EnsureThreadRequested();
}

internal static void TransferAllLocalWorkItemsToHighPriorityGlobalQueue()
{
// If there's no local queue, there's nothing to transfer.
if (ThreadPoolWorkQueueThreadLocals.threadLocals is not ThreadPoolWorkQueueThreadLocals tl)
{
return;
}

// Pop each work item off the local queue and push it onto the global. This is a
// bounded loop as no other thread is allowed to push into this thread's queue.
ThreadPoolWorkQueue queue = ThreadPool.s_workQueue;
while (tl.workStealingQueue.LocalPop() is object workItem)
{
queue.highPriorityWorkItems.Enqueue(workItem);
}

Volatile.Write(ref queue._mayHaveHighPriorityWorkItems, true);

queue.EnsureThreadRequested();
}

internal static bool LocalFindAndPop(object callback)
{
ThreadPoolWorkQueueThreadLocals? tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,12 +1263,13 @@ public static void PrioritizationExperimentConfigVarTest()
RemoteExecutor.Invoke(() =>
{
const int WorkItemCountPerKind = 100;
const int Kinds = 3;

int completedWorkItemCount = 0;
var allWorkItemsCompleted = new AutoResetEvent(false);
Action<int> workItem = _ =>
{
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * 3)
if (Interlocked.Increment(ref completedWorkItemCount) == WorkItemCountPerKind * Kinds)
{
allWorkItemsCompleted.Set();
}
Expand Down Expand Up @@ -1305,6 +1306,27 @@ public static void PrioritizationExperimentConfigVarTest()
0,
preferLocal: false);

ThreadPool.UnsafeQueueUserWorkItem(
_ =>
{
// Enqueue tasks from a thread pool thread into the local queue,
// then block this thread until a queued task completes.

startTest.CheckedWait();

Task queued = null;
for (int i = 0; i < WorkItemCountPerKind; i++)
{
queued = Task.Run(() => workItem(0));
}

queued
.ContinueWith(_ => { }) // prevent wait inlining
.Wait();
},
0,
preferLocal: false);

t = new Thread(() =>
{
// Enqueue local work from thread pool worker threads
Expand Down

0 comments on commit d955059

Please sign in to comment.