work.c: reimpl. using kernel queues

resource.c: add asserts

svn path=/trunk/; revision=11757
This commit is contained in:
Gunnar Dalsnes 2004-11-21 18:38:51 +00:00
parent 4635919140
commit eedd0d823f
2 changed files with 70 additions and 86 deletions

View file

@ -1,4 +1,4 @@
/* $Id: resource.c,v 1.29 2004/10/22 20:18:35 ekohl Exp $ /* $Id: resource.c,v 1.30 2004/11/21 18:38:51 gdalsnes Exp $
* *
* COPYRIGHT: See COPYING in the top level directory * COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel * PROJECT: ReactOS kernel
@ -110,6 +110,17 @@ ExAcquireResourceExclusiveLite (
DPRINT("ExAcquireResourceExclusiveLite(Resource %x, Wait %d)\n", DPRINT("ExAcquireResourceExclusiveLite(Resource %x, Wait %d)\n",
Resource, Wait); Resource, Wait);
ASSERT_IRQL_LESS(DISPATCH_LEVEL);
/* undefed for now, since cdfs must be fixed first */
#if 0
/* At least regular kmode APC's must be disabled
* Note that this requirement is missing in old DDK's */
ASSERT(KeGetCurrentThread() == NULL || /* <-Early in the boot process the current thread is obseved to be NULL */
KeGetCurrentThread()->KernelApcDisable ||
KeGetCurrentIrql() == APC_LEVEL);
#endif
KeAcquireSpinLock(&Resource->SpinLock, &oldIrql); KeAcquireSpinLock(&Resource->SpinLock, &oldIrql);
/* resource already locked */ /* resource already locked */
@ -345,6 +356,18 @@ ExAcquireResourceSharedLite (
DPRINT("ExAcquireResourceSharedLite(Resource %x, Wait %d)\n", DPRINT("ExAcquireResourceSharedLite(Resource %x, Wait %d)\n",
Resource, Wait); Resource, Wait);
ASSERT_IRQL_LESS(DISPATCH_LEVEL);
/* undefed for now, since cdfs must be fixed first */
#if 0
/* At least regular kmode APC's must be disabled
* Note that this requirement is missing in old DDK's
*/
ASSERT(KeGetCurrentThread() == NULL || /* <-Early in the boot process the current thread is obseved to be NULL */
KeGetCurrentThread()->KernelApcDisable ||
KeGetCurrentIrql() == APC_LEVEL);
#endif
KeAcquireSpinLock(&Resource->SpinLock, &oldIrql); KeAcquireSpinLock(&Resource->SpinLock, &oldIrql);
/* first, resolve trivial cases */ /* first, resolve trivial cases */
@ -809,6 +832,8 @@ ExReleaseResourceForThreadLite (
DPRINT("ExReleaseResourceForThreadLite(Resource %x, ResourceThreadId %x)\n", DPRINT("ExReleaseResourceForThreadLite(Resource %x, ResourceThreadId %x)\n",
Resource, ResourceThreadId); Resource, ResourceThreadId);
ASSERT(KeGetCurrentIrql() <= DISPATCH_LEVEL);
KeAcquireSpinLock(&Resource->SpinLock, &oldIrql); KeAcquireSpinLock(&Resource->SpinLock, &oldIrql);
if (Resource->Flag & ResourceOwnedExclusive) if (Resource->Flag & ResourceOwnedExclusive)

View file

@ -1,4 +1,4 @@
/* $Id: work.c,v 1.22 2004/11/17 23:55:36 gdalsnes Exp $ /* $Id: work.c,v 1.23 2004/11/21 18:38:51 gdalsnes Exp $
* *
* COPYRIGHT: See COPYING in the top level directory * COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel * PROJECT: ReactOS kernel
@ -21,39 +21,16 @@
/* TYPES *********************************************************************/ /* TYPES *********************************************************************/
typedef struct _WORK_QUEUE
{
/*
* PURPOSE: Head of the list of waiting work items
*/
LIST_ENTRY Head;
/*
* PURPOSE: Sychronize access to the work queue
*/
KSPIN_LOCK Lock;
/*
* PURPOSE: Worker threads with nothing to do wait on this event
*/
KSEMAPHORE Sem;
/*
* PURPOSE: Thread associated with work queue
*/
HANDLE Thread[NUMBER_OF_WORKER_THREADS];
} WORK_QUEUE, *PWORK_QUEUE;
/* GLOBALS *******************************************************************/ /* GLOBALS *******************************************************************/
/* /*
* PURPOSE: Queue of items waiting to be processed at normal priority * PURPOSE: Queue of items waiting to be processed at normal priority
*/ */
WORK_QUEUE EiNormalWorkQueue; KQUEUE EiNormalWorkQueue;
WORK_QUEUE EiCriticalWorkQueue; KQUEUE EiCriticalWorkQueue;
WORK_QUEUE EiHyperCriticalWorkQueue; KQUEUE EiHyperCriticalWorkQueue;
/* FUNCTIONS ****************************************************************/ /* FUNCTIONS ****************************************************************/
@ -69,52 +46,43 @@ ExWorkerThreadEntryPoint(IN PVOID context)
* calls PsTerminateSystemThread * calls PsTerminateSystemThread
*/ */
{ {
PWORK_QUEUE queue = (PWORK_QUEUE)context;
PWORK_QUEUE_ITEM item; PWORK_QUEUE_ITEM item;
PLIST_ENTRY current; PLIST_ENTRY current;
for(;;) while (TRUE)
{
current = ExInterlockedRemoveHeadList(&queue->Head,
&queue->Lock);
if (current!=NULL)
{ {
current = KeRemoveQueue( (PKQUEUE)context, KernelMode, NULL );
item = CONTAINING_RECORD( current, WORK_QUEUE_ITEM, List); item = CONTAINING_RECORD( current, WORK_QUEUE_ITEM, List);
item->WorkerRoutine(item->Parameter); item->WorkerRoutine(item->Parameter);
}
else if (KeGetCurrentIrql() != PASSIVE_LEVEL)
{ {
KeWaitForSingleObject((PVOID)&queue->Sem, KeBugCheck(IRQL_NOT_LESS_OR_EQUAL);
Executive,
KernelMode,
FALSE,
NULL);
DPRINT("Woke from wait\n");
}
} }
} }
static VOID ExInitializeWorkQueue(PWORK_QUEUE WorkQueue, }
static VOID ExInitializeWorkQueue(PKQUEUE WorkQueue,
KPRIORITY Priority) KPRIORITY Priority)
{ {
ULONG i; ULONG i;
PETHREAD Thread; PETHREAD Thread;
HANDLE hThread;
InitializeListHead(&WorkQueue->Head);
KeInitializeSpinLock(&WorkQueue->Lock);
KeInitializeSemaphore(&WorkQueue->Sem,
0,
256);
for (i=0; i<NUMBER_OF_WORKER_THREADS; i++) for (i=0; i<NUMBER_OF_WORKER_THREADS; i++)
{ {
PsCreateSystemThread(&WorkQueue->Thread[i],
PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS, THREAD_ALL_ACCESS,
NULL, NULL,
NULL, NULL,
NULL, NULL,
ExWorkerThreadEntryPoint, ExWorkerThreadEntryPoint,
WorkQueue); WorkQueue);
ObReferenceObjectByHandle(WorkQueue->Thread[i], ObReferenceObjectByHandle(hThread,
THREAD_ALL_ACCESS, THREAD_ALL_ACCESS,
PsThreadType, PsThreadType,
KernelMode, KernelMode,
@ -123,12 +91,17 @@ static VOID ExInitializeWorkQueue(PWORK_QUEUE WorkQueue,
KeSetPriorityThread(&Thread->Tcb, KeSetPriorityThread(&Thread->Tcb,
Priority); Priority);
ObDereferenceObject(Thread); ObDereferenceObject(Thread);
ZwClose(hThread);
} }
} }
VOID INIT_FUNCTION VOID INIT_FUNCTION
ExInitializeWorkerThreads(VOID) ExInitializeWorkerThreads(VOID)
{ {
KeInitializeQueue( &EiNormalWorkQueue, NUMBER_OF_WORKER_THREADS );
KeInitializeQueue( &EiCriticalWorkQueue , NUMBER_OF_WORKER_THREADS );
KeInitializeQueue( &EiHyperCriticalWorkQueue , NUMBER_OF_WORKER_THREADS );
ExInitializeWorkQueue(&EiNormalWorkQueue, ExInitializeWorkQueue(&EiNormalWorkQueue,
LOW_PRIORITY); LOW_PRIORITY);
ExInitializeWorkQueue(&EiCriticalWorkQueue, ExInitializeWorkQueue(&EiCriticalWorkQueue,
@ -161,40 +134,26 @@ ExQueueWorkItem (PWORK_QUEUE_ITEM WorkItem,
switch(QueueType) switch(QueueType)
{ {
case DelayedWorkQueue: case DelayedWorkQueue:
ExInterlockedInsertTailList(&EiNormalWorkQueue.Head, KeInsertQueue (
&WorkItem->List, &EiNormalWorkQueue,
&EiNormalWorkQueue.Lock); &WorkItem->List
KeReleaseSemaphore(&EiNormalWorkQueue.Sem, );
IO_NO_INCREMENT,
1,
FALSE);
break; break;
case CriticalWorkQueue: case CriticalWorkQueue:
ExInterlockedInsertTailList(&EiCriticalWorkQueue.Head, KeInsertQueue (
&WorkItem->List, &EiCriticalWorkQueue,
&EiCriticalWorkQueue.Lock); &WorkItem->List
KeReleaseSemaphore(&EiCriticalWorkQueue.Sem, );
IO_NO_INCREMENT,
1,
FALSE);
break; break;
case HyperCriticalWorkQueue: case HyperCriticalWorkQueue:
ExInterlockedInsertTailList(&EiHyperCriticalWorkQueue.Head, KeInsertQueue (
&WorkItem->List, &EiHyperCriticalWorkQueue,
&EiHyperCriticalWorkQueue.Lock); &WorkItem->List
KeReleaseSemaphore(&EiHyperCriticalWorkQueue.Sem, );
IO_NO_INCREMENT,
1,
FALSE);
break; break;
#ifdef __USE_W32API
case MaximumWorkQueue:
// Unimplemented
break;
#endif
} }
} }