reactos/ntoskrnl/ex/work.c
Art Yerkes c501d8112c Create a branch for network fixes.
svn path=/branches/aicom-network-fixes/; revision=34994
2008-08-01 11:32:26 +00:00

664 lines
21 KiB
C

/*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS Kernel
* FILE: ntoskrnl/ex/work.c
* PURPOSE: Manage system work queues and worker threads
* PROGRAMMER: Alex Ionescu (alex@relsoft.net)
*/
/* INCLUDES ******************************************************************/
#include <ntoskrnl.h>
#define NDEBUG
#include <internal/debug.h>
#if defined (ALLOC_PRAGMA)
#pragma alloc_text(INIT, ExpInitializeWorkerThreads)
#endif
/* DATA **********************************************************************/
/* Number of worker threads for each Queue */
#define EX_HYPERCRITICAL_WORK_THREADS 1
#define EX_DELAYED_WORK_THREADS 3
#define EX_CRITICAL_WORK_THREADS 5
/* Magic flag for dynamic worker threads */
#define EX_DYNAMIC_WORK_THREAD 0x80000000
/* Worker thread priority increments (added to base priority) */
#define EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT 7
#define EX_CRITICAL_QUEUE_PRIORITY_INCREMENT 5
#define EX_DELAYED_QUEUE_PRIORITY_INCREMENT 4
/* The actual worker queue array */
EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
/* Accounting of the total threads and registry hacked threads */
ULONG ExpCriticalWorkerThreads;
ULONG ExpDelayedWorkerThreads;
ULONG ExpAdditionalCriticalWorkerThreads;
ULONG ExpAdditionalDelayedWorkerThreads;
/* Future support for stack swapping worker threads */
BOOLEAN ExpWorkersCanSwap;
LIST_ENTRY ExpWorkerListHead;
KMUTANT ExpWorkerSwapinMutex;
/* The worker balance set manager events */
KEVENT ExpThreadSetManagerEvent;
KEVENT ExpThreadSetManagerShutdownEvent;
/* Thread pointers for future worker thread shutdown support */
PETHREAD ExpWorkerThreadBalanceManagerPtr;
PETHREAD ExpLastWorkerThread;
/* PRIVATE FUNCTIONS *********************************************************/
/*++
* @name ExpWorkerThreadEntryPoint
*
* The ExpWorkerThreadEntryPoint routine is the entrypoint for any new
* worker thread created by teh system.
*
* @param Context
* Contains the work queue type masked with a flag specifing whether the
* thread is dynamic or not.
*
* @return None.
*
* @remarks A dynamic thread can timeout after 10 minutes of waiting on a queue
* while a static thread will never timeout.
*
* Worker threads must return at IRQL == PASSIVE_LEVEL, must not have
* active impersonation info, and must not have disabled APCs.
*
* NB: We will re-enable APCs for broken threads but all other cases
* will generate a bugcheck.
*
*--*/
VOID
NTAPI
ExpWorkerThreadEntryPoint(IN PVOID Context)
{
PWORK_QUEUE_ITEM WorkItem;
PLIST_ENTRY QueueEntry;
WORK_QUEUE_TYPE WorkQueueType;
PEX_WORK_QUEUE WorkQueue;
LARGE_INTEGER Timeout;
PLARGE_INTEGER TimeoutPointer = NULL;
PETHREAD Thread = PsGetCurrentThread();
KPROCESSOR_MODE WaitMode;
EX_QUEUE_WORKER_INFO OldValue, NewValue;
/* Check if this is a dyamic thread */
if ((ULONG_PTR)Context & EX_DYNAMIC_WORK_THREAD)
{
/* It is, which means we will eventually time out after 10 minutes */
Timeout.QuadPart = Int32x32To64(10, -10000000 * 60);
TimeoutPointer = &Timeout;
}
/* Get Queue Type and Worker Queue */
WorkQueueType = (WORK_QUEUE_TYPE)((ULONG_PTR)Context &
~EX_DYNAMIC_WORK_THREAD);
WorkQueue = &ExWorkerQueue[WorkQueueType];
/* Select the wait mode */
WaitMode = (UCHAR)WorkQueue->Info.WaitMode;
/* Nobody should have initialized this yet, do it now */
ASSERT(Thread->ExWorkerCanWaitUser == 0);
if (WaitMode == UserMode) Thread->ExWorkerCanWaitUser = TRUE;
/* If we shouldn't swap, disable that feature */
if (!ExpWorkersCanSwap) KeSetKernelStackSwapEnable(FALSE);
/* Set the worker flags */
do
{
/* Check if the queue is being disabled */
if (WorkQueue->Info.QueueDisabled)
{
/* Re-enable stack swapping and kill us */
KeSetKernelStackSwapEnable(TRUE);
PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
}
/* Increase the worker count */
OldValue = WorkQueue->Info;
NewValue = OldValue;
NewValue.WorkerCount++;
}
while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
*(PLONG)&NewValue,
*(PLONG)&OldValue) != *(PLONG)&OldValue);
/* Success, you are now officially a worker thread! */
Thread->ActiveExWorker = TRUE;
/* Loop forever */
ProcessLoop:
for (;;)
{
/* Wait for Something to Happen on the Queue */
QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue,
WaitMode,
TimeoutPointer);
/* Check if we timed out and quit this loop in that case */
if ((NTSTATUS)QueueEntry == STATUS_TIMEOUT) break;
/* Increment Processed Work Items */
InterlockedIncrement((PLONG)&WorkQueue->WorkItemsProcessed);
/* Get the Work Item */
WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
/* Make sure nobody is trying to play smart with us */
ASSERT((ULONG_PTR)WorkItem->WorkerRoutine > MmUserProbeAddress);
/* Call the Worker Routine */
WorkItem->WorkerRoutine(WorkItem->Parameter);
/* Make sure APCs are not disabled */
if (Thread->Tcb.SpecialApcDisable)
{
/* We're nice and do it behind your back */
DPRINT1("Warning: Broken Worker Thread: %p %lx %p came back "
"with APCs disabled!\n",
WorkItem->WorkerRoutine,
WorkItem->Parameter,
WorkItem);
Thread->Tcb.SpecialApcDisable = 0;
}
/* Make sure it returned at right IRQL */
if (KeGetCurrentIrql() != PASSIVE_LEVEL)
{
/* It didn't, bugcheck! */
KEBUGCHECKEX(WORKER_THREAD_RETURNED_AT_BAD_IRQL,
(ULONG_PTR)WorkItem->WorkerRoutine,
KeGetCurrentIrql(),
(ULONG_PTR)WorkItem->Parameter,
(ULONG_PTR)WorkItem);
}
/* Make sure it returned with Impersionation Disabled */
if (Thread->ActiveImpersonationInfo)
{
/* It didn't, bugcheck! */
KEBUGCHECKEX(IMPERSONATING_WORKER_THREAD,
(ULONG_PTR)WorkItem->WorkerRoutine,
(ULONG_PTR)WorkItem->Parameter,
(ULONG_PTR)WorkItem,
0);
}
}
/* This is a dynamic thread. Terminate it unless IRPs are pending */
if (!IsListEmpty(&Thread->IrpList)) goto ProcessLoop;
/* Don't terminate it if the queue is disabled either */
if (WorkQueue->Info.QueueDisabled) goto ProcessLoop;
/* Set the worker flags */
do
{
/* Decrease the worker count */
OldValue = WorkQueue->Info;
NewValue = OldValue;
NewValue.WorkerCount--;
}
while (InterlockedCompareExchange((PLONG)&WorkQueue->Info,
*(PLONG)&NewValue,
*(PLONG)&OldValue) != *(PLONG)&OldValue);
/* Decrement dynamic thread count */
InterlockedDecrement(&WorkQueue->DynamicThreadCount);
/* We're not a worker thread anymore */
Thread->ActiveExWorker = FALSE;
/* Re-enable the stack swap */
KeSetKernelStackSwapEnable(TRUE);
return;
}
/*++
* @name ExpCreateWorkerThread
*
* The ExpCreateWorkerThread routine creates a new worker thread for the
* specified queue.
**
* @param QueueType
* Type of the queue to use for this thread. Valid values are:
* - DelayedWorkQueue
* - CriticalWorkQueue
* - HyperCriticalWorkQueue
*
* @param Dynamic
* Specifies whether or not this thread is a dynamic thread.
*
* @return None.
*
* @remarks HyperCritical work threads run at priority 7; Critical work threads
* run at priority 5, and delayed work threads run at priority 4.
*
* This, worker threads cannot pre-empty a normal user-mode thread.
*
*--*/
VOID
NTAPI
ExpCreateWorkerThread(WORK_QUEUE_TYPE WorkQueueType,
IN BOOLEAN Dynamic)
{
PETHREAD Thread;
HANDLE hThread;
ULONG Context;
KPRIORITY Priority;
/* Check if this is going to be a dynamic thread */
Context = WorkQueueType;
/* Add the dynamic mask */
if (Dynamic) Context |= EX_DYNAMIC_WORK_THREAD;
/* Create the System Thread */
PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS,
NULL,
NULL,
NULL,
ExpWorkerThreadEntryPoint,
(PVOID)Context);
/* If the thread is dynamic */
if (Dynamic)
{
/* Increase the count */
InterlockedIncrement(&ExWorkerQueue[WorkQueueType].DynamicThreadCount);
}
/* Set the priority */
if (WorkQueueType == DelayedWorkQueue)
{
/* Priority == 4 */
Priority = EX_DELAYED_QUEUE_PRIORITY_INCREMENT;
}
else if (WorkQueueType == CriticalWorkQueue)
{
/* Priority == 5 */
Priority = EX_CRITICAL_QUEUE_PRIORITY_INCREMENT;
}
else
{
/* Priority == 7 */
Priority = EX_HYPERCRITICAL_QUEUE_PRIORITY_INCREMENT;
}
/* Get the Thread */
ObReferenceObjectByHandle(hThread,
THREAD_SET_INFORMATION,
PsThreadType,
KernelMode,
(PVOID*)&Thread,
NULL);
/* Set the Priority */
KeSetBasePriorityThread(&Thread->Tcb, Priority);
/* Dereference and close handle */
ObDereferenceObject(Thread);
ZwClose(hThread);
}
/*++
* @name ExpCheckDynamicThreadCount
*
* The ExpCheckDynamicThreadCount routine checks every queue and creates a
* dynamic thread if the queue seems to be deadlocked.
*
* @param None
*
* @return None.
*
* @remarks The algorithm for deciding if a new thread must be created is
* based on wether the queue has processed no new items in the last
* second, and new items are still enqueued.
*
*--*/
VOID
NTAPI
ExpDetectWorkerThreadDeadlock(VOID)
{
ULONG i;
PEX_WORK_QUEUE Queue;
/* Loop the 3 queues */
for (i = 0; i < MaximumWorkQueue; i++)
{
/* Get the queue */
Queue = &ExWorkerQueue[i];
ASSERT(Queue->DynamicThreadCount <= 16);
/* Check if stuff is on the queue that still is unprocessed */
if ((Queue->QueueDepthLastPass) &&
(Queue->WorkItemsProcessed == Queue->WorkItemsProcessedLastPass) &&
(Queue->DynamicThreadCount < 16))
{
/* Stuff is still on the queue and nobody did anything about it */
DPRINT1("EX: Work Queue Deadlock detected: %d\n", i);
ExpCreateWorkerThread(i, TRUE);
DPRINT1("Dynamic threads queued %d\n", Queue->DynamicThreadCount);
}
/* Update our data */
Queue->WorkItemsProcessedLastPass = Queue->WorkItemsProcessed;
Queue->QueueDepthLastPass = KeReadStateQueue(&Queue->WorkerQueue);
}
}
/*++
* @name ExpCheckDynamicThreadCount
*
* The ExpCheckDynamicThreadCount routine checks every queue and creates a
* dynamic thread if the queue requires one.
*
* @param None
*
* @return None.
*
* @remarks The algorithm for deciding if a new thread must be created is
* documented in the ExQueueWorkItem routine.
*
*--*/
VOID
NTAPI
ExpCheckDynamicThreadCount(VOID)
{
ULONG i;
PEX_WORK_QUEUE Queue;
/* Loop the 3 queues */
for (i = 0; i < MaximumWorkQueue; i++)
{
/* Get the queue */
Queue = &ExWorkerQueue[i];
/* Check if still need a new thread. See ExQueueWorkItem */
if ((Queue->Info.MakeThreadsAsNecessary) &&
(!IsListEmpty(&Queue->WorkerQueue.EntryListHead)) &&
(Queue->WorkerQueue.CurrentCount <
Queue->WorkerQueue.MaximumCount) &&
(Queue->DynamicThreadCount < 16))
{
/* Create a new thread */
DPRINT1("EX: Creating new dynamic thread as requested\n");
ExpCreateWorkerThread(i, TRUE);
}
}
}
/*++
* @name ExpWorkerThreadBalanceManager
*
* The ExpWorkerThreadBalanceManager routine is the entrypoint for the
* worker thread balance set manager.
*
* @param Context
* Unused.
*
* @return None.
*
* @remarks The worker thread balance set manager listens every second, but can
* also be woken up by an event when a new thread is needed, or by the
* special shutdown event. This thread runs at priority 7.
*
* This routine must run at IRQL == PASSIVE_LEVEL.
*
*--*/
VOID
NTAPI
ExpWorkerThreadBalanceManager(IN PVOID Context)
{
KTIMER Timer;
LARGE_INTEGER Timeout;
NTSTATUS Status;
PVOID WaitEvents[3];
PAGED_CODE();
UNREFERENCED_PARAMETER(Context);
/* Raise our priority above all other worker threads */
KeSetBasePriorityThread(KeGetCurrentThread(),
EX_CRITICAL_QUEUE_PRIORITY_INCREMENT + 1);
/* Setup the timer */
KeInitializeTimer(&Timer);
Timeout.QuadPart = Int32x32To64(-1, 10000000);
/* We'll wait on the periodic timer and also the emergency event */
WaitEvents[0] = &Timer;
WaitEvents[1] = &ExpThreadSetManagerEvent;
WaitEvents[2] = &ExpThreadSetManagerShutdownEvent;
/* Start wait loop */
for (;;)
{
/* Wait for the timer */
KeSetTimer(&Timer, Timeout, NULL);
Status = KeWaitForMultipleObjects(3,
WaitEvents,
WaitAny,
Executive,
KernelMode,
FALSE,
NULL,
NULL);
if (Status == 0)
{
/* Our timer expired. Check for deadlocks */
ExpDetectWorkerThreadDeadlock();
}
else if (Status == 1)
{
/* Someone notified us, verify if we should create a new thread */
ExpCheckDynamicThreadCount();
}
else if (Status == 2)
{
/* We are shutting down. Cancel the timer */
DPRINT1("System shutdown\n");
KeCancelTimer(&Timer);
/* Make sure we have a final thread */
ASSERT(ExpLastWorkerThread);
/* Wait for it */
KeWaitForSingleObject(ExpLastWorkerThread,
Executive,
KernelMode,
FALSE,
NULL);
/* Dereference it and kill us */
ObDereferenceObject(ExpLastWorkerThread);
PsTerminateSystemThread(STATUS_SYSTEM_SHUTDOWN);
}
}
}
/*++
* @name ExpInitializeWorkerThreads
*
* The ExpInitializeWorkerThreads routine initializes worker thread and
* work queue support.
*
* @param None.
*
* @return None.
*
* @remarks This routine is only called once during system initialization.
*
*--*/
VOID
INIT_FUNCTION
NTAPI
ExpInitializeWorkerThreads(VOID)
{
ULONG WorkQueueType;
ULONG CriticalThreads, DelayedThreads;
HANDLE ThreadHandle;
PETHREAD Thread;
ULONG i;
/* Setup the stack swap support */
KeInitializeMutex(&ExpWorkerSwapinMutex, FALSE);
InitializeListHead(&ExpWorkerListHead);
ExpWorkersCanSwap = TRUE;
/* Set the number of critical and delayed threads. We shouldn't hardcode */
DelayedThreads = EX_DELAYED_WORK_THREADS;
CriticalThreads = EX_CRITICAL_WORK_THREADS;
/* Protect against greedy registry modifications */
ExpAdditionalDelayedWorkerThreads =
min(ExpAdditionalDelayedWorkerThreads, 16);
ExpAdditionalCriticalWorkerThreads =
min(ExpAdditionalCriticalWorkerThreads, 16);
/* Calculate final count */
DelayedThreads += ExpAdditionalDelayedWorkerThreads;
CriticalThreads += ExpAdditionalCriticalWorkerThreads;
/* Initialize the Array */
for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++)
{
/* Clear the structure and initialize the queue */
RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
}
/* Dynamic threads are only used for the critical queue */
ExWorkerQueue[CriticalWorkQueue].Info.MakeThreadsAsNecessary = TRUE;
/* Initialize the balance set manager events */
KeInitializeEvent(&ExpThreadSetManagerEvent, SynchronizationEvent, FALSE);
KeInitializeEvent(&ExpThreadSetManagerShutdownEvent,
NotificationEvent,
FALSE);
/* Create the built-in worker threads for the critical queue */
for (i = 0; i < CriticalThreads; i++)
{
/* Create the thread */
ExpCreateWorkerThread(CriticalWorkQueue, FALSE);
ExpCriticalWorkerThreads++;
}
/* Create the built-in worker threads for the delayed queue */
for (i = 0; i < DelayedThreads; i++)
{
/* Create the thread */
ExpCreateWorkerThread(DelayedWorkQueue, FALSE);
ExpDelayedWorkerThreads++;
}
/* Create the built-in worker thread for the hypercritical queue */
ExpCreateWorkerThread(HyperCriticalWorkQueue, FALSE);
/* Create the balance set manager thread */
PsCreateSystemThread(&ThreadHandle,
THREAD_ALL_ACCESS,
NULL,
0,
NULL,
ExpWorkerThreadBalanceManager,
NULL);
/* Get a pointer to it for the shutdown process */
ObReferenceObjectByHandle(ThreadHandle,
THREAD_ALL_ACCESS,
NULL,
KernelMode,
(PVOID*)&Thread,
NULL);
ExpWorkerThreadBalanceManagerPtr = Thread;
/* Close the handle and return */
ZwClose(ThreadHandle);
}
/* PUBLIC FUNCTIONS **********************************************************/
/*++
* @name ExQueueWorkItem
* @implemented NT4
*
* The ExQueueWorkItem routine acquires rundown protection for
* the specified descriptor.
*
* @param WorkItem
* Pointer to an initialized Work Queue Item structure. This structure
* must be located in nonpaged pool memory.
*
* @param QueueType
* Type of the queue to use for this item. Can be one of the following:
* - DelayedWorkQueue
* - CriticalWorkQueue
* - HyperCriticalWorkQueue
*
* @return None.
*
* @remarks This routine is obsolete. Use IoQueueWorkItem instead.
*
* Callers of this routine must be running at IRQL <= DISPATCH_LEVEL.
*
*--*/
VOID
NTAPI
ExQueueWorkItem(IN PWORK_QUEUE_ITEM WorkItem,
IN WORK_QUEUE_TYPE QueueType)
{
PEX_WORK_QUEUE WorkQueue = &ExWorkerQueue[QueueType];
ASSERT(QueueType < MaximumWorkQueue);
ASSERT(WorkItem->List.Flink == NULL);
/* Don't try to trick us */
if ((ULONG_PTR)WorkItem->WorkerRoutine < MmUserProbeAddress)
{
/* Bugcheck the system */
KEBUGCHECKEX(WORKER_INVALID,
1,
(ULONG_PTR)WorkItem,
(ULONG_PTR)WorkItem->WorkerRoutine,
0);
}
/* Insert the Queue */
KeInsertQueue(&WorkQueue->WorkerQueue, &WorkItem->List);
ASSERT(!WorkQueue->Info.QueueDisabled);
/*
* Check if we need a new thread. Our decision is as follows:
* - This queue type must support Dynamic Threads (duh!)
* - It actually has to have unprocessed items
* - We have CPUs which could be handling another thread
* - We haven't abused our usage of dynamic threads.
*/
if ((WorkQueue->Info.MakeThreadsAsNecessary) &&
(!IsListEmpty(&WorkQueue->WorkerQueue.EntryListHead)) &&
(WorkQueue->WorkerQueue.CurrentCount <
WorkQueue->WorkerQueue.MaximumCount) &&
(WorkQueue->DynamicThreadCount < 16))
{
/* Let the balance manager know about it */
DPRINT1("Requesting a new thread. CurrentCount: %d. MaxCount: %d\n",
WorkQueue->WorkerQueue.CurrentCount,
WorkQueue->WorkerQueue.MaximumCount);
KeSetEvent(&ExpThreadSetManagerEvent, 0, FALSE);
}
}
/* EOF */