Modified reading of pipes.

Fixed some bugs.

svn path=/trunk/; revision=2936
This commit is contained in:
Hartmut Birr 2002-05-07 22:41:22 +00:00
parent 311a376750
commit 0ef49e3484
2 changed files with 107 additions and 84 deletions

View file

@ -1,3 +1,5 @@
/* $Id: npfs.h,v 1.11 2002/05/07 22:41:22 hbirr Exp $ */
#ifndef __SERVICES_FS_NP_NPFS_H
#define __SERVICES_FS_NP_NPFS_H
@ -66,6 +68,17 @@ extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;
#define CP DPRINT("\n");
static inline VOID
NpfsFreePipeData(PNPFS_PIPE_DATA PipeData)
{
if (PipeData->Data)
{
ExFreePool(PipeData->Data);
}
ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData);
}
NTSTATUS STDCALL NpfsCreate(PDEVICE_OBJECT DeviceObject, PIRP Irp);
NTSTATUS STDCALL NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject, PIRP Irp);
NTSTATUS STDCALL NpfsClose(PDEVICE_OBJECT DeviceObject, PIRP Irp);

View file

@ -1,4 +1,4 @@
/* $Id: rw.c,v 1.5 2001/11/20 20:34:29 ekohl Exp $
/* $Id: rw.c,v 1.6 2002/05/07 22:41:22 hbirr Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -18,18 +18,6 @@
/* FUNCTIONS *****************************************************************/
static inline VOID
NpfsFreePipeData(PNPFS_PIPE_DATA PipeData)
{
if (PipeData->Data)
{
ExFreePool(PipeData->Data);
}
ExFreeToNPagedLookasideList(&NpfsPipeDataLookasideList, PipeData);
}
static inline PNPFS_PIPE_DATA
NpfsAllocatePipeData(PVOID Data,
ULONG Size)
@ -94,7 +82,6 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
ULONG Length;
PVOID Buffer;
ULONG CopyLength;
BOOLEAN DataListEmpty;
DPRINT("NpfsRead(DeviceObject %p Irp %p)\n", DeviceObject, Irp);
@ -109,7 +96,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
{
DPRINT("Pipe is NOT connected!\n");
Status = STATUS_UNSUCCESSFUL;
Length = 0;
Information = 0;
goto done;
}
@ -117,99 +104,120 @@ NpfsRead(PDEVICE_OBJECT DeviceObject, PIRP Irp)
{
DPRINT("Irp->MdlAddress == NULL\n");
Status = STATUS_UNSUCCESSFUL;
Length = 0;
Information = 0;
goto done;
}
Status = STATUS_SUCCESS;
Length = IoStack->Parameters.Read.Length;
Information = 0;
Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
DPRINT("Length %d Buffer %x\n",Length,Buffer);
KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
DataListEmpty = IsListEmpty(&ReadFcb->DataListHead);
KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
/* FIXME: check if in blocking mode */
if (DataListEmpty == TRUE)
{
/* Wait for ReadEvent to become signaled */
DPRINT("Waiting for readable data\n");
Status = KeWaitForSingleObject(&Fcb->ReadEvent,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting! Status: %x\n", Status);
}
KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
if (Pipe->PipeReadMode & FILE_PIPE_BYTE_STREAM_MODE)
{
DPRINT("Byte stream mode\n");
/* Byte stream mode */
Information = 0;
CurrentEntry = ReadFcb->DataListHead.Flink;
while ((Length > 0) && (CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead)))
while (1)
{
/* FIXME: check if in blocking mode */
if (IsListEmpty(&ReadFcb->DataListHead))
{
KeResetEvent(&Fcb->ReadEvent);
KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
if (Information > 0)
{
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
Status = STATUS_SUCCESS;
goto done;
}
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
Status = STATUS_PIPE_BROKEN;
goto done;
}
/* Wait for ReadEvent to become signaled */
DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->ReadEvent,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
KeAcquireSpinLock(&ReadFcb->DataListLock, &OldIrql);
}
DPRINT("Took pipe data at %p off the queue\n", Current);
if (Pipe->PipeReadMode == FILE_PIPE_BYTE_STREAM_MODE)
{
DPRINT("Byte stream mode\n");
CopyLength = RtlMin(Current->Size, Length);
RtlCopyMemory(Buffer,
((PVOID)((ULONG_PTR)Current->Data + Current->Offset)),
CopyLength);
Buffer += CopyLength;
Length -= CopyLength;
Information += CopyLength;
/* Byte stream mode */
CurrentEntry = NULL;
while (Length > 0 && !IsListEmpty(&ReadFcb->DataListHead))
{
CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
/* Update the data buffer */
Current->Offset += CopyLength;
Current->Size -= CopyLength;
DPRINT("Took pipe data at %p off the queue\n", Current);
CurrentEntry = CurrentEntry->Flink;
CopyLength = RtlMin(Current->Size, Length);
RtlCopyMemory(Buffer,
((PVOID)((PVOID)Current->Data + Current->Offset)),
CopyLength);
Buffer += CopyLength;
Length -= CopyLength;
Information += CopyLength;
/* Update the data buffer */
Current->Offset += CopyLength;
Current->Size -= CopyLength;
if (Current->Size == 0)
{
NpfsFreePipeData(Current);
CurrentEntry = NULL;
}
}
if ((CurrentEntry != &ReadFcb->DataListHead) && (Current->Offset != Current->Size))
if (CurrentEntry && Current->Size > 0)
{
DPRINT("Putting pipe data at %p back in queue\n", Current);
DPRINT("Putting pipe data at %p back in queue\n", Current);
/* The caller's buffer could not contain the complete message,
so put it back on the queue */
InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
/* The caller's buffer could not contain the complete message,
so put it back on the queue */
InsertHeadList(&ReadFcb->DataListHead, &Current->ListEntry);
}
}
else
{
DPRINT("Message mode\n");
/* Message mode */
CurrentEntry = ReadFcb->DataListHead.Flink;
if (CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead))
if (Length == 0)
{
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
DPRINT("Took pipe data at %p off the queue\n", Current);
/* Truncate the message if the receive buffer is too small */
CopyLength = RtlMin(Current->Size, Length);
RtlCopyMemory(Buffer, Current->Data, CopyLength);
Information = CopyLength;
Current->Offset += CopyLength;
CurrentEntry = CurrentEntry->Flink;
break;
}
}
}
else
{
DPRINT("Message mode\n");
KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
/* Message mode */
if (!IsListEmpty(&ReadFcb->DataListHead))
{
CurrentEntry = RemoveHeadList(&ReadFcb->DataListHead);
Current = CONTAINING_RECORD(CurrentEntry, NPFS_PIPE_DATA, ListEntry);
DPRINT("Took pipe data at %p off the queue\n", Current);
/* Truncate the message if the receive buffer is too small */
CopyLength = RtlMin(Current->Size, Length);
RtlCopyMemory(Buffer, Current->Data, CopyLength);
Information = CopyLength;
Current->Offset += CopyLength;
NpfsFreePipeData(Current);
}
if (Information > 0)
{
break;
}
}
}
/* reset ReaderEvent */
KeResetEvent(&Fcb->ReadEvent);
KeReleaseSpinLock(&ReadFcb->DataListLock, OldIrql);
done:
Irp->IoStatus.Status = Status;
@ -275,10 +283,12 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
KeAcquireSpinLock(&Fcb->DataListLock, &OldIrql);
InsertTailList(&Fcb->DataListHead, &PipeData->ListEntry);
KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
/* signal the readers ReadEvent */
KeSetEvent(&Fcb->OtherSide->ConnectEvent, IO_NO_INCREMENT, FALSE);
KeSetEvent(&Fcb->OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
KeReleaseSpinLock(&Fcb->DataListLock, OldIrql);
}
else
{