From 2f0c09875737a31fd937b31310a99e1f51c696d2 Mon Sep 17 00:00:00 2001 From: Hartmut Birr Date: Sat, 21 Jun 2003 19:55:55 +0000 Subject: [PATCH] - Allocate the pipe buffer at creation time. - Interprete the given buffer size. - Interprete the given direction (inbound, outbound or both). - The handling of read/write request in message mode is possible incorrect. svn path=/trunk/; revision=4941 --- reactos/drivers/fs/np/Makefile | 6 +- reactos/drivers/fs/np/create.c | 174 ++++++++++++----- reactos/drivers/fs/np/npfs.c | 16 +- reactos/drivers/fs/np/npfs.h | 31 +-- reactos/drivers/fs/np/rw.c | 333 ++++++++++++++++++--------------- 5 files changed, 327 insertions(+), 233 deletions(-) diff --git a/reactos/drivers/fs/np/Makefile b/reactos/drivers/fs/np/Makefile index 4c6e39fbd19..6c8e33ab2a8 100644 --- a/reactos/drivers/fs/np/Makefile +++ b/reactos/drivers/fs/np/Makefile @@ -1,4 +1,4 @@ -# $Id: Makefile,v 1.12 2001/08/21 20:13:13 chorns Exp $ +# $Id: Makefile,v 1.13 2003/06/21 19:55:55 hbirr Exp $ PATH_TO_TOP = ../../.. @@ -14,6 +14,10 @@ TARGET_OBJECTS = \ rw.o \ volume.o +DEP_OBJECTS = $(TARGET_OBJECTS) + include $(PATH_TO_TOP)/rules.mak include $(TOOLS_PATH)/helper.mk + +include $(TOOLS_PATH)/depend.mk diff --git a/reactos/drivers/fs/np/create.c b/reactos/drivers/fs/np/create.c index a5e04dc262b..1739dd7e449 100644 --- a/reactos/drivers/fs/np/create.c +++ b/reactos/drivers/fs/np/create.c @@ -1,4 +1,4 @@ -/* $Id: create.c,v 1.15 2003/01/19 01:13:04 gvg Exp $ +/* $Id: create.c,v 1.16 2003/06/21 19:55:55 hbirr Exp $ * * COPYRIGHT: See COPYING in the top level directory * PROJECT: ReactOS kernel @@ -98,14 +98,39 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject, ClientFcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; /* initialize data list */ - InitializeListHead(&ClientFcb->DataListHead); + if (Pipe->InboundQuota) + { + ClientFcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota); + if (ClientFcb->Data == NULL) + { + ExFreePool(ClientFcb); + KeUnlockMutex(&DeviceExt->PipeListLock); + + Irp->IoStatus.Status = STATUS_NO_MEMORY; + Irp->IoStatus.Information = 0; + + IoCompleteRequest(Irp, IO_NO_INCREMENT); + DPRINT("No memory!\n"); + + return(STATUS_NO_MEMORY); + } + } + else + { + ClientFcb->Data = NULL; + } + ClientFcb->ReadPtr = ClientFcb->Data; + ClientFcb->WritePtr = ClientFcb->Data; + ClientFcb->ReadDataAvailable = 0; + ClientFcb->WriteQuotaAvailable = Pipe->InboundQuota; + ClientFcb->MaxDataLength = Pipe->InboundQuota; KeInitializeSpinLock(&ClientFcb->DataListLock); KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE); - KeInitializeEvent(&ClientFcb->ReadEvent, + KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE); @@ -259,9 +284,85 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, Pipe->MaximumInstances = Buffer->MaxInstances; Pipe->CurrentInstances = 0; Pipe->TimeOut = Buffer->TimeOut; - Pipe->InboundQuota = Buffer->InBufferSize; - Pipe->OutboundQuota = Buffer->OutBufferSize; - + if (!(IoStack->Parameters.Create.Options & FILE_PIPE_OUTBOUND) || + IoStack->Parameters.Create.Options & FILE_PIPE_FULL_DUPLEX) + { + if (Buffer->InBufferSize == 0) + { + Pipe->InboundQuota = DeviceExt->DefaultQuota; + } + else + { + Pipe->InboundQuota = PAGE_ROUND_UP(Buffer->InBufferSize); + if (Pipe->InboundQuota < DeviceExt->MinQuota) + { + Pipe->InboundQuota = DeviceExt->MinQuota; + } + else if (Pipe->InboundQuota > DeviceExt->MaxQuota) + { + Pipe->InboundQuota = DeviceExt->MaxQuota; + } + } + } + else + { + Pipe->InboundQuota = 0; + } + if (IoStack->Parameters.Create.Options & (FILE_PIPE_FULL_DUPLEX|FILE_PIPE_OUTBOUND)) + { + if (Buffer->OutBufferSize == 0) + { + Pipe->OutboundQuota = DeviceExt->DefaultQuota; + } + else + { + Pipe->OutboundQuota = PAGE_ROUND_UP(Buffer->OutBufferSize); + if (Pipe->OutboundQuota < DeviceExt->MinQuota) + { + Pipe->OutboundQuota = DeviceExt->MinQuota; + } + else if (Pipe->OutboundQuota > DeviceExt->MaxQuota) + { + Pipe->OutboundQuota = DeviceExt->MaxQuota; + } + } + } + else + { + Pipe->OutboundQuota = 0; + } + + if (Pipe->OutboundQuota) + { + Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->OutboundQuota); + if (Fcb->Data == NULL) + { + ExFreePool(Fcb); + RtlFreeUnicodeString(&Pipe->PipeName); + ExFreePool(Pipe); + + Irp->IoStatus.Status = STATUS_NO_MEMORY; + Irp->IoStatus.Information = 0; + + IoCompleteRequest(Irp, IO_NO_INCREMENT); + + return(STATUS_NO_MEMORY); + } + + } + else + { + Fcb->Data = NULL; + } + + Fcb->ReadPtr = Fcb->Data; + Fcb->WritePtr = Fcb->Data; + Fcb->ReadDataAvailable = 0; + Fcb->WriteQuotaAvailable = Pipe->OutboundQuota; + Fcb->MaxDataLength = Pipe->OutboundQuota; + KeInitializeSpinLock(&Fcb->DataListLock); + + KeLockMutex(&DeviceExt->PipeListLock); current_entry = DeviceExt->PipeListHead.Flink; while (current_entry != &DeviceExt->PipeListHead) @@ -300,18 +401,12 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, Fcb->PipeEnd = FILE_PIPE_SERVER_END; Fcb->OtherSide = NULL; Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; - Fcb->ReadDataAvailable = 0; - Fcb->WriteQuotaAvailable = 0; - - /* initialize data list */ - InitializeListHead(&Fcb->DataListHead); - KeInitializeSpinLock(&Fcb->DataListLock); KeInitializeEvent(&Fcb->ConnectEvent, SynchronizationEvent, FALSE); - KeInitializeEvent(&Fcb->ReadEvent, + KeInitializeEvent(&Fcb->Event, SynchronizationEvent, FALSE); @@ -339,9 +434,6 @@ NpfsClose(PDEVICE_OBJECT DeviceObject, PNPFS_FCB Fcb; PNPFS_PIPE Pipe; KIRQL oldIrql; - PLIST_ENTRY CurrentEntry; - PNPFS_PIPE_DATA Current; - DPRINT("NpfsClose(DeviceObject %p Irp %p)\n", DeviceObject, Irp); @@ -374,13 +466,17 @@ NpfsClose(PDEVICE_OBJECT DeviceObject, DPRINT("Server\n"); Pipe->CurrentInstances--; if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) - { - if (Fcb->OtherSide) - { + { + if (Fcb->OtherSide) + { Fcb->OtherSide->PipeState = FILE_PIPE_CLOSING_STATE; - } - Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; - } + /* Signaling the write event. If is possible that an other + * thread waits of an empty buffer. + */ + KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); + } + Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; + } } Pipe->ReferenceCount--; @@ -396,7 +492,7 @@ NpfsClose(PDEVICE_OBJECT DeviceObject, /* Signaling the read event. If is possible that an other * thread waits of read data. */ - KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); + KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); } Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; } @@ -414,27 +510,17 @@ NpfsClose(PDEVICE_OBJECT DeviceObject, RemoveEntryList(&Fcb->FcbListEntry); KeReleaseSpinLock(&Pipe->FcbListLock, oldIrql); if (Fcb->OtherSide) - { - KeAcquireSpinLock(&Fcb->OtherSide->DataListLock, &oldIrql); - while (!IsListEmpty(&Fcb->OtherSide->DataListHead)) - { - CurrentEntry = RemoveHeadList(&Fcb->OtherSide->DataListHead); - Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry); - - NpfsFreePipeData(Current); - } - KeReleaseSpinLock(&Fcb->OtherSide->DataListLock, oldIrql); - ExFreePool(Fcb->OtherSide); - } - - KeAcquireSpinLock(&Fcb->DataListLock, &oldIrql); - while (!IsListEmpty(&Fcb->DataListHead)) - { - CurrentEntry = RemoveHeadList(&Fcb->DataListHead); - Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry); - NpfsFreePipeData(Current); - } - KeReleaseSpinLock(&Fcb->DataListLock, oldIrql); + { + if (Fcb->OtherSide->Data) + { + ExFreePool(Fcb->OtherSide->Data); + } + ExFreePool(Fcb->OtherSide); + } + if (Fcb->Data) + { + ExFreePool(Fcb->Data); + } ExFreePool(Fcb); RtlFreeUnicodeString(&Pipe->PipeName); RemoveEntryList(&Pipe->PipeListEntry); diff --git a/reactos/drivers/fs/np/npfs.c b/reactos/drivers/fs/np/npfs.c index 4c8364aa83a..d04ed8d70f6 100644 --- a/reactos/drivers/fs/np/npfs.c +++ b/reactos/drivers/fs/np/npfs.c @@ -1,4 +1,4 @@ -/* $Id: npfs.c,v 1.5 2002/09/08 10:22:11 chorns Exp $ +/* $Id: npfs.c,v 1.6 2003/06/21 19:55:55 hbirr Exp $ * * COPYRIGHT: See COPYING in the top level directory * PROJECT: ReactOS kernel @@ -15,8 +15,6 @@ #define NDEBUG #include -NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList; - /* FUNCTIONS *****************************************************************/ NTSTATUS STDCALL @@ -78,14 +76,10 @@ DriverEntry(PDRIVER_OBJECT DriverObject, KeInitializeMutex(&DeviceExtension->PipeListLock, 0); - ExInitializeNPagedLookasideList( - &NpfsPipeDataLookasideList, - NULL, - NULL, - 0, - sizeof(NPFS_PIPE_DATA), - TAG('N', 'P', 'D', 'A'), - 0); + /* set the size quotas */ + DeviceExtension->MinQuota = PAGE_SIZE; + DeviceExtension->DefaultQuota = 8 * PAGE_SIZE; + DeviceExtension->MaxQuota = 64 * PAGE_SIZE; return(STATUS_SUCCESS); } diff --git a/reactos/drivers/fs/np/npfs.h b/reactos/drivers/fs/np/npfs.h index 46222787ccb..e2f463b0b2f 100644 --- a/reactos/drivers/fs/np/npfs.h +++ b/reactos/drivers/fs/np/npfs.h @@ -1,4 +1,4 @@ -/* $Id: npfs.h,v 1.13 2002/09/08 10:22:11 chorns Exp $ */ +/* $Id: npfs.h,v 1.14 2003/06/21 19:55:55 hbirr Exp $ */ #ifndef __SERVICES_FS_NP_NPFS_H #define __SERVICES_FS_NP_NPFS_H @@ -8,16 +8,11 @@ typedef struct { LIST_ENTRY PipeListHead; KMUTEX PipeListLock; + ULONG MinQuota; + ULONG DefaultQuota; + ULONG MaxQuota; } NPFS_DEVICE_EXTENSION, *PNPFS_DEVICE_EXTENSION; -typedef struct -{ - LIST_ENTRY ListEntry; - ULONG Size; - PVOID Data; - ULONG Offset; -} NPFS_PIPE_DATA, *PNPFS_PIPE_DATA; - typedef struct { UNICODE_STRING PipeName; @@ -44,13 +39,17 @@ typedef struct _NPFS_FCB struct _NPFS_FCB* OtherSide; PNPFS_PIPE Pipe; KEVENT ConnectEvent; - KEVENT ReadEvent; + KEVENT Event; ULONG PipeEnd; ULONG PipeState; ULONG ReadDataAvailable; ULONG WriteQuotaAvailable; - LIST_ENTRY DataListHead; /* Data queue */ + PVOID Data; + PVOID ReadPtr; + PVOID WritePtr; + ULONG MaxDataLength; + KSPIN_LOCK DataListLock; /* Data queue lock */ } NPFS_FCB, *PNPFS_FCB; @@ -68,16 +67,6 @@ extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList; #define CP DPRINT("\n"); -static inline VOID -NpfsFreePipeData(PNPFS_PIPE_DATA PipeData) -{ - if (PipeData->Data) - { - ExFreePool(PipeData->Data); - } - ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData); -} - NTSTATUS STDCALL NpfsCreate(PDEVICE_OBJECT DeviceObject, PIRP Irp); NTSTATUS STDCALL NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, PIRP Irp); diff --git a/reactos/drivers/fs/np/rw.c b/reactos/drivers/fs/np/rw.c index e722e9d3936..fd3f6e83f96 100644 --- a/reactos/drivers/fs/np/rw.c +++ b/reactos/drivers/fs/np/rw.c @@ -1,4 +1,4 @@ -/* $Id: rw.c,v 1.8 2002/09/08 10:22:11 chorns Exp $ +/* $Id: rw.c,v 1.9 2003/06/21 19:55:55 hbirr Exp $ * * COPYRIGHT: See COPYING in the top level directory * PROJECT: ReactOS kernel @@ -18,51 +18,6 @@ /* FUNCTIONS *****************************************************************/ -static inline PNPFS_PIPE_DATA -NpfsAllocatePipeData(PVOID Data, - ULONG Size) -{ - PNPFS_PIPE_DATA PipeData; - - PipeData = ExAllocateFromNPagedLookasideList(&NpfsPipeDataLookasideList); - if (!PipeData) - { - return NULL; - } - - PipeData->Data = Data; - PipeData->Size = Size; - PipeData->Offset = 0; - - return PipeData; -} - - -static inline PNPFS_PIPE_DATA -NpfsInitializePipeData( - PVOID Data, - ULONG Size) -{ - PNPFS_PIPE_DATA PipeData; - PVOID Buffer; - - Buffer = ExAllocatePool(NonPagedPool, Size); - if (!Buffer) - { - return NULL; - } - - RtlMoveMemory(Buffer, Data, Size); - - PipeData = NpfsAllocatePipeData(Buffer, Size); - if (!PipeData) - { - ExFreePool(Buffer); - } - - return PipeData; -} - NTSTATUS STDCALL NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp) @@ -71,10 +26,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp) PFILE_OBJECT FileObject; NTSTATUS Status; PNPFS_DEVICE_EXTENSION DeviceExt; - PWSTR PipeName; KIRQL OldIrql; - PLIST_ENTRY CurrentEntry; - PNPFS_PIPE_DATA Current; ULONG Information; PNPFS_FCB Fcb; PNPFS_FCB ReadFcb; @@ -82,6 +34,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp) ULONG Length; PVOID Buffer; ULONG CopyLength; + ULONG TempLength; DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp); @@ -107,6 +60,15 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp) Information = 0; goto done; } + + if (ReadFcb->Data == NULL) + { + DPRINT("Pipe is NOT readable!\n"); + Status = STATUS_UNSUCCESSFUL; + Information = 0; + goto done; + } + Status = STATUS_SUCCESS; Length = IoStack->Parameters.Read.Length; @@ -117,108 +79,96 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp) KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql); while (1) - { - /* FIXME: check if in blocking mode */ - if (IsListEmpty(&ReadFcb->DataListHead)) - { - KeResetEvent(&Fcb->ReadEvent); - KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql); - if (Information > 0) - { - Status = STATUS_SUCCESS; - goto done; + { + /* FIXME: check if in blocking mode */ + if (ReadFcb->ReadDataAvailable == 0) + { + KeResetEvent(&Fcb->Event); + KeSetEvent(&ReadFcb->Event, IO_NO_INCREMENT, FALSE); + KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql); + if (Information > 0) + { + Status = STATUS_SUCCESS; + goto done; + } + if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) + { + Status = STATUS_PIPE_BROKEN; + goto done; + } + /* Wait for ReadEvent to become signaled */ + DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); + Status = KeWaitForSingleObject(&Fcb->Event, + UserRequest, + KernelMode, + FALSE, + NULL); + DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); + KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql); } - if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) - { - Status = STATUS_PIPE_BROKEN; - goto done; - } - /* Wait for ReadEvent to become signaled */ - DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); - Status = KeWaitForSingleObject(&Fcb->ReadEvent, - UserRequest, - KernelMode, - FALSE, - NULL); - DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); - KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql); - } if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE) - { - DPRINT("Byte stream mode\n"); - - /* Byte stream mode */ - CurrentEntry = NULL; - while (Length > 0 && !IsListEmpty(&ReadFcb->DataListHead)) - { - CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead); - Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry); - - DPRINT("Took pipe data at %p off the queue\n", Current); - - CopyLength = RtlMin(Current->Size, Length); - RtlCopyMemory(Buffer, - ((PVOID)((PVOID)Current->Data + Current->Offset)), - CopyLength); - Buffer += CopyLength; - Length -= CopyLength; - Information += CopyLength; - - /* Update the data buffer */ - Current->Offset += CopyLength; - Current->Size -= CopyLength; - if (Current->Size == 0) + { + DPRINT("Byte stream mode\n"); + /* Byte stream mode */ + while (Length > 0 && ReadFcb->ReadDataAvailable > 0) { - NpfsFreePipeData(Current); - CurrentEntry = NULL; + CopyLength = RtlMin(ReadFcb->ReadDataAvailable, Length); + if (ReadFcb->ReadPtr + CopyLength <= ReadFcb->Data + ReadFcb->MaxDataLength) + { + memcpy(Buffer, ReadFcb->ReadPtr, CopyLength); + ReadFcb->ReadPtr += CopyLength; + if (ReadFcb->ReadPtr == ReadFcb->Data + ReadFcb->MaxDataLength) + { + ReadFcb->ReadPtr = ReadFcb->Data; + } + } + else + { + TempLength = ReadFcb->Data + ReadFcb->MaxDataLength - ReadFcb->ReadPtr; + memcpy(Buffer, ReadFcb->ReadPtr, TempLength); + memcpy(Buffer + TempLength, ReadFcb->Data, CopyLength - TempLength); + ReadFcb->ReadPtr = ReadFcb->Data + CopyLength - TempLength; + } + + Buffer += CopyLength; + Length -= CopyLength; + Information += CopyLength; + + ReadFcb->ReadDataAvailable -= CopyLength; + ReadFcb->WriteQuotaAvailable += CopyLength; } - } - - if (CurrentEntry && Current->Size > 0) - { - DPRINT("Putting pipe data at %p back in queue\n", Current); - - /* The caller's buffer could not contain the complete message, - so put it back on the queue */ - InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry); - } if (Length == 0) - { - break; - } - } + { + KeSetEvent(&ReadFcb->Event, IO_NO_INCREMENT, FALSE); + break; + } + } else - { - DPRINT("Message mode\n"); + { + DPRINT("Message mode\n"); - /* Message mode */ - if (!IsListEmpty(&ReadFcb->DataListHead)) - { - CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead); - Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry); + /* Message mode */ + if (ReadFcb->ReadDataAvailable) + { + /* Truncate the message if the receive buffer is too small */ + CopyLength = RtlMin(ReadFcb->ReadDataAvailable, Length); + memcpy(Buffer, ReadFcb->Data, CopyLength); - DPRINT("Took pipe data at %p off the queue\n", Current); - - /* Truncate the message if the receive buffer is too small */ - CopyLength = RtlMin(Current->Size, Length); - RtlCopyMemory(Buffer, Current->Data, CopyLength); - Information = CopyLength; - - Current->Offset += CopyLength; - NpfsFreePipeData(Current); - } - if (Information > 0) - { - break; - } - } - } - /* reset ReaderEvent */ + Information = CopyLength; + ReadFcb->ReadDataAvailable = 0; + ReadFcb->WriteQuotaAvailable = ReadFcb->MaxDataLength; + } + if (Information > 0) + { + KeSetEvent(&ReadFcb->Event, IO_NO_INCREMENT, FALSE); + break; + } + } + } KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql); - done: Irp->IoStatus.Status = Status; Irp->IoStatus.Information = Information; @@ -242,7 +192,9 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, ULONG Length; ULONG Offset; KIRQL OldIrql; - PNPFS_PIPE_DATA PipeData; + ULONG Information; + ULONG CopyLength; + ULONG TempLength; DPRINT("NpfsWrite()\n"); @@ -256,6 +208,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, Length = IoStack->Parameters.Write.Length; Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart; + Information = 0; if (Irp->MdlAddress == NULL) { @@ -272,36 +225,104 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject, Length = 0; goto done; } + + if (Fcb->Data == NULL) + { + DPRINT("Pipe is NOT writable!\n"); + Status = STATUS_UNSUCCESSFUL; + Length = 0; + goto done; + } + Status = STATUS_SUCCESS; Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress); DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset); - PipeData = NpfsInitializePipeData(Buffer, Length); - if (PipeData) + KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql); + while(1) { - DPRINT("Attaching pipe data at %p (%d bytes)\n", PipeData, Length); + if (Fcb->WriteQuotaAvailable == 0) + { + KeResetEvent(&Fcb->Event); + KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); + KeReleaseSpinLock(&Fcb->DataListLock, OldIrql); + if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) + { + Status = STATUS_PIPE_BROKEN; + goto done; + } + DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer); + Status = KeWaitForSingleObject(&Fcb->Event, + UserRequest, + KernelMode, + FALSE, + NULL); + DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status); + KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql); + } + if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE) + { + DPRINT("Byte stream mode\n"); + while (Length > 0 && Fcb->WriteQuotaAvailable > 0) + { + CopyLength = RtlMin(Length, Fcb->WriteQuotaAvailable); + if (Fcb->WritePtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength) + { + memcpy(Fcb->WritePtr, Buffer, CopyLength); + Fcb->WritePtr += CopyLength; + if (Fcb->WritePtr == Fcb->Data + Fcb->MaxDataLength) + { + Fcb->WritePtr = Fcb->Data; + } + } + else + { + TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->WritePtr; + memcpy(Fcb->WritePtr, Buffer, TempLength); + memcpy(Fcb->Data, Buffer + TempLength, CopyLength - TempLength); + Fcb->WritePtr = Fcb->Data + CopyLength - TempLength; + } + + Buffer += CopyLength; + Length -= CopyLength; + Information += CopyLength; - KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql); - InsertTailList(&Fcb->DataListHead, &PipeData->ListEntry); + Fcb->ReadDataAvailable += CopyLength; + Fcb->WriteQuotaAvailable -= CopyLength; + } - /* signal the readers ReadEvent */ - KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); - - KeReleaseSpinLock(&Fcb->DataListLock, OldIrql); + if (Length == 0) + { + KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); + break; + } + } + else + { + if (Length > 0) + { + CopyLength = RtlMin(Length, Fcb->WriteQuotaAvailable); + memcpy(Buffer, Fcb->Data, CopyLength); + Information = CopyLength; + Fcb->ReadDataAvailable = CopyLength; + Fcb->WriteQuotaAvailable = 0; + } + if (Information > 0) + { + KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE); + break; + } + } } - else - { - Length = 0; - Status = STATUS_INSUFFICIENT_RESOURCES; - } + KeReleaseSpinLock(&Fcb->DataListLock, OldIrql); done: Irp->IoStatus.Status = Status; - Irp->IoStatus.Information = Length; + Irp->IoStatus.Information = Information; IoCompleteRequest(Irp, IO_NO_INCREMENT); - + return(Status); }