mirror of
https://github.com/reactos/reactos.git
synced 2025-08-05 20:03:12 +00:00
attempt to fix handling of timed out sending of messages to be thread-safe
svn path=/trunk/; revision=9441
This commit is contained in:
parent
90b8b7f3d6
commit
1f14756983
2 changed files with 137 additions and 19 deletions
|
@ -23,9 +23,11 @@ typedef struct _USER_SENT_MESSAGE
|
||||||
MSG Msg;
|
MSG Msg;
|
||||||
PKEVENT CompletionEvent;
|
PKEVENT CompletionEvent;
|
||||||
LRESULT* Result;
|
LRESULT* Result;
|
||||||
struct _USER_MESSAGE_QUEUE* CompletionQueue;
|
struct _USER_MESSAGE_QUEUE* SenderQueue;
|
||||||
SENDASYNCPROC CompletionCallback;
|
SENDASYNCPROC CompletionCallback;
|
||||||
ULONG_PTR CompletionCallbackContext;
|
ULONG_PTR CompletionCallbackContext;
|
||||||
|
/* entry in the dispatching list of the sender's message queue */
|
||||||
|
LIST_ENTRY DispatchingListEntry;
|
||||||
} USER_SENT_MESSAGE, *PUSER_SENT_MESSAGE;
|
} USER_SENT_MESSAGE, *PUSER_SENT_MESSAGE;
|
||||||
|
|
||||||
typedef struct _USER_SENT_MESSAGE_NOTIFY
|
typedef struct _USER_SENT_MESSAGE_NOTIFY
|
||||||
|
@ -95,6 +97,8 @@ typedef struct _USER_MESSAGE_QUEUE
|
||||||
/* extra message information */
|
/* extra message information */
|
||||||
LPARAM ExtraInfo;
|
LPARAM ExtraInfo;
|
||||||
|
|
||||||
|
/* messages that are currently dispatched by other threads */
|
||||||
|
LIST_ENTRY DispatchingMessagesHead;
|
||||||
} USER_MESSAGE_QUEUE, *PUSER_MESSAGE_QUEUE;
|
} USER_MESSAGE_QUEUE, *PUSER_MESSAGE_QUEUE;
|
||||||
|
|
||||||
BOOL FASTCALL
|
BOOL FASTCALL
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
* along with this program; if not, write to the Free Software
|
* along with this program; if not, write to the Free Software
|
||||||
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
|
||||||
*/
|
*/
|
||||||
/* $Id: msgqueue.c,v 1.96 2004/05/15 16:17:59 gvg Exp $
|
/* $Id: msgqueue.c,v 1.97 2004/05/19 18:45:31 weiden Exp $
|
||||||
*
|
*
|
||||||
* COPYRIGHT: See COPYING in the top level directory
|
* COPYRIGHT: See COPYING in the top level directory
|
||||||
* PROJECT: ReactOS kernel
|
* PROJECT: ReactOS kernel
|
||||||
|
@ -698,9 +698,9 @@ MsqDispatchSentNotifyMessages(PUSER_MESSAGE_QUEUE MessageQueue)
|
||||||
PLIST_ENTRY ListEntry;
|
PLIST_ENTRY ListEntry;
|
||||||
PUSER_SENT_MESSAGE_NOTIFY Message;
|
PUSER_SENT_MESSAGE_NOTIFY Message;
|
||||||
|
|
||||||
|
IntLockMessageQueue(MessageQueue);
|
||||||
while (!IsListEmpty(&MessageQueue->SentMessagesListHead))
|
while (!IsListEmpty(&MessageQueue->SentMessagesListHead))
|
||||||
{
|
{
|
||||||
IntLockMessageQueue(MessageQueue);
|
|
||||||
ListEntry = RemoveHeadList(&MessageQueue->SentMessagesListHead);
|
ListEntry = RemoveHeadList(&MessageQueue->SentMessagesListHead);
|
||||||
Message = CONTAINING_RECORD(ListEntry, USER_SENT_MESSAGE_NOTIFY,
|
Message = CONTAINING_RECORD(ListEntry, USER_SENT_MESSAGE_NOTIFY,
|
||||||
ListEntry);
|
ListEntry);
|
||||||
|
@ -711,7 +711,10 @@ MsqDispatchSentNotifyMessages(PUSER_MESSAGE_QUEUE MessageQueue)
|
||||||
Message->Msg,
|
Message->Msg,
|
||||||
Message->CompletionCallbackContext,
|
Message->CompletionCallbackContext,
|
||||||
Message->Result);
|
Message->Result);
|
||||||
|
|
||||||
|
IntLockMessageQueue(MessageQueue);
|
||||||
}
|
}
|
||||||
|
IntUnLockMessageQueue(MessageQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
BOOLEAN FASTCALL
|
BOOLEAN FASTCALL
|
||||||
|
@ -744,6 +747,16 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
|
||||||
Message->Msg.wParam,
|
Message->Msg.wParam,
|
||||||
Message->Msg.lParam);
|
Message->Msg.lParam);
|
||||||
|
|
||||||
|
/* remove the message from the dispatching list */
|
||||||
|
IntLockMessageQueue(Message->SenderQueue);
|
||||||
|
if(Message->DispatchingListEntry.Flink != NULL)
|
||||||
|
{
|
||||||
|
/* only remove it from the dispatching list if not already removed by a timeout */
|
||||||
|
RemoveEntryList(&Message->DispatchingListEntry);
|
||||||
|
}
|
||||||
|
/* still keep the sender's message queue locked, so the sender can't exit the
|
||||||
|
MsqSendMessage() function (if timed out) */
|
||||||
|
|
||||||
/* Let the sender know the result. */
|
/* Let the sender know the result. */
|
||||||
if (Message->Result != NULL)
|
if (Message->Result != NULL)
|
||||||
{
|
{
|
||||||
|
@ -756,11 +769,18 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
|
||||||
KeSetEvent(Message->CompletionEvent, IO_NO_INCREMENT, FALSE);
|
KeSetEvent(Message->CompletionEvent, IO_NO_INCREMENT, FALSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* unlock the sender's message queue, the safe operation is done */
|
||||||
|
IntUnLockMessageQueue(Message->SenderQueue);
|
||||||
|
|
||||||
/* Notify the sender if they specified a callback. */
|
/* Notify the sender if they specified a callback. */
|
||||||
if (Message->CompletionCallback != NULL)
|
if (Message->CompletionCallback != NULL)
|
||||||
{
|
{
|
||||||
NotifyMessage = ExAllocatePoolWithTag(NonPagedPool,
|
if(!(NotifyMessage = ExAllocatePoolWithTag(NonPagedPool,
|
||||||
sizeof(USER_SENT_MESSAGE_NOTIFY), TAG_USRMSG);
|
sizeof(USER_SENT_MESSAGE_NOTIFY), TAG_USRMSG)))
|
||||||
|
{
|
||||||
|
DPRINT1("MsqDispatchOneSentMessage(): Not enough memory to create a callback notify message\n");
|
||||||
|
goto Notified;
|
||||||
|
}
|
||||||
NotifyMessage->CompletionCallback =
|
NotifyMessage->CompletionCallback =
|
||||||
Message->CompletionCallback;
|
Message->CompletionCallback;
|
||||||
NotifyMessage->CompletionCallbackContext =
|
NotifyMessage->CompletionCallbackContext =
|
||||||
|
@ -768,9 +788,11 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
|
||||||
NotifyMessage->Result = Result;
|
NotifyMessage->Result = Result;
|
||||||
NotifyMessage->hWnd = Message->Msg.hwnd;
|
NotifyMessage->hWnd = Message->Msg.hwnd;
|
||||||
NotifyMessage->Msg = Message->Msg.message;
|
NotifyMessage->Msg = Message->Msg.message;
|
||||||
MsqSendNotifyMessage(Message->CompletionQueue, NotifyMessage);
|
MsqSendNotifyMessage(Message->SenderQueue, NotifyMessage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Notified:
|
||||||
|
/* FIXME - decrease reference counter of sender's message queue here */
|
||||||
ExFreePool(Message);
|
ExFreePool(Message);
|
||||||
return(TRUE);
|
return(TRUE);
|
||||||
}
|
}
|
||||||
|
@ -799,31 +821,96 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue,
|
||||||
LARGE_INTEGER Timeout;
|
LARGE_INTEGER Timeout;
|
||||||
PLIST_ENTRY Entry;
|
PLIST_ENTRY Entry;
|
||||||
|
|
||||||
|
if(!(Message = ExAllocatePoolWithTag(PagedPool, sizeof(USER_SENT_MESSAGE), TAG_USRMSG)))
|
||||||
|
{
|
||||||
|
DPRINT1("MsqSendMessage(): Not enough memory to allocate a message");
|
||||||
|
return STATUS_INSUFFICIENT_RESOURCES;
|
||||||
|
}
|
||||||
|
|
||||||
KeInitializeEvent(&CompletionEvent, NotificationEvent, FALSE);
|
KeInitializeEvent(&CompletionEvent, NotificationEvent, FALSE);
|
||||||
|
|
||||||
Message = ExAllocatePoolWithTag(PagedPool, sizeof(USER_SENT_MESSAGE), TAG_USRMSG);
|
ThreadQueue = PsGetWin32Thread()->MessageQueue;
|
||||||
|
ASSERT(ThreadQueue != MessageQueue);
|
||||||
|
|
||||||
|
/* FIXME - increase reference counter of sender's message queue here */
|
||||||
|
|
||||||
|
Result = 0;
|
||||||
Message->Msg.hwnd = Wnd;
|
Message->Msg.hwnd = Wnd;
|
||||||
Message->Msg.message = Msg;
|
Message->Msg.message = Msg;
|
||||||
Message->Msg.wParam = wParam;
|
Message->Msg.wParam = wParam;
|
||||||
Message->Msg.lParam = lParam;
|
Message->Msg.lParam = lParam;
|
||||||
Message->CompletionEvent = &CompletionEvent;
|
Message->CompletionEvent = &CompletionEvent;
|
||||||
Message->Result = &Result;
|
Message->Result = &Result;
|
||||||
Message->CompletionQueue = NULL;
|
Message->SenderQueue = ThreadQueue;
|
||||||
Message->CompletionCallback = NULL;
|
Message->CompletionCallback = NULL;
|
||||||
|
|
||||||
|
/* add it to the list of pending messages */
|
||||||
|
IntLockMessageQueue(ThreadQueue);
|
||||||
|
InsertTailList(&MessageQueue->DispatchingMessagesHead, &Message->DispatchingListEntry);
|
||||||
|
IntUnLockMessageQueue(ThreadQueue);
|
||||||
|
|
||||||
|
/* queue it in the destination's message queue */
|
||||||
IntLockMessageQueue(MessageQueue);
|
IntLockMessageQueue(MessageQueue);
|
||||||
InsertTailList(&MessageQueue->SentMessagesListHead, &Message->ListEntry);
|
InsertTailList(&MessageQueue->SentMessagesListHead, &Message->ListEntry);
|
||||||
IntUnLockMessageQueue(MessageQueue);
|
IntUnLockMessageQueue(MessageQueue);
|
||||||
|
|
||||||
KeSetEvent(&MessageQueue->NewMessages, IO_NO_INCREMENT, FALSE);
|
KeSetEvent(&MessageQueue->NewMessages, IO_NO_INCREMENT, FALSE);
|
||||||
|
|
||||||
|
/* we can't access the Message anymore since it could have already been deleted! */
|
||||||
|
|
||||||
Timeout.QuadPart = uTimeout * -10000;
|
Timeout.QuadPart = uTimeout * -10000;
|
||||||
|
|
||||||
ThreadQueue = PsGetWin32Thread()->MessageQueue;
|
|
||||||
if(Block)
|
if(Block)
|
||||||
{
|
{
|
||||||
/* don't process messages sent to the thread */
|
/* don't process messages sent to the thread */
|
||||||
WaitStatus = KeWaitForSingleObject(&CompletionEvent, UserRequest, UserMode,
|
WaitStatus = KeWaitForSingleObject(&CompletionEvent, UserRequest, UserMode,
|
||||||
FALSE, (uTimeout ? &Timeout : NULL));
|
FALSE, (uTimeout ? &Timeout : NULL));
|
||||||
|
if(WaitStatus == STATUS_TIMEOUT)
|
||||||
|
{
|
||||||
|
/* look up if the message has already been dispatched, if so
|
||||||
|
make sure it can't pass a result and it must not set the completion event anymore */
|
||||||
|
IntLockMessageQueue(MessageQueue);
|
||||||
|
Entry = MessageQueue->SentMessagesListHead.Flink;
|
||||||
|
while (Entry != &MessageQueue->SentMessagesListHead)
|
||||||
|
{
|
||||||
|
if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry)
|
||||||
|
== Message)
|
||||||
|
{
|
||||||
|
/* we can access Message here, it's secure because the message queue is locked
|
||||||
|
and the message is still hasn't been dispatched */
|
||||||
|
Message->CompletionEvent = NULL;
|
||||||
|
Message->Result = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Entry = Entry->Flink;
|
||||||
|
}
|
||||||
|
IntUnLockMessageQueue(MessageQueue);
|
||||||
|
|
||||||
|
/* remove from the local dispatching list so the other thread knows,
|
||||||
|
it can't pass a result and it must not set the completion event anymore */
|
||||||
|
IntLockMessageQueue(ThreadQueue);
|
||||||
|
Entry = ThreadQueue->DispatchingMessagesHead.Flink;
|
||||||
|
while (Entry != &ThreadQueue->DispatchingMessagesHead)
|
||||||
|
{
|
||||||
|
if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, DispatchingListEntry)
|
||||||
|
== Message)
|
||||||
|
{
|
||||||
|
/* we can access Message here, it's secure because the sender's message is locked
|
||||||
|
and the message has definitely not yet been destroyed, otherwise it would
|
||||||
|
have been removed from this list by the dispatching routine right after
|
||||||
|
dispatching the message */
|
||||||
|
Message->CompletionEvent = NULL;
|
||||||
|
Message->Result = NULL;
|
||||||
|
RemoveEntryList(&Message->DispatchingListEntry);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Entry = Entry->Flink;
|
||||||
|
}
|
||||||
|
IntUnLockMessageQueue(ThreadQueue);
|
||||||
|
|
||||||
|
DPRINT("MsqSendMessage (blocked) timed out\n");
|
||||||
|
}
|
||||||
|
while (MsqDispatchOneSentMessage(ThreadQueue));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -837,6 +924,8 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue,
|
||||||
UserMode, FALSE, (uTimeout ? &Timeout : NULL), NULL);
|
UserMode, FALSE, (uTimeout ? &Timeout : NULL), NULL);
|
||||||
if(WaitStatus == STATUS_TIMEOUT)
|
if(WaitStatus == STATUS_TIMEOUT)
|
||||||
{
|
{
|
||||||
|
/* look up if the message has already been dispatched, if so
|
||||||
|
make sure it can't pass a result and it must not set the completion event anymore */
|
||||||
IntLockMessageQueue(MessageQueue);
|
IntLockMessageQueue(MessageQueue);
|
||||||
Entry = MessageQueue->SentMessagesListHead.Flink;
|
Entry = MessageQueue->SentMessagesListHead.Flink;
|
||||||
while (Entry != &MessageQueue->SentMessagesListHead)
|
while (Entry != &MessageQueue->SentMessagesListHead)
|
||||||
|
@ -844,19 +933,42 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue,
|
||||||
if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry)
|
if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry)
|
||||||
== Message)
|
== Message)
|
||||||
{
|
{
|
||||||
|
/* we can access Message here, it's secure because the message queue is locked
|
||||||
|
and the message is still hasn't been dispatched */
|
||||||
Message->CompletionEvent = NULL;
|
Message->CompletionEvent = NULL;
|
||||||
|
Message->Result = NULL;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Entry = Entry->Flink;
|
Entry = Entry->Flink;
|
||||||
}
|
}
|
||||||
IntUnLockMessageQueue(MessageQueue);
|
IntUnLockMessageQueue(MessageQueue);
|
||||||
|
|
||||||
|
/* remove from the local dispatching list so the other thread knows,
|
||||||
|
it can't pass a result and it must not set the completion event anymore */
|
||||||
|
IntLockMessageQueue(ThreadQueue);
|
||||||
|
Entry = ThreadQueue->DispatchingMessagesHead.Flink;
|
||||||
|
while (Entry != &ThreadQueue->DispatchingMessagesHead)
|
||||||
|
{
|
||||||
|
if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, DispatchingListEntry)
|
||||||
|
== Message)
|
||||||
|
{
|
||||||
|
/* we can access Message here, it's secure because the sender's message is locked
|
||||||
|
and the message has definitely not yet been destroyed, otherwise it would
|
||||||
|
have been removed from this list by the dispatching routine right after
|
||||||
|
dispatching the message */
|
||||||
|
Message->CompletionEvent = NULL;
|
||||||
|
Message->Result = NULL;
|
||||||
|
RemoveEntryList(&Message->DispatchingListEntry);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Entry = Entry->Flink;
|
||||||
|
}
|
||||||
|
IntUnLockMessageQueue(ThreadQueue);
|
||||||
|
|
||||||
DPRINT("MsqSendMessage timed out\n");
|
DPRINT("MsqSendMessage timed out\n");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
while (MsqDispatchOneSentMessage(ThreadQueue))
|
while (MsqDispatchOneSentMessage(ThreadQueue));
|
||||||
{
|
|
||||||
;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
while (NT_SUCCESS(WaitStatus) && STATUS_WAIT_0 != WaitStatus);
|
while (NT_SUCCESS(WaitStatus) && STATUS_WAIT_0 != WaitStatus);
|
||||||
}
|
}
|
||||||
|
@ -972,6 +1084,7 @@ MsqInitializeMessageQueue(struct _ETHREAD *Thread, PUSER_MESSAGE_QUEUE MessageQu
|
||||||
InitializeListHead(&MessageQueue->PostedMessagesListHead);
|
InitializeListHead(&MessageQueue->PostedMessagesListHead);
|
||||||
InitializeListHead(&MessageQueue->SentMessagesListHead);
|
InitializeListHead(&MessageQueue->SentMessagesListHead);
|
||||||
InitializeListHead(&MessageQueue->HardwareMessagesListHead);
|
InitializeListHead(&MessageQueue->HardwareMessagesListHead);
|
||||||
|
InitializeListHead(&MessageQueue->DispatchingMessagesHead);
|
||||||
KeInitializeMutex(&MessageQueue->HardwareLock, 0);
|
KeInitializeMutex(&MessageQueue->HardwareLock, 0);
|
||||||
ExInitializeFastMutex(&MessageQueue->Lock);
|
ExInitializeFastMutex(&MessageQueue->Lock);
|
||||||
MessageQueue->QuitPosted = FALSE;
|
MessageQueue->QuitPosted = FALSE;
|
||||||
|
@ -1008,12 +1121,13 @@ MsqCreateMessageQueue(struct _ETHREAD *Thread)
|
||||||
MessageQueue = (PUSER_MESSAGE_QUEUE)ExAllocatePoolWithTag(PagedPool,
|
MessageQueue = (PUSER_MESSAGE_QUEUE)ExAllocatePoolWithTag(PagedPool,
|
||||||
sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO),
|
sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO),
|
||||||
TAG_MSGQ);
|
TAG_MSGQ);
|
||||||
RtlZeroMemory(MessageQueue, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO));
|
|
||||||
if (!MessageQueue)
|
if (!MessageQueue)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RtlZeroMemory(MessageQueue, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO));
|
||||||
MsqInitializeMessageQueue(Thread, MessageQueue);
|
MsqInitializeMessageQueue(Thread, MessageQueue);
|
||||||
|
|
||||||
return MessageQueue;
|
return MessageQueue;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue