diff --git a/reactos/drivers/fs/np/create.c b/reactos/drivers/fs/np/create.c index 4e2c6330fd2..3ab8deb8c44 100644 --- a/reactos/drivers/fs/np/create.c +++ b/reactos/drivers/fs/np/create.c @@ -49,6 +49,7 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe) { PLIST_ENTRY CurrentEntry; PNPFS_WAITER_ENTRY Waiter; + KIRQL oldIrql; CurrentEntry = Pipe->WaiterListHead.Flink; while (CurrentEntry != &Pipe->WaiterListHead) @@ -58,11 +59,15 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe) !Waiter->Irp->Cancel) { DPRINT("Server found! Fcb %p\n", Waiter->Fcb); - - if (IoSetCancelRoutine(Waiter->Irp, NULL) != NULL) - { + + IoAcquireCancelSpinLock(&oldIrql); + if (!Waiter->Irp->Cancel) + { + IoSetCancelRoutine(Waiter->Irp, NULL); + IoReleaseCancelSpinLock(oldIrql); return Waiter->Fcb; } + IoReleaseCancelSpinLock(oldIrql); } CurrentEntry = CurrentEntry->Flink; @@ -174,7 +179,7 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject, /* Initialize data list. */ if (Pipe->OutboundQuota) { - ClientFcb->Data = ExAllocatePool(NonPagedPool, Pipe->OutboundQuota); + ClientFcb->Data = ExAllocatePool(PagedPool, Pipe->OutboundQuota); if (ClientFcb->Data == NULL) { DPRINT("No memory!\n"); @@ -195,7 +200,7 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject, ClientFcb->ReadDataAvailable = 0; ClientFcb->WriteQuotaAvailable = Pipe->OutboundQuota; ClientFcb->MaxDataLength = Pipe->OutboundQuota; - KeInitializeSpinLock(&ClientFcb->DataListLock); + ExInitializeFastMutex(&ClientFcb->DataListLock); KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE); KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE); @@ -455,13 +460,17 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, if (Pipe->InboundQuota) { - Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota); + Fcb->Data = ExAllocatePool(PagedPool, Pipe->InboundQuota); if (Fcb->Data == NULL) { ExFreePool(Fcb); if (NewPipe) { + /* + * FIXME: + * Lock the pipelist and remove the pipe from the list. + */ RtlFreeUnicodeString(&Pipe->PipeName); ExFreePool(Pipe); } @@ -481,7 +490,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, Fcb->ReadDataAvailable = 0; Fcb->WriteQuotaAvailable = Pipe->InboundQuota; Fcb->MaxDataLength = Pipe->InboundQuota; - KeInitializeSpinLock(&Fcb->DataListLock); + ExInitializeFastMutex(&Fcb->DataListLock); Pipe->CurrentInstances++; diff --git a/reactos/drivers/fs/np/fsctrl.c b/reactos/drivers/fs/np/fsctrl.c index cc6ed883072..715b84af6bc 100644 --- a/reactos/drivers/fs/np/fsctrl.c +++ b/reactos/drivers/fs/np/fsctrl.c @@ -46,6 +46,7 @@ NpfsAddListeningServerInstance(PIRP Irp, PNPFS_FCB Fcb) { PNPFS_WAITER_ENTRY Entry; + KIRQL oldIrql; Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY)); if (Entry == NULL) @@ -61,13 +62,15 @@ NpfsAddListeningServerInstance(PIRP Irp, Irp->Tail.Overlay.DriverContext[0] = Entry; InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry); - IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine); - + IoAcquireCancelSpinLock(&oldIrql); if (!Irp->Cancel) { + IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine); + IoReleaseCancelSpinLock(oldIrql); KeUnlockMutex(&Fcb->Pipe->FcbListLock); return STATUS_PENDING; } + IoReleaseCancelSpinLock(oldIrql); RemoveEntryList(&Entry->Entry); diff --git a/reactos/drivers/fs/np/npfs.c b/reactos/drivers/fs/np/npfs.c index 9075ac7d577..8ac5c4853fe 100644 --- a/reactos/drivers/fs/np/npfs.c +++ b/reactos/drivers/fs/np/npfs.c @@ -73,8 +73,10 @@ DriverEntry(PDRIVER_OBJECT DriverObject, /* initialize the device extension */ DeviceExtension = DeviceObject->DeviceExtension; InitializeListHead(&DeviceExtension->PipeListHead); + InitializeListHead(&DeviceExtension->ThreadListHead); KeInitializeMutex(&DeviceExtension->PipeListLock, 0); + DeviceExtension->EmptyWaiterCount = 0; /* set the size quotas */ DeviceExtension->MinQuota = PAGE_SIZE; diff --git a/reactos/drivers/fs/np/npfs.h b/reactos/drivers/fs/np/npfs.h index 1038411e2fb..a7a63602ca4 100644 --- a/reactos/drivers/fs/np/npfs.h +++ b/reactos/drivers/fs/np/npfs.h @@ -6,7 +6,9 @@ typedef struct _NPFS_DEVICE_EXTENSION { LIST_ENTRY PipeListHead; + LIST_ENTRY ThreadListHead; KMUTEX PipeListLock; + ULONG EmptyWaiterCount; ULONG MinQuota; ULONG DefaultQuota; ULONG MaxQuota; @@ -20,6 +22,7 @@ typedef struct _NPFS_PIPE LIST_ENTRY ServerFcbListHead; LIST_ENTRY ClientFcbListHead; LIST_ENTRY WaiterListHead; + LIST_ENTRY EmptyBufferListHead; ULONG PipeType; ULONG ReadMode; ULONG WriteMode; @@ -50,9 +53,29 @@ typedef struct _NPFS_FCB PVOID WritePtr; ULONG MaxDataLength; - KSPIN_LOCK DataListLock; /* Data queue lock */ + FAST_MUTEX DataListLock; /* Data queue lock */ } NPFS_FCB, *PNPFS_FCB; +typedef struct _NPFS_CONTEXT +{ + PDEVICE_OBJECT DeviceObject; + PIRP Irp; + PNPFS_FCB Fcb; + UCHAR MajorFunction; + BOOLEAN AllocatedFromPool; +} NPFS_CONTEXT, *PNPFS_CONTEXT; + +typedef struct _NPFS_THREAD_CONTEXT +{ + ULONG Count; + KEVENT Event; + PNPFS_DEVICE_EXTENSION DeviceExt; + LIST_ENTRY ListEntry; + PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS]; + KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS]; + PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS]; +} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT; + typedef struct _NPFS_WAITER_ENTRY { LIST_ENTRY Entry; diff --git a/reactos/drivers/fs/np/rw.c b/reactos/drivers/fs/np/rw.c index 254f4b1ba47..10b085c067b 100644 --- a/reactos/drivers/fs/np/rw.c +++ b/reactos/drivers/fs/np/rw.c @@ -46,16 +46,197 @@ VOID HexDump(PUCHAR Buffer, ULONG Length) } #endif +static NTSTATUS +NpfsReadFromPipe(PNPFS_CONTEXT Context); -NTSTATUS STDCALL -NpfsRead(PDEVICE_OBJECT DeviceObject, - PIRP Irp) +static VOID STDCALL +NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject, + IN PIRP Irp) +{ + PNPFS_CONTEXT Context; + PNPFS_DEVICE_EXTENSION DeviceExt; + + DPRINT1("NpfsWaitingCancelRoutine() called\n"); + + IoReleaseCancelSpinLock(Irp->CancelIrql); + + Context = Irp->Tail.Overlay.DriverContext[0]; + DeviceExt = Context->DeviceObject->DeviceExtension; + + KeLockMutex(&DeviceExt->PipeListLock); + KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE); + KeUnlockMutex(&DeviceExt->PipeListLock); +} + +static VOID STDCALL +NpfsWaiterThread(PVOID Context) +{ + PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context; + ULONG CurrentCount, Count = 0; + PNPFS_CONTEXT WaitContext = NULL; + NTSTATUS Status; + BOOLEAN Terminate = FALSE; + BOOLEAN Cancel = FALSE; + KIRQL oldIrql; + + KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); + + while (1) + { + CurrentCount = ThreadContext->Count; + KeResetEvent(&ThreadContext->Event); + KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock); + if (WaitContext) + { + if (Cancel) + { + WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED; + WaitContext->Irp->IoStatus.Information = 0; + IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT); + ExFreePool(WaitContext); + } + else + { + switch (WaitContext->MajorFunction) + { + case IRP_MJ_READ: + NpfsReadFromPipe(WaitContext); + break; + default: + KEBUGCHECK(0); + } + } + } + if (Terminate) + { + break; + } + Status = KeWaitForMultipleObjects(CurrentCount, + ThreadContext->WaitObjectArray, + WaitAny, + Executive, + KernelMode, + FALSE, + NULL, + ThreadContext->WaitBlockArray); + KeLockMutex(&ThreadContext->DeviceExt->PipeListLock); + if (!NT_SUCCESS(Status)) + { + KEBUGCHECK(0); + } + Count = Status - STATUS_SUCCESS; + ASSERT (Count <= CurrentCount); + if (Count > 0) + { + WaitContext = ThreadContext->WaitContextArray[Count]; + ThreadContext->Count--; + ThreadContext->DeviceExt->EmptyWaiterCount++; + ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count]; + ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count]; + IoAcquireCancelSpinLock(&oldIrql); + Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL); + IoReleaseCancelSpinLock(oldIrql); + } + else + { + /* someone has add a new wait request */ + WaitContext = NULL; + } + if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS) + { + /* it exist an other thread with empty wait slots, we can remove our thread from the list */ + RemoveEntryList(&ThreadContext->ListEntry); + ThreadContext->DeviceExt->EmptyWaiterCount -= MAXIMUM_WAIT_OBJECTS - 1; + Terminate = TRUE; + } + } + KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock); + ExFreePool(ThreadContext); +} + +static NTSTATUS +NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb) +{ + PLIST_ENTRY ListEntry; + PNPFS_THREAD_CONTEXT ThreadContext; + NTSTATUS Status; + HANDLE hThread; + KIRQL oldIrql; + + KeLockMutex(&DeviceExt->PipeListLock); + + ListEntry = DeviceExt->ThreadListHead.Flink; + while (ListEntry != &DeviceExt->ThreadListHead) + { + ThreadContext = CONTAINING_RECORD(ListEntry, NPFS_THREAD_CONTEXT, ListEntry); + if (ThreadContext->Count < MAXIMUM_WAIT_OBJECTS) + { + break; + } + ListEntry = ListEntry->Flink; + } + if (ListEntry == &DeviceExt->ThreadListHead) + { + ThreadContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_THREAD_CONTEXT)); + if (ThreadContext == NULL) + { + KeUnlockMutex(&DeviceExt->PipeListLock); + return STATUS_NO_MEMORY; + } + ThreadContext->DeviceExt = DeviceExt; + KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE); + ThreadContext->Count = 1; + ThreadContext->WaitObjectArray[0] = &ThreadContext->Event; + + + DPRINT("Creating a new system thread for waiting read requests\n"); + + Status = PsCreateSystemThread(&hThread, + THREAD_ALL_ACCESS, + NULL, + NULL, + NULL, + NpfsWaiterThread, + (PVOID)ThreadContext); + if (!NT_SUCCESS(Status)) + { + ExFreePool(ThreadContext); + KeUnlockMutex(&DeviceExt->PipeListLock); + return Status; + } + InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry); + DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1; + } + IoMarkIrpPending(Context->Irp); + Context->Irp->Tail.Overlay.DriverContext[0] = Context; + + IoAcquireCancelSpinLock(&oldIrql); + if (Context->Irp->Cancel) + { + IoReleaseCancelSpinLock(oldIrql); + Status = STATUS_CANCELLED; + } + else + { + IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine); + IoReleaseCancelSpinLock(oldIrql); + ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event; + ThreadContext->WaitContextArray[ThreadContext->Count] = Context; + ThreadContext->Count++; + DeviceExt->EmptyWaiterCount--; + KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE); + Status = STATUS_SUCCESS; + } + KeUnlockMutex(&DeviceExt->PipeListLock); + return Status; +} + +static NTSTATUS +NpfsReadFromPipe(PNPFS_CONTEXT Context) { PIO_STACK_LOCATION IoStack; PFILE_OBJECT FileObject; NTSTATUS Status; - PNPFS_DEVICE_EXTENSION DeviceExt; - KIRQL OldIrql; ULONG Information; PNPFS_FCB Fcb; PNPFS_FCB WriterFcb; @@ -65,23 +246,14 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, ULONG CopyLength; ULONG TempLength; - DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp); + DPRINT("NpfsReadFromPipe(Context %p)\n", Context); - DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension; - IoStack = IoGetCurrentIrpStackLocation(Irp); + IoStack = IoGetCurrentIrpStackLocation(Context->Irp); FileObject = IoStack->FileObject; Fcb = FileObject->FsContext; Pipe = Fcb->Pipe; WriterFcb = Fcb->OtherSide; - if (Irp->MdlAddress == NULL) - { - DPRINT("Irp->MdlAddress == NULL\n"); - Status = STATUS_UNSUCCESSFUL; - Information = 0; - goto done; - } - if (Fcb->Data == NULL) { DPRINT("Pipe is NOT readable!\n"); @@ -94,41 +266,71 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, Length = IoStack->Parameters.Read.Length; Information = 0; - Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress); - KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql); + Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress); + ExAcquireFastMutex(&Fcb->DataListLock); while (1) { - /* FIXME: check if in blocking mode */ if (Fcb->ReadDataAvailable == 0) { if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) { KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); } - KeReleaseSpinLock(&Fcb->DataListLock, OldIrql); + ExReleaseFastMutex(&Fcb->DataListLock); if (Information > 0) { Status = STATUS_SUCCESS; goto done; } - if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) + if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE && + !(Fcb->PipeState == FILE_PIPE_LISTENING_STATE && Fcb->PipeEnd == FILE_PIPE_SERVER_END)) { DPRINT("PipeState: %x\n", Fcb->PipeState); Status = STATUS_PIPE_BROKEN; goto done; } - /* Wait for ReadEvent to become signaled */ - DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); - Status = KeWaitForSingleObject(&Fcb->Event, - UserRequest, - KernelMode, - FALSE, - NULL); - DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); + if (IoIsOperationSynchronous(Context->Irp)) + { + /* Wait for ReadEvent to become signaled */ + DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); + Status = KeWaitForSingleObject(&Fcb->Event, + UserRequest, + KernelMode, + FALSE, + NULL); + DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); + } + else + { + PNPFS_CONTEXT NewContext; - KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql); + NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT)); + if (NewContext == NULL) + { + Status = STATUS_NO_MEMORY; + goto done; + } + memcpy(NewContext, Context, sizeof(NPFS_CONTEXT)); + NewContext->AllocatedFromPool = TRUE; + NewContext->Fcb = Fcb; + NewContext->MajorFunction = IRP_MJ_READ; + + Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb); + + if (NT_SUCCESS(Status)) + { + Status = STATUS_PENDING; + } + else + { + ExFreePool(NewContext); + } + goto done; + } + + ExAcquireFastMutex(&Fcb->DataListLock); } if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE) @@ -217,19 +419,47 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, } } - KeReleaseSpinLock(&Fcb->DataListLock, OldIrql); + ExReleaseFastMutex(&Fcb->DataListLock); done: - Irp->IoStatus.Status = Status; - Irp->IoStatus.Information = Information; + Context->Irp->IoStatus.Status = Status; + Context->Irp->IoStatus.Information = Information; - IoCompleteRequest(Irp, IO_NO_INCREMENT); + if (Status != STATUS_PENDING) + { + IoCompleteRequest(Context->Irp, IO_NO_INCREMENT); + } + if (Context->AllocatedFromPool) + { + ExFreePool(Context); + } DPRINT("NpfsRead done (Status %lx)\n", Status); return Status; } +NTSTATUS STDCALL +NpfsRead(PDEVICE_OBJECT DeviceObject, + PIRP Irp) +{ + NPFS_CONTEXT Context; + + Context.AllocatedFromPool = FALSE; + Context.DeviceObject = DeviceObject; + Context.Irp = Irp; + + if (Irp->MdlAddress == NULL) + { + DPRINT("Irp->MdlAddress == NULL\n"); + Irp->IoStatus.Status = STATUS_UNSUCCESSFUL; + Irp->IoStatus.Information = 0; + IoCompleteRequest(Irp, IO_NO_INCREMENT); + return STATUS_UNSUCCESSFUL; + } + + return NpfsReadFromPipe(&Context); +} NTSTATUS STDCALL NpfsWrite(PDEVICE_OBJECT DeviceObject, @@ -244,7 +474,6 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, NTSTATUS Status = STATUS_SUCCESS; ULONG Length; ULONG Offset; - KIRQL OldIrql; ULONG Information; ULONG CopyLength; ULONG TempLength; @@ -296,7 +525,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, Status = STATUS_SUCCESS; Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress); - KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql); + ExAcquireFastMutex(&ReaderFcb->DataListLock); #ifndef NDEBUG DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset); HexDump(Buffer, Length); @@ -307,7 +536,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, if (ReaderFcb->WriteQuotaAvailable == 0) { KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); - KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql); + ExReleaseFastMutex(&ReaderFcb->DataListLock); if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) { Status = STATUS_PIPE_BROKEN; @@ -332,7 +561,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, Status = STATUS_PIPE_BROKEN; goto done; } - KeAcquireSpinLock(&ReaderFcb->DataListLock, &OldIrql); + ExAcquireFastMutex(&ReaderFcb->DataListLock); } if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE) @@ -395,7 +624,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, } } - KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql); + ExReleaseFastMutex(&ReaderFcb->DataListLock); done: Irp->IoStatus.Status = Status;