- Fix totally broken KSWORKER implementation (it may have worked, but it didnt do what it should have)

- Re-Implement KsQueueWorkItem, KsIncrementCountedWorker, KsDecrementCountedWorker, KsRegisterCountedWorker, KsUnregisterWorker, KsRegisterWorker

svn path=/trunk/; revision=42147
This commit is contained in:
Johannes Anderwald 2009-07-22 21:27:44 +00:00
parent f035e43e1e
commit 721b165dbb

View file

@ -15,14 +15,64 @@
typedef struct typedef struct
{ {
WORK_QUEUE_ITEM WorkItem;
KEVENT Event; KEVENT Event;
KSPIN_LOCK Lock; KSPIN_LOCK Lock;
WORK_QUEUE_TYPE Type; WORK_QUEUE_TYPE Type;
LONG Counter; LONG Counter;
LONG QueuedWorkItemCount;
LIST_ENTRY QueuedWorkItems;
PWORK_QUEUE_ITEM CountedWorkItem;
}KSIWORKER, *PKSIWORKER;
VOID
NTAPI
WorkItemRoutine(
IN PVOID Context)
{
PKSIWORKER KsWorker;
KIRQL OldLevel;
PWORK_QUEUE_ITEM WorkItem; PWORK_QUEUE_ITEM WorkItem;
ULONG WorkItemActive; PLIST_ENTRY Entry;
ULONG DeleteInProgress;
}KS_WORKER;
/* get ks worker implementation */
KsWorker = (PKSIWORKER)Context;
/* acquire back the lock */
KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
do
{
/* remove first entry */
Entry = RemoveHeadList(&KsWorker->QueuedWorkItems);
/* get offset to work item */
WorkItem = (PWORK_QUEUE_ITEM)CONTAINING_RECORD(Entry, WORK_QUEUE_ITEM, List);
/* release lock as the callback might call one KsWorker functions */
KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
/* now dispatch the work */
WorkItem->WorkerRoutine(WorkItem->Parameter);
/* acquire back the lock */
KeAcquireSpinLock(&KsWorker->Lock, &OldLevel);
/* decrement queued work item count */
InterlockedDecrement(&KsWorker->QueuedWorkItemCount);
}while(KsWorker->QueuedWorkItemCount);
/* release the lock */
KeReleaseSpinLock(&KsWorker->Lock, OldLevel);
/* signal completion event */
KeSetEvent(&KsWorker->Event, IO_NO_INCREMENT, FALSE);
}
/* /*
@implemented @implemented
@ -34,8 +84,8 @@ KsRegisterWorker(
IN WORK_QUEUE_TYPE WorkQueueType, IN WORK_QUEUE_TYPE WorkQueueType,
OUT PKSWORKER* Worker) OUT PKSWORKER* Worker)
{ {
KS_WORKER * KsWorker; PKSIWORKER KsWorker;
UNIMPLEMENTED;
if (WorkQueueType != CriticalWorkQueue && if (WorkQueueType != CriticalWorkQueue &&
WorkQueueType != DelayedWorkQueue && WorkQueueType != DelayedWorkQueue &&
@ -44,16 +94,22 @@ KsRegisterWorker(
return STATUS_INVALID_PARAMETER; return STATUS_INVALID_PARAMETER;
} }
KsWorker = ExAllocatePool(NonPagedPool, sizeof(KS_WORKER)); /* allocate worker context */
KsWorker = ExAllocatePool(NonPagedPool, sizeof(KSIWORKER));
if (!KsWorker) if (!KsWorker)
return STATUS_INSUFFICIENT_RESOURCES; return STATUS_INSUFFICIENT_RESOURCES;
/* initialze the work ctx */
ExInitializeWorkItem(&KsWorker->WorkItem, WorkItemRoutine, (PVOID)KsWorker);
/* setup type */
KsWorker->Type = WorkQueueType; KsWorker->Type = WorkQueueType;
/* set counter to zero */
KsWorker->Counter = 0; KsWorker->Counter = 0;
KsWorker->WorkItemActive = 0; /* Initialize work item queue */
KsWorker->WorkItem = NULL; InitializeListHead(&KsWorker->QueuedWorkItems);
KsWorker->DeleteInProgress = FALSE; /* initialize work item lock */
KeInitializeSpinLock(&KsWorker->Lock); KeInitializeSpinLock(&KsWorker->Lock);
/* initialize event */
KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE); KeInitializeEvent(&KsWorker->Event, NotificationEvent, FALSE);
*Worker = KsWorker; *Worker = KsWorker;
@ -63,53 +119,68 @@ KsRegisterWorker(
/* /*
@implemented @implemented
*/ */
KSDDKAPI VOID NTAPI KSDDKAPI
VOID
NTAPI
KsUnregisterWorker( KsUnregisterWorker(
IN PKSWORKER Worker) IN PKSWORKER Worker)
{ {
KS_WORKER * KsWorker; PKSIWORKER KsWorker;
KIRQL OldIrql; KIRQL OldIrql;
if (!Worker) if (!Worker)
return; return;
KsWorker = (KS_WORKER *)Worker; /* get ks worker implementation */
KsWorker = (PKSIWORKER)Worker;
/* acquire spinlock */
KeAcquireSpinLock(&KsWorker->Lock, &OldIrql); KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
/* fake status running to avoid work items to be queued by the counted worker */
KsWorker->DeleteInProgress = TRUE; KsWorker->Counter = 1;
/* is there currently a work item active */
if (KsWorker->WorkItemActive) if (KsWorker->QueuedWorkItemCount)
{ {
/* release the lock */
KeReleaseSpinLock(&KsWorker->Lock, OldIrql); KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
/* wait for the worker routine to finish */
KeWaitForSingleObject(&KsWorker->Event, Executive, KernelMode, FALSE, NULL); KeWaitForSingleObject(&KsWorker->Event, Executive, KernelMode, FALSE, NULL);
} }
else else
{ {
/* no work item active, just release the lock */
KeReleaseSpinLock(&KsWorker->Lock, OldIrql); KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
} }
/* free worker context */
ExFreePool(KsWorker); FreeItem(KsWorker);
} }
/* /*
@implemented @implemented
*/ */
KSDDKAPI NTSTATUS NTAPI KSDDKAPI
NTSTATUS
NTAPI
KsRegisterCountedWorker( KsRegisterCountedWorker(
IN WORK_QUEUE_TYPE WorkQueueType, IN WORK_QUEUE_TYPE WorkQueueType,
IN PWORK_QUEUE_ITEM CountedWorkItem, IN PWORK_QUEUE_ITEM CountedWorkItem,
OUT PKSWORKER* Worker) OUT PKSWORKER* Worker)
{ {
NTSTATUS Status; NTSTATUS Status;
KS_WORKER * KsWorker; PKSIWORKER KsWorker;
/* check for counted work item parameter */
if (!CountedWorkItem)
return STATUS_INVALID_PARAMETER_2;
/* create the work ctx */
Status = KsRegisterWorker(WorkQueueType, Worker); Status = KsRegisterWorker(WorkQueueType, Worker);
/* check for success */
if (NT_SUCCESS(Status)) if (NT_SUCCESS(Status))
{ {
KsWorker = (KS_WORKER *)Worker; /* get ks worker implementation */
KsWorker->WorkItem = CountedWorkItem; KsWorker = (PKSIWORKER)Worker;
/* store counted work item */
KsWorker->CountedWorkItem = CountedWorkItem;
} }
return Status; return Status;
@ -124,21 +195,18 @@ NTAPI
KsDecrementCountedWorker( KsDecrementCountedWorker(
IN PKSWORKER Worker) IN PKSWORKER Worker)
{ {
KS_WORKER * KsWorker; PKSIWORKER KsWorker;
LONG Counter; LONG Counter;
/* did the caller pass a work ctx */
if (!Worker) if (!Worker)
return STATUS_INVALID_PARAMETER; return STATUS_INVALID_PARAMETER;
KsWorker = (KS_WORKER *)Worker; /* get ks worker implementation */
KsWorker = (PKSIWORKER)Worker;
/* decrement counter */
Counter = InterlockedDecrement(&KsWorker->Counter); Counter = InterlockedDecrement(&KsWorker->Counter);
/* return result */
if (KsWorker->DeleteInProgress)
{
/* signal that we are done */
KeSetEvent(&KsWorker->Event, 0, 0);
}
return Counter; return Counter;
} }
@ -151,19 +219,24 @@ NTAPI
KsIncrementCountedWorker( KsIncrementCountedWorker(
IN PKSWORKER Worker) IN PKSWORKER Worker)
{ {
KS_WORKER * KsWorker; PKSIWORKER KsWorker;
LONG Counter; LONG Counter;
/* did the caller pass a work ctx */
if (!Worker) if (!Worker)
return STATUS_INVALID_PARAMETER; return STATUS_INVALID_PARAMETER;
KsWorker = (KS_WORKER *)Worker; /* get ks worker implementation */
KsWorker = (PKSIWORKER)Worker;
/* increment counter */
Counter = InterlockedIncrement(&KsWorker->Counter); Counter = InterlockedIncrement(&KsWorker->Counter);
if (Counter == 1) if (Counter == 1)
{ {
KsQueueWorkItem(Worker, KsWorker->WorkItem); /* this is the first work item in list, so queue a real work item */
KsQueueWorkItem(Worker, KsWorker->CountedWorkItem);
} }
/* return current counter */
return Counter; return Counter;
} }
@ -177,25 +250,31 @@ KsQueueWorkItem(
IN PKSWORKER Worker, IN PKSWORKER Worker,
IN PWORK_QUEUE_ITEM WorkItem) IN PWORK_QUEUE_ITEM WorkItem)
{ {
KS_WORKER * KsWorker; PKSIWORKER KsWorker;
KIRQL OldIrql; KIRQL OldIrql;
NTSTATUS Status = STATUS_SUCCESS;
/* check for all parameters */
if (!Worker || !WorkItem) if (!Worker || !WorkItem)
return STATUS_INVALID_PARAMETER; return STATUS_INVALID_PARAMETER;
KsWorker = (KS_WORKER *)Worker; /* get ks worker implementation */
KsWorker = (PKSIWORKER)Worker;
/* lock the work queue */
KeAcquireSpinLock(&KsWorker->Lock, &OldIrql); KeAcquireSpinLock(&KsWorker->Lock, &OldIrql);
/* insert work item to list */
if (!KsWorker->DeleteInProgress) InsertTailList(&KsWorker->QueuedWorkItems, &WorkItem->List);
/* increment active count */
InterlockedIncrement(&KsWorker->QueuedWorkItemCount);
/* is this the first work item */
if (KsWorker->QueuedWorkItemCount == 1)
{ {
/* clear event */
KeClearEvent(&KsWorker->Event);
/* it is, queue it */
ExQueueWorkItem(WorkItem, KsWorker->Type); ExQueueWorkItem(WorkItem, KsWorker->Type);
} }
else /* release lock */
{
Status = STATUS_UNSUCCESSFUL;
}
KeReleaseSpinLock(&KsWorker->Lock, OldIrql); KeReleaseSpinLock(&KsWorker->Lock, OldIrql);
return Status;
return STATUS_SUCCESS;
} }