- Put the wait entry into the DriverContext of the irp instead to allocated it from pool.

- Lock the data list on both ends of the pipe, if we disconnect the pipe.    
- Implemented a read and a write event on each end of the pipe.  
- Implemented a list for read requests to deliver the requests in the correct sequence.  
- Do not end a read request if the pipe was connected and if the buffer wasn't filled completely.

svn path=/trunk/; revision=14372
This commit is contained in:
Hartmut Birr 2005-03-28 18:42:53 +00:00
parent 9b5646acd0
commit a48a01798c
5 changed files with 514 additions and 310 deletions

View file

@ -50,20 +50,21 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
KIRQL oldIrql;
PIRP Irp;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
{
Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE &&
!Waiter->Irp->Cancel)
Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext);
if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
IoAcquireCancelSpinLock(&oldIrql);
if (!Waiter->Irp->Cancel)
if (!Irp->Cancel)
{
IoSetCancelRoutine(Waiter->Irp, NULL);
IoSetCancelRoutine(Irp, NULL);
IoReleaseCancelSpinLock(oldIrql);
return Waiter->Fcb;
}
@ -83,6 +84,7 @@ NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe,
{
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
PIRP Irp;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
@ -92,13 +94,12 @@ NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe,
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
Waiter->Irp->IoStatus.Status = STATUS_PIPE_CONNECTED;
Waiter->Irp->IoStatus.Information = 0;
IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT);
RemoveEntryList(&Waiter->Entry);
ExFreePool(Waiter);
return;
Irp = CONTAINING_RECORD(Waiter, IRP, Tail.Overlay.DriverContext);
Irp->IoStatus.Status = STATUS_PIPE_CONNECTED;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
break;
}
CurrentEntry = CurrentEntry->Flink;
}
@ -175,6 +176,9 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
ClientFcb->PipeEnd = FILE_PIPE_CLIENT_END;
ClientFcb->OtherSide = NULL;
ClientFcb->PipeState = SpecialAccess ? 0 : FILE_PIPE_DISCONNECTED_STATE;
InitializeListHead(&ClientFcb->ReadRequestListHead);
DPRINT("Fcb: %x\n", ClientFcb);
/* Initialize data list. */
if (Pipe->OutboundQuota)
@ -202,7 +206,9 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
ClientFcb->MaxDataLength = Pipe->OutboundQuota;
ExInitializeFastMutex(&ClientFcb->DataListLock);
KeInitializeEvent(&ClientFcb->ConnectEvent, SynchronizationEvent, FALSE);
KeInitializeEvent(&ClientFcb->Event, SynchronizationEvent, FALSE);
KeInitializeEvent(&ClientFcb->ReadEvent, SynchronizationEvent, FALSE);
KeInitializeEvent(&ClientFcb->WriteEvent, SynchronizationEvent, FALSE);
/*
* Step 3. Search for listening server FCB.
@ -489,6 +495,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Pipe->InboundQuota;
Fcb->MaxDataLength = Pipe->InboundQuota;
InitializeListHead(&Fcb->ReadRequestListHead);
ExInitializeFastMutex(&Fcb->DataListLock);
Pipe->CurrentInstances++;
@ -498,13 +505,11 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
Fcb->PipeState = FILE_PIPE_LISTENING_STATE;
Fcb->OtherSide = NULL;
KeInitializeEvent(&Fcb->ConnectEvent,
SynchronizationEvent,
FALSE);
DPRINT("Fcb: %x\n", Fcb);
KeInitializeEvent(&Fcb->Event,
SynchronizationEvent,
FALSE);
KeInitializeEvent(&Fcb->ConnectEvent, SynchronizationEvent, FALSE);
KeInitializeEvent(&Fcb->ReadEvent, SynchronizationEvent, FALSE);
KeInitializeEvent(&Fcb->WriteEvent, SynchronizationEvent, FALSE);
KeLockMutex(&Pipe->FcbListLock);
InsertTailList(&Pipe->ServerFcbListHead, &Fcb->FcbListEntry);
@ -528,7 +533,7 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject,
PNPFS_DEVICE_EXTENSION DeviceExt;
PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
PNPFS_FCB Fcb;
PNPFS_FCB Fcb, OtherSide;
PNPFS_PIPE Pipe;
BOOL Server;
@ -566,18 +571,37 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject,
{
DPRINT("Client\n");
}
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
if (Fcb->OtherSide)
OtherSide = Fcb->OtherSide;
/* Lock the server first */
if (Server)
{
Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
Fcb->OtherSide->OtherSide = NULL;
/*
* Signaling the write event. If is possible that an other
* thread waits for an empty buffer.
*/
KeSetEvent(&Fcb->OtherSide->Event, IO_NO_INCREMENT, FALSE);
ExAcquireFastMutex(&Fcb->DataListLock);
ExAcquireFastMutex(&OtherSide->DataListLock);
}
else
{
ExAcquireFastMutex(&OtherSide->DataListLock);
ExAcquireFastMutex(&Fcb->DataListLock);
}
OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
OtherSide->OtherSide = NULL;
/*
* Signaling the write event. If is possible that an other
* thread waits for an empty buffer.
*/
KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
if (Server)
{
ExReleaseFastMutex(&Fcb->DataListLock);
ExReleaseFastMutex(&OtherSide->DataListLock);
}
else
{
ExReleaseFastMutex(&OtherSide->DataListLock);
ExReleaseFastMutex(&Fcb->DataListLock);
}
}
else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
@ -586,6 +610,7 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject,
PNPFS_WAITER_ENTRY WaitEntry = NULL;
BOOLEAN Complete = FALSE;
KIRQL oldIrql;
PIRP tmpIrp;
Entry = Fcb->Pipe->WaiterListHead.Flink;
while (Entry != &Fcb->Pipe->WaiterListHead)
@ -594,28 +619,25 @@ NpfsCleanup(PDEVICE_OBJECT DeviceObject,
if (WaitEntry->Fcb == Fcb)
{
RemoveEntryList(Entry);
tmpIrp = CONTAINING_RECORD(WaitEntry, IRP, Tail.Overlay.DriverContext);
IoAcquireCancelSpinLock(&oldIrql);
if (!Irp->Cancel)
if (!tmpIrp->Cancel)
{
IoSetCancelRoutine(WaitEntry->Irp, NULL);
IoSetCancelRoutine(tmpIrp, NULL);
Complete = TRUE;
}
IoReleaseCancelSpinLock(oldIrql);
if (Complete)
{
tmpIrp->IoStatus.Status = STATUS_PIPE_BROKEN;
tmpIrp->IoStatus.Information = 0;
IoCompleteRequest(tmpIrp, IO_NO_INCREMENT);
}
break;
}
Entry = Entry->Flink;
}
if (Entry != &Fcb->Pipe->WaiterListHead)
{
if (Complete)
{
WaitEntry->Irp->IoStatus.Status = STATUS_PIPE_BROKEN;
WaitEntry->Irp->IoStatus.Information = 0;
IoCompleteRequest(WaitEntry->Irp, IO_NO_INCREMENT);
}
ExFreePool(WaitEntry);
}
}
Fcb->PipeState = FILE_PIPE_CLOSING_STATE;

View file

@ -26,18 +26,18 @@ NpfsListeningCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
DPRINT1("NpfsListeningCancelRoutine() called\n");
Waiter = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
IoReleaseCancelSpinLock(Irp->CancelIrql);
Waiter = Irp->Tail.Overlay.DriverContext[0];
KeLockMutex(&Waiter->Pipe->FcbListLock);
KeLockMutex(&Waiter->Fcb->Pipe->FcbListLock);
RemoveEntryList(&Waiter->Entry);
KeUnlockMutex(&Waiter->Pipe->FcbListLock);
KeUnlockMutex(&Waiter->Fcb->Pipe->FcbListLock);
Irp->IoStatus.Status = STATUS_CANCELLED;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
ExFreePool(Waiter);
}
@ -48,18 +48,13 @@ NpfsAddListeningServerInstance(PIRP Irp,
PNPFS_WAITER_ENTRY Entry;
KIRQL oldIrql;
Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
if (Entry == NULL)
return STATUS_INSUFFICIENT_RESOURCES;
Entry = (PNPFS_WAITER_ENTRY)&Irp->Tail.Overlay.DriverContext;
Entry->Irp = Irp;
Entry->Fcb = Fcb;
Entry->Pipe = Fcb->Pipe;
KeLockMutex(&Fcb->Pipe->FcbListLock);
IoMarkIrpPending(Irp);
Irp->Tail.Overlay.DriverContext[0] = Entry;
InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
IoAcquireCancelSpinLock(&oldIrql);
@ -78,7 +73,6 @@ NpfsAddListeningServerInstance(PIRP Irp,
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
KeUnlockMutex(&Fcb->Pipe->FcbListLock);
ExFreePool(Entry);
return STATUS_CANCELLED;
}
@ -173,38 +167,100 @@ NpfsConnectPipe(PIRP Irp,
static NTSTATUS
NpfsDisconnectPipe(PNPFS_FCB Fcb)
{
DPRINT("NpfsDisconnectPipe()\n");
NTSTATUS Status;
PNPFS_FCB OtherSide;
PNPFS_PIPE Pipe;
BOOL Server;
if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
return STATUS_SUCCESS;
DPRINT("NpfsDisconnectPipe()\n");
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
/* FIXME: Shouldn't this be FILE_PIPE_CLOSING_STATE? */
Fcb->OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
Pipe = Fcb->Pipe;
KeLockMutex(&Pipe->FcbListLock);
/* FIXME: remove data queue(s) */
Fcb->OtherSide->OtherSide = NULL;
if (Fcb->PipeState == FILE_PIPE_DISCONNECTED_STATE)
{
DPRINT("Pipe is already disconnected\n");
Status = STATUS_SUCCESS;
}
else if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
Server = (Fcb->PipeEnd == FILE_PIPE_SERVER_END);
OtherSide = Fcb->OtherSide;
Fcb->OtherSide = NULL;
/* Lock the server first */
if (Server)
{
ExAcquireFastMutex(&Fcb->DataListLock);
ExAcquireFastMutex(&OtherSide->DataListLock);
}
else
{
ExAcquireFastMutex(&OtherSide->DataListLock);
ExAcquireFastMutex(&Fcb->DataListLock);
}
OtherSide->PipeState = FILE_PIPE_DISCONNECTED_STATE;
OtherSide->OtherSide = NULL;
/*
* Signaling the write event. If is possible that an other
* thread waits for an empty buffer.
*/
KeSetEvent(&OtherSide->ReadEvent, IO_NO_INCREMENT, FALSE);
KeSetEvent(&OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
if (Server)
{
ExReleaseFastMutex(&Fcb->DataListLock);
ExReleaseFastMutex(&OtherSide->DataListLock);
}
else
{
ExReleaseFastMutex(&OtherSide->DataListLock);
ExReleaseFastMutex(&OtherSide->DataListLock);
}
Status = STATUS_SUCCESS;
}
else if (Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
{
PLIST_ENTRY Entry;
PNPFS_WAITER_ENTRY WaitEntry = NULL;
BOOLEAN Complete = FALSE;
PIRP Irp = NULL;
DPRINT("Pipe disconnected\n");
return STATUS_SUCCESS;
}
Entry = Fcb->Pipe->WaiterListHead.Flink;
while (Entry != &Fcb->Pipe->WaiterListHead)
{
WaitEntry = CONTAINING_RECORD(Entry, NPFS_WAITER_ENTRY, Entry);
if (WaitEntry->Fcb == Fcb)
{
RemoveEntryList(Entry);
Irp = CONTAINING_RECORD(Entry, IRP, Tail.Overlay.DriverContext);
Complete = (NULL == IoSetCancelRoutine(Irp, NULL));
break;
}
Entry = Entry->Flink;
}
if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE)
{
if (Irp)
{
if (Complete)
{
Irp->IoStatus.Status = STATUS_PIPE_BROKEN;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
}
Fcb->PipeState = FILE_PIPE_DISCONNECTED_STATE;
Fcb->OtherSide = NULL;
/* FIXME: remove data queue(s) */
DPRINT("Pipe disconnected\n");
return STATUS_SUCCESS;
}
return STATUS_UNSUCCESSFUL;
Status = STATUS_SUCCESS;
}
else if (Fcb->PipeState == FILE_PIPE_CLOSING_STATE)
{
Status = STATUS_PIPE_CLOSING;
}
else
{
Status = STATUS_UNSUCCESSFUL;
}
KeUnlockMutex(&Pipe->FcbListLock);
return Status;
}

View file

@ -27,6 +27,9 @@ DriverEntry(PDRIVER_OBJECT DriverObject,
NTSTATUS Status;
DPRINT("Named Pipe FSD 0.0.2\n");
ASSERT (sizeof(NPFS_CONTEXT) <= sizeof (((PIRP)NULL)->Tail.Overlay.DriverContext));
ASSERT (sizeof(NPFS_WAITER_ENTRY) <= sizeof(((PIRP)NULL)->Tail.Overlay.DriverContext));
DriverObject->MajorFunction[IRP_MJ_CREATE] = NpfsCreate;
DriverObject->MajorFunction[IRP_MJ_CREATE_NAMED_PIPE] =
@ -74,8 +77,7 @@ DriverEntry(PDRIVER_OBJECT DriverObject,
DeviceExtension = DeviceObject->DeviceExtension;
InitializeListHead(&DeviceExtension->PipeListHead);
InitializeListHead(&DeviceExtension->ThreadListHead);
KeInitializeMutex(&DeviceExtension->PipeListLock,
0);
KeInitializeMutex(&DeviceExtension->PipeListLock, 0);
DeviceExtension->EmptyWaiterCount = 0;
/* set the size quotas */

View file

@ -42,12 +42,15 @@ typedef struct _NPFS_FCB
struct ETHREAD *Thread;
PNPFS_PIPE Pipe;
KEVENT ConnectEvent;
KEVENT Event;
KEVENT ReadEvent;
KEVENT WriteEvent;
ULONG PipeEnd;
ULONG PipeState;
ULONG ReadDataAvailable;
ULONG WriteQuotaAvailable;
LIST_ENTRY ReadRequestListHead;
PVOID Data;
PVOID ReadPtr;
PVOID WritePtr;
@ -58,11 +61,8 @@ typedef struct _NPFS_FCB
typedef struct _NPFS_CONTEXT
{
PDEVICE_OBJECT DeviceObject;
PIRP Irp;
PNPFS_FCB Fcb;
UCHAR MajorFunction;
BOOLEAN AllocatedFromPool;
LIST_ENTRY ListEntry;
PKEVENT WaitEvent;
} NPFS_CONTEXT, *PNPFS_CONTEXT;
typedef struct _NPFS_THREAD_CONTEXT
@ -73,14 +73,12 @@ typedef struct _NPFS_THREAD_CONTEXT
LIST_ENTRY ListEntry;
PVOID WaitObjectArray[MAXIMUM_WAIT_OBJECTS];
KWAIT_BLOCK WaitBlockArray[MAXIMUM_WAIT_OBJECTS];
PNPFS_CONTEXT WaitContextArray[MAXIMUM_WAIT_OBJECTS];
PIRP WaitIrpArray[MAXIMUM_WAIT_OBJECTS];
} NPFS_THREAD_CONTEXT, *PNPFS_THREAD_CONTEXT;
typedef struct _NPFS_WAITER_ENTRY
{
LIST_ENTRY Entry;
PIRP Irp;
PNPFS_PIPE Pipe;
PNPFS_FCB Fcb;
} NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;

View file

@ -46,61 +46,90 @@ VOID HexDump(PUCHAR Buffer, ULONG Length)
}
#endif
static NTSTATUS
NpfsReadFromPipe(PNPFS_CONTEXT Context);
static VOID STDCALL
NpfsWaitingCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
IN PIRP Irp)
NpfsReadWriteCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
IN PIRP Irp)
{
PNPFS_CONTEXT Context;
PNPFS_DEVICE_EXTENSION DeviceExt;
PIO_STACK_LOCATION IoStack;
PNPFS_FCB Fcb;
BOOLEAN Complete = FALSE;
DPRINT1("NpfsWaitingCancelRoutine() called\n");
DPRINT("NpfsReadWriteCancelRoutine(DeviceObject %x, Irp %x)\n", DeviceObject, Irp);
IoReleaseCancelSpinLock(Irp->CancelIrql);
Context = Irp->Tail.Overlay.DriverContext[0];
DeviceExt = Context->DeviceObject->DeviceExtension;
Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;
IoStack = IoGetCurrentIrpStackLocation(Irp);
Fcb = IoStack->FileObject->FsContext;
KeLockMutex(&DeviceExt->PipeListLock);
KeSetEvent(&Context->Fcb->Event, IO_NO_INCREMENT, FALSE);
ExAcquireFastMutex(&Fcb->DataListLock);
switch(IoStack->MajorFunction)
{
case IRP_MJ_READ:
if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
{
/* we are not the first in the list, remove an complete us */
RemoveEntryList(&Context->ListEntry);
Complete = TRUE;
}
else
{
KeSetEvent(&Fcb->ReadEvent, IO_NO_INCREMENT, FALSE);
}
break;
default:
KEBUGCHECK(0);
}
ExReleaseFastMutex(&Fcb->DataListLock);
KeUnlockMutex(&DeviceExt->PipeListLock);
if (Complete)
{
Irp->IoStatus.Status = STATUS_CANCELLED;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
}
static VOID STDCALL
NpfsWaiterThread(PVOID Context)
NpfsWaiterThread(PVOID InitContext)
{
PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) Context;
ULONG CurrentCount, Count = 0;
PNPFS_CONTEXT WaitContext = NULL;
PNPFS_THREAD_CONTEXT ThreadContext = (PNPFS_THREAD_CONTEXT) InitContext;
ULONG CurrentCount;
ULONG Count = 0;
PIRP Irp = NULL;
PIRP NextIrp;
NTSTATUS Status;
BOOLEAN Terminate = FALSE;
BOOLEAN Cancel = FALSE;
KIRQL oldIrql;
PIO_STACK_LOCATION IoStack = NULL;
PNPFS_CONTEXT Context;
PNPFS_CONTEXT NextContext;
PNPFS_FCB Fcb;
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
while (1)
{
CurrentCount = ThreadContext->Count;
KeResetEvent(&ThreadContext->Event);
KeUnlockMutex(&ThreadContext->DeviceExt->PipeListLock);
if (WaitContext)
if (Irp)
{
if (Cancel)
{
WaitContext->Irp->IoStatus.Status = STATUS_CANCELLED;
WaitContext->Irp->IoStatus.Information = 0;
IoCompleteRequest(WaitContext->Irp, IO_NO_INCREMENT);
ExFreePool(WaitContext);
Irp->IoStatus.Status = STATUS_CANCELLED;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
else
{
switch (WaitContext->MajorFunction)
switch (IoStack->MajorFunction)
{
case IRP_MJ_READ:
NpfsReadFromPipe(WaitContext);
NpfsRead(IoStack->DeviceObject, Irp);
break;
default:
KEBUGCHECK(0);
@ -119,30 +148,56 @@ NpfsWaiterThread(PVOID Context)
FALSE,
NULL,
ThreadContext->WaitBlockArray);
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
if (!NT_SUCCESS(Status))
{
KEBUGCHECK(0);
}
KeLockMutex(&ThreadContext->DeviceExt->PipeListLock);
Count = Status - STATUS_SUCCESS;
ASSERT (Count <= CurrentCount);
ASSERT (Count < CurrentCount);
if (Count > 0)
{
WaitContext = ThreadContext->WaitContextArray[Count];
ThreadContext->Count--;
ThreadContext->DeviceExt->EmptyWaiterCount++;
ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
ThreadContext->WaitContextArray[Count] = ThreadContext->WaitContextArray[ThreadContext->Count];
IoAcquireCancelSpinLock(&oldIrql);
Cancel = NULL == IoSetCancelRoutine(WaitContext->Irp, NULL);
IoReleaseCancelSpinLock(oldIrql);
}
else
{
{
Irp = ThreadContext->WaitIrpArray[Count];
ThreadContext->Count--;
ThreadContext->DeviceExt->EmptyWaiterCount++;
ThreadContext->WaitObjectArray[Count] = ThreadContext->WaitObjectArray[ThreadContext->Count];
ThreadContext->WaitIrpArray[Count] = ThreadContext->WaitIrpArray[ThreadContext->Count];
Cancel = (NULL == IoSetCancelRoutine(Irp, NULL));
Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
IoStack = IoGetCurrentIrpStackLocation(Irp);
if (Cancel)
{
Fcb = IoStack->FileObject->FsContext;
ExAcquireFastMutex(&Fcb->DataListLock);
RemoveEntryList(&Context->ListEntry);
switch (IoStack->MajorFunction)
{
case IRP_MJ_READ:
if (!IsListEmpty(&Fcb->ReadRequestListHead))
{
/* put the next request on the wait list */
NextContext = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
ThreadContext->WaitObjectArray[ThreadContext->Count] = NextContext->WaitEvent;
NextIrp = CONTAINING_RECORD(NextContext, IRP, Tail.Overlay.DriverContext);
ThreadContext->WaitIrpArray[ThreadContext->Count] = NextIrp;
ThreadContext->Count++;
ThreadContext->DeviceExt->EmptyWaiterCount--;
}
break;
default:
KEBUGCHECK(0);
}
ExReleaseFastMutex(&Fcb->DataListLock);
}
}
else
{
/* someone has add a new wait request */
WaitContext = NULL;
}
if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
Irp = NULL;
}
if (ThreadContext->Count == 1 && ThreadContext->DeviceExt->EmptyWaiterCount >= MAXIMUM_WAIT_OBJECTS)
{
/* it exist an other thread with empty wait slots, we can remove our thread from the list */
RemoveEntryList(&ThreadContext->ListEntry);
@ -155,14 +210,20 @@ NpfsWaiterThread(PVOID Context)
}
static NTSTATUS
NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PNPFS_FCB Fcb)
NpfsAddWaitingReadWriteRequest(IN PDEVICE_OBJECT DeviceObject,
IN PIRP Irp)
{
PLIST_ENTRY ListEntry;
PNPFS_THREAD_CONTEXT ThreadContext;
PNPFS_THREAD_CONTEXT ThreadContext = NULL;
NTSTATUS Status;
HANDLE hThread;
KIRQL oldIrql;
PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
PNPFS_DEVICE_EXTENSION DeviceExt = (PNPFS_DEVICE_EXTENSION)DeviceObject->DeviceExtension;;
DPRINT("NpfsAddWaitingReadWriteRequest(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
KeLockMutex(&DeviceExt->PipeListLock);
ListEntry = DeviceExt->ThreadListHead.Flink;
@ -184,12 +245,12 @@ NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PN
return STATUS_NO_MEMORY;
}
ThreadContext->DeviceExt = DeviceExt;
KeInitializeEvent(&ThreadContext->Event, NotificationEvent, FALSE);
KeInitializeEvent(&ThreadContext->Event, SynchronizationEvent, FALSE);
ThreadContext->Count = 1;
ThreadContext->WaitObjectArray[0] = &ThreadContext->Event;
DPRINT("Creating a new system thread for waiting read requests\n");
DPRINT("Creating a new system thread for waiting read/write requests\n");
Status = PsCreateSystemThread(&hThread,
THREAD_ALL_ACCESS,
@ -207,21 +268,20 @@ NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PN
InsertHeadList(&DeviceExt->ThreadListHead, &ThreadContext->ListEntry);
DeviceExt->EmptyWaiterCount += MAXIMUM_WAIT_OBJECTS - 1;
}
IoMarkIrpPending(Context->Irp);
Context->Irp->Tail.Overlay.DriverContext[0] = Context;
IoMarkIrpPending(Irp);
IoAcquireCancelSpinLock(&oldIrql);
if (Context->Irp->Cancel)
if (Irp->Cancel)
{
IoReleaseCancelSpinLock(oldIrql);
Status = STATUS_CANCELLED;
}
else
{
IoSetCancelRoutine(Context->Irp, NpfsWaitingCancelRoutine);
IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
IoReleaseCancelSpinLock(oldIrql);
ThreadContext->WaitObjectArray[ThreadContext->Count] = &Fcb->Event;
ThreadContext->WaitContextArray[ThreadContext->Count] = Context;
ThreadContext->WaitObjectArray[ThreadContext->Count] = Context->WaitEvent;
ThreadContext->WaitIrpArray[ThreadContext->Count] = Irp;
ThreadContext->Count++;
DeviceExt->EmptyWaiterCount--;
KeSetEvent(&ThreadContext->Event, IO_NO_INCREMENT, FALSE);
@ -231,130 +291,182 @@ NpfsAddWaitingReader(PNPFS_DEVICE_EXTENSION DeviceExt, PNPFS_CONTEXT Context, PN
return Status;
}
static NTSTATUS
NpfsReadFromPipe(PNPFS_CONTEXT Context)
NTSTATUS STDCALL
NpfsRead(IN PDEVICE_OBJECT DeviceObject,
IN PIRP Irp)
{
PIO_STACK_LOCATION IoStack;
PFILE_OBJECT FileObject;
NTSTATUS Status;
ULONG Information;
NTSTATUS OriginalStatus = STATUS_SUCCESS;
PNPFS_FCB Fcb;
PNPFS_FCB WriterFcb;
PNPFS_PIPE Pipe;
PNPFS_CONTEXT Context;
KEVENT Event;
ULONG Length;
PVOID Buffer;
ULONG Information;
ULONG CopyLength;
ULONG TempLength;
BOOLEAN IsOriginalRequest = TRUE;
PVOID Buffer;
DPRINT("NpfsReadFromPipe(Context %p)\n", Context);
DPRINT("NpfsRead(DeviceObject %p, Irp %p)\n", DeviceObject, Irp);
IoStack = IoGetCurrentIrpStackLocation(Context->Irp);
FileObject = IoStack->FileObject;
if (Irp->MdlAddress == NULL)
{
DPRINT("Irp->MdlAddress == NULL\n");
Status = STATUS_UNSUCCESSFUL;
Irp->IoStatus.Information = 0;
goto done;
}
FileObject = IoGetCurrentIrpStackLocation(Irp)->FileObject;
Fcb = FileObject->FsContext;
Pipe = Fcb->Pipe;
WriterFcb = Fcb->OtherSide;
Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
if (Fcb->Data == NULL)
{
DPRINT("Pipe is NOT readable!\n");
Status = STATUS_UNSUCCESSFUL;
Information = 0;
goto done;
}
{
DPRINT1("Pipe is NOT readable!\n");
Status = STATUS_UNSUCCESSFUL;
Irp->IoStatus.Information = 0;
goto done;
}
Status = STATUS_SUCCESS;
Length = IoStack->Parameters.Read.Length;
Information = 0;
Buffer = MmGetSystemAddressForMdl(Context->Irp->MdlAddress);
ExAcquireFastMutex(&Fcb->DataListLock);
while (1)
{
if (Fcb->ReadDataAvailable == 0)
{
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
}
ExReleaseFastMutex(&Fcb->DataListLock);
if (Information > 0)
{
Status = STATUS_SUCCESS;
goto done;
}
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
if (IoIsOperationSynchronous(Irp))
{
InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
{
KeInitializeEvent(&Event, SynchronizationEvent, FALSE);
Context->WaitEvent = &Event;
ExReleaseFastMutex(&Fcb->DataListLock);
Status = KeWaitForSingleObject(&Event,
Executive,
KernelMode,
FALSE,
NULL);
if (!NT_SUCCESS(Status))
{
KEBUGCHECK(0);
}
ExAcquireFastMutex(&Fcb->DataListLock);
}
Irp->IoStatus.Information = 0;
}
else
{
KIRQL oldIrql;
if (IsListEmpty(&Fcb->ReadRequestListHead) ||
Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
{
/* this is a new request */
Irp->IoStatus.Information = 0;
Context->WaitEvent = &Fcb->ReadEvent;
InsertTailList(&Fcb->ReadRequestListHead, &Context->ListEntry);
if (Fcb->ReadRequestListHead.Flink != &Context->ListEntry)
{
/* there was already a request on the list */
IoAcquireCancelSpinLock(&oldIrql);
if (Irp->Cancel)
{
IoReleaseCancelSpinLock(oldIrql);
RemoveEntryList(&Context->ListEntry);
ExReleaseFastMutex(&Fcb->DataListLock);
Status = STATUS_CANCELLED;
goto done;
}
IoSetCancelRoutine(Irp, NpfsReadWriteCancelRoutine);
IoReleaseCancelSpinLock(oldIrql);
ExReleaseFastMutex(&Fcb->DataListLock);
IoMarkIrpPending(Irp);
Status = STATUS_PENDING;
goto done;
}
}
}
while (1)
{
Buffer = MmGetSystemAddressForMdl(Irp->MdlAddress);
Information = Irp->IoStatus.Information;
Length = IoGetCurrentIrpStackLocation(Irp)->Parameters.Read.Length;
ASSERT (Information <= Length);
Buffer += Information;
Length -= Information;
Status = STATUS_SUCCESS;
while (1)
{
if (Fcb->ReadDataAvailable == 0)
{
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
}
if (Information > 0 &&
(Fcb->Pipe->ReadMode != FILE_PIPE_BYTE_STREAM_MODE ||
Fcb->PipeState != FILE_PIPE_CONNECTED_STATE))
{
break;
}
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
DPRINT("PipeState: %x\n", Fcb->PipeState);
Status = STATUS_PIPE_BROKEN;
goto done;
}
if (IoIsOperationSynchronous(Context->Irp))
{
break;
}
ExReleaseFastMutex(&Fcb->DataListLock);
if (IoIsOperationSynchronous(Irp))
{
/* Wait for ReadEvent to become signaled */
DPRINT("Waiting for readable data (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->Event,
DPRINT("Waiting for readable data (%wZ)\n", &Fcb->Pipe->PipeName);
Status = KeWaitForSingleObject(&Fcb->ReadEvent,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
}
else
{
PNPFS_CONTEXT NewContext;
NewContext = ExAllocatePool(NonPagedPool, sizeof(NPFS_CONTEXT));
if (NewContext == NULL)
{
Status = STATUS_NO_MEMORY;
goto done;
}
memcpy(NewContext, Context, sizeof(NPFS_CONTEXT));
NewContext->AllocatedFromPool = TRUE;
NewContext->Fcb = Fcb;
NewContext->MajorFunction = IRP_MJ_READ;
Status = NpfsAddWaitingReader(Context->DeviceObject->DeviceExtension, NewContext, Fcb);
DPRINT("Finished waiting (%wZ)! Status: %x\n", &Fcb->Pipe->PipeName, Status);
ExAcquireFastMutex(&Fcb->DataListLock);
}
else
{
PNPFS_CONTEXT Context = (PNPFS_CONTEXT)&Irp->Tail.Overlay.DriverContext;
Context->WaitEvent = &Fcb->ReadEvent;
Status = NpfsAddWaitingReadWriteRequest(DeviceObject, Irp);
if (NT_SUCCESS(Status))
{
Status = STATUS_PENDING;
}
else
{
ExFreePool(NewContext);
}
goto done;
}
ExAcquireFastMutex(&Fcb->DataListLock);
}
if (Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
{
DPRINT("Byte stream mode\n");
/* Byte stream mode */
while (Length > 0 && Fcb->ReadDataAvailable > 0)
{
{
Status = STATUS_PENDING;
}
ExAcquireFastMutex(&Fcb->DataListLock);
break;
}
}
if (Fcb->Pipe->ReadMode == FILE_PIPE_BYTE_STREAM_MODE)
{
DPRINT("Byte stream mode\n");
/* Byte stream mode */
while (Length > 0 && Fcb->ReadDataAvailable > 0)
{
CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
if (Fcb->ReadPtr + CopyLength <= Fcb->Data + Fcb->MaxDataLength)
{
memcpy(Buffer, Fcb->ReadPtr, CopyLength);
Fcb->ReadPtr += CopyLength;
if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
{
Fcb->ReadPtr = Fcb->Data;
}
}
else
{
TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr;
memcpy(Buffer, Fcb->ReadPtr, TempLength);
memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength);
Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
}
{
memcpy(Buffer, Fcb->ReadPtr, CopyLength);
Fcb->ReadPtr += CopyLength;
if (Fcb->ReadPtr == Fcb->Data + Fcb->MaxDataLength)
{
Fcb->ReadPtr = Fcb->Data;
}
}
else
{
TempLength = Fcb->Data + Fcb->MaxDataLength - Fcb->ReadPtr;
memcpy(Buffer, Fcb->ReadPtr, TempLength);
memcpy(Buffer + TempLength, Fcb->Data, CopyLength - TempLength);
Fcb->ReadPtr = Fcb->Data + CopyLength - TempLength;
}
Buffer += CopyLength;
Length -= CopyLength;
@ -362,25 +474,25 @@ NpfsReadFromPipe(PNPFS_CONTEXT Context)
Fcb->ReadDataAvailable -= CopyLength;
Fcb->WriteQuotaAvailable += CopyLength;
}
}
if (Length == 0)
{
if (Length == 0)
{
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
}
KeResetEvent(&Fcb->Event);
break;
}
}
else
{
DPRINT("Message mode\n");
{
KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
}
KeResetEvent(&Fcb->ReadEvent);
break;
}
}
else
{
DPRINT("Message mode\n");
/* Message mode */
if (Fcb->ReadDataAvailable)
{
/* Message mode */
if (Fcb->ReadDataAvailable)
{
/* Truncate the message if the receive buffer is too small */
CopyLength = RtlRosMin(Fcb->ReadDataAvailable, Length);
memcpy(Buffer, Fcb->Data, CopyLength);
@ -393,73 +505,85 @@ NpfsReadFromPipe(PNPFS_CONTEXT Context)
Information = CopyLength;
if (Fcb->ReadDataAvailable > Length)
{
memmove(Fcb->Data, Fcb->Data + Length,
Fcb->ReadDataAvailable - Length);
Fcb->ReadDataAvailable -= Length;
Status = STATUS_MORE_ENTRIES;
}
{
memmove(Fcb->Data, Fcb->Data + Length,
Fcb->ReadDataAvailable - Length);
Fcb->ReadDataAvailable -= Length;
Status = STATUS_MORE_ENTRIES;
}
else
{
KeResetEvent(&Fcb->Event);
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
}
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
}
}
{
KeResetEvent(&Fcb->ReadEvent);
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{
KeSetEvent(&Fcb->OtherSide->WriteEvent, IO_NO_INCREMENT, FALSE);
}
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
}
}
if (Information > 0)
{
break;
}
if (Information > 0)
{
break;
}
}
}
Irp->IoStatus.Information = Information;
Irp->IoStatus.Status = Status;
if (IoIsOperationSynchronous(Irp))
{
RemoveEntryList(&Context->ListEntry);
if (!IsListEmpty(&Fcb->ReadRequestListHead))
{
Context = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
KeSetEvent(Context->WaitEvent, IO_NO_INCREMENT, FALSE);
}
}
ExReleaseFastMutex(&Fcb->DataListLock);
IoCompleteRequest(Irp, IO_NO_INCREMENT);
ExReleaseFastMutex(&Fcb->DataListLock);
DPRINT("NpfsRead done (Status %lx)\n", Status);
return Status;
}
else
{
if (IsOriginalRequest)
{
IsOriginalRequest = FALSE;
OriginalStatus = Status;
}
if (Status == STATUS_PENDING)
{
ExReleaseFastMutex(&Fcb->DataListLock);
DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
return OriginalStatus;
}
RemoveEntryList(&Context->ListEntry);
IoCompleteRequest(Irp, IO_NO_INCREMENT);
if (IsListEmpty(&Fcb->ReadRequestListHead))
{
ExReleaseFastMutex(&Fcb->DataListLock);
DPRINT("NpfsRead done (Status %lx)\n", OriginalStatus);
return OriginalStatus;
}
Context = CONTAINING_RECORD(Fcb->ReadRequestListHead.Flink, NPFS_CONTEXT, ListEntry);
Irp = CONTAINING_RECORD(Context, IRP, Tail.Overlay.DriverContext);
}
}
done:
Context->Irp->IoStatus.Status = Status;
Context->Irp->IoStatus.Information = Information;
Irp->IoStatus.Status = Status;
if (Status != STATUS_PENDING)
{
IoCompleteRequest(Context->Irp, IO_NO_INCREMENT);
}
if (Context->AllocatedFromPool)
{
ExFreePool(Context);
IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
DPRINT("NpfsRead done (Status %lx)\n", Status);
return Status;
}
NTSTATUS STDCALL
NpfsRead(PDEVICE_OBJECT DeviceObject,
PIRP Irp)
{
NPFS_CONTEXT Context;
Context.AllocatedFromPool = FALSE;
Context.DeviceObject = DeviceObject;
Context.Irp = Irp;
if (Irp->MdlAddress == NULL)
{
DPRINT("Irp->MdlAddress == NULL\n");
Irp->IoStatus.Status = STATUS_UNSUCCESSFUL;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
return STATUS_UNSUCCESSFUL;
}
return NpfsReadFromPipe(&Context);
}
NTSTATUS STDCALL
NpfsWrite(PDEVICE_OBJECT DeviceObject,
PIRP Irp)
@ -534,22 +658,24 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
{
if (ReaderFcb->WriteQuotaAvailable == 0)
{
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
ExReleaseFastMutex(&ReaderFcb->DataListLock);
KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE);
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
{
Status = STATUS_PIPE_BROKEN;
ExReleaseFastMutex(&ReaderFcb->DataListLock);
goto done;
}
ExReleaseFastMutex(&ReaderFcb->DataListLock);
DPRINT("Waiting for buffer space (%S)\n", Pipe->PipeName.Buffer);
Status = KeWaitForSingleObject(&Fcb->Event,
UserRequest,
Status = KeWaitForSingleObject(&Fcb->WriteEvent,
UserRequest,
KernelMode,
FALSE,
NULL);
DPRINT("Finished waiting (%S)! Status: %x\n", Pipe->PipeName.Buffer, Status);
ExAcquireFastMutex(&ReaderFcb->DataListLock);
/*
* It's possible that the event was signaled because the
* other side of pipe was closed.
@ -558,9 +684,9 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
{
DPRINT("PipeState: %x\n", Fcb->PipeState);
Status = STATUS_PIPE_BROKEN;
ExReleaseFastMutex(&ReaderFcb->DataListLock);
goto done;
}
ExAcquireFastMutex(&ReaderFcb->DataListLock);
}
if (Pipe->WriteMode == FILE_PIPE_BYTE_STREAM_MODE)
@ -596,8 +722,8 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
if (Length == 0)
{
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->Event);
KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->WriteEvent);
break;
}
}
@ -616,8 +742,8 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
if (Information > 0)
{
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->Event);
KeSetEvent(&ReaderFcb->ReadEvent, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->WriteEvent);
break;
}
}