Fixed LPC port implementation to use an internal semaphore, rather than an

event.  When using the event, if multiple messages were queued to the port
at once, the client would only get the first one, and then block rather than
read the next message.

I think that the port object should have a DISPATCHER_HEADER to make it a
full blown dispatcher object, allowing clients to wait on the port object.

svn path=/trunk/; revision=2003
This commit is contained in:
Phillip Susi 2001-06-23 19:13:33 +00:00
parent 1dfc10a465
commit 71f7e0cad5
7 changed files with 79 additions and 79 deletions

View file

@ -6,7 +6,7 @@
typedef struct _EPORT
{
KSPIN_LOCK Lock;
KEVENT Event;
KSEMAPHORE Semaphore;
ULONG State;

View file

@ -1,4 +1,4 @@
/* $Id: close.c,v 1.4 2001/03/13 16:25:54 dwelch Exp $
/* $Id: close.c,v 1.5 2001/06/23 19:13:33 phreak Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -47,17 +47,16 @@ NiClosePort (PVOID ObjectBody, ULONG HandleCount)
{
Message.MessageSize = sizeof(LPC_MESSAGE);
Message.DataSize = 0;
EiReplyOrRequestPort (Port->OtherPort,
&Message,
LPC_PORT_CLOSED,
Port);
KeSetEvent (&Port->OtherPort->Event,
IO_NO_INCREMENT,
FALSE);
Port->OtherPort->OtherPort = NULL;
Port->OtherPort->State = EPORT_DISCONNECTED;
KeReleaseSemaphore( &Port->OtherPort->Semaphore,
IO_NO_INCREMENT,
1,
FALSE );
ObDereferenceObject (Port);
}

View file

@ -1,4 +1,4 @@
/* $Id: complete.c,v 1.3 2001/01/18 15:00:08 dwelch Exp $
/* $Id: complete.c,v 1.4 2001/06/23 19:13:33 phreak Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -45,10 +45,10 @@ NtCompleteConnectPort (HANDLE PortHandle)
return (Status);
}
KeSetEvent (&OurPort->OtherPort->Event, IO_NO_INCREMENT, FALSE);
OurPort->State = EPORT_CONNECTED_SERVER;
KeReleaseSemaphore( &OurPort->OtherPort->Semaphore, IO_NO_INCREMENT, 1, FALSE );
ObDereferenceObject (OurPort);
return (STATUS_SUCCESS);

View file

@ -1,4 +1,4 @@
/* $Id: connect.c,v 1.6 2001/06/16 14:08:57 ekohl Exp $
/* $Id: connect.c,v 1.7 2001/06/23 19:13:33 phreak Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -130,14 +130,14 @@ NtConnectPort (PHANDLE ConnectedPort,
Request,
LPC_CONNECTION_REQUEST,
OurPort);
KeSetEvent (&NamedPort->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSemaphore( &NamedPort->Semaphore, IO_NO_INCREMENT, 1, FALSE );
DPRINT("Waiting for connection completion\n");
/*
* Wait for them to accept our connection
*/
KeWaitForSingleObject (&OurPort->Event,
KeWaitForSingleObject (&OurPort->Semaphore,
UserRequest,
UserMode,
FALSE,
@ -242,8 +242,9 @@ NtAcceptConnectPort (PHANDLE ServerPortHandle,
LpcMessage,
LPC_CONNECTION_REFUSED,
NamedPort);
KeSetEvent (&ConnectionRequest->Sender->Event,
KeReleaseSemaphore( &ConnectionRequest->Sender->Semaphore,
IO_NO_INCREMENT,
1,
FALSE);
ObDereferenceObject (ConnectionRequest->Sender);
ExFreePool (ConnectionRequest);

View file

@ -1,4 +1,4 @@
/* $Id: port.c,v 1.5 2001/03/07 16:48:43 dwelch Exp $
/* $Id: port.c,v 1.6 2001/06/23 19:13:33 phreak Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -93,7 +93,7 @@ NiInitializePort (
{
memset (Port, 0, sizeof(EPORT));
KeInitializeSpinLock (& Port->Lock);
KeInitializeEvent (& Port->Event, SynchronizationEvent, FALSE);
KeInitializeSemaphore( &Port->Semaphore, 0, LONG_MAX );
Port->OtherPort = NULL;
Port->QueueLength = 0;
Port->ConnectQueueLength = 0;

View file

@ -1,4 +1,4 @@
/* $Id: reply.c,v 1.6 2001/03/07 16:48:43 dwelch Exp $
/* $Id: reply.c,v 1.7 2001/06/23 19:13:33 phreak Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -111,7 +111,7 @@ NtReplyPort (IN HANDLE PortHandle,
LpcReply,
LPC_REPLY,
Port);
KeSetEvent(&Port->OtherPort->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSemaphore( &Port->OtherPort->Semaphore, IO_NO_INCREMENT, 1, FALSE );
ObDereferenceObject(Port);
@ -150,6 +150,8 @@ NtReplyWaitReceivePortEx(IN HANDLE PortHandle,
PEPORT Port;
KIRQL oldIrql;
PQUEUEDMESSAGE Request;
BOOLEAN Disconnected;
LARGE_INTEGER to;
DPRINT("NtReplyWaitReceivePortEx(PortHandle %x, LpcReply %x, "
"LpcMessage %x)\n", PortHandle, LpcReply, LpcMessage);
@ -165,26 +167,27 @@ NtReplyWaitReceivePortEx(IN HANDLE PortHandle,
DPRINT1("NtReplyWaitReceivePortEx() = %x\n", Status);
return(Status);
}
if (Port->State != EPORT_CONNECTED_CLIENT &&
Port->State != EPORT_CONNECTED_SERVER &&
LpcReply != NULL)
if( Port->State == EPORT_DISCONNECTED )
{
DPRINT1("NtReplyWaitReceivePortEx() = %x (State was %x)\n",
STATUS_PORT_DISCONNECTED, Port->State);
return(STATUS_PORT_DISCONNECTED);
// if the port is disconnected, force the timeout to be 0
// so we don't wait for new messages, because there won't be
// any, only try to remove any existing messages
Disconnected = TRUE;
to.QuadPart = 0;
Timeout = &to;
}
else Disconnected = FALSE;
/*
* Send the reply
* Send the reply, only if port is connected
*/
if (LpcReply != NULL)
if (LpcReply != NULL && !Disconnected)
{
Status = EiReplyOrRequestPort(Port->OtherPort,
LpcReply,
LPC_REPLY,
Port);
KeSetEvent(&Port->OtherPort->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSemaphore( &Port->OtherPort->Semaphore, IO_NO_INCREMENT, 1, FALSE);
if (!NT_SUCCESS(Status))
{
@ -197,16 +200,23 @@ NtReplyWaitReceivePortEx(IN HANDLE PortHandle,
/*
* Want for a message to be received
*/
do
{
Status = KeWaitForSingleObject(&Port->Event,
Status = KeWaitForSingleObject(&Port->Semaphore,
UserRequest,
UserMode,
FALSE,
NULL);
Timeout);
if( Status == STATUS_TIMEOUT )
{
// if the port is disconnected, and there are no remaining messages,
// return STATUS_PORT_DISCONNECTED
ObDereferenceObject( Port );
return Disconnected ? STATUS_PORT_DISCONNECTED : STATUS_TIMEOUT;
}
if (!NT_SUCCESS(Status))
{
DPRINT1("NtReplyWaitReceivePortEx() = %x\n", Status);
ObDereferenceObject( Port );
return(Status);
}
@ -216,17 +226,7 @@ NtReplyWaitReceivePortEx(IN HANDLE PortHandle,
KeAcquireSpinLock(&Port->Lock, &oldIrql);
Request = EiDequeueMessagePort(Port);
/*
* There is a race between the event being set and the port lock being
* taken in which another thread may dequeue the same request so
* we may need to loop.
*/
if (Request == NULL)
{
KeReleaseSpinLock(&Port->Lock, oldIrql);
}
} while(Request == NULL);
assert( Request );
memcpy(LpcMessage, &Request->Message, Request->Message.MessageSize);
if (Request->Message.MessageType == LPC_CONNECTION_REQUEST)
{

View file

@ -1,4 +1,4 @@
/* $Id: send.c,v 1.4 2001/03/18 19:35:13 dwelch Exp $
/* $Id: send.c,v 1.5 2001/06/23 19:13:33 phreak Exp $
*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel
@ -75,12 +75,12 @@ LpcSendDebugMessagePort (IN PEPORT Port,
ObDereferenceObject(Port);
return(Status);
}
KeSetEvent(&Port->OtherPort->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSemaphore( &Port->OtherPort->Semaphore, IO_NO_INCREMENT, 1, FALSE );
/*
* Wait for a reply
*/
KeWaitForSingleObject(&Port->Event,
KeWaitForSingleObject(&Port->Semaphore,
UserRequest,
UserMode,
FALSE,
@ -122,7 +122,7 @@ NTSTATUS STDCALL LpcRequestPort (IN PEPORT Port,
LpcMessage,
LPC_DATAGRAM,
Port);
KeSetEvent(&Port->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSemaphore( &Port->Semaphore, IO_NO_INCREMENT, 1, FALSE );
return(Status);
}
@ -216,12 +216,12 @@ NtRequestWaitReplyPort (IN HANDLE PortHandle,
ObDereferenceObject(Port);
return(Status);
}
KeSetEvent(&Port->OtherPort->Event, IO_NO_INCREMENT, FALSE);
KeReleaseSemaphore( &Port->OtherPort->Semaphore, IO_NO_INCREMENT, 1, FALSE);
/*
* Wait for a reply
*/
KeWaitForSingleObject(&Port->Event,
KeWaitForSingleObject(&Port->Semaphore,
UserRequest,
UserMode,
FALSE,