diff --git a/reactos/drivers/fs/np/create.c b/reactos/drivers/fs/np/create.c index 43292453775..b42f0e881bd 100644 --- a/reactos/drivers/fs/np/create.c +++ b/reactos/drivers/fs/np/create.c @@ -50,20 +50,21 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe) PLIST_ENTRY CurrentEntry; PNPFS_WAITER_ENTRY Waiter; KIRQL oldIrql; + PIRP Irp; CurrentEntry = Pipe->WaiterListHead.Flink; while (CurrentEntry != &Pipe->WaiterListHead) { Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry); - if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE && - !Waiter->Irp->Cancel) + Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext); + if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE) { DPRINT("Server found! Fcb %p\n", Waiter->Fcb); IoAcquireCancelSpinLock(&oldIrql); - if (!Waiter->Irp->Cancel) + if (!Irp->Cancel) { - IoSetCancelRoutine(Waiter->Irp, NULL); + IoSetCancelRoutine(Irp, NULL); IoReleaseCancelSpinLock(oldIrql); return Waiter->Fcb; } @@ -83,6 +84,7 @@ NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe, { PLIST_ENTRY CurrentEntry; PNPFS_WAITER_ENTRY Waiter; + PIRP Irp; CurrentEntry = Pipe->WaiterListHead.Flink; while (CurrentEntry != &Pipe->WaiterListHead) @@ -92,13 +94,12 @@ NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe, { DPRINT("Server found! Fcb %p\n", Waiter->Fcb); - Waiter->Irp->IoStatus.Status = STATUS_PIPE_CONNECTED; - Waiter->Irp->IoStatus.Information = 0; - IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT); - RemoveEntryList(&Waiter->Entry); - ExFreePool(Waiter); - return; + Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext); + Irp->IoStatus.Status = STATUS_PIPE_CONNECTED; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + break; } CurrentEntry = CurrentEntry->Flink; } @@ -175,6 +176,9 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject, ClientFcb->PipeEnd = FILE_PIPE_CLIENT_END; ClientFcb->OtherSide = NULL; ClientFcb->PipeState = SpecialAccess ? 0 : FILE_PIPE_DISCONNECTED_STATE; + InitializeListHead(&ClientFcb->ReadRequestListHead); + + DPRINT("Fcb: %x\n", ClientFcb); /* Initialize data list. */ if (Pipe->OutboundQuota) @@ -202,7 +206,9 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject, ClientFcb->MaxDataLength = Pipe->OutboundQuota; ExInitializeFastMutex(&ClientFcb->DataListLock); KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE); - KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE); + KeInitializeEvent(&ClientFcb->ReadEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&ClientFcb->WriteEvent, SynchronizationEvent, FALSE); + /* * Step 3. Search for listening server FCB. @@ -489,6 +495,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, Fcb->ReadDataAvailable = 0; Fcb->WriteQuotaAvailable = Pipe->InboundQuota; Fcb->MaxDataLength = Pipe->InboundQuota; + InitializeListHead(&Fcb->ReadRequestListHead); ExInitializeFastMutex(&Fcb->DataListLock); Pipe->CurrentInstances++; @@ -498,13 +505,11 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, Fcb->PipeState = FILE_PIPE_LISTENING_STATE; Fcb->OtherSide = NULL; - KeInitializeEvent(&Fcb->ConnectEvent, - SynchronizationEvent, - FALSE); + DPRINT("Fcb: %x\n", Fcb); - KeInitializeEvent(&Fcb->Event, - SynchronizationEvent, - FALSE); + KeInitializeEvent(&Fcb->ConnectEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&Fcb->ReadEvent, SynchronizationEvent, FALSE); + KeInitializeEvent(&Fcb->WriteEvent, SynchronizationEvent, FALSE); KeLockMutex(&Pipe->FcbListLock); InsertTailList(&Pipe->ServerFcbListHead, &Fcb->FcbListEntry); @@ -528,7 +533,7 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject, PNPFS_DEVICE_EXTENSION DeviceExt; PIO_STACK_LOCATION IoStack; PFILE_OBJECT FileObject; - PNPFS_FCB Fcb; + PNPFS_FCB Fcb, OtherSide; PNPFS_PIPE Pipe; BOOL Server; @@ -566,18 +571,37 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject, { DPRINT("Client\n"); } - if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) { - if (Fcb->OtherSide) + OtherSide = Fcb->OtherSide; + /* Lock the server first */ + if (Server) { - Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; - Fcb->OtherSide->OtherSide = NULL; - /* - * Signaling the write event. If is possible that an other - * thread waits for an empty buffer. - */ - KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); + ExAcquireFastMutex(&Fcb->DataListLock); + ExAcquireFastMutex(&OtherSide->DataListLock); + } + else + { + ExAcquireFastMutex(&OtherSide->DataListLock); + ExAcquireFastMutex(&Fcb->DataListLock); + } + OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; + OtherSide->OtherSide = NULL; + /* + * Signaling the write event. If is possible that an other + * thread waits for an empty buffer. + */ + KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); + KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + if (Server) + { + ExReleaseFastMutex(&Fcb->DataListLock); + ExReleaseFastMutex(&OtherSide->DataListLock); + } + else + { + ExReleaseFastMutex(&OtherSide->DataListLock); + ExReleaseFastMutex(&Fcb->DataListLock); } } else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE) @@ -586,6 +610,7 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject, PNPFS_WAITER_ENTRY WaitEntry = NULL; BOOLEAN Complete = FALSE; KIRQL oldIrql; + PIRP tmpIrp; Entry = Fcb->Pipe->WaiterListHead.Flink; while (Entry != &Fcb->Pipe->WaiterListHead) @@ -594,28 +619,25 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject, if (WaitEntry->Fcb == Fcb) { RemoveEntryList(Entry); + tmpIrp = CONTAINING_RECORD(WaitEntry, IRP, Tail.Overlay.DriverContext); IoAcquireCancelSpinLock(&oldIrql); - if (!Irp->Cancel) + if (!tmpIrp->Cancel) { - IoSetCancelRoutine(WaitEntry->Irp, NULL); + IoSetCancelRoutine(tmpIrp, NULL); Complete = TRUE; } IoReleaseCancelSpinLock(oldIrql); + if (Complete) + { + tmpIrp->IoStatus.Status = STATUS_PIPE_BROKEN; + tmpIrp->IoStatus.Information = 0; + IoCompleteRequest(tmpIrp, IO_NO_INCREMENT); + } break; } Entry = Entry->Flink; } - if (Entry != &Fcb->Pipe->WaiterListHead) - { - if (Complete) - { - WaitEntry->Irp->IoStatus.Status = STATUS_PIPE_BROKEN; - WaitEntry->Irp->IoStatus.Information = 0; - IoCompleteRequest(WaitEntry->Irp, IO_NO_INCREMENT); - } - ExFreePool(WaitEntry); - } } Fcb->PipeState = FILE_PIPE_CLOSING_STATE; diff --git a/reactos/drivers/fs/np/fsctrl.c b/reactos/drivers/fs/np/fsctrl.c index 715b84af6bc..1413589d5a1 100644 --- a/reactos/drivers/fs/np/fsctrl.c +++ b/reactos/drivers/fs/np/fsctrl.c @@ -26,18 +26,18 @@ NpfsListeningCancelRoutine(IN PDEVICE_OBJECT DeviceObject, DPRINT1("NpfsListeningCancelRoutine() called\n"); + Waiter = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext; + IoReleaseCancelSpinLock(Irp->CancelIrql); - Waiter = Irp->Tail.Overlay.DriverContext[0]; - KeLockMutex(&Waiter->Pipe->FcbListLock); + KeLockMutex(&Waiter->Fcb->Pipe->FcbListLock); RemoveEntryList(&Waiter->Entry); - KeUnlockMutex(&Waiter->Pipe->FcbListLock); + KeUnlockMutex(&Waiter->Fcb->Pipe->FcbListLock); Irp->IoStatus.Status = STATUS_CANCELLED; Irp->IoStatus.Information = 0; IoCompleteRequest(Irp, IO_NO_INCREMENT); - ExFreePool(Waiter); } @@ -48,18 +48,13 @@ NpfsAddListeningServerInstance(PIRP Irp, PNPFS_WAITER_ENTRY Entry; KIRQL oldIrql; - Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY)); - if (Entry == NULL) - return STATUS_INSUFFICIENT_RESOURCES; + Entry = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext; - Entry->Irp = Irp; Entry->Fcb = Fcb; - Entry->Pipe = Fcb->Pipe; KeLockMutex(&Fcb->Pipe->FcbListLock); IoMarkIrpPending(Irp); - Irp->Tail.Overlay.DriverContext[0] = Entry; InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry); IoAcquireCancelSpinLock(&oldIrql); @@ -78,7 +73,6 @@ NpfsAddListeningServerInstance(PIRP Irp, Irp->IoStatus.Information = 0; IoCompleteRequest(Irp, IO_NO_INCREMENT); KeUnlockMutex(&Fcb->Pipe->FcbListLock); - ExFreePool(Entry); return STATUS_CANCELLED; } @@ -173,38 +167,100 @@ NpfsConnectPipe(PIRP Irp, static NTSTATUS NpfsDisconnectPipe(PNPFS_FCB Fcb) { - DPRINT("NpfsDisconnectPipe()\n"); + NTSTATUS Status; + PNPFS_FCB OtherSide; + PNPFS_PIPE Pipe; + BOOL Server; - if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE) - return STATUS_SUCCESS; + DPRINT("NpfsDisconnectPipe()\n"); - if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; - /* FIXME: Shouldn't this be FILE_PIPE_CLOSING_STATE? */ - Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; + Pipe = Fcb->Pipe; + KeLockMutex(&Pipe->FcbListLock); - /* FIXME: remove data queue(s) */ - - Fcb->OtherSide->OtherSide = NULL; + if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE) + { + DPRINT("Pipe is already disconnected\n"); + Status = STATUS_SUCCESS; + } + else if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) + { + Server = (Fcb->PipeEnd == FILE_PIPE_SERVER_END); + OtherSide = Fcb->OtherSide; Fcb->OtherSide = NULL; + /* Lock the server first */ + if (Server) + { + ExAcquireFastMutex(&Fcb->DataListLock); + ExAcquireFastMutex(&OtherSide->DataListLock); + } + else + { + ExAcquireFastMutex(&OtherSide->DataListLock); + ExAcquireFastMutex(&Fcb->DataListLock); + } + OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE; + OtherSide->OtherSide = NULL; + /* + * Signaling the write event. If is possible that an other + * thread waits for an empty buffer. + */ + KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); + KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + if (Server) + { + ExReleaseFastMutex(&Fcb->DataListLock); + ExReleaseFastMutex(&OtherSide->DataListLock); + } + else + { + ExReleaseFastMutex(&OtherSide->DataListLock); + ExReleaseFastMutex(&OtherSide->DataListLock); + } + Status = STATUS_SUCCESS; + } + else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE) + { + PLIST_ENTRY Entry; + PNPFS_WAITER_ENTRY WaitEntry = NULL; + BOOLEAN Complete = FALSE; + PIRP Irp = NULL; - DPRINT("Pipe disconnected\n"); - return STATUS_SUCCESS; - } + Entry = Fcb->Pipe->WaiterListHead.Flink; + while (Entry != &Fcb->Pipe->WaiterListHead) + { + WaitEntry = CONTAINING_RECORD(Entry, NPFS_WAITER_ENTRY, Entry); + if (WaitEntry->Fcb == Fcb) + { + RemoveEntryList(Entry); + Irp = CONTAINING_RECORD(Entry, IRP, Tail.Overlay.DriverContext); + Complete = (NULL == IoSetCancelRoutine(Irp, NULL)); + break; + } + Entry = Entry->Flink; + } - if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE) - { + if (Irp) + { + if (Complete) + { + Irp->IoStatus.Status = STATUS_PIPE_BROKEN; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + } + } Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; - Fcb->OtherSide = NULL; - - /* FIXME: remove data queue(s) */ - - DPRINT("Pipe disconnected\n"); - return STATUS_SUCCESS; - } - - return STATUS_UNSUCCESSFUL; + Status = STATUS_SUCCESS; + } + else if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE) + { + Status = STATUS_PIPE_CLOSING; + } + else + { + Status = STATUS_UNSUCCESSFUL; + } + KeUnlockMutex(&Pipe->FcbListLock); + return Status; } diff --git a/reactos/drivers/fs/np/npfs.c b/reactos/drivers/fs/np/npfs.c index d7be7ee294d..753f5d13cb5 100644 --- a/reactos/drivers/fs/np/npfs.c +++ b/reactos/drivers/fs/np/npfs.c @@ -27,6 +27,9 @@ DriverEntry(PDRIVER_OBJECT DriverObject, NTSTATUS Status; DPRINT("Named Pipe FSD 0.0.2\n"); + + ASSERT (sizeof(NPFS_CONTEXT) <= sizeof (((PIRP)NULL)->Tail.Overlay.DriverContext)); + ASSERT (sizeof(NPFS_WAITER_ENTRY) <= sizeof(((PIRP)NULL)->Tail.Overlay.DriverContext)); DriverObject->MajorFunction[IRP_MJ_CREATE] = NpfsCreate; DriverObject->MajorFunction[IRP_MJ_CREATE_NAMED_PIPE] = @@ -74,8 +77,7 @@ DriverEntry(PDRIVER_OBJECT DriverObject, DeviceExtension = DeviceObject->DeviceExtension; InitializeListHead(&DeviceExtension->PipeListHead); InitializeListHead(&DeviceExtension->ThreadListHead); - KeInitializeMutex(&DeviceExtension->PipeListLock, - 0); + KeInitializeMutex(&DeviceExtension->PipeListLock, 0); DeviceExtension->EmptyWaiterCount = 0; /* set the size quotas */ diff --git a/reactos/drivers/fs/np/npfs.h b/reactos/drivers/fs/np/npfs.h index b01053451a2..052b79f3be8 100644 --- a/reactos/drivers/fs/np/npfs.h +++ b/reactos/drivers/fs/np/npfs.h @@ -42,12 +42,15 @@ typedef struct _NPFS_FCB struct ETHREAD *Thread; PNPFS_PIPE Pipe; KEVENT ConnectEvent; - KEVENT Event; + KEVENT ReadEvent; + KEVENT WriteEvent; ULONG PipeEnd; ULONG PipeState; ULONG ReadDataAvailable; ULONG WriteQuotaAvailable; + LIST_ENTRY ReadRequestListHead; + PVOID Data; PVOID ReadPtr; PVOID WritePtr; @@ -58,11 +61,8 @@ typedef struct _NPFS_FCB typedef struct _NPFS_CONTEXT { - PDEVICE_OBJECT DeviceObject; - PIRP Irp; - PNPFS_FCB Fcb; - UCHAR MajorFunction; - BOOLEAN AllocatedFromPool; + LIST_ENTRY ListEntry; + PKEVENT WaitEvent; } NPFS_CONTEXT, *PNPFS_CONTEXT; typedef struct _NPFS_THREAD_CONTEXT @@ -73,14 +73,12 @@ typedef struct _NPFS_THREAD_CONTEXT LIST_ENTRY ListEntry; PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS]; KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS]; - PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS]; + PIRP WaitIrpArray[MAXIMUM_WAIT_OBJECTS]; } NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT; typedef struct _NPFS_WAITER_ENTRY { LIST_ENTRY Entry; - PIRP Irp; - PNPFS_PIPE Pipe; PNPFS_FCB Fcb; } NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY; diff --git a/reactos/drivers/fs/np/rw.c b/reactos/drivers/fs/np/rw.c index c0797b6c16b..d07796b4035 100644 --- a/reactos/drivers/fs/np/rw.c +++ b/reactos/drivers/fs/np/rw.c @@ -46,61 +46,90 @@ VOID HexDump(PUCHAR Buffer, ULONG Length) } #endif -static NTSTATUS -NpfsReadFromPipe(PNPFS_CONTEXT Context); - static VOID STDCALL -NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject, - IN PIRP Irp) +NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) { PNPFS_CONTEXT Context; PNPFS_DEVICE_EXTENSION DeviceExt; + PIO_STACK_LOCATION IoStack; + PNPFS_FCB Fcb; + BOOLEAN Complete = FALSE; - DPRINT1("NpfsWaitingCancelRoutine() called\n"); + DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %x, Irp %x)\n", DeviceObject, Irp); IoReleaseCancelSpinLock(Irp->CancelIrql); - Context = Irp->Tail.Overlay.DriverContext[0]; - DeviceExt = Context->DeviceObject->DeviceExtension; + Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension; + IoStack = IoGetCurrentIrpStackLocation(Irp); + Fcb = IoStack->FileObject->FsContext; KeLockMutex(&DeviceExt->PipeListLock); - KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE); + ExAcquireFastMutex(&Fcb->DataListLock); + switch(IoStack->MajorFunction) + { + case IRP_MJ_READ: + if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + /* we are not the first in the list, remove an complete us */ + RemoveEntryList(&Context->ListEntry); + Complete = TRUE; + } + else + { + KeSetEvent(&Fcb->ReadEvent, IO_NO_INCREMENT, FALSE); + } + break; + default: + KEBUGCHECK(0); + } + ExReleaseFastMutex(&Fcb->DataListLock); KeUnlockMutex(&DeviceExt->PipeListLock); + if (Complete) + { + Irp->IoStatus.Status = STATUS_CANCELLED; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + } } static VOID STDCALL -NpfsWaiterThread(PVOID Context) +NpfsWaiterThread(PVOID InitContext) { - PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context; - ULONG CurrentCount, Count = 0; - PNPFS_CONTEXT WaitContext = NULL; + PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext; + ULONG CurrentCount; + ULONG Count = 0; + PIRP Irp = NULL; + PIRP NextIrp; NTSTATUS Status; BOOLEAN Terminate = FALSE; BOOLEAN Cancel = FALSE; - KIRQL oldIrql; + PIO_STACK_LOCATION IoStack = NULL; + PNPFS_CONTEXT Context; + PNPFS_CONTEXT NextContext; + PNPFS_FCB Fcb; KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); while (1) { CurrentCount = ThreadContext->Count; - KeResetEvent(&ThreadContext->Event); KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock); - if (WaitContext) + if (Irp) { if (Cancel) { - WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED; - WaitContext->Irp->IoStatus.Information = 0; - IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT); - ExFreePool(WaitContext); + Irp->IoStatus.Status = STATUS_CANCELLED; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); } else { - switch (WaitContext->MajorFunction) + switch (IoStack->MajorFunction) { case IRP_MJ_READ: - NpfsReadFromPipe(WaitContext); + NpfsRead(IoStack->DeviceObject, Irp); break; default: KEBUGCHECK(0); @@ -119,30 +148,56 @@ NpfsWaiterThread(PVOID Context) FALSE, NULL, ThreadContext->WaitBlockArray); - KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); if (!NT_SUCCESS(Status)) { KEBUGCHECK(0); } + KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); Count = Status - STATUS_SUCCESS; - ASSERT (Count <= CurrentCount); + ASSERT (Count < CurrentCount); if (Count > 0) - { - WaitContext = ThreadContext->WaitContextArray[Count]; - ThreadContext->Count--; - ThreadContext->DeviceExt->EmptyWaiterCount++; - ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count]; - ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count]; - IoAcquireCancelSpinLock(&oldIrql); - Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL); - IoReleaseCancelSpinLock(oldIrql); - } - else - { + { + Irp = ThreadContext->WaitIrpArray[Count]; + ThreadContext->Count--; + ThreadContext->DeviceExt->EmptyWaiterCount++; + ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count]; + ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count]; + + Cancel = (NULL == IoSetCancelRoutine(Irp, NULL)); + Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + IoStack = IoGetCurrentIrpStackLocation(Irp); + + if (Cancel) + { + Fcb = IoStack->FileObject->FsContext; + ExAcquireFastMutex(&Fcb->DataListLock); + RemoveEntryList(&Context->ListEntry); + switch (IoStack->MajorFunction) + { + case IRP_MJ_READ: + if (!IsListEmpty(&Fcb->ReadRequestListHead)) + { + /* put the next request on the wait list */ + NextContext = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry); + ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent; + NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext); + ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp; + ThreadContext->Count++; + ThreadContext->DeviceExt->EmptyWaiterCount--; + } + break; + default: + KEBUGCHECK(0); + } + ExReleaseFastMutex(&Fcb->DataListLock); + } + } + else + { /* someone has add a new wait request */ - WaitContext = NULL; - } - if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS) + Irp = NULL; + } + if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS) { /* it exist an other thread with empty wait slots, we can remove our thread from the list */ RemoveEntryList(&ThreadContext->ListEntry); @@ -155,14 +210,20 @@ NpfsWaiterThread(PVOID Context) } static NTSTATUS -NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb) +NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) { PLIST_ENTRY ListEntry; - PNPFS_THREAD_CONTEXT ThreadContext; + PNPFS_THREAD_CONTEXT ThreadContext = NULL; NTSTATUS Status; HANDLE hThread; KIRQL oldIrql; + PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;; + + DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp); + KeLockMutex(&DeviceExt->PipeListLock); ListEntry = DeviceExt->ThreadListHead.Flink; @@ -184,12 +245,12 @@ NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PN return STATUS_NO_MEMORY; } ThreadContext->DeviceExt = DeviceExt; - KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE); + KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE); ThreadContext->Count = 1; ThreadContext->WaitObjectArray[0] = &ThreadContext->Event; - DPRINT("Creating a new system thread for waiting read requests\n"); + DPRINT("Creating a new system thread for waiting read/write requests\n"); Status = PsCreateSystemThread(&hThread, THREAD_ALL_ACCESS, @@ -207,21 +268,20 @@ NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PN InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry); DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1; } - IoMarkIrpPending(Context->Irp); - Context->Irp->Tail.Overlay.DriverContext[0] = Context; + IoMarkIrpPending(Irp); IoAcquireCancelSpinLock(&oldIrql); - if (Context->Irp->Cancel) + if (Irp->Cancel) { IoReleaseCancelSpinLock(oldIrql); Status = STATUS_CANCELLED; } else { - IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine); + IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine); IoReleaseCancelSpinLock(oldIrql); - ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event; - ThreadContext->WaitContextArray[ThreadContext->Count] = Context; + ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent; + ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp; ThreadContext->Count++; DeviceExt->EmptyWaiterCount--; KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE); @@ -231,130 +291,182 @@ NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PN return Status; } -static NTSTATUS -NpfsReadFromPipe(PNPFS_CONTEXT Context) +NTSTATUS STDCALL +NpfsRead(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) { - PIO_STACK_LOCATION IoStack; PFILE_OBJECT FileObject; NTSTATUS Status; - ULONG Information; + NTSTATUS OriginalStatus = STATUS_SUCCESS; PNPFS_FCB Fcb; - PNPFS_FCB WriterFcb; - PNPFS_PIPE Pipe; + PNPFS_CONTEXT Context; + KEVENT Event; ULONG Length; - PVOID Buffer; + ULONG Information; ULONG CopyLength; ULONG TempLength; + BOOLEAN IsOriginalRequest = TRUE; + PVOID Buffer; - DPRINT("NpfsReadFromPipe(Context %p)\n", Context); + DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp); - IoStack = IoGetCurrentIrpStackLocation(Context->Irp); - FileObject = IoStack->FileObject; + if (Irp->MdlAddress == NULL) + { + DPRINT("Irp->MdlAddress == NULL\n"); + Status = STATUS_UNSUCCESSFUL; + Irp->IoStatus.Information = 0; + goto done; + } + + FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject; Fcb = FileObject->FsContext; - Pipe = Fcb->Pipe; - WriterFcb = Fcb->OtherSide; + Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; if (Fcb->Data == NULL) - { - DPRINT("Pipe is NOT readable!\n"); - Status = STATUS_UNSUCCESSFUL; - Information = 0; - goto done; - } + { + DPRINT1("Pipe is NOT readable!\n"); + Status = STATUS_UNSUCCESSFUL; + Irp->IoStatus.Information = 0; + goto done; + } - Status = STATUS_SUCCESS; - Length = IoStack->Parameters.Read.Length; - Information = 0; - - Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress); ExAcquireFastMutex(&Fcb->DataListLock); - while (1) - { - if (Fcb->ReadDataAvailable == 0) - { - if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); - } - ExReleaseFastMutex(&Fcb->DataListLock); - if (Information > 0) - { - Status = STATUS_SUCCESS; - goto done; - } - if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) - { + if (IoIsOperationSynchronous(Irp)) + { + InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry); + if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + KeInitializeEvent(&Event, SynchronizationEvent, FALSE); + Context->WaitEvent = &Event; + ExReleaseFastMutex(&Fcb->DataListLock); + Status = KeWaitForSingleObject(&Event, + Executive, + KernelMode, + FALSE, + NULL); + if (!NT_SUCCESS(Status)) + { + KEBUGCHECK(0); + } + ExAcquireFastMutex(&Fcb->DataListLock); + } + Irp->IoStatus.Information = 0; + } + else + { + KIRQL oldIrql; + if (IsListEmpty(&Fcb->ReadRequestListHead) || + Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + /* this is a new request */ + Irp->IoStatus.Information = 0; + Context->WaitEvent = &Fcb->ReadEvent; + InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry); + if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry) + { + /* there was already a request on the list */ + IoAcquireCancelSpinLock(&oldIrql); + if (Irp->Cancel) + { + IoReleaseCancelSpinLock(oldIrql); + RemoveEntryList(&Context->ListEntry); + ExReleaseFastMutex(&Fcb->DataListLock); + Status = STATUS_CANCELLED; + goto done; + } + IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine); + IoReleaseCancelSpinLock(oldIrql); + ExReleaseFastMutex(&Fcb->DataListLock); + IoMarkIrpPending(Irp); + Status = STATUS_PENDING; + goto done; + } + } + } + + while (1) + { + Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress); + Information = Irp->IoStatus.Information; + Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length; + ASSERT (Information <= Length); + Buffer += Information; + Length -= Information; + Status = STATUS_SUCCESS; + + while (1) + { + if (Fcb->ReadDataAvailable == 0) + { + if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) + { + KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + } + if (Information > 0 && + (Fcb->Pipe->ReadMode != FILE_PIPE_BYTE_STREAM_MODE || + Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)) + { + break; + } + if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) + { DPRINT("PipeState: %x\n", Fcb->PipeState); Status = STATUS_PIPE_BROKEN; - goto done; - } - - if (IoIsOperationSynchronous(Context->Irp)) - { + break; + } + ExReleaseFastMutex(&Fcb->DataListLock); + if (IoIsOperationSynchronous(Irp)) + { /* Wait for ReadEvent to become signaled */ - DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); - Status = KeWaitForSingleObject(&Fcb->Event, + + DPRINT("Waiting for readable data (%wZ)\n", &Fcb->Pipe->PipeName); + Status = KeWaitForSingleObject(&Fcb->ReadEvent, UserRequest, KernelMode, FALSE, NULL); - DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); - } - else - { - PNPFS_CONTEXT NewContext; - - NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT)); - if (NewContext == NULL) - { - Status = STATUS_NO_MEMORY; - goto done; - } - memcpy(NewContext, Context, sizeof(NPFS_CONTEXT)); - NewContext->AllocatedFromPool = TRUE; - NewContext->Fcb = Fcb; - NewContext->MajorFunction = IRP_MJ_READ; - - Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb); + DPRINT("Finished waiting (%wZ)! Status: %x\n", &Fcb->Pipe->PipeName, Status); + ExAcquireFastMutex(&Fcb->DataListLock); + } + else + { + PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext; + + Context->WaitEvent = &Fcb->ReadEvent; + Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp); if (NT_SUCCESS(Status)) - { - Status = STATUS_PENDING; - } - else - { - ExFreePool(NewContext); - } - goto done; - } - - ExAcquireFastMutex(&Fcb->DataListLock); - } - - if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE) - { - DPRINT("Byte stream mode\n"); - /* Byte stream mode */ - while (Length > 0 && Fcb->ReadDataAvailable > 0) - { + { + Status = STATUS_PENDING; + } + ExAcquireFastMutex(&Fcb->DataListLock); + break; + } + } + if (Fcb->Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE) + { + DPRINT("Byte stream mode\n"); + /* Byte stream mode */ + while (Length > 0 && Fcb->ReadDataAvailable > 0) + { CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length); if (Fcb->ReadPtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength) - { - memcpy(Buffer, Fcb->ReadPtr, CopyLength); - Fcb->ReadPtr += CopyLength; - if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength) - { - Fcb->ReadPtr = Fcb->Data; - } - } - else - { - TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr; - memcpy(Buffer, Fcb->ReadPtr, TempLength); - memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength); - Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength; - } + { + memcpy(Buffer, Fcb->ReadPtr, CopyLength); + Fcb->ReadPtr += CopyLength; + if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength) + { + Fcb->ReadPtr = Fcb->Data; + } + } + else + { + TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr; + memcpy(Buffer, Fcb->ReadPtr, TempLength); + memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength); + Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength; + } Buffer += CopyLength; Length -= CopyLength; @@ -362,25 +474,25 @@ NpfsReadFromPipe(PNPFS_CONTEXT Context) Fcb->ReadDataAvailable -= CopyLength; Fcb->WriteQuotaAvailable += CopyLength; - } + } - if (Length == 0) - { + if (Length == 0) + { if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); - } - KeResetEvent(&Fcb->Event); - break; - } - } - else - { - DPRINT("Message mode\n"); + { + KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + } + KeResetEvent(&Fcb->ReadEvent); + break; + } + } + else + { + DPRINT("Message mode\n"); - /* Message mode */ - if (Fcb->ReadDataAvailable) - { + /* Message mode */ + if (Fcb->ReadDataAvailable) + { /* Truncate the message if the receive buffer is too small */ CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length); memcpy(Buffer, Fcb->Data, CopyLength); @@ -393,73 +505,85 @@ NpfsReadFromPipe(PNPFS_CONTEXT Context) Information = CopyLength; if (Fcb->ReadDataAvailable > Length) - { - memmove(Fcb->Data, Fcb->Data + Length, - Fcb->ReadDataAvailable - Length); - Fcb->ReadDataAvailable -= Length; - Status = STATUS_MORE_ENTRIES; - } + { + memmove(Fcb->Data, Fcb->Data + Length, + Fcb->ReadDataAvailable - Length); + Fcb->ReadDataAvailable -= Length; + Status = STATUS_MORE_ENTRIES; + } else - { - KeResetEvent(&Fcb->Event); - if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); - } - Fcb->ReadDataAvailable = 0; - Fcb->WriteQuotaAvailable = Fcb->MaxDataLength; - } - } + { + KeResetEvent(&Fcb->ReadEvent); + if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) + { + KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE); + } + Fcb->ReadDataAvailable = 0; + Fcb->WriteQuotaAvailable = Fcb->MaxDataLength; + } + } - if (Information > 0) - { - break; - } + if (Information > 0) + { + break; + } + } + } + Irp->IoStatus.Information = Information; + Irp->IoStatus.Status = Status; + + if (IoIsOperationSynchronous(Irp)) + { + RemoveEntryList(&Context->ListEntry); + if (!IsListEmpty(&Fcb->ReadRequestListHead)) + { + Context = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry); + KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE); } - } + ExReleaseFastMutex(&Fcb->DataListLock); + IoCompleteRequest(Irp, IO_NO_INCREMENT); - ExReleaseFastMutex(&Fcb->DataListLock); + DPRINT("NpfsRead done (Status %lx)\n", Status); + return Status; + } + else + { + if (IsOriginalRequest) + { + IsOriginalRequest = FALSE; + OriginalStatus = Status; + } + if (Status == STATUS_PENDING) + { + ExReleaseFastMutex(&Fcb->DataListLock); + DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus); + return OriginalStatus; + } + RemoveEntryList(&Context->ListEntry); + IoCompleteRequest(Irp, IO_NO_INCREMENT); + if (IsListEmpty(&Fcb->ReadRequestListHead)) + { + ExReleaseFastMutex(&Fcb->DataListLock); + DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus); + return OriginalStatus; + } + Context = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry); + Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext); + } + } done: - Context->Irp->IoStatus.Status = Status; - Context->Irp->IoStatus.Information = Information; + Irp->IoStatus.Status = Status; if (Status != STATUS_PENDING) { - IoCompleteRequest(Context->Irp, IO_NO_INCREMENT); - } - - if (Context->AllocatedFromPool) - { - ExFreePool(Context); + IoCompleteRequest(Irp, IO_NO_INCREMENT); } DPRINT("NpfsRead done (Status %lx)\n", Status); return Status; } -NTSTATUS STDCALL -NpfsRead(PDEVICE_OBJECT DeviceObject, - PIRP Irp) -{ - NPFS_CONTEXT Context; - - Context.AllocatedFromPool = FALSE; - Context.DeviceObject = DeviceObject; - Context.Irp = Irp; - - if (Irp->MdlAddress == NULL) - { - DPRINT("Irp->MdlAddress == NULL\n"); - Irp->IoStatus.Status = STATUS_UNSUCCESSFUL; - Irp->IoStatus.Information = 0; - IoCompleteRequest(Irp, IO_NO_INCREMENT); - return STATUS_UNSUCCESSFUL; - } - - return NpfsReadFromPipe(&Context); -} - NTSTATUS STDCALL NpfsWrite(PDEVICE_OBJECT DeviceObject, PIRP Irp) @@ -534,22 +658,24 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, { if (ReaderFcb->WriteQuotaAvailable == 0) { - KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); - ExReleaseFastMutex(&ReaderFcb->DataListLock); + KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE); if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) { Status = STATUS_PIPE_BROKEN; + ExReleaseFastMutex(&ReaderFcb->DataListLock); goto done; } + ExReleaseFastMutex(&ReaderFcb->DataListLock); DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer); - Status = KeWaitForSingleObject(&Fcb->Event, - UserRequest, + Status = KeWaitForSingleObject(&Fcb->WriteEvent, + UserRequest, KernelMode, FALSE, NULL); DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); + ExAcquireFastMutex(&ReaderFcb->DataListLock); /* * It's possible that the event was signaled because the * other side of pipe was closed. @@ -558,9 +684,9 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, { DPRINT("PipeState: %x\n", Fcb->PipeState); Status = STATUS_PIPE_BROKEN; + ExReleaseFastMutex(&ReaderFcb->DataListLock); goto done; } - ExAcquireFastMutex(&ReaderFcb->DataListLock); } if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE) @@ -596,8 +722,8 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, if (Length == 0) { - KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); - KeResetEvent(&Fcb->Event); + KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE); + KeResetEvent(&Fcb->WriteEvent); break; } } @@ -616,8 +742,8 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, if (Information > 0) { - KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); - KeResetEvent(&Fcb->Event); + KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE); + KeResetEvent(&Fcb->WriteEvent); break; } }