recv now works every time

- Standardized on recvfrom request and reply everywhere
- Added a continuous parameter to FillWSABuffers for stream sockets
- Added function TryToSatisfyRecvRequest
- Create MDLs for the WSABUFS.  These are needed because we aren't in our
  home process when tcpip calls back with data.
- Removed extraneous and potentially confusing lock ReadRequestQueueLock
  Now both ReadRequestQueue and ReceiveQueue rely on ReceiveQueueLock

svn path=/trunk/; revision=9669
This commit is contained in:
Art Yerkes 2004-06-15 02:56:13 +00:00
parent 9725dc0746
commit 5fd39d97bb
7 changed files with 153 additions and 105 deletions

View file

@ -52,8 +52,9 @@ AfdDispatch(
* Status of the operation
*/
{
NTSTATUS Status;
NTSTATUS Status;
PIO_STACK_LOCATION IrpSp;
BOOL DoComplete = TRUE;
IrpSp = IoGetCurrentIrpStackLocation(Irp);
@ -93,6 +94,7 @@ AfdDispatch(
case IOCTL_AFD_RECV:
Status = AfdDispRecv(Irp, IrpSp);
DoComplete = FALSE;
break;
case IOCTL_AFD_SEND:
@ -115,14 +117,14 @@ AfdDispatch(
break;
}
if (Status != STATUS_PENDING) {
if (Status != STATUS_PENDING && DoComplete) {
Irp->IoStatus.Status = Status;
IoCompleteRequest(Irp, IO_NETWORK_INCREMENT);
}
AFD_DbgPrint(MAX_TRACE, ("Leaving. Status (0x%X).\n", Status));
return Status;
return Status;
}

View file

@ -29,52 +29,55 @@ NTSTATUS AfdpDispRecv(
PAFD_READ_REQUEST ReadRequest;
NTSTATUS Status;
KIRQL OldIrql;
ULONG Count;
PMDL Mdl;
UINT i;
KeAcquireSpinLock(&FCB->ReceiveQueueLock, &OldIrql);
if (IsListEmpty(&FCB->ReceiveQueue)) {
KeReleaseSpinLock(&FCB->ReceiveQueueLock, OldIrql);
/* Queue a read request and return STATUS_PENDING */
AFD_DbgPrint(MAX_TRACE, ("Queueing read request.\n"));
/*ReadRequest = (PAFD_READ_REQUEST)ExAllocateFromNPagedLookasideList(
&ReadRequestLookasideList);*/
ReadRequest = (PAFD_READ_REQUEST)ExAllocatePool(
/* Queue a read request and return STATUS_PENDING */
AFD_DbgPrint(MAX_TRACE, ("Queueing read request.\n"));
/*ReadRequest = (PAFD_READ_REQUEST)ExAllocateFromNPagedLookasideList(
&ReadRequestLookasideList);*/
ReadRequest = (PAFD_READ_REQUEST)ExAllocatePool(
NonPagedPool,
sizeof(AFD_READ_REQUEST));
if (ReadRequest) {
if (ReadRequest) {
ReadRequest->Irp = Irp;
ReadRequest->RecvFromRequest = Request;
ReadRequest->RecvFromReply = Reply;
AFD_DbgPrint(MAX_TRACE,("Reply to %x (%x)\n", Reply,
ReadRequest->RecvFromReply));
for( i = 0;
i < ReadRequest->RecvFromRequest->BufferCount;
i++ ) {
/* These will be cleaned up in routines.c:FillWSABuffers */
Mdl = IoAllocateMdl( ReadRequest->RecvFromRequest->Buffers[i].buf,
ReadRequest->RecvFromRequest->Buffers[i].len,
FALSE,
FALSE,
Irp );
MmProbeAndLockPages( Mdl, KernelMode, IoWriteAccess );
ReadRequest->RecvFromRequest->Buffers[i].buf = (PCHAR)Mdl;
}
InsertTailList( &FCB->ReadRequestQueue, &ReadRequest->ListEntry );
ExInterlockedInsertTailList(
&FCB->ReadRequestQueue,
&ReadRequest->ListEntry,
&FCB->ReadRequestQueueLock);
Status = STATUS_PENDING;
} else {
Status = STATUS_INSUFFICIENT_RESOURCES;
}
} else {
AFD_DbgPrint(MAX_TRACE, ("Satisfying read request.\n"));
/* Satisfy the request at once */
Status = FillWSABuffers(
FCB,
Request->Buffers,
Request->BufferCount,
&Count,
Continuous); /* I.E. Packets are exhausted on short recv if not */
KeReleaseSpinLock(&FCB->ReceiveQueueLock, OldIrql);
Reply->NumberOfBytesRecvd = Count;
Reply->Status = NO_ERROR;
AFD_DbgPrint(MAX_TRACE, ("Bytes received (0x%X).\n", Count));
Status = STATUS_INSUFFICIENT_RESOURCES;
}
TryToSatisfyRecvRequest( FCB, Continuous );
if (IsListEmpty(&FCB->ReadRequestQueue)) /* All recv requests handled */
Status = STATUS_SUCCESS;
else
IoMarkIrpPending( Irp );
KeReleaseSpinLock(&FCB->ReceiveQueueLock, OldIrql);
return Status;
}
@ -767,8 +770,8 @@ NTSTATUS AfdDispRecv(
NTSTATUS Status;
UINT InputBufferLength;
UINT OutputBufferLength;
PFILE_REQUEST_RECV Request;
PFILE_REPLY_RECV Reply;
PFILE_REQUEST_RECVFROM Request;
PFILE_REPLY_RECVFROM Reply;
DWORD NumberOfBytesRecvd;
PAFDFCB FCB;
@ -782,14 +785,14 @@ NTSTATUS AfdDispRecv(
(OutputBufferLength >= sizeof(FILE_REPLY_RECV))) {
FCB = IrpSp->FileObject->FsContext;
Request = (PFILE_REQUEST_RECV)Irp->AssociatedIrp.SystemBuffer;
Reply = (PFILE_REPLY_RECV)Irp->AssociatedIrp.SystemBuffer;
Request = (PFILE_REQUEST_RECVFROM)Irp->AssociatedIrp.SystemBuffer;
Reply = (PFILE_REPLY_RECVFROM)Irp->AssociatedIrp.SystemBuffer;
Status = AfdpDispRecv(
Irp,
FCB,
(PFILE_REQUEST_RECVFROM)Request,
(PFILE_REPLY_RECVFROM)Reply,
Request,
Reply,
TRUE);
Reply->NumberOfBytesRecvd = NumberOfBytesRecvd;
Reply->Status = NO_ERROR;

View file

@ -45,13 +45,9 @@ NTSTATUS AfdEventReceive(
OUT PIRP *IoRequestPacket)
{
PAFDFCB FCB = (PAFDFCB)TdiEventContext;
PAFD_READ_REQUEST ReadRequest;
PVOID ReceiveBuffer;
PAFD_BUFFER Buffer;
PLIST_ENTRY Entry;
NTSTATUS Status;
KIRQL OldIrql;
ULONG Count;
AFD_DbgPrint(MAX_TRACE, ("Called.\n"));
@ -77,38 +73,13 @@ NTSTATUS AfdEventReceive(
Buffer->Buffer.buf = ReceiveBuffer;
Buffer->Offset = 0;
ExInterlockedInsertTailList(
&FCB->ReceiveQueue,
&Buffer->ListEntry,
&FCB->ReceiveQueueLock);
KeAcquireSpinLock(&FCB->ReceiveQueueLock, &OldIrql);
KeAcquireSpinLock(&FCB->ReadRequestQueueLock, &OldIrql);
InsertTailList( &FCB->ReceiveQueue, &Buffer->ListEntry );
while (!IsListEmpty(&FCB->ReadRequestQueue) &&
!IsListEmpty(&FCB->ReceiveQueue)) {
AFD_DbgPrint(MAX_TRACE, ("Satisfying read request.\n"));
TryToSatisfyRecvRequest( FCB, TRUE );
Entry = RemoveHeadList(&FCB->ReceiveQueue);
ReadRequest = CONTAINING_RECORD(Entry, AFD_READ_REQUEST, ListEntry);
Status = FillWSABuffers(
FCB,
ReadRequest->RecvFromRequest->Buffers,
ReadRequest->RecvFromRequest->BufferCount,
&Count,
TRUE ); /* Continuous */
ReadRequest->RecvFromReply->NumberOfBytesRecvd = Count;
ReadRequest->RecvFromReply->Status = NO_ERROR;
ReadRequest->Irp->IoStatus.Information = 0;
ReadRequest->Irp->IoStatus.Status = Status;
AFD_DbgPrint(MAX_TRACE, ("Completing IRP at (0x%X).\n", ReadRequest->Irp));
IoCompleteRequest(ReadRequest->Irp, IO_NETWORK_INCREMENT);
}
KeReleaseSpinLock(&FCB->ReadRequestQueueLock, OldIrql);
KeReleaseSpinLock(&FCB->ReceiveQueueLock, OldIrql);
*BytesTaken = BytesAvailable;
@ -200,7 +171,7 @@ NTSTATUS AfdEventReceiveDatagramHandler(
&Buffer->ListEntry,
&FCB->ReceiveQueueLock);
KeAcquireSpinLock(&FCB->ReadRequestQueueLock, &OldIrql);
KeAcquireSpinLock(&FCB->ReceiveQueueLock, &OldIrql);
if (!IsListEmpty(&FCB->ReadRequestQueue)) {
AFD_DbgPrint(MAX_TRACE, ("Satisfying read request.\n"));
@ -225,7 +196,7 @@ NTSTATUS AfdEventReceiveDatagramHandler(
IoCompleteRequest(ReadRequest->Irp, IO_NETWORK_INCREMENT);
}
KeReleaseSpinLock(&FCB->ReadRequestQueueLock, OldIrql);
KeReleaseSpinLock(&FCB->ReceiveQueueLock, OldIrql);
*BytesTaken = BytesAvailable;

View file

@ -42,7 +42,6 @@ PAFDFCB AfdInitializeFCB(
KeInitializeSpinLock(&NewFCB->ReceiveQueueLock);
InitializeListHead(&NewFCB->ReadRequestQueue);
KeInitializeSpinLock(&NewFCB->ReadRequestQueueLock);
InitializeListHead(&NewFCB->ListenRequestQueue);

View file

@ -10,6 +10,19 @@
#include <afd.h>
#include <debug.h>
#ifndef DONT_USE_ME_THIS_WAY_IM_LIFTED_FROM_NTOSKRNL_XXX_DO_THIS_THE_RIGHT_WAY
LONG FASTCALL
XxInterlockedExchange(PLONG Target,
LONG Value);
__asm__("\n\t.global @XxInterlockedExchange@8\n\t"
"@XxInterlockedExchange@8:\n\t"
"xchgl %edx,(%ecx)\n\t"
"movl %edx,%eax\n\t"
"ret\n\t");
#define InterlockedExchange XxInterlockedExchange
#endif
VOID DumpName(
LPSOCKADDR Name)
@ -84,6 +97,49 @@ NTSTATUS MergeWSABuffers(
return STATUS_SUCCESS;
}
VOID TryToSatisfyRecvRequest( PAFDFCB FCB, BOOL Continuous ) {
PAFD_READ_REQUEST ReadRequest;
PLIST_ENTRY Entry;
NTSTATUS Status;
ULONG Count = 0;
AFD_DbgPrint(MAX_TRACE, ("Satisfying read request.\n"));
while (!IsListEmpty(&FCB->ReadRequestQueue) &&
!IsListEmpty(&FCB->ReceiveQueue)) {
AFD_DbgPrint(MAX_TRACE, ("Satisfying read request.\n"));
Entry = RemoveHeadList(&FCB->ReadRequestQueue);
ReadRequest = CONTAINING_RECORD(Entry, AFD_READ_REQUEST, ListEntry);
AFD_DbgPrint(MAX_TRACE,("ReadRequest: (li) %x %x %x\n",
ReadRequest->Irp,
ReadRequest->RecvFromRequest,
ReadRequest->RecvFromReply));
Status = FillWSABuffers(
FCB,
ReadRequest->RecvFromRequest->Buffers,
ReadRequest->RecvFromRequest->BufferCount,
&Count,
Continuous );
ReadRequest->RecvFromReply->NumberOfBytesRecvd = Count;
ReadRequest->RecvFromReply->Status = NO_ERROR;
ReadRequest->Irp->IoStatus.Information =
sizeof(*ReadRequest->RecvFromReply);
ReadRequest->Irp->IoStatus.Status = Status;
AFD_DbgPrint(MAX_TRACE, ("Completing IRP at (0x%X).\n", ReadRequest->Irp));
IoSetCancelRoutine(ReadRequest->Irp, NULL);
IoCompleteRequest(ReadRequest->Irp, IO_NETWORK_INCREMENT);
}
AFD_DbgPrint(MAX_TRACE, ("Bytes received (0x%X).\n", Count));
}
/*
* NOTES: ReceiveQueueLock must be acquired for the FCB when called
*/
@ -98,6 +154,7 @@ NTSTATUS FillWSABuffers(
UINT DstSize, SrcSize;
UINT Count, Total;
PAFD_BUFFER SrcBuffer;
PMDL Mdl;
PLIST_ENTRY Entry;
*BytesCopied = 0;
@ -112,7 +169,9 @@ NTSTATUS FillWSABuffers(
SrcData = SrcBuffer->Buffer.buf + SrcBuffer->Offset;
SrcSize = SrcBuffer->Buffer.len - SrcBuffer->Offset;
DstData = Buffers->buf;
/* First buffer: map the pages so we can access them */
Mdl = (PMDL)Buffers->buf;
DstData = MmMapLockedPages( Mdl, KernelMode );
DstSize = Buffers->len;
/* Copy the data */
@ -157,20 +216,27 @@ NTSTATUS FillWSABuffers(
BufferCount--;
if (BufferCount < 1)
break;
/* And cleanup the pages. */
MmUnmapLockedPages( DstData, Mdl );
MmUnlockPages( Mdl );
IoFreeMdl( Mdl );
Buffers++;
DstData = Buffers->buf;
Mdl = (PMDL)Buffers->buf;
DstData = MmMapLockedPages( Mdl, KernelMode );
DstSize = Buffers->len;
}
}
if (SrcSize > 0) {
SrcBuffer->Offset += Total;
InsertHeadList(&FCB->ReceiveQueue, Entry);
} else if (SrcBuffer != NULL) {
ExFreePool(SrcBuffer->Buffer.buf);
ExFreePool(SrcBuffer);
}
SrcBuffer->Offset += Total;
*BytesCopied = Total;
return STATUS_SUCCESS;

View file

@ -71,7 +71,6 @@ typedef struct _AFDFCB {
LIST_ENTRY ReceiveQueue;
KSPIN_LOCK ReceiveQueueLock;
LIST_ENTRY ReadRequestQueue;
KSPIN_LOCK ReadRequestQueueLock;
LIST_ENTRY ListenRequestQueue;
/* For WSAEventSelect() */
WSANETWORKEVENTS NetworkEvents;
@ -310,6 +309,9 @@ NTSTATUS STDCALL AfdWrite(
VOID DumpName(
LPSOCKADDR Name);
/* Requires caller to hold the recv queue lock */
VOID TryToSatisfyRecvRequest( PAFDFCB FCB, BOOL Continuous );
ULONG WSABufferSize(
LPWSABUF Buffers,
DWORD BufferCount);

View file

@ -38,8 +38,8 @@ WSPRecv(
IN LPWSATHREADID lpThreadId,
OUT LPINT lpErrno)
{
PFILE_REQUEST_RECV Request;
FILE_REPLY_RECV Reply;
PFILE_REQUEST_RECVFROM Request;
FILE_REPLY_RECVFROM Reply;
IO_STATUS_BLOCK Iosb;
NTSTATUS Status;
DWORD Size;
@ -48,8 +48,8 @@ WSPRecv(
Size = dwBufferCount * sizeof(WSABUF);
Request = (PFILE_REQUEST_RECV)HeapAlloc(
GlobalHeap, 0, sizeof(FILE_REQUEST_RECV) + Size);
Request = (PFILE_REQUEST_RECVFROM)HeapAlloc(
GlobalHeap, 0, sizeof(FILE_REQUEST_RECVFROM) + Size);
if (!Request) {
AFD_DbgPrint(MIN_TRACE, ("Insufficient resources.\n"));
*lpErrno = WSAENOBUFS;
@ -71,9 +71,9 @@ WSPRecv(
&Iosb,
IOCTL_AFD_RECV,
Request,
sizeof(FILE_REQUEST_RECV) + Size,
sizeof(FILE_REQUEST_RECVFROM) + Size,
&Reply,
sizeof(FILE_REPLY_RECV));
sizeof(FILE_REPLY_RECVFROM));
HeapFree(GlobalHeap, 0, Request);
@ -90,7 +90,7 @@ WSPRecv(
}
AFD_DbgPrint(MAX_TRACE, ("Receive successful (0x%X).\n",
Reply.NumberOfBytesRecvd));
Reply.NumberOfBytesRecvd));
*lpNumberOfBytesRecvd = Reply.NumberOfBytesRecvd;
//*lpFlags = 0;
@ -215,40 +215,45 @@ WSPSend(
AFD_DbgPrint(MAX_TRACE, ("Called.\n"));
Size = dwBufferCount * sizeof(WSABUF);
CP
Request = (PFILE_REQUEST_SENDTO)HeapAlloc(
GlobalHeap, 0, sizeof(FILE_REQUEST_SEND) + Size);
GlobalHeap, 0, sizeof(FILE_REQUEST_SENDTO) + Size);
if (!Request) {
*lpErrno = WSAENOBUFS;
return SOCKET_ERROR;
}
CP
/* Put buffer pointers after request structure */
Request->Buffers = (LPWSABUF)(Request + 1);
Request->BufferCount = dwBufferCount;
Request->Flags = dwFlags;
CP
RtlCopyMemory(Request->Buffers, lpBuffers, Size);
CP
Status = NtDeviceIoControlFile(
(HANDLE)s,
NULL,
NULL,
NULL,
&Iosb,
IOCTL_AFD_SEND,
Request,
sizeof(FILE_REQUEST_SEND) + Size,
&Reply,
sizeof(FILE_REPLY_SEND));
HeapFree(GlobalHeap, 0, Request);
Status = NtDeviceIoControlFile
( (HANDLE)s,
NULL,
NULL,
NULL,
&Iosb,
IOCTL_AFD_SEND,
Request,
sizeof(FILE_REQUEST_SENDTO) + Size,
&Reply,
sizeof(FILE_REPLY_SENDTO));
CP
/* HeapFree(GlobalHeap, 0, Request); */
CP
if (Status == STATUS_PENDING) {
AFD_DbgPrint(MAX_TRACE, ("Waiting on transport.\n"));
/* FIXME: Wait only for blocking sockets */
Status = NtWaitForSingleObject((HANDLE)s, FALSE, NULL);
}
CP
if (!NT_SUCCESS(Status)) {
AFD_DbgPrint(MAX_TRACE, ("Status (0x%X).\n", Status));