Support asynchronous (aka overlapped) connect, read and write requests.

svn path=/trunk/; revision=13826
This commit is contained in:
Eric Kohl 2005-03-05 12:08:50 +00:00
parent 888cadd6ea
commit 8027ba84c5
4 changed files with 138 additions and 37 deletions

View file

@ -48,17 +48,18 @@ static PNPFS_FCB
NpfsFindListeningServerInstance(PNPFS_PIPE Pipe) NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
{ {
PLIST_ENTRY CurrentEntry; PLIST_ENTRY CurrentEntry;
PNPFS_FCB ServerFcb; PNPFS_WAITER_ENTRY Waiter;
CurrentEntry = Pipe->ServerFcbListHead.Flink; CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->ServerFcbListHead) while (CurrentEntry != &Pipe->WaiterListHead)
{ {
ServerFcb = CONTAINING_RECORD(CurrentEntry, NPFS_FCB, FcbListEntry); Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
if (ServerFcb->PipeState == FILE_PIPE_LISTENING_STATE) if (Waiter->Fcb->PipeState == FILE_PIPE_LISTENING_STATE)
{ {
DPRINT("Server found! Fcb %p\n", ServerFcb); DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
return ServerFcb; return Waiter->Fcb;
} }
CurrentEntry = CurrentEntry->Flink; CurrentEntry = CurrentEntry->Flink;
} }
@ -66,6 +67,35 @@ NpfsFindListeningServerInstance(PNPFS_PIPE Pipe)
} }
static VOID
NpfsSignalAndRemoveListeningServerInstance(PNPFS_PIPE Pipe,
PNPFS_FCB Fcb)
{
PLIST_ENTRY CurrentEntry;
PNPFS_WAITER_ENTRY Waiter;
CurrentEntry = Pipe->WaiterListHead.Flink;
while (CurrentEntry != &Pipe->WaiterListHead)
{
Waiter = CONTAINING_RECORD(CurrentEntry, NPFS_WAITER_ENTRY, Entry);
if (Waiter->Fcb == Fcb)
{
DPRINT("Server found! Fcb %p\n", Waiter->Fcb);
KeSetEvent(Waiter->Irp->UserEvent, 0, FALSE);
Waiter->Irp->UserIosb->Status = FILE_PIPE_CONNECTED_STATE;
Waiter->Irp->UserIosb->Information = 0;
IoCompleteRequest(Waiter->Irp, IO_NO_INCREMENT);
RemoveEntryList(&Waiter->Entry);
ExFreePool(Waiter);
return;
}
CurrentEntry = CurrentEntry->Flink;
}
}
NTSTATUS STDCALL NTSTATUS STDCALL
NpfsCreate(PDEVICE_OBJECT DeviceObject, NpfsCreate(PDEVICE_OBJECT DeviceObject,
PIRP Irp) PIRP Irp)
@ -206,9 +236,8 @@ NpfsCreate(PDEVICE_OBJECT DeviceObject,
ClientFcb->PipeState = FILE_PIPE_CONNECTED_STATE; ClientFcb->PipeState = FILE_PIPE_CONNECTED_STATE;
ServerFcb->PipeState = FILE_PIPE_CONNECTED_STATE; ServerFcb->PipeState = FILE_PIPE_CONNECTED_STATE;
/* Wake server thread */ /* Signal the server thread and remove it from the waiter list */
DPRINT("Setting the ConnectEvent for %x\n", ServerFcb); NpfsSignalAndRemoveListeningServerInstance(Pipe, ServerFcb);
KeSetEvent(&ServerFcb->ConnectEvent, 0, FALSE);
} }
KeUnlockMutex(&Pipe->FcbListLock); KeUnlockMutex(&Pipe->FcbListLock);
@ -318,6 +347,7 @@ NpfsCreateNamedPipe(PDEVICE_OBJECT DeviceObject,
InitializeListHead(&Pipe->ServerFcbListHead); InitializeListHead(&Pipe->ServerFcbListHead);
InitializeListHead(&Pipe->ClientFcbListHead); InitializeListHead(&Pipe->ClientFcbListHead);
InitializeListHead(&Pipe->WaiterListHead);
KeInitializeMutex(&Pipe->FcbListLock, 0); KeInitializeMutex(&Pipe->FcbListLock, 0);
Pipe->PipeType = Buffer->NamedPipeType; Pipe->PipeType = Buffer->NamedPipeType;

View file

@ -18,8 +18,66 @@
/* FUNCTIONS *****************************************************************/ /* FUNCTIONS *****************************************************************/
static VOID
NpfsListeningCancelRoutine(IN PDEVICE_OBJECT DeviceObject,
IN PIRP Irp)
{
PNPFS_WAITER_ENTRY Waiter;
DPRINT1("NpfsListeningCancelRoutine() called\n");
/* FIXME: Not tested. */
Waiter = Irp->Tail.Overlay.DriverContext[0];
RemoveEntryList(&Waiter->Entry);
ExFreePool(Waiter);
IoReleaseCancelSpinLock(Irp->CancelIrql);
Irp->IoStatus.Status = STATUS_CANCELLED;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
static NTSTATUS static NTSTATUS
NpfsConnectPipe(PNPFS_FCB Fcb) NpfsAddListeningServerInstance(PIRP Irp,
PNPFS_FCB Fcb)
{
PNPFS_WAITER_ENTRY Entry;
KIRQL OldIrql;
Entry = ExAllocatePool(NonPagedPool, sizeof(NPFS_WAITER_ENTRY));
if (Entry == NULL)
return STATUS_INSUFFICIENT_RESOURCES;
Entry->Irp = Irp;
Entry->Fcb = Fcb;
InsertTailList(&Fcb->Pipe->WaiterListHead, &Entry->Entry);
IoAcquireCancelSpinLock(&OldIrql);
if (!Irp->Cancel)
{
Irp->Tail.Overlay.DriverContext[0] = Entry;
IoMarkIrpPending(Irp);
IoSetCancelRoutine(Irp, NpfsListeningCancelRoutine);
IoReleaseCancelSpinLock(OldIrql);
return STATUS_PENDING;
}
/* IRP has already been cancelled */
IoReleaseCancelSpinLock(OldIrql);
DPRINT1("FIXME: Remove waiter entry!\n");
RemoveEntryList(&Entry->Entry);
ExFreePool(Entry);
return STATUS_CANCELLED;
}
static NTSTATUS
NpfsConnectPipe(PIRP Irp,
PNPFS_FCB Fcb)
{ {
PNPFS_PIPE Pipe; PNPFS_PIPE Pipe;
PLIST_ENTRY current_entry; PLIST_ENTRY current_entry;
@ -88,29 +146,18 @@ NpfsConnectPipe(PNPFS_FCB Fcb)
current_entry = current_entry->Flink; current_entry = current_entry->Flink;
} }
KeUnlockMutex(&Pipe->FcbListLock);
/* no listening client fcb found */ /* no listening client fcb found */
DPRINT("No listening client fcb found -- waiting for client\n"); DPRINT("No listening client fcb found -- waiting for client\n");
Fcb->PipeState = FILE_PIPE_LISTENING_STATE; Fcb->PipeState = FILE_PIPE_LISTENING_STATE;
Status = KeWaitForSingleObject(&Fcb->ConnectEvent, Status = NpfsAddListeningServerInstance(Irp, Fcb);
UserRequest,
KernelMode,
FALSE,
NULL);
if (!NT_SUCCESS(Status))
{
DPRINT("KeWaitForSingleObject() failed (Status %lx)\n", Status);
return Status;
}
Fcb->PipeState = FILE_PIPE_CONNECTED_STATE; KeUnlockMutex(&Pipe->FcbListLock);
DPRINT("Client Fcb: %p\n", Fcb->OtherSide); DPRINT("NpfsConnectPipe() done (Status %lx)\n", Status);
return STATUS_PIPE_CONNECTED; return Status;
} }
@ -327,7 +374,6 @@ NpfsPeekPipe(PIRP Irp,
} }
NTSTATUS STDCALL NTSTATUS STDCALL
NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject, NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
PIRP Irp) PIRP Irp)
@ -366,7 +412,7 @@ NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
case FSCTL_PIPE_LISTEN: case FSCTL_PIPE_LISTEN:
DPRINT("Connecting pipe %wZ\n", &Pipe->PipeName); DPRINT("Connecting pipe %wZ\n", &Pipe->PipeName);
Status = NpfsConnectPipe(Fcb); Status = NpfsConnectPipe(Irp, Fcb);
break; break;
case FSCTL_PIPE_PEEK: case FSCTL_PIPE_PEEK:
@ -439,12 +485,15 @@ NpfsFileSystemControl(PDEVICE_OBJECT DeviceObject,
Status = STATUS_UNSUCCESSFUL; Status = STATUS_UNSUCCESSFUL;
} }
Irp->IoStatus.Status = Status; if (Status != STATUS_PENDING)
Irp->IoStatus.Information = 0; {
Irp->IoStatus.Status = Status;
Irp->IoStatus.Information = 0;
IoCompleteRequest(Irp, IO_NO_INCREMENT);
}
IoCompleteRequest(Irp, IO_NO_INCREMENT); return Status;
return(Status);
} }

View file

@ -19,6 +19,7 @@ typedef struct _NPFS_PIPE
KMUTEX FcbListLock; KMUTEX FcbListLock;
LIST_ENTRY ServerFcbListHead; LIST_ENTRY ServerFcbListHead;
LIST_ENTRY ClientFcbListHead; LIST_ENTRY ClientFcbListHead;
LIST_ENTRY WaiterListHead;
ULONG PipeType; ULONG PipeType;
ULONG ReadMode; ULONG ReadMode;
ULONG WriteMode; ULONG WriteMode;
@ -52,6 +53,14 @@ typedef struct _NPFS_FCB
KSPIN_LOCK DataListLock; /* Data queue lock */ KSPIN_LOCK DataListLock; /* Data queue lock */
} NPFS_FCB, *PNPFS_FCB; } NPFS_FCB, *PNPFS_FCB;
typedef struct _NPFS_WAITER_ENTRY
{
LIST_ENTRY Entry;
PIRP Irp;
PNPFS_PIPE Pipe;
PNPFS_FCB Fcb;
} NPFS_WAITER_ENTRY, *PNPFS_WAITER_ENTRY;
extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList; extern NPAGED_LOOKASIDE_LIST NpfsPipeDataLookasideList;

View file

@ -101,7 +101,6 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
/* FIXME: check if in blocking mode */ /* FIXME: check if in blocking mode */
if (Fcb->ReadDataAvailable == 0) if (Fcb->ReadDataAvailable == 0)
{ {
KeResetEvent(&Fcb->Event);
if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE) if (Fcb->PipeState == FILE_PIPE_CONNECTED_STATE)
{ {
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
@ -167,6 +166,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
if (Length == 0) if (Length == 0)
{ {
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->Event);
break; break;
} }
} }
@ -187,8 +187,19 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
#endif #endif
Information = CopyLength; Information = CopyLength;
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Fcb->MaxDataLength; if (Fcb->ReadDataAvailable > Length)
{
memmove(Fcb->Data, Fcb->Data + Length,
Fcb->ReadDataAvailable - Length);
Fcb->ReadDataAvailable -= Length;
Status = STATUS_MORE_ENTRIES;
}
else
{
Fcb->ReadDataAvailable = 0;
Fcb->WriteQuotaAvailable = Fcb->MaxDataLength;
}
} }
if (Information > 0) if (Information > 0)
@ -197,6 +208,7 @@ NpfsRead(PDEVICE_OBJECT DeviceObject,
{ {
KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE); KeSetEvent(&WriterFcb->Event, IO_NO_INCREMENT, FALSE);
} }
KeResetEvent(&Fcb->Event);
break; break;
} }
} }
@ -291,7 +303,6 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
{ {
if (ReaderFcb->WriteQuotaAvailable == 0) if (ReaderFcb->WriteQuotaAvailable == 0)
{ {
KeResetEvent(&Fcb->Event);
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql); KeReleaseSpinLock(&ReaderFcb->DataListLock, OldIrql);
if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE) if (Fcb->PipeState != FILE_PIPE_CONNECTED_STATE)
@ -355,6 +366,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
if (Length == 0) if (Length == 0)
{ {
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->Event);
break; break;
} }
} }
@ -374,6 +386,7 @@ NpfsWrite(PDEVICE_OBJECT DeviceObject,
if (Information > 0) if (Information > 0)
{ {
KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE); KeSetEvent(&ReaderFcb->Event, IO_NO_INCREMENT, FALSE);
KeResetEvent(&Fcb->Event);
break; break;
} }
} }