- Allocate the pipe buffer at creation time.

- Interprete the given buffer size.
- Interprete the given direction (inbound, outbound or both).
- The handling of read/write request in message mode is possible incorrect.

svn path=/trunk/; revision=4941
This commit is contained in:
Hartmut Birr 2003-06-21 19:55:55 +00:00
parent 246369270a
commit 2f0c098757
5 changed files with 327 additions and 233 deletions

View file

@ -1,4 +1,4 @@
# $Id: Makefile,v 1.12 2001/08/21 20:13:13 chorns Exp $ # $Id: Makefile,v 1.13 2003/06/21 19:55:55 hbirr Exp $
PATH_TO_TOP = ../../.. PATH_TO_TOP = ../../..
@ -14,6 +14,10 @@ TARGET_OBJECTS = \
rw.o \ rw.o \
volume.o volume.o
DEP_OBJECTS = $(TARGET_OBJECTS)
include $(PATH_TO_TOP)/rules.mak include $(PATH_TO_TOP)/rules.mak
include $(TOOLS_PATH)/helper.mk include $(TOOLS_PATH)/helper.mk
include $(TOOLS_PATH)/depend.mk

View file

@ -1,4 +1,4 @@
/* $Id: create.c,v 1.15 2003/01/19 01:13:04 gvg Exp $ /* $Id: create.c,v 1.16 2003/06/21 19:55:55 hbirr Exp $
* *
* COPYRIGHT: See COPYING in the top level directory * COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel * PROJECT: ReactOS kernel
@ -98,14 +98,39 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
ClientFcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; ClientFcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
/* initialize data list */ /* initialize data list */
InitializeListHead(&ClientFcb->DataListHead); if (Pipe->InboundQuota)
{
ClientFcb->Data = ExAllocatePool(NonPagedPool, Pipe->InboundQuota);
if (ClientFcb->Data == NULL)
{
ExFreePool(ClientFcb);
KeUnlockMutex(&DeviceExt->PipeListLock);
Irp->IoStatus.Status = STATUS_NO_MEMORY;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
DPRINT("No memory!\n");
return(STATUS_NO_MEMORY);
}
}
else
{
ClientFcb->Data = NULL;
}
ClientFcb->ReadPtr = ClientFcb->Data;
ClientFcb->WritePtr = ClientFcb->Data;
ClientFcb->ReadDataAvailable = 0;
ClientFcb->WriteQuotaAvailable = Pipe->InboundQuota;
ClientFcb->MaxDataLength = Pipe->InboundQuota;
KeInitializeSpinLock(&ClientFcb->DataListLock); KeInitializeSpinLock(&ClientFcb->DataListLock);
KeInitializeEvent(&ClientFcb->ConnectEvent, KeInitializeEvent(&ClientFcb->ConnectEvent,
SynchronizationEvent, SynchronizationEvent,
FALSE); FALSE);
KeInitializeEvent(&ClientFcb->ReadEvent, KeInitializeEvent(&ClientFcb->Event,
SynchronizationEvent, SynchronizationEvent,
FALSE); FALSE);
@ -259,8 +284,84 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
Pipe->MaximumInstances = Buffer->MaxInstances; Pipe->MaximumInstances = Buffer->MaxInstances;
Pipe->CurrentInstances = 0; Pipe->CurrentInstances = 0;
Pipe->TimeOut = Buffer->TimeOut; Pipe->TimeOut = Buffer->TimeOut;
Pipe->InboundQuota = Buffer->InBufferSize; if (!(IoStack->Parameters.Create.Options & FILE_PIPE_OUTBOUND) ||
Pipe->OutboundQuota = Buffer->OutBufferSize; IoStack->Parameters.Create.Options & FILE_PIPE_FULL_DUPLEX)
{
if (Buffer->InBufferSize == 0)
{
Pipe->InboundQuota = DeviceExt->DefaultQuota;
}
else
{
Pipe->InboundQuota = PAGE_ROUND_UP(Buffer->InBufferSize);
if (Pipe->InboundQuota < DeviceExt->MinQuota)
{
Pipe->InboundQuota = DeviceExt->MinQuota;
}
else if (Pipe->InboundQuota > DeviceExt->MaxQuota)
{
Pipe->InboundQuota = DeviceExt->MaxQuota;
}
}
}
else
{
Pipe->InboundQuota = 0;
}
if (IoStack->Parameters.Create.Options & (FILE_PIPE_FULL_DUPLEX|FILE_PIPE_OUTBOUND))
{
if (Buffer->OutBufferSize == 0)
{
Pipe->OutboundQuota = DeviceExt->DefaultQuota;
}
else
{
Pipe->OutboundQuota = PAGE_ROUND_UP(Buffer->OutBufferSize);
if (Pipe->OutboundQuota < DeviceExt->MinQuota)
{
Pipe->OutboundQuota = DeviceExt->MinQuota;
}
else if (Pipe->OutboundQuota > DeviceExt->MaxQuota)
{
Pipe->OutboundQuota = DeviceExt->MaxQuota;
}
}
}
else
{
Pipe->OutboundQuota = 0;
}
if (Pipe->OutboundQuota)
{
Fcb->Data = ExAllocatePool(NonPagedPool, Pipe->OutboundQuota);
if (Fcb->Data == NULL)
{
ExFreePool(Fcb);
RtlFreeUnicodeString(&Pipe->PipeName);
ExFreePool(Pipe);
Irp->IoStatus.Status = STATUS_NO_MEMORY;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
return(STATUS_NO_MEMORY);
}
}
else
{
Fcb->Data = NULL;
}
Fcb->ReadPtr = Fcb->Data;
Fcb->WritePtr = Fcb->Data;
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Pipe->OutboundQuota;
Fcb->MaxDataLength = Pipe->OutboundQuota;
KeInitializeSpinLock(&Fcb->DataListLock);
KeLockMutex(&DeviceExt->PipeListLock); KeLockMutex(&DeviceExt->PipeListLock);
current_entry = DeviceExt->PipeListHead.Flink; current_entry = DeviceExt->PipeListHead.Flink;
@ -300,18 +401,12 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
Fcb->PipeEnd = FILE_PIPE_SERVER_END; Fcb->PipeEnd = FILE_PIPE_SERVER_END;
Fcb->OtherSide = NULL; Fcb->OtherSide = NULL;
Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = 0;
/* initialize data list */
InitializeListHead(&Fcb->DataListHead);
KeInitializeSpinLock(&Fcb->DataListLock);
KeInitializeEvent(&Fcb->ConnectEvent, KeInitializeEvent(&Fcb->ConnectEvent,
SynchronizationEvent, SynchronizationEvent,
FALSE); FALSE);
KeInitializeEvent(&Fcb->ReadEvent, KeInitializeEvent(&Fcb->Event,
SynchronizationEvent, SynchronizationEvent,
FALSE); FALSE);
@ -339,9 +434,6 @@ NpfsClose(PDEVICE_OBJECT DeviceObject,
PNPFS_FCB Fcb; PNPFS_FCB Fcb;
PNPFS_PIPE Pipe; PNPFS_PIPE Pipe;
KIRQL oldIrql; KIRQL oldIrql;
PLIST_ENTRY CurrentEntry;
PNPFS_PIPE_DATA Current;
DPRINT("NpfsClose(DeviceObject %p Irp %p)\n", DeviceObject, Irp); DPRINT("NpfsClose(DeviceObject %p Irp %p)\n", DeviceObject, Irp);
@ -378,6 +470,10 @@ NpfsClose(PDEVICE_OBJECT DeviceObject,
if (Fcb->OtherSide) if (Fcb->OtherSide)
{ {
Fcb->OtherSide->PipeState = FILE_PIPE_CLOSING_STATE; Fcb->OtherSide->PipeState = FILE_PIPE_CLOSING_STATE;
/* Signaling the write event. If is possible that an other
* thread waits of an empty buffer.
*/
KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
} }
Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
} }
@ -396,7 +492,7 @@ NpfsClose(PDEVICE_OBJECT DeviceObject,
/* Signaling the read event. If is possible that an other /* Signaling the read event. If is possible that an other
* thread waits of read data. * thread waits of read data.
*/ */
KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
} }
Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE; Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
} }
@ -415,26 +511,16 @@ NpfsClose(PDEVICE_OBJECT DeviceObject,
KeReleaseSpinLock(&Pipe->FcbListLock, oldIrql); KeReleaseSpinLock(&Pipe->FcbListLock, oldIrql);
if (Fcb->OtherSide) if (Fcb->OtherSide)
{ {
KeAcquireSpinLock(&Fcb->OtherSide->DataListLock, &oldIrql); if (Fcb->OtherSide->Data)
while (!IsListEmpty(&Fcb->OtherSide->DataListHead))
{ {
CurrentEntry = RemoveHeadList(&Fcb->OtherSide->DataListHead); ExFreePool(Fcb->OtherSide->Data);
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
NpfsFreePipeData(Current);
} }
KeReleaseSpinLock(&Fcb->OtherSide->DataListLock, oldIrql);
ExFreePool(Fcb->OtherSide); ExFreePool(Fcb->OtherSide);
} }
if (Fcb->Data)
KeAcquireSpinLock(&Fcb->DataListLock, &oldIrql);
while (!IsListEmpty(&Fcb->DataListHead))
{ {
CurrentEntry = RemoveHeadList(&Fcb->DataListHead); ExFreePool(Fcb->Data);
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
NpfsFreePipeData(Current);
} }
KeReleaseSpinLock(&Fcb->DataListLock, oldIrql);
ExFreePool(Fcb); ExFreePool(Fcb);
RtlFreeUnicodeString(&Pipe->PipeName); RtlFreeUnicodeString(&Pipe->PipeName);
RemoveEntryList(&Pipe->PipeListEntry); RemoveEntryList(&Pipe->PipeListEntry);

View file

@ -1,4 +1,4 @@
/* $Id: npfs.c,v 1.5 2002/09/08 10:22:11 chorns Exp $ /* $Id: npfs.c,v 1.6 2003/06/21 19:55:55 hbirr Exp $
* *
* COPYRIGHT: See COPYING in the top level directory * COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel * PROJECT: ReactOS kernel
@ -15,8 +15,6 @@
#define NDEBUG #define NDEBUG
#include <debug.h> #include <debug.h>
NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;
/* FUNCTIONS *****************************************************************/ /* FUNCTIONS *****************************************************************/
NTSTATUS STDCALL NTSTATUS STDCALL
@ -78,14 +76,10 @@ DriverEntry(PDRIVER_OBJECT DriverObject,
KeInitializeMutex(&DeviceExtension->PipeListLock, KeInitializeMutex(&DeviceExtension->PipeListLock,
0); 0);
ExInitializeNPagedLookasideList( /* set the size quotas */
&NpfsPipeDataLookasideList, DeviceExtension->MinQuota = PAGE_SIZE;
NULL, DeviceExtension->DefaultQuota = 8 * PAGE_SIZE;
NULL, DeviceExtension->MaxQuota = 64 * PAGE_SIZE;
0,
sizeof(NPFS_PIPE_DATA),
TAG('N', 'P', 'D', 'A'),
0);
return(STATUS_SUCCESS); return(STATUS_SUCCESS);
} }

View file

@ -1,4 +1,4 @@
/* $Id: npfs.h,v 1.13 2002/09/08 10:22:11 chorns Exp $ */ /* $Id: npfs.h,v 1.14 2003/06/21 19:55:55 hbirr Exp $ */
#ifndef __SERVICES_FS_NP_NPFS_H #ifndef __SERVICES_FS_NP_NPFS_H
#define __SERVICES_FS_NP_NPFS_H #define __SERVICES_FS_NP_NPFS_H
@ -8,16 +8,11 @@ typedef struct
{ {
LIST_ENTRY PipeListHead; LIST_ENTRY PipeListHead;
KMUTEX PipeListLock; KMUTEX PipeListLock;
ULONG MinQuota;
ULONG DefaultQuota;
ULONG MaxQuota;
} NPFS_DEVICE_EXTENSION, *PNPFS_DEVICE_EXTENSION; } NPFS_DEVICE_EXTENSION, *PNPFS_DEVICE_EXTENSION;
typedef struct
{
LIST_ENTRY ListEntry;
ULONG Size;
PVOID Data;
ULONG Offset;
} NPFS_PIPE_DATA, *PNPFS_PIPE_DATA;
typedef struct typedef struct
{ {
UNICODE_STRING PipeName; UNICODE_STRING PipeName;
@ -44,13 +39,17 @@ typedef struct _NPFS_FCB
struct _NPFS_FCB* OtherSide; struct _NPFS_FCB* OtherSide;
PNPFS_PIPE Pipe; PNPFS_PIPE Pipe;
KEVENT ConnectEvent; KEVENT ConnectEvent;
KEVENT ReadEvent; KEVENT Event;
ULONG PipeEnd; ULONG PipeEnd;
ULONG PipeState; ULONG PipeState;
ULONG ReadDataAvailable; ULONG ReadDataAvailable;
ULONG WriteQuotaAvailable; ULONG WriteQuotaAvailable;
LIST_ENTRY DataListHead; /* Data queue */ PVOID Data;
PVOID ReadPtr;
PVOID WritePtr;
ULONG MaxDataLength;
KSPIN_LOCK DataListLock; /* Data queue lock */ KSPIN_LOCK DataListLock; /* Data queue lock */
} NPFS_FCB, *PNPFS_FCB; } NPFS_FCB, *PNPFS_FCB;
@ -68,16 +67,6 @@ extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;
#define CP DPRINT("\n"); #define CP DPRINT("\n");
static inline VOID
NpfsFreePipeData(PNPFS_PIPE_DATA PipeData)
{
if (PipeData->Data)
{
ExFreePool(PipeData->Data);
}
ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData);
}
NTSTATUS STDCALL NpfsCreate(PDEVICE_OBJECT DeviceObject, PIRP Irp); NTSTATUS STDCALL NpfsCreate(PDEVICE_OBJECT DeviceObject, PIRP Irp);
NTSTATUS STDCALL NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, PIRP Irp); NTSTATUS STDCALL NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, PIRP Irp);

View file

@ -1,4 +1,4 @@
/* $Id: rw.c,v 1.8 2002/09/08 10:22:11 chorns Exp $ /* $Id: rw.c,v 1.9 2003/06/21 19:55:55 hbirr Exp $
* *
* COPYRIGHT: See COPYING in the top level directory * COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel * PROJECT: ReactOS kernel
@ -18,51 +18,6 @@
/* FUNCTIONS *****************************************************************/ /* FUNCTIONS *****************************************************************/
static inline PNPFS_PIPE_DATA
NpfsAllocatePipeData(PVOID Data,
ULONG Size)
{
PNPFS_PIPE_DATA PipeData;
PipeData = ExAllocateFromNPagedLookasideList(&NpfsPipeDataLookasideList);
if (!PipeData)
{
return NULL;
}
PipeData->Data = Data;
PipeData->Size = Size;
PipeData->Offset = 0;
return PipeData;
}
static inline PNPFS_PIPE_DATA
NpfsInitializePipeData(
PVOID Data,
ULONG Size)
{
PNPFS_PIPE_DATA PipeData;
PVOID Buffer;
Buffer = ExAllocatePool(NonPagedPool, Size);
if (!Buffer)
{
return NULL;
}
RtlMoveMemory(Buffer, Data, Size);
PipeData = NpfsAllocatePipeData(Buffer, Size);
if (!PipeData)
{
ExFreePool(Buffer);
}
return PipeData;
}
NTSTATUS STDCALL NTSTATUS STDCALL
NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp) NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
@ -71,10 +26,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
PFILE_OBJECT FileObject; PFILE_OBJECT FileObject;
NTSTATUS Status; NTSTATUS Status;
PNPFS_DEVICE_EXTENSION DeviceExt; PNPFS_DEVICE_EXTENSION DeviceExt;
PWSTR PipeName;
KIRQL OldIrql; KIRQL OldIrql;
PLIST_ENTRY CurrentEntry;
PNPFS_PIPE_DATA Current;
ULONG Information; ULONG Information;
PNPFS_FCB Fcb; PNPFS_FCB Fcb;
PNPFS_FCB ReadFcb; PNPFS_FCB ReadFcb;
@ -82,6 +34,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
ULONG Length; ULONG Length;
PVOID Buffer; PVOID Buffer;
ULONG CopyLength; ULONG CopyLength;
ULONG TempLength;
DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp); DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp);
@ -108,6 +61,15 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
goto done; goto done;
} }
if (ReadFcb->Data == NULL)
{
DPRINT("Pipe is NOT readable!\n");
Status = STATUS_UNSUCCESSFUL;
Information = 0;
goto done;
}
Status = STATUS_SUCCESS; Status = STATUS_SUCCESS;
Length = IoStack->Parameters.Read.Length; Length = IoStack->Parameters.Read.Length;
Information = 0; Information = 0;
@ -119,9 +81,10 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
while (1) while (1)
{ {
/* FIXME: check if in blocking mode */ /* FIXME: check if in blocking mode */
if (IsListEmpty(&ReadFcb->DataListHead)) if (ReadFcb->ReadDataAvailable == 0)
{ {
KeResetEvent(&Fcb->ReadEvent); KeResetEvent(&Fcb->Event);
KeSetEvent(&ReadFcb->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql); KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
if (Information > 0) if (Information > 0)
{ {
@ -135,7 +98,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
} }
/* Wait for ReadEvent to become signaled */ /* Wait for ReadEvent to become signaled */
DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer); DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->ReadEvent, Status = KeWaitForSingleObject(&Fcb->Event,
UserRequest, UserRequest,
KernelMode, KernelMode,
FALSE, FALSE,
@ -147,45 +110,38 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE) if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE)
{ {
DPRINT("Byte stream mode\n"); DPRINT("Byte stream mode\n");
/* Byte stream mode */ /* Byte stream mode */
CurrentEntry = NULL; while (Length > 0 && ReadFcb->ReadDataAvailable > 0)
while (Length > 0 && !IsListEmpty(&ReadFcb->DataListHead))
{ {
CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead); CopyLength = RtlMin(ReadFcb->ReadDataAvailable, Length);
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry); if (ReadFcb->ReadPtr + CopyLength <= ReadFcb->Data + ReadFcb->MaxDataLength)
{
memcpy(Buffer, ReadFcb->ReadPtr, CopyLength);
ReadFcb->ReadPtr += CopyLength;
if (ReadFcb->ReadPtr == ReadFcb->Data + ReadFcb->MaxDataLength)
{
ReadFcb->ReadPtr = ReadFcb->Data;
}
}
else
{
TempLength = ReadFcb->Data + ReadFcb->MaxDataLength - ReadFcb->ReadPtr;
memcpy(Buffer, ReadFcb->ReadPtr, TempLength);
memcpy(Buffer + TempLength, ReadFcb->Data, CopyLength - TempLength);
ReadFcb->ReadPtr = ReadFcb->Data + CopyLength - TempLength;
}
DPRINT("Took pipe data at %p off the queue\n", Current);
CopyLength = RtlMin(Current->Size, Length);
RtlCopyMemory(Buffer,
((PVOID)((PVOID)Current->Data + Current->Offset)),
CopyLength);
Buffer += CopyLength; Buffer += CopyLength;
Length -= CopyLength; Length -= CopyLength;
Information += CopyLength; Information += CopyLength;
/* Update the data buffer */ ReadFcb->ReadDataAvailable -= CopyLength;
Current->Offset += CopyLength; ReadFcb->WriteQuotaAvailable += CopyLength;
Current->Size -= CopyLength;
if (Current->Size == 0)
{
NpfsFreePipeData(Current);
CurrentEntry = NULL;
}
}
if (CurrentEntry && Current->Size > 0)
{
DPRINT("Putting pipe data at %p back in queue\n", Current);
/* The caller's buffer could not contain the complete message,
so put it back on the queue */
InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
} }
if (Length == 0) if (Length == 0)
{ {
KeSetEvent(&ReadFcb->Event, IO_NO_INCREMENT, FALSE);
break; break;
} }
} }
@ -194,31 +150,25 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
DPRINT("Message mode\n"); DPRINT("Message mode\n");
/* Message mode */ /* Message mode */
if (!IsListEmpty(&ReadFcb->DataListHead)) if (ReadFcb->ReadDataAvailable)
{ {
CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
DPRINT("Took pipe data at %p off the queue\n", Current);
/* Truncate the message if the receive buffer is too small */ /* Truncate the message if the receive buffer is too small */
CopyLength = RtlMin(Current->Size, Length); CopyLength = RtlMin(ReadFcb->ReadDataAvailable, Length);
RtlCopyMemory(Buffer, Current->Data, CopyLength); memcpy(Buffer, ReadFcb->Data, CopyLength);
Information = CopyLength;
Current->Offset += CopyLength; Information = CopyLength;
NpfsFreePipeData(Current); ReadFcb->ReadDataAvailable = 0;
ReadFcb->WriteQuotaAvailable = ReadFcb->MaxDataLength;
} }
if (Information > 0) if (Information > 0)
{ {
KeSetEvent(&ReadFcb->Event, IO_NO_INCREMENT, FALSE);
break; break;
} }
} }
} }
/* reset ReaderEvent */
KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql); KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
done: done:
Irp->IoStatus.Status = Status; Irp->IoStatus.Status = Status;
Irp->IoStatus.Information = Information; Irp->IoStatus.Information = Information;
@ -242,7 +192,9 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
ULONG Length; ULONG Length;
ULONG Offset; ULONG Offset;
KIRQL OldIrql; KIRQL OldIrql;
PNPFS_PIPE_DATA PipeData; ULONG Information;
ULONG CopyLength;
ULONG TempLength;
DPRINT("NpfsWrite()\n"); DPRINT("NpfsWrite()\n");
@ -256,6 +208,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
Length = IoStack->Parameters.Write.Length; Length = IoStack->Parameters.Write.Length;
Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart; Offset = IoStack->Parameters.Write.ByteOffset.u.LowPart;
Information = 0;
if (Irp->MdlAddress == NULL) if (Irp->MdlAddress == NULL)
{ {
@ -273,32 +226,100 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
goto done; goto done;
} }
if (Fcb->Data == NULL)
{
DPRINT("Pipe is NOT writable!\n");
Status = STATUS_UNSUCCESSFUL;
Length = 0;
goto done;
}
Status = STATUS_SUCCESS;
Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress); Buffer = MmGetSystemAddressForMdl (Irp->MdlAddress);
DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset); DPRINT("Length %d Buffer %x Offset %x\n",Length,Buffer,Offset);
PipeData = NpfsInitializePipeData(Buffer, Length);
if (PipeData)
{
DPRINT("Attaching pipe data at %p (%d bytes)\n", PipeData, Length);
KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql); KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
InsertTailList(&Fcb->DataListHead, &PipeData->ListEntry); while(1)
{
/* signal the readers ReadEvent */ if (Fcb->WriteQuotaAvailable == 0)
KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE); {
KeResetEvent(&Fcb->Event);
KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSpinLock(&Fcb->DataListLock, OldIrql); KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
Status = STATUS_PIPE_BROKEN;
goto done;
}
DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->Event,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
}
if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE)
{
DPRINT("Byte stream mode\n");
while (Length > 0 && Fcb->WriteQuotaAvailable > 0)
{
CopyLength = RtlMin(Length, Fcb->WriteQuotaAvailable);
if (Fcb->WritePtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength)
{
memcpy(Fcb->WritePtr, Buffer, CopyLength);
Fcb->WritePtr += CopyLength;
if (Fcb->WritePtr == Fcb->Data + Fcb->MaxDataLength)
{
Fcb->WritePtr = Fcb->Data;
}
} }
else else
{ {
Length = 0; TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->WritePtr;
Status = STATUS_INSUFFICIENT_RESOURCES; memcpy(Fcb->WritePtr, Buffer, TempLength);
memcpy(Fcb->Data, Buffer + TempLength, CopyLength - TempLength);
Fcb->WritePtr = Fcb->Data + CopyLength - TempLength;
} }
Buffer += CopyLength;
Length -= CopyLength;
Information += CopyLength;
Fcb->ReadDataAvailable += CopyLength;
Fcb->WriteQuotaAvailable -= CopyLength;
}
if (Length == 0)
{
KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
break;
}
}
else
{
if (Length > 0)
{
CopyLength = RtlMin(Length, Fcb->WriteQuotaAvailable);
memcpy(Buffer, Fcb->Data, CopyLength);
Information = CopyLength;
Fcb->ReadDataAvailable = CopyLength;
Fcb->WriteQuotaAvailable = 0;
}
if (Information > 0)
{
KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
break;
}
}
}
KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
done: done:
Irp->IoStatus.Status = Status; Irp->IoStatus.Status = Status;
Irp->IoStatus.Information = Length; Irp->IoStatus.Information = Information;
IoCompleteRequest(Irp, IO_NO_INCREMENT); IoCompleteRequest(Irp, IO_NO_INCREMENT);