Dispatching & Queue Rewrite II:

- Rewrote wait code. It is now cleaner, more optimized and faster. All waiting
      functions are now clearly differentiated instead of sharing code. These functions
      are called up to a dozen times a second, so having dedicated code for each of
      them is a real boost in speed.
    - Fixed several queue issues, made a dedicated queue wait/wake function (you are not
      supposed to use KeWaitFor on a queue, and this is also a speed boost), and make it
      compatible with new wait code.
    - Optimized Work Queue code to be much smaller and better organized, by using an 
      array instead of hard-coded multiple static variables. Also, add support for the
      real NT structures and implementation, paving the road for Dynamic Work Items, which
      also have timeouts, and deadlock dection + debug info.
    - Simplified PsBlockThread and made it compatible with wait code.
    - Added support for priority boosting when unwaiting a thread; will use later, as well
      as put proper boosting for dispatch objects.
    - Inlined all dispatcher lock functions and header initialization for speed.
    - Moved executive wait code into ob.

svn path=/trunk/; revision=14047
This commit is contained in:
Alex Ionescu 2005-03-14 05:54:32 +00:00
parent e33a78f93b
commit 6a7ba78c91
19 changed files with 1436 additions and 1375 deletions

View file

@ -5,7 +5,6 @@
# Possible values in the future: alpha,i386,m68k,mips,powerpc
ARCH := i386
#
# Which cpu should reactos optimize for
# example : i486, i586, pentium, pentium2, pentium3, pentium4
@ -32,11 +31,6 @@ DBG := 0
#
CONFIG_SMP := 0
#
# whether to use a 3GB User, 1GB Kernel memory map
#
3GB := 0
#
# Which version of NDIS do we support up to?
#

View file

@ -30,6 +30,22 @@ typedef enum _WORK_QUEUE_TYPE {
MaximumWorkQueue
} WORK_QUEUE_TYPE;
typedef struct _EX_QUEUE_WORKER_INFO {
UCHAR QueueDisabled:1;
UCHAR MakeThreadsAsNecessary:1;
UCHAR WaitMode:1;
ULONG WorkerCount:29;
} EX_QUEUE_WORKER_INFO, *PEX_QUEUE_WORKER_INFO;
typedef struct _EX_WORK_QUEUE {
KQUEUE WorkerQueue;
ULONG DynamicThreadCount;
ULONG WorkItemsProcessed;
ULONG WorkItemsProcessedLastPass;
ULONG QueueDepthLastPass;
EX_QUEUE_WORKER_INFO Info;
} EX_WORK_QUEUE, *PEX_WORK_QUEUE;
typedef ULONG_PTR ERESOURCE_THREAD, *PERESOURCE_THREAD;
typedef struct _OWNER_ENTRY

View file

@ -212,7 +212,8 @@ OBJECTS_OB = \
ob/object.o \
ob/sdcache.o \
ob/security.o \
ob/symlink.o
ob/symlink.o \
ob/wait.o
# Process Manager (Ps)
OBJECTS_PS = \

View file

@ -27,11 +27,9 @@ extern ULONG_PTR LastKrnlPhysAddr;
extern ULONG_PTR LastKernelAddress;
extern LOADER_MODULE KeLoaderModules[64];
extern PRTL_MESSAGE_RESOURCE_DATA KiBugCodeMessages;
#if 0
extern LIST_ENTRY KiProfileListHead;
extern LIST_ENTRY KiProfileSourceListHead;
extern KSPIN_LOCK KiProfileLock;
#endif
VOID PspPostInitSystemProcess(VOID);
@ -427,12 +425,10 @@ ExpInitializeExecutive(VOID)
/* Bring back the IRQL to Passive */
KeLowerIrql(PASSIVE_LEVEL);
#if 0
/* Initialize Profiling */
InitializeListHead(&KiProfileListHead);
InitializeListHead(&KiProfileSourceListHead);
KeInitializeSpinLock(&KiProfileLock);
#endif
/* Load basic Security for other Managers */
if (!SeInit1()) KEBUGCHECK(SECURITY_INITIALIZATION_FAILED);
@ -488,7 +484,7 @@ ExpInitializeExecutive(VOID)
PspPostInitSystemProcess();
/* initialize the worker threads */
ExInitializeWorkerThreads();
ExpInitializeWorkerThreads();
/* initialize callbacks */
ExpInitializeCallbacks();

View file

@ -1,11 +1,11 @@
/* $Id$
*
/*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
* FILE: ntoskrnl/ex/work.c
* PURPOSE: Manage system work queues
*
* PROGRAMMERS: David Welch (welch@mcmail.com)
* PROGRAMMERS: Alex Ionescu - Used correct work queue array and added some fixes and checks.
* Gunnar Dalsnes - Implemented
*/
/* INCLUDES ******************************************************************/
@ -25,17 +25,10 @@
/*
* PURPOSE: Queue of items waiting to be processed at normal priority
*/
KQUEUE EiNormalWorkQueue;
KQUEUE EiCriticalWorkQueue;
KQUEUE EiHyperCriticalWorkQueue;
EX_WORK_QUEUE ExWorkerQueue[MaximumWorkQueue];
/* FUNCTIONS ****************************************************************/
//static NTSTATUS STDCALL
static VOID STDCALL
ExWorkerThreadEntryPoint(IN PVOID context)
/*
* FUNCTION: Entry point for a worker thread
* ARGUMENTS:
@ -44,161 +37,138 @@ ExWorkerThreadEntryPoint(IN PVOID context)
* NOTE: To kill a worker thread you must queue an item whose callback
* calls PsTerminateSystemThread
*/
static
VOID
STDCALL
ExpWorkerThreadEntryPoint(IN PVOID Context)
{
PWORK_QUEUE_ITEM item;
PLIST_ENTRY current;
PWORK_QUEUE_ITEM WorkItem;
PLIST_ENTRY QueueEntry;
WORK_QUEUE_TYPE WorkQueueType;
PEX_WORK_QUEUE WorkQueue;
while (TRUE)
{
current = KeRemoveQueue( (PKQUEUE)context, KernelMode, NULL );
/* Get Queue Type and Worker Queue */
WorkQueueType = (WORK_QUEUE_TYPE)Context;
WorkQueue = &ExWorkerQueue[WorkQueueType];
/* Loop forever */
while (TRUE) {
/* Wait for Something to Happen on the Queue */
QueueEntry = KeRemoveQueue(&WorkQueue->WorkerQueue, KernelMode, NULL);
/* can't happend since we do a KernelMode wait (and we're a system thread) */
ASSERT((NTSTATUS)current != STATUS_USER_APC);
/* Can't happen since we do a KernelMode wait (and we're a system thread) */
ASSERT((NTSTATUS)QueueEntry != STATUS_USER_APC);
/* this should never happend either, since we wait with NULL timeout,
* but there's a slight possibility that STATUS_TIMEOUT is returned
* at queue rundown in NT (unlikely) -Gunnar
*/
ASSERT((NTSTATUS)current != STATUS_TIMEOUT);
/* based on INVALID_WORK_QUEUE_ITEM bugcheck desc. */
if (current->Flink == NULL || current->Blink == NULL)
{
KeBugCheck(INVALID_WORK_QUEUE_ITEM);
}
/* "reinitialize" item (same as done in ExInitializeWorkItem) */
current->Flink = NULL;
item = CONTAINING_RECORD( current, WORK_QUEUE_ITEM, List);
item->WorkerRoutine(item->Parameter);
if (KeGetCurrentIrql() != PASSIVE_LEVEL)
{
KeBugCheck(IRQL_NOT_LESS_OR_EQUAL);
}
}
}
/* this should never happen either, since we wait with NULL timeout,
* but there's a slight possibility that STATUS_TIMEOUT is returned
* at queue rundown in NT (unlikely) -Gunnar
*/
ASSERT((NTSTATUS)QueueEntry != STATUS_TIMEOUT);
/* Increment Processed Work Items */
InterlockedIncrement(&WorkQueue->WorkItemsProcessed);
static VOID ExInitializeWorkQueue(PKQUEUE WorkQueue,
KPRIORITY Priority)
{
ULONG i;
PETHREAD Thread;
HANDLE hThread;
NTSTATUS Status;
PAGED_CODE();
/* Loop through how many threads we need to create */
for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) {
/* Create the System Thread */
Status = PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS,
NULL,
NULL,
NULL,
ExWorkerThreadEntryPoint,
(PVOID)WorkQueue);
if(NT_SUCCESS(Status))
{
/* Get the Thread */
Status = ObReferenceObjectByHandle(hThread,
THREAD_SET_INFORMATION,
PsThreadType,
KernelMode,
(PVOID*)&Thread,
NULL);
if(NT_SUCCESS(Status))
{
/* Set the Priority */
KeSetPriorityThread(&Thread->Tcb, Priority);
/* Dereference and close handle */
ObDereferenceObject(Thread);
ZwClose(hThread);
}
else
{
DPRINT1("Unable to reference worker thread handle 0x%x, Status: 0x%x!\n", hThread, Status);
KEBUGCHECK(0);
}
/* Get the Work Item */
WorkItem = CONTAINING_RECORD(QueueEntry, WORK_QUEUE_ITEM, List);
/* Call the Worker Routine */
WorkItem->WorkerRoutine(WorkItem->Parameter);
/* Make sure it returned at right IRQL */
if (KeGetCurrentIrql() != PASSIVE_LEVEL) {
/* FIXME: Make this an Ex */
KEBUGCHECK(WORKER_THREAD_RETURNED_AT_BAD_IRQL);
}
else
{
DPRINT1("Unable to create worker thread, Status: 0x%x!\n", Status);
KEBUGCHECK(0);
/* Make sure it returned with Impersionation Disabled */
if (PsGetCurrentThread()->ActiveImpersonationInfo) {
/* FIXME: Make this an Ex */
KEBUGCHECK(IMPERSONATING_WORKER_THREAD);
}
}
}
VOID INIT_FUNCTION
ExInitializeWorkerThreads(VOID)
static
VOID
STDCALL
ExpInitializeWorkQueue(WORK_QUEUE_TYPE WorkQueueType,
KPRIORITY Priority)
{
KeInitializeQueue( &EiNormalWorkQueue, NUMBER_OF_WORKER_THREADS );
KeInitializeQueue( &EiCriticalWorkQueue , NUMBER_OF_WORKER_THREADS );
KeInitializeQueue( &EiHyperCriticalWorkQueue , NUMBER_OF_WORKER_THREADS );
ULONG i;
PETHREAD Thread;
HANDLE hThread;
/* Loop through how many threads we need to create */
for (i = 0; i < NUMBER_OF_WORKER_THREADS; i++) {
/* Create the System Thread */
PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS,
NULL,
NULL,
NULL,
ExpWorkerThreadEntryPoint,
(PVOID)WorkQueueType);
/* Get the Thread */
ObReferenceObjectByHandle(hThread,
THREAD_SET_INFORMATION,
PsThreadType,
KernelMode,
(PVOID*)&Thread,
NULL);
/* Set the Priority */
KeSetPriorityThread(&Thread->Tcb, Priority);
/* Dereference and close handle */
ObDereferenceObject(Thread);
ZwClose(hThread);
}
}
ExInitializeWorkQueue(&EiNormalWorkQueue,
LOW_PRIORITY);
ExInitializeWorkQueue(&EiCriticalWorkQueue,
LOW_REALTIME_PRIORITY);
ExInitializeWorkQueue(&EiHyperCriticalWorkQueue,
HIGH_PRIORITY);
VOID
INIT_FUNCTION
ExpInitializeWorkerThreads(VOID)
{
ULONG WorkQueueType;
/* Initialize the Array */
for (WorkQueueType = 0; WorkQueueType < MaximumWorkQueue; WorkQueueType++) {
RtlZeroMemory(&ExWorkerQueue[WorkQueueType], sizeof(EX_WORK_QUEUE));
KeInitializeQueue(&ExWorkerQueue[WorkQueueType].WorkerQueue, 0);
}
/* Create the built-in worker threads for each work queue */
ExpInitializeWorkQueue(CriticalWorkQueue, LOW_REALTIME_PRIORITY);
ExpInitializeWorkQueue(DelayedWorkQueue, LOW_PRIORITY);
ExpInitializeWorkQueue(HyperCriticalWorkQueue, HIGH_PRIORITY);
}
/*
* @implemented
*/
VOID STDCALL
ExQueueWorkItem (PWORK_QUEUE_ITEM WorkItem,
WORK_QUEUE_TYPE QueueType)
/*
*
* FUNCTION: Inserts a work item in a queue for one of the system worker
* threads to process
* ARGUMENTS:
* WorkItem = Item to insert
* QueueType = Queue to insert it in
*/
VOID
STDCALL
ExQueueWorkItem(PWORK_QUEUE_ITEM WorkItem,
WORK_QUEUE_TYPE QueueType)
{
ASSERT(WorkItem!=NULL);
ASSERT_IRQL(DISPATCH_LEVEL);
ASSERT(WorkItem->List.Flink == NULL);
/*
* Insert the item in the appropiate queue and wake up any thread
* waiting for something to do
*/
switch(QueueType)
{
case DelayedWorkQueue:
KeInsertQueue (
&EiNormalWorkQueue,
&WorkItem->List
);
break;
case CriticalWorkQueue:
KeInsertQueue (
&EiCriticalWorkQueue,
&WorkItem->List
);
break;
case HyperCriticalWorkQueue:
KeInsertQueue (
&EiHyperCriticalWorkQueue,
&WorkItem->List
);
break;
default:
break;
}
/* Insert the Queue */
KeInsertQueue(&ExWorkerQueue[QueueType].WorkerQueue, &WorkItem->List);
}
/* EOF */

View file

@ -86,7 +86,9 @@ extern LARGE_INTEGER ExpTimeZoneBias;
extern ULONG ExpTimeZoneId;
extern POBJECT_TYPE ExEventPairObjectType;
extern POBJECT_TYPE EXPORTED ExMutantObjectType;
extern POBJECT_TYPE EXPORTED ExSemaphoreObjectType;
extern POBJECT_TYPE EXPORTED ExTimerType;
/* INITIALIZATION FUNCTIONS *************************************************/
@ -100,7 +102,7 @@ ExInit3(VOID);
VOID
ExpInitTimeZoneInfo(VOID);
VOID
ExInitializeWorkerThreads(VOID);
ExpInitializeWorkerThreads(VOID);
VOID
ExpInitLookasideLists(VOID);
VOID

View file

@ -139,33 +139,38 @@ KeProfileInterruptWithSource(
IN KPROFILE_SOURCE Source
);
VOID KiInsertProfileIntoProcess(PLIST_ENTRY ListHead, PKPROFILE Profile);
VOID KiInsertProfile(PKPROFILE Profile);
VOID KiRemoveProfile(PKPROFILE Profile);
VOID STDCALL KiDeleteProfile(PVOID ObjectBody);
VOID STDCALL KeUpdateSystemTime(PKTRAP_FRAME TrapFrame, KIRQL Irql);
VOID STDCALL KeUpdateRunTime(PKTRAP_FRAME TrapFrame, KIRQL Irql);
VOID STDCALL KiExpireTimers(PKDPC Dpc, PVOID DeferredContext, PVOID SystemArgument1, PVOID SystemArgument2);
KIRQL KeAcquireDispatcherDatabaseLock(VOID);
VOID KeAcquireDispatcherDatabaseLockAtDpcLevel(VOID);
VOID KeReleaseDispatcherDatabaseLock(KIRQL Irql);
VOID KeReleaseDispatcherDatabaseLockFromDpcLevel(VOID);
KIRQL inline FASTCALL KeAcquireDispatcherDatabaseLock(VOID);
VOID inline FASTCALL KeAcquireDispatcherDatabaseLockAtDpcLevel(VOID);
VOID inline FASTCALL KeReleaseDispatcherDatabaseLock(KIRQL Irql);
VOID inline FASTCALL KeReleaseDispatcherDatabaseLockFromDpcLevel(VOID);
BOOLEAN KiDispatcherObjectWake(DISPATCHER_HEADER* hdr, KPRIORITY increment);
VOID STDCALL KeExpireTimers(PKDPC Apc,
PVOID Arg1,
PVOID Arg2,
PVOID Arg3);
VOID KeInitializeDispatcherHeader(DISPATCHER_HEADER* Header, ULONG Type,
ULONG Size, ULONG SignalState);
VOID inline FASTCALL KeInitializeDispatcherHeader(DISPATCHER_HEADER* Header, ULONG Type,
ULONG Size, ULONG SignalState);
VOID KeDumpStackFrames(PULONG Frame);
BOOLEAN KiTestAlert(VOID);
BOOLEAN KiAbortWaitThread(struct _KTHREAD* Thread, NTSTATUS WaitStatus);
VOID FASTCALL KiAbortWaitThread(struct _KTHREAD* Thread, NTSTATUS WaitStatus);
BOOLEAN STDCALL KiInsertTimer(PKTIMER Timer, LARGE_INTEGER DueTime);
VOID inline FASTCALL KiSatisfyObjectWait(PDISPATCHER_HEADER Object, PKTHREAD Thread);
BOOLEAN inline FASTCALL KiIsObjectSignaled(PDISPATCHER_HEADER Object, PKTHREAD Thread);
VOID inline FASTCALL KiSatisifyMultipleObjectWaits(PKWAIT_BLOCK WaitBlock);
VOID FASTCALL KiWaitTest(PDISPATCHER_HEADER Object, KPRIORITY Increment);
PULONG KeGetStackTopThread(struct _ETHREAD* Thread);
VOID KeContextToTrapFrame(PCONTEXT Context, PKTRAP_FRAME TrapFrame);
@ -191,6 +196,7 @@ STDCALL
KeTestAlertThread(IN KPROCESSOR_MODE AlertMode);
BOOLEAN STDCALL KeRemoveQueueApc (PKAPC Apc);
VOID FASTCALL KiWakeQueue(IN PKQUEUE Queue);
PLIST_ENTRY STDCALL KeRundownQueue(IN PKQUEUE Queue);
extern LARGE_INTEGER SystemBootTime;
@ -202,7 +208,7 @@ VOID KeInitInterrupts(VOID);
VOID KeInitTimer(VOID);
VOID KeInitDpc(struct _KPRCB* Prcb);
VOID KeInitDispatcher(VOID);
VOID KeInitializeDispatcher(VOID);
VOID inline FASTCALL KeInitializeDispatcher(VOID);
VOID KiInitializeSystemClock(VOID);
VOID KiInitializeBugCheck(VOID);
VOID Phase1Initialization(PVOID Context);

View file

@ -525,8 +525,11 @@ VOID PsUnfreezeProcessThreads(PEPROCESS Process);
ULONG PsEnumThreadsByProcess(PEPROCESS Process);
PEPROCESS PsGetNextProcess(PEPROCESS OldProcess);
VOID
PsBlockThread(PNTSTATUS Status, UCHAR Alertable, ULONG WaitMode,
BOOLEAN DispatcherLock, KIRQL WaitIrql, UCHAR WaitReason);
STDCALL
PsBlockThread(PNTSTATUS Status,
UCHAR Alertable,
ULONG WaitMode,
UCHAR WaitReason);
VOID
PsUnblockThread(PETHREAD Thread, PNTSTATUS WaitStatus, KPRIORITY Increment);
VOID

View file

@ -91,7 +91,7 @@ KePulseEvent(IN PKEVENT Event,
Event->Header.SignalState = 1;
/* Wake the Event */
KiDispatcherObjectWake(&Event->Header, Increment);
KiWaitTest(&Event->Header, Increment);
}
/* Unsignal it */
@ -196,7 +196,7 @@ KeSetEvent(PKEVENT Event,
/* We must do a full wait satisfaction */
DPRINT("Notification Event or WaitAll, Wait on the Event and Signal\n");
Event->Header.SignalState = 1;
KiDispatcherObjectWake(&Event->Header, Increment);
KiWaitTest(&Event->Header, Increment);
}
} else {

View file

@ -204,7 +204,7 @@ SaveTrapFrameForKDB_Return:
*/
sti
call _KeReleaseDispatcherDatabaseLockFromDpcLevel
call @KeReleaseDispatcherDatabaseLockFromDpcLevel@0
cmpl $0, _PiNrThreadsAwaitingReaping
je 5f

View file

@ -56,7 +56,7 @@ KeAlertResumeThread(IN PKTHREAD Thread)
/* Signal and satisfy */
Thread->SuspendSemaphore.Header.SignalState++;
KiDispatcherObjectWake(&Thread->SuspendSemaphore.Header, IO_NO_INCREMENT);
KiWaitTest(&Thread->SuspendSemaphore.Header, IO_NO_INCREMENT);
}
}

View file

@ -177,7 +177,7 @@ KeReleaseMutant(IN PKMUTANT Mutant,
/* Wake the Mutant */
DPRINT("Waking the Mutant\n");
KiDispatcherObjectWake(&Mutant->Header, Increment);
KiWaitTest(&Mutant->Header, Increment);
}
}

View file

@ -16,245 +16,483 @@
/* FUNCTIONS *****************************************************************/
LONG STDCALL KiInsertQueue(IN PKQUEUE Queue, IN PLIST_ENTRY Entry, BOOLEAN Head);
/*
* @implemented
*/
VOID STDCALL
VOID
STDCALL
KeInitializeQueue(IN PKQUEUE Queue,
IN ULONG Count OPTIONAL)
IN ULONG Count OPTIONAL)
{
KeInitializeDispatcherHeader(&Queue->Header,
QueueObject,
sizeof(KQUEUE)/sizeof(ULONG),
0);
InitializeListHead(&Queue->EntryListHead);
InitializeListHead(&Queue->ThreadListHead);
Queue->CurrentCount = 0;
Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
DPRINT("KeInitializeQueue %x\n", Queue);
/* Initialize the Header */
KeInitializeDispatcherHeader(&Queue->Header,
QueueObject,
sizeof(KQUEUE)/sizeof(ULONG),
0);
/* Initialize the Lists */
InitializeListHead(&Queue->EntryListHead);
InitializeListHead(&Queue->ThreadListHead);
/* Set the Current and Maximum Count */
Queue->CurrentCount = 0;
Queue->MaximumCount = (Count == 0) ? (ULONG) KeNumberProcessors : Count;
}
/*
* @implemented
*
* Returns number of entries in the queue
*/
LONG STDCALL
KeReadStateQueue(IN PKQUEUE Queue)
{
return(Queue->Header.SignalState);
}
/*
* Returns the previous number of entries in the queue
*/
LONG STDCALL
KiInsertQueue(
IN PKQUEUE Queue,
IN PLIST_ENTRY Entry,
BOOLEAN Head
)
{
ULONG InitialState;
DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
InitialState = Queue->Header.SignalState;
if (Head)
{
InsertHeadList(&Queue->EntryListHead, Entry);
}
else
{
InsertTailList(&Queue->EntryListHead, Entry);
}
//inc. num entries in queue
Queue->Header.SignalState++;
/* Why the KeGetCurrentThread()->Queue != Queue?
* KiInsertQueue might be called from an APC for the current thread.
* -Gunnar
*/
if (Queue->CurrentCount < Queue->MaximumCount &&
!IsListEmpty(&Queue->Header.WaitListHead) &&
KeGetCurrentThread()->Queue != Queue)
{
KiDispatcherObjectWake(&Queue->Header, IO_NO_INCREMENT);
}
return InitialState;
}
/*
* @implemented
*/
LONG STDCALL
LONG
STDCALL
KeInsertHeadQueue(IN PKQUEUE Queue,
IN PLIST_ENTRY Entry)
IN PLIST_ENTRY Entry)
{
LONG Result;
KIRQL OldIrql;
LONG PreviousState;
KIRQL OldIrql;
DPRINT("KeInsertHeadQueue %x\n", Queue);
/* Lock the Dispatcher Database */
OldIrql = KeAcquireDispatcherDatabaseLock();
/* Insert the Queue */
PreviousState = KiInsertQueue(Queue, Entry, TRUE);
/* Release the Dispatcher Lock */
KeReleaseDispatcherDatabaseLock(OldIrql);
OldIrql = KeAcquireDispatcherDatabaseLock();
Result = KiInsertQueue(Queue,Entry,TRUE);
KeReleaseDispatcherDatabaseLock(OldIrql);
return Result;
/* Return previous State */
return PreviousState;
}
/*
* @implemented
*/
LONG STDCALL
KeInsertQueue(IN PKQUEUE Queue,
IN PLIST_ENTRY Entry)
IN PLIST_ENTRY Entry)
{
LONG Result;
KIRQL OldIrql;
LONG PreviousState;
KIRQL OldIrql;
DPRINT("KeInsertQueue %x\n", Queue);
/* Lock the Dispatcher Database */
OldIrql = KeAcquireDispatcherDatabaseLock();
/* Insert the Queue */
PreviousState = KiInsertQueue(Queue, Entry, FALSE);
/* Release the Dispatcher Lock */
KeReleaseDispatcherDatabaseLock(OldIrql);
OldIrql = KeAcquireDispatcherDatabaseLock();
Result = KiInsertQueue(Queue,Entry,FALSE);
KeReleaseDispatcherDatabaseLock(OldIrql);
return Result;
/* Return previous State */
return PreviousState;
}
/*
* @implemented
*
* Returns number of entries in the queue
*/
LONG
STDCALL
KeReadStateQueue(IN PKQUEUE Queue)
{
/* Returns the Signal State */
return(Queue->Header.SignalState);
}
/*
* @implemented
*/
PLIST_ENTRY STDCALL
PLIST_ENTRY
STDCALL
KeRemoveQueue(IN PKQUEUE Queue,
IN KPROCESSOR_MODE WaitMode,
IN PLARGE_INTEGER Timeout OPTIONAL)
IN KPROCESSOR_MODE WaitMode,
IN PLARGE_INTEGER Timeout OPTIONAL)
{
PLIST_ENTRY ListEntry;
NTSTATUS Status;
PKTHREAD Thread = KeGetCurrentThread();
KIRQL OldIrql;
PLIST_ENTRY ListEntry;
NTSTATUS Status;
PKTHREAD Thread = KeGetCurrentThread();
KIRQL OldIrql;
PKQUEUE PreviousQueue;
PKWAIT_BLOCK WaitBlock;
PKWAIT_BLOCK TimerWaitBlock;
PKTIMER Timer;
OldIrql = KeAcquireDispatcherDatabaseLock ();
DPRINT("KeRemoveQueue %x\n", Queue);
/* Check if the Lock is already held */
if (Thread->WaitNext) {
DPRINT("Lock is already held\n");
} else {
/* Lock the Dispatcher Database */
DPRINT("Lock not held, acquiring\n");
OldIrql = KeAcquireDispatcherDatabaseLock();
Thread->WaitIrql = OldIrql;
}
if (Thread->Queue != Queue)
{
/*
* INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
* Queue->ThreadListHead when the thread registers with the queue and unlinked when
* the thread registers with a new queue. The Thread->Queue already tells us what
* queue the thread is registered with.
* -Gunnar
*/
/* This is needed so that we can set the new queue right here, before additional processing */
PreviousQueue = Thread->Queue;
Thread->Queue = Queue;
//unregister thread from previous queue (if any)
if (Thread->Queue)
{
RemoveEntryList(&Thread->QueueListEntry);
Thread->Queue->CurrentCount--;
if (Thread->Queue->CurrentCount < Thread->Queue->MaximumCount &&
!IsListEmpty(&Thread->Queue->EntryListHead))
{
KiDispatcherObjectWake(&Thread->Queue->Header, 0);
}
/* Check if this is a different queue */
if (Queue != PreviousQueue) {
/*
* INVESTIGATE: What is the Thread->QueueListEntry used for? It's linked it into the
* Queue->ThreadListHead when the thread registers with the queue and unlinked when
* the thread registers with a new queue. The Thread->Queue already tells us what
* queue the thread is registered with.
* -Gunnar
*/
DPRINT("Different Queue\n");
if (PreviousQueue) {
/* Remove from this list */
DPRINT("Removing Old Queue\n");
RemoveEntryList(&Thread->QueueListEntry);
/* Wake the queue */
DPRINT("Activating new thread\n");
KiWakeQueue(PreviousQueue);
}
// register thread with this queue
InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
Thread->Queue = Queue;
}
else /* if (Thread->Queue == Queue) */
{
//dec. num running threads
Queue->CurrentCount--;
}
/* Insert in this new Queue */
DPRINT("Inserting new Queue!\n");
InsertTailList(&Queue->ThreadListHead, &Thread->QueueListEntry);
while (TRUE)
{
if (Queue->CurrentCount < Queue->MaximumCount && !IsListEmpty(&Queue->EntryListHead))
{
ListEntry = RemoveHeadList(&Queue->EntryListHead);
//dec. num entries in queue
Queue->Header.SignalState--;
//inc. num running threads
Queue->CurrentCount++;
KeReleaseDispatcherDatabaseLock(OldIrql);
return ListEntry;
}
else
{
//inform KeWaitXxx that we are holding disp. lock
Thread->WaitNext = TRUE;
Thread->WaitIrql = OldIrql;
} else {
/* Same queue, decrement waiting threads */
DPRINT("Same Queue!\n");
Queue->CurrentCount--;
}
/* Loop until the queue is processed */
while (TRUE) {
/* Get the Entry */
ListEntry = Queue->EntryListHead.Flink;
/* Check if the counts are valid and if there is still a queued entry */
if ((Queue->CurrentCount < Queue->MaximumCount) &&
(ListEntry != &Queue->EntryListHead)) {
/* Remove the Entry and Save it */
DPRINT("Removing Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
Queue->CurrentCount, Queue->MaximumCount);
ListEntry = RemoveHeadList(&Queue->EntryListHead);
/* Decrease the number of entries */
Queue->Header.SignalState--;
/* Increase numbef of running threads */
Queue->CurrentCount++;
/* Check if the entry is valid. If not, bugcheck */
if (!ListEntry->Flink || !ListEntry->Blink) {
KEBUGCHECK(INVALID_WORK_QUEUE_ITEM);
}
/* Remove the Entry */
RemoveEntryList(ListEntry);
ListEntry->Flink = NULL;
/* Nothing to wait on */
break;
} else {
/* Do the wait */
DPRINT("Waiting on Queue Entry. CurrentCount: %d, Maximum Count: %d\n",
Queue->CurrentCount, Queue->MaximumCount);
/* Use the Thread's Wait Block, it's big enough */
Thread->WaitBlockList = &Thread->WaitBlock[0];
/* Fail if there's an APC Pending */
if (WaitMode == UserMode && Thread->ApcState.UserApcPending) {
/* Return the status and increase the pending threads */
ListEntry = (PLIST_ENTRY)STATUS_USER_APC;
Queue->CurrentCount++;
/* Nothing to wait on */
break;
}
/* Build the Wait Block */
WaitBlock = &Thread->WaitBlock[0];
WaitBlock->Object = (PVOID)Queue;
WaitBlock->WaitKey = STATUS_SUCCESS;
WaitBlock->WaitType = WaitAny;
WaitBlock->Thread = Thread;
WaitBlock->NextWaitBlock = NULL;
Thread->WaitStatus = STATUS_SUCCESS;
/* We need to wait for the object... check if we have a timeout */
if (Timeout) {
/* If it's zero, then don't do any waiting */
if (!Timeout->QuadPart) {
/* Instant Timeout, return the status and increase the pending threads */
DPRINT("Queue Wait has timed out\n");
ListEntry = (PLIST_ENTRY)STATUS_TIMEOUT;
Queue->CurrentCount++;
/* Nothing to wait on */
break;
}
/*
* Set up the Timer. We'll use the internal function so that we can
* hold on to the dispatcher lock.
*/
Timer = &Thread->Timer;
TimerWaitBlock = &Thread->WaitBlock[1];
Status = KeWaitForSingleObject(Queue,
WrQueue,
WaitMode,
TRUE, //bAlertable
Timeout);
if (Status == STATUS_TIMEOUT || Status == STATUS_USER_APC)
{
return (PVOID)Status;
}
OldIrql = KeAcquireDispatcherDatabaseLock ();
}
}
/* Set up the Timer Wait Block */
TimerWaitBlock->Object = (PVOID)Timer;
TimerWaitBlock->Thread = Thread;
TimerWaitBlock->WaitKey = STATUS_TIMEOUT;
TimerWaitBlock->WaitType = WaitAny;
TimerWaitBlock->NextWaitBlock = NULL;
/* Link the timer to this Wait Block */
InitializeListHead(&Timer->Header.WaitListHead);
InsertTailList(&Timer->Header.WaitListHead, &TimerWaitBlock->WaitListEntry);
/* Create Timer */
DPRINT("Creating Timer with timeout %I64d\n", *Timeout);
KiInsertTimer(Timer, *Timeout);
}
/* Insert the wait block into the Queues's wait list */
WaitBlock = Thread->WaitBlockList;
InsertTailList(&Queue->Header.WaitListHead, &WaitBlock->WaitListEntry);
/* Block the Thread */
DPRINT("Blocking the Thread: %x %x!\n", KeGetCurrentThread(), Thread);
PsBlockThread(&Status,
FALSE,
WaitMode,
WrQueue);
/* Reset the wait reason */
Thread->WaitReason = 0;
/* Check if we were executing an APC */
if (Status != STATUS_KERNEL_APC) {
/* Done Waiting */
DPRINT("Done waking queue. Thread: %x %x!\n", KeGetCurrentThread(), Thread);
return (PLIST_ENTRY)Status;
}
/* Acquire again the lock */
DPRINT("Looping again\n");
OldIrql = KeAcquireDispatcherDatabaseLock();
/* Save the new IRQL and decrease number of waiting threads */
Thread->WaitIrql = OldIrql;
Queue->CurrentCount--;
}
}
/* Unlock Database and return */
KeReleaseDispatcherDatabaseLock(Thread->WaitIrql);
DPRINT("Returning. CurrentCount: %d, Maximum Count: %d\n",
Queue->CurrentCount, Queue->MaximumCount);
return ListEntry;
}
/*
* @implemented
*/
PLIST_ENTRY STDCALL
PLIST_ENTRY
STDCALL
KeRundownQueue(IN PKQUEUE Queue)
{
PLIST_ENTRY EnumEntry;
PKTHREAD Thread;
KIRQL OldIrql;
PLIST_ENTRY EnumEntry;
PLIST_ENTRY FirstEntry;
PKTHREAD Thread;
KIRQL OldIrql;
DPRINT("KeRundownQueue(Queue %x)\n", Queue);
DPRINT("KeRundownQueue(Queue %x)\n", Queue);
/* I'm just guessing how this should work:-/
* -Gunnar
*/
OldIrql = KeAcquireDispatcherDatabaseLock ();
/* Get the Dispatcher Lock */
OldIrql = KeAcquireDispatcherDatabaseLock();
//no thread must wait on queue at rundown
ASSERT(IsListEmpty(&Queue->Header.WaitListHead));
/* Get the First Empty Entry */
FirstEntry = Queue->EntryListHead.Flink;
/* Make sure the list is not empty */
if (FirstEntry == &Queue->EntryListHead) {
/* It is, so don't return anything */
EnumEntry = NULL;
// unlink threads and clear their Thread->Queue
while (!IsListEmpty(&Queue->ThreadListHead))
{
EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
Thread->Queue = NULL;
}
} else {
/* Remove it */
RemoveEntryList(&Queue->EntryListHead);
}
/* Unlink threads and clear their Thread->Queue */
while (!IsListEmpty(&Queue->ThreadListHead)) {
/* Get the Entry and Remove it */
EnumEntry = RemoveHeadList(&Queue->ThreadListHead);
/* Get the Entry's Thread */
Thread = CONTAINING_RECORD(EnumEntry, KTHREAD, QueueListEntry);
/* Kill its Queue */
Thread->Queue = NULL;
}
if (IsListEmpty(&Queue->EntryListHead))
{
EnumEntry = NULL;
}
else
{
EnumEntry = Queue->EntryListHead.Flink;
}
/* Release the lock and return */
KeReleaseDispatcherDatabaseLock(OldIrql);
return FirstEntry;
}
KeReleaseDispatcherDatabaseLock (OldIrql);
/*
* Called when a thread which has a queue entry is entering a wait state
*/
VOID
FASTCALL
KiWakeQueue(IN PKQUEUE Queue)
{
PLIST_ENTRY QueueEntry;
PLIST_ENTRY WaitEntry;
PKWAIT_BLOCK WaitBlock;
/* Decrement the number of active threads */
DPRINT("KiWakeQueue: %x. Thread: %x\n", Queue, KeGetCurrentThread());
Queue->CurrentCount--;
/* Make sure the counts are OK */
if (Queue->CurrentCount < Queue->MaximumCount) {
/* Get the Queue Entry */
QueueEntry = Queue->EntryListHead.Flink;
/* Get the Wait Entry */
WaitEntry = Queue->Header.WaitListHead.Blink;
DPRINT("Queue Count is ok, Queue entries: %x, %x\n", QueueEntry, WaitEntry);
/* Make sure that the Queue List isn't empty and that this entry is valid */
if (!IsListEmpty(&Queue->Header.WaitListHead) &&
(QueueEntry != &Queue->EntryListHead)) {
/* Remove this entry */
DPRINT("Queue in List, removing it\n");
RemoveEntryList(QueueEntry);
QueueEntry->Flink = NULL;
/* Decrease the Signal State */
Queue->Header.SignalState--;
/* Unwait the Thread */
DPRINT("Unwaiting Thread\n");
WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
KiAbortWaitThread(WaitBlock->Thread, (NTSTATUS)QueueEntry);
}
}
}
return EnumEntry;
/*
* Returns the previous number of entries in the queue
*/
LONG
STDCALL
KiInsertQueue(IN PKQUEUE Queue,
IN PLIST_ENTRY Entry,
BOOLEAN Head)
{
ULONG InitialState;
PKTHREAD Thread = KeGetCurrentThread();
PKWAIT_BLOCK WaitBlock;
PLIST_ENTRY WaitEntry;
DPRINT("KiInsertQueue(Queue %x, Entry %x)\n", Queue, Entry);
/* Save the old state */
InitialState = Queue->Header.SignalState;
/* Get the Entry */
WaitEntry = Queue->Header.WaitListHead.Blink;
DPRINT("Initial State, WaitEntry: %d, %x\n", InitialState, WaitEntry);
/*
* Why the KeGetCurrentThread()->Queue != Queue?
* KiInsertQueue might be called from an APC for the current thread.
* -Gunnar
*/
if ((Queue->CurrentCount < Queue->MaximumCount) &&
(WaitEntry != &Queue->Header.WaitListHead) &&
((Thread->Queue != Queue) || (Thread->WaitReason != WrQueue))) {
/* Remove the wait entry */
DPRINT("Removing Entry\n");
RemoveEntryList(WaitEntry);
/* Get the Wait Block and Thread */
WaitBlock = CONTAINING_RECORD(WaitEntry, KWAIT_BLOCK, WaitListEntry);
DPRINT("Got wait block: %x\n", WaitBlock);
Thread = WaitBlock->Thread;
/* Reset the wait reason */
Thread->WaitReason = 0;
/* Increase the waiting threads */
Queue->CurrentCount++;
/* Check if there's a Thread Timer */
if (Thread->Timer.Header.Inserted) {
/* Cancel the Thread Timer with the no-lock fastpath */
DPRINT("Removing the Thread's Timer\n");
Thread->Timer.Header.Inserted = FALSE;
RemoveEntryList(&Thread->Timer.TimerListEntry);
}
/* Reschedule the Thread */
DPRINT("Unblocking the Thread\n");
PsUnblockThread((PETHREAD)Thread, (PNTSTATUS)&Entry, 0);
} else {
/* Increase the Entries */
DPRINT("Adding new Queue Entry: %d %d\n", Head, Queue->Header.SignalState);
Queue->Header.SignalState++;
if (Head) {
InsertHeadList(&Queue->EntryListHead, Entry);
} else {
InsertTailList(&Queue->EntryListHead, Entry);
}
}
/* Return the previous state */
DPRINT("Returning\n");
return InitialState;
}
/* EOF */

View file

@ -111,7 +111,7 @@ KeReleaseSemaphore(PKSEMAPHORE Semaphore,
if (InitialState == 0 && !IsListEmpty(&Semaphore->Header.WaitListHead)) {
/* Wake the Semaphore */
KiDispatcherObjectWake(&Semaphore->Header, SEMAPHORE_INCREMENT);
KiWaitTest(&Semaphore->Header, SEMAPHORE_INCREMENT);
}
/* If the Wait is true, then return with a Wait and don't unlock the Dispatcher Database */

View file

@ -291,7 +291,7 @@ KiHandleExpiredTimer(PKTIMER Timer)
/* Set it as Signaled */
DPRINT("Setting Timer as Signaled\n");
Timer->Header.SignalState = TRUE;
KiDispatcherObjectWake(&Timer->Header, 0);
KiWaitTest(&Timer->Header, 0);
/* If the Timer is periodic, reinsert the timer with the new due time */
if (Timer->Period) {

File diff suppressed because it is too large Load diff

View file

@ -511,7 +511,7 @@ Ke386IoSetAccessProcess@8
Ke386QueryIoAccessMap@8
Ke386SetIoAccessMap@8
KeAcquireSpinLockAtDpcLevel@4
KeAcquireDispatcherDatabaseLockAtDpcLevel
@KeAcquireDispatcherDatabaseLockAtDpcLevel@0
@KeAcquireInStackQueuedSpinLockAtDpcLevel@8
KeAcquireInterruptSpinLock@4
KeAddSystemServiceTable@20
@ -592,7 +592,7 @@ KeReadStateSemaphore@4
KeReadStateTimer@4
KeRegisterBugCheckCallback@20
KeRegisterBugCheckReasonCallback@16
KeReleaseDispatcherDatabaseLockFromDpcLevel
@KeReleaseDispatcherDatabaseLockFromDpcLevel@0
@KeReleaseInStackQueuedSpinLockFromDpcLevel@4
KeReleaseInterruptSpinLock@8
KeReleaseMutant@16

View file

@ -207,7 +207,7 @@ PsTerminateCurrentThread(NTSTATUS ExitStatus)
oldIrql = KeAcquireDispatcherDatabaseLock();
CurrentThread->Tcb.DispatcherHeader.SignalState = TRUE;
KiDispatcherObjectWake(&CurrentThread->Tcb.DispatcherHeader, IO_NO_INCREMENT);
KiWaitTest(&CurrentThread->Tcb.DispatcherHeader, IO_NO_INCREMENT);
KeReleaseDispatcherDatabaseLock (oldIrql);
/* The last thread shall close the door on exit */
@ -227,9 +227,7 @@ PsTerminateCurrentThread(NTSTATUS ExitStatus)
#ifdef _ENABLE_THRDEVTPAIR
ExpSwapThreadEventPair(CurrentThread, NULL); /* Release the associated eventpair object, if there was one */
#endif /* _ENABLE_THRDEVTPAIR */
ASSERT(CurrentThread->Tcb.WaitBlockList == NULL);
PsDispatchThreadNoLock(THREAD_STATE_TERMINATED_1);
DPRINT1("Unexpected return, CurrentThread %x PsGetCurrentThread() %x\n", CurrentThread, PsGetCurrentThread());
KEBUGCHECK(0);
@ -342,7 +340,7 @@ PiTerminateProcess(PEPROCESS Process,
}
OldIrql = KeAcquireDispatcherDatabaseLock ();
Process->Pcb.DispatcherHeader.SignalState = TRUE;
KiDispatcherObjectWake(&Process->Pcb.DispatcherHeader, IO_NO_INCREMENT);
KiWaitTest(&Process->Pcb.DispatcherHeader, IO_NO_INCREMENT);
KeReleaseDispatcherDatabaseLock (OldIrql);
ObDereferenceObject(Process);
return(STATUS_SUCCESS);

View file

@ -545,49 +545,46 @@ PsUnblockThread(PETHREAD Thread, PNTSTATUS WaitStatus, KPRIORITY Increment)
}
VOID
PsBlockThread(PNTSTATUS Status, UCHAR Alertable, ULONG WaitMode,
BOOLEAN DispatcherLock, KIRQL WaitIrql, UCHAR WaitReason)
STDCALL
PsBlockThread(PNTSTATUS Status,
UCHAR Alertable,
ULONG WaitMode,
UCHAR WaitReason)
{
KIRQL oldIrql;
PKTHREAD KThread;
PETHREAD Thread;
PKWAIT_BLOCK WaitBlock;
PKTHREAD Thread = KeGetCurrentThread();
PKWAIT_BLOCK WaitBlock;
if (!DispatcherLock)
{
oldIrql = KeAcquireDispatcherDatabaseLock();
if (Thread->ApcState.KernelApcPending) {
DPRINT("Dispatching Thread as ready (APC!)\n");
/* Remove Waits */
WaitBlock = Thread->WaitBlockList;
while (WaitBlock) {
RemoveEntryList (&WaitBlock->WaitListEntry);
WaitBlock = WaitBlock->NextWaitBlock;
}
Thread->WaitBlockList = NULL;
/* Dispatch it and return status */
PsDispatchThreadNoLock (THREAD_STATE_READY);
if (Status != NULL) *Status = STATUS_KERNEL_APC;
} else {
/* Set the Thread Data as Requested */
DPRINT("Dispatching Thread as blocked\n");
Thread->Alertable = Alertable;
Thread->WaitMode = (UCHAR)WaitMode;
Thread->WaitReason = WaitReason;
/* Dispatch it and return status */
PsDispatchThreadNoLock(THREAD_STATE_BLOCKED);
if (Status != NULL) *Status = Thread->WaitStatus;
}
KThread = KeGetCurrentThread();
Thread = CONTAINING_RECORD (KThread, ETHREAD, Tcb);
if (KThread->ApcState.KernelApcPending)
{
WaitBlock = (PKWAIT_BLOCK)Thread->Tcb.WaitBlockList;
while (WaitBlock)
{
RemoveEntryList (&WaitBlock->WaitListEntry);
WaitBlock = WaitBlock->NextWaitBlock;
}
Thread->Tcb.WaitBlockList = NULL;
PsDispatchThreadNoLock (THREAD_STATE_READY);
if (Status != NULL)
{
*Status = STATUS_KERNEL_APC;
}
}
else
{
Thread->Tcb.Alertable = Alertable;
Thread->Tcb.WaitMode = (UCHAR)WaitMode;
Thread->Tcb.WaitReason = WaitReason;
PsDispatchThreadNoLock(THREAD_STATE_BLOCKED);
if (Status != NULL)
{
*Status = Thread->Tcb.WaitStatus;
}
}
KeLowerIrql(WaitIrql);
DPRINT("Releasing Dispatcher Lock\n");
KfLowerIrql(Thread->WaitIrql);
}
VOID