- Guarded the calls to IoSetCancelRoutine with IoAcquireCancelSpinLock/IoReleaseCancelSpinLock.

- Used a fastmutex as lock for the data queue.  
- Used paged pool for the data buffers.  
- Allowed the server to read (and to wait) on a listening pipe.  
- Implemented the non blocking read operations.

svn path=/trunk/; revision=14296
This commit is contained in:
Hartmut Birr 2005-03-23 22:11:20 +00:00
parent 7889b35088
commit 974ee62e85
5 changed files with 315 additions and 49 deletions

View file

@ -49,6 +49,7 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
{
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
KIRQL oldIrql;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
@ -58,11 +59,15 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
!Waiter->Irp->Cancel)
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
if (IoSetCancelRoutine(Waiter->Irp, NULL) != NULL)
{
IoAcquireCancelSpinLock(&oldIrql);
if (!Waiter->Irp->Cancel)
{
IoSetCancelRoutine(Waiter->Irp, NULL);
IoReleaseCancelSpinLock(oldIrql);
return Waiter->Fcb;
}
IoReleaseCancelSpinLock(oldIrql);
}
CurrentEntry = CurrentEntry->Flink;
@ -174,7 +179,7 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
/* Initialize data list. */
if (Pipe->OutboundQuota)
{
ClientFcb->Data = ExAllocatePool(NonPagedPool, Pipe->OutboundQuota);
ClientFcb->Data = ExAllocatePool(PagedPool, Pipe->OutboundQuota);
if (ClientFcb->Data == NULL)
{
DPRINT("No memory!\n");
@ -195,7 +200,7 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
ClientFcb->ReadDataAvailable = 0;
ClientFcb->WriteQuotaAvailable = Pipe->OutboundQuota;
ClientFcb->MaxDataLength = Pipe->OutboundQuota;
KeInitializeSpinLock(&ClientFcb->DataListLock);
ExInitializeFastMutex(&ClientFcb->DataListLock);
KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE);
KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
@ -455,13 +460,17 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
if (Pipe->InboundQuota)
{
Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota);
Fcb->Data = ExAllocatePool(PagedPool, Pipe->InboundQuota);
if (Fcb->Data == NULL)
{
ExFreePool(Fcb);
if (NewPipe)
{
/*
* FIXME:
* Lock the pipelist and remove the pipe from the list.
*/
RtlFreeUnicodeString(&Pipe->PipeName);
ExFreePool(Pipe);
}
@ -481,7 +490,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
Fcb->MaxDataLength = Pipe->InboundQuota;
KeInitializeSpinLock(&Fcb->DataListLock);
ExInitializeFastMutex(&Fcb->DataListLock);
Pipe->CurrentInstances++;

View file

@ -46,6 +46,7 @@ NpfsAddListeningServerInstance(PIRP Irp,
PNPFS_FCB Fcb)
{
PNPFS_WAITER_ENTRY Entry;
KIRQL oldIrql;
Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
if (Entry == NULL)
@ -61,13 +62,15 @@ NpfsAddListeningServerInstance(PIRP Irp,
Irp->Tail.Overlay.DriverContext[0] = Entry;
InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
IoAcquireCancelSpinLock(&oldIrql);
if (!Irp->Cancel)
{
IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
IoReleaseCancelSpinLock(oldIrql);
KeUnlockMutex(&Fcb->Pipe->FcbListLock);
return STATUS_PENDING;
}
IoReleaseCancelSpinLock(oldIrql);
RemoveEntryList(&Entry->Entry);

View file

@ -73,8 +73,10 @@ DriverEntry(PDRIVER_OBJECT DriverObject,
/* initialize the device extension */
DeviceExtension = DeviceObject->DeviceExtension;
InitializeListHead(&DeviceExtension->PipeListHead);
InitializeListHead(&DeviceExtension->ThreadListHead);
KeInitializeMutex(&DeviceExtension->PipeListLock,
0);
DeviceExtension->EmptyWaiterCount = 0;
/* set the size quotas */
DeviceExtension->MinQuota = PAGE_SIZE;

View file

@ -6,7 +6,9 @@
typedef struct _NPFS_DEVICE_EXTENSION
{
LIST_ENTRY PipeListHead;
LIST_ENTRY ThreadListHead;
KMUTEX PipeListLock;
ULONG EmptyWaiterCount;
ULONG MinQuota;
ULONG DefaultQuota;
ULONG MaxQuota;
@ -20,6 +22,7 @@ typedef struct _NPFS_PIPE
LIST_ENTRY ServerFcbListHead;
LIST_ENTRY ClientFcbListHead;
LIST_ENTRY WaiterListHead;
LIST_ENTRY EmptyBufferListHead;
ULONG PipeType;
ULONG ReadMode;
ULONG WriteMode;
@ -50,9 +53,29 @@ typedef struct _NPFS_FCB
PVOID WritePtr;
ULONG MaxDataLength;
KSPIN_LOCK DataListLock; /* Data queue lock */
FAST_MUTEX DataListLock; /* Data queue lock */
} NPFS_FCB, *PNPFS_FCB;
typedef struct _NPFS_CONTEXT
{
PDEVICE_OBJECT DeviceObject;
PIRP Irp;
PNPFS_FCB Fcb;
UCHAR MajorFunction;
BOOLEAN AllocatedFromPool;
} NPFS_CONTEXT, *PNPFS_CONTEXT;
typedef struct _NPFS_THREAD_CONTEXT
{
ULONG Count;
KEVENT Event;
PNPFS_DEVICE_EXTENSION DeviceExt;
LIST_ENTRY ListEntry;
PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
typedef struct _NPFS_WAITER_ENTRY
{
LIST_ENTRY Entry;

View file

@ -46,16 +46,197 @@ VOID HexDump(PUCHAR Buffer, ULONG Length)
}
#endif
static NTSTATUS
NpfsReadFromPipe(PNPFS_CONTEXT Context);
NTSTATUS STDCALL
NpfsRead(PDEVICE_OBJECT DeviceObject,
PIRP Irp)
static VOID STDCALL
NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
IN PIRP Irp)
{
PNPFS_CONTEXT Context;
PNPFS_DEVICE_EXTENSION DeviceExt;
DPRINT1("NpfsWaitingCancelRoutine() called\n");
IoReleaseCancelSpinLock(Irp->CancelIrql);
Context = Irp->Tail.Overlay.DriverContext[0];
DeviceExt = Context->DeviceObject->DeviceExtension;
KeLockMutex(&DeviceExt->PipeListLock);
KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
KeUnlockMutex(&DeviceExt->PipeListLock);
}
static VOID STDCALL
NpfsWaiterThread(PVOID Context)
{
PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
ULONG CurrentCount, Count = 0;
PNPFS_CONTEXT WaitContext = NULL;
NTSTATUS Status;
BOOLEAN Terminate = FALSE;
BOOLEAN Cancel = FALSE;
KIRQL oldIrql;
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
while (1)
{
CurrentCount = ThreadContext->Count;
KeResetEvent(&ThreadContext->Event);
KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
if (WaitContext)
{
if (Cancel)
{
WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
WaitContext->Irp->IoStatus.Information = 0;
IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
ExFreePool(WaitContext);
}
else
{
switch (WaitContext->MajorFunction)
{
case IRP_MJ_READ:
NpfsReadFromPipe(WaitContext);
break;
default:
KEBUGCHECK(0);
}
}
}
if (Terminate)
{
break;
}
Status = KeWaitForMultipleObjects(CurrentCount,
ThreadContext->WaitObjectArray,
WaitAny,
Executive,
KernelMode,
FALSE,
NULL,
ThreadContext->WaitBlockArray);
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
if (!NT_SUCCESS(Status))
{
KEBUGCHECK(0);
}
Count = Status - STATUS_SUCCESS;
ASSERT (Count <= CurrentCount);
if (Count > 0)
{
WaitContext = ThreadContext->WaitContextArray[Count];
ThreadContext->Count--;
ThreadContext->DeviceExt->EmptyWaiterCount++;
ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count];
IoAcquireCancelSpinLock(&oldIrql);
Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
IoReleaseCancelSpinLock(oldIrql);
}
else
{
/* someone has add a new wait request */
WaitContext = NULL;
}
if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
{
/* it exist an other thread with empty wait slots, we can remove our thread from the list */
RemoveEntryList(&ThreadContext->ListEntry);
ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1;
Terminate = TRUE;
}
}
KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
ExFreePool(ThreadContext);
}
static NTSTATUS
NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb)
{
PLIST_ENTRY ListEntry;
PNPFS_THREAD_CONTEXT ThreadContext;
NTSTATUS Status;
HANDLE hThread;
KIRQL oldIrql;
KeLockMutex(&DeviceExt->PipeListLock);
ListEntry = DeviceExt->ThreadListHead.Flink;
while (ListEntry != &DeviceExt->ThreadListHead)
{
ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry);
if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS)
{
break;
}
ListEntry = ListEntry->Flink;
}
if (ListEntry == &DeviceExt->ThreadListHead)
{
ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT));
if (ThreadContext == NULL)
{
KeUnlockMutex(&DeviceExt->PipeListLock);
return STATUS_NO_MEMORY;
}
ThreadContext->DeviceExt = DeviceExt;
KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE);
ThreadContext->Count = 1;
ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
DPRINT("Creating a new system thread for waiting read requests\n");
Status = PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS,
NULL,
NULL,
NULL,
NpfsWaiterThread,
(PVOID)ThreadContext);
if (!NT_SUCCESS(Status))
{
ExFreePool(ThreadContext);
KeUnlockMutex(&DeviceExt->PipeListLock);
return Status;
}
InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
}
IoMarkIrpPending(Context->Irp);
Context->Irp->Tail.Overlay.DriverContext[0] = Context;
IoAcquireCancelSpinLock(&oldIrql);
if (Context->Irp->Cancel)
{
IoReleaseCancelSpinLock(oldIrql);
Status = STATUS_CANCELLED;
}
else
{
IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
IoReleaseCancelSpinLock(oldIrql);
ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event;
ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
ThreadContext->Count++;
DeviceExt->EmptyWaiterCount--;
KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
Status = STATUS_SUCCESS;
}
KeUnlockMutex(&DeviceExt->PipeListLock);
return Status;
}
static NTSTATUS
NpfsReadFromPipe(PNPFS_CONTEXT Context)
{
PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
NTSTATUS Status;
PNPFS_DEVICE_EXTENSION DeviceExt;
KIRQL OldIrql;
ULONG Information;
PNPFS_FCB Fcb;
PNPFS_FCB WriterFcb;
@ -65,23 +246,14 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
ULONG CopyLength;
ULONG TempLength;
DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp);
DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
IoStack = IoGetCurrentIrpStackLocation(Irp);
IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
FileObject = IoStack->FileObject;
Fcb = FileObject->FsContext;
Pipe = Fcb->Pipe;
WriterFcb = Fcb->OtherSide;
if (Irp->MdlAddress == NULL)
{
DPRINT("Irp->MdlAddress == NULL\n");
Status = STATUS_UNSUCCESSFUL;
Information = 0;
goto done;
}
if (Fcb->Data == NULL)
{
DPRINT("Pipe is NOT readable!\n");
@ -94,41 +266,71 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
Length = IoStack->Parameters.Read.Length;
Information = 0;
Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
ExAcquireFastMutex(&Fcb->DataListLock);
while (1)
{
/* FIXME: check if in blocking mode */
if (Fcb->ReadDataAvailable == 0)
{
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
}
KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
ExReleaseFastMutex(&Fcb->DataListLock);
if (Information > 0)
{
Status = STATUS_SUCCESS;
goto done;
}
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE &&
!(Fcb->PipeState == FILE_PIPE_LISTENING_STATE && Fcb->PipeEnd == FILE_PIPE_SERVER_END))
{
DPRINT("PipeState: %x\n", Fcb->PipeState);
Status = STATUS_PIPE_BROKEN;
goto done;
}
/* Wait for ReadEvent to become signaled */
DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->Event,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
if (IoIsOperationSynchronous(Context->Irp))
{
/* Wait for ReadEvent to become signaled */
DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->Event,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
}
else
{
PNPFS_CONTEXT NewContext;
KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT));
if (NewContext == NULL)
{
Status = STATUS_NO_MEMORY;
goto done;
}
memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
NewContext->AllocatedFromPool = TRUE;
NewContext->Fcb = Fcb;
NewContext->MajorFunction = IRP_MJ_READ;
Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb);
if (NT_SUCCESS(Status))
{
Status = STATUS_PENDING;
}
else
{
ExFreePool(NewContext);
}
goto done;
}
ExAcquireFastMutex(&Fcb->DataListLock);
}
if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
@ -217,19 +419,47 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
}
}
KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
ExReleaseFastMutex(&Fcb->DataListLock);
done:
Irp->IoStatus.Status = Status;
Irp->IoStatus.Information = Information;
Context->Irp->IoStatus.Status = Status;
Context->Irp->IoStatus.Information = Information;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
if (Status != STATUS_PENDING)
{
IoCompleteRequest(Context->Irp, IO_NO_INCREMENT);
}
if (Context->AllocatedFromPool)
{
ExFreePool(Context);
}
DPRINT("NpfsRead done (Status %lx)\n", Status);
return Status;
}
NTSTATUS STDCALL
NpfsRead(PDEVICE_OBJECT DeviceObject,
PIRP Irp)
{
NPFS_CONTEXT Context;
Context.AllocatedFromPool = FALSE;
Context.DeviceObject = DeviceObject;
Context.Irp = Irp;
if (Irp->MdlAddress == NULL)
{
DPRINT("Irp->MdlAddress == NULL\n");
Irp->IoStatus.Status = STATUS_UNSUCCESSFUL;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
return STATUS_UNSUCCESSFUL;
}
return NpfsReadFromPipe(&Context);
}
NTSTATUS STDCALL
NpfsWrite(PDEVICE_OBJECT DeviceObject,
@ -244,7 +474,6 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
NTSTATUS Status = STATUS_SUCCESS;
ULONG Length;
ULONG Offset;
KIRQL OldIrql;
ULONG Information;
ULONG CopyLength;
ULONG TempLength;
@ -296,7 +525,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
Status = STATUS_SUCCESS;
Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
ExAcquireFastMutex(&ReaderFcb->DataListLock);
#ifndef NDEBUG
DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
HexDump(Buffer, Length);
@ -307,7 +536,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
if (ReaderFcb->WriteQuotaAvailable == 0)
{
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
ExReleaseFastMutex(&ReaderFcb->DataListLock);
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
Status = STATUS_PIPE_BROKEN;
@ -332,7 +561,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
Status = STATUS_PIPE_BROKEN;
goto done;
}
KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql);
ExAcquireFastMutex(&ReaderFcb->DataListLock);
}
if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
@ -395,7 +624,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
}
}
KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
ExReleaseFastMutex(&ReaderFcb->DataListLock);
done:
Irp->IoStatus.Status = Status;