From 1f14756983c1a601a235e0c372459658e2b79631 Mon Sep 17 00:00:00 2001 From: Thomas Bluemel Date: Wed, 19 May 2004 18:45:31 +0000 Subject: [PATCH] attempt to fix handling of timed out sending of messages to be thread-safe svn path=/trunk/; revision=9441 --- reactos/subsys/win32k/include/msgqueue.h | 6 +- reactos/subsys/win32k/ntuser/msgqueue.c | 150 ++++++++++++++++++++--- 2 files changed, 137 insertions(+), 19 deletions(-) diff --git a/reactos/subsys/win32k/include/msgqueue.h b/reactos/subsys/win32k/include/msgqueue.h index 7c0bbc833bb..ce25369b9da 100644 --- a/reactos/subsys/win32k/include/msgqueue.h +++ b/reactos/subsys/win32k/include/msgqueue.h @@ -23,9 +23,11 @@ typedef struct _USER_SENT_MESSAGE MSG Msg; PKEVENT CompletionEvent; LRESULT* Result; - struct _USER_MESSAGE_QUEUE* CompletionQueue; + struct _USER_MESSAGE_QUEUE* SenderQueue; SENDASYNCPROC CompletionCallback; ULONG_PTR CompletionCallbackContext; + /* entry in the dispatching list of the sender's message queue */ + LIST_ENTRY DispatchingListEntry; } USER_SENT_MESSAGE, *PUSER_SENT_MESSAGE; typedef struct _USER_SENT_MESSAGE_NOTIFY @@ -95,6 +97,8 @@ typedef struct _USER_MESSAGE_QUEUE /* extra message information */ LPARAM ExtraInfo; + /* messages that are currently dispatched by other threads */ + LIST_ENTRY DispatchingMessagesHead; } USER_MESSAGE_QUEUE, *PUSER_MESSAGE_QUEUE; BOOL FASTCALL diff --git a/reactos/subsys/win32k/ntuser/msgqueue.c b/reactos/subsys/win32k/ntuser/msgqueue.c index 6a9c218faed..c41edefc483 100644 --- a/reactos/subsys/win32k/ntuser/msgqueue.c +++ b/reactos/subsys/win32k/ntuser/msgqueue.c @@ -16,7 +16,7 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -/* $Id: msgqueue.c,v 1.96 2004/05/15 16:17:59 gvg Exp $ +/* $Id: msgqueue.c,v 1.97 2004/05/19 18:45:31 weiden Exp $ * * COPYRIGHT: See COPYING in the top level directory * PROJECT: ReactOS kernel @@ -698,9 +698,9 @@ MsqDispatchSentNotifyMessages(PUSER_MESSAGE_QUEUE MessageQueue) PLIST_ENTRY ListEntry; PUSER_SENT_MESSAGE_NOTIFY Message; + IntLockMessageQueue(MessageQueue); while (!IsListEmpty(&MessageQueue->SentMessagesListHead)) { - IntLockMessageQueue(MessageQueue); ListEntry = RemoveHeadList(&MessageQueue->SentMessagesListHead); Message = CONTAINING_RECORD(ListEntry, USER_SENT_MESSAGE_NOTIFY, ListEntry); @@ -711,7 +711,10 @@ MsqDispatchSentNotifyMessages(PUSER_MESSAGE_QUEUE MessageQueue) Message->Msg, Message->CompletionCallbackContext, Message->Result); + + IntLockMessageQueue(MessageQueue); } + IntUnLockMessageQueue(MessageQueue); } BOOLEAN FASTCALL @@ -744,6 +747,16 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue) Message->Msg.wParam, Message->Msg.lParam); + /* remove the message from the dispatching list */ + IntLockMessageQueue(Message->SenderQueue); + if(Message->DispatchingListEntry.Flink != NULL) + { + /* only remove it from the dispatching list if not already removed by a timeout */ + RemoveEntryList(&Message->DispatchingListEntry); + } + /* still keep the sender's message queue locked, so the sender can't exit the + MsqSendMessage() function (if timed out) */ + /* Let the sender know the result. */ if (Message->Result != NULL) { @@ -756,11 +769,18 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue) KeSetEvent(Message->CompletionEvent, IO_NO_INCREMENT, FALSE); } + /* unlock the sender's message queue, the safe operation is done */ + IntUnLockMessageQueue(Message->SenderQueue); + /* Notify the sender if they specified a callback. */ if (Message->CompletionCallback != NULL) { - NotifyMessage = ExAllocatePoolWithTag(NonPagedPool, - sizeof(USER_SENT_MESSAGE_NOTIFY), TAG_USRMSG); + if(!(NotifyMessage = ExAllocatePoolWithTag(NonPagedPool, + sizeof(USER_SENT_MESSAGE_NOTIFY), TAG_USRMSG))) + { + DPRINT1("MsqDispatchOneSentMessage(): Not enough memory to create a callback notify message\n"); + goto Notified; + } NotifyMessage->CompletionCallback = Message->CompletionCallback; NotifyMessage->CompletionCallbackContext = @@ -768,9 +788,11 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue) NotifyMessage->Result = Result; NotifyMessage->hWnd = Message->Msg.hwnd; NotifyMessage->Msg = Message->Msg.message; - MsqSendNotifyMessage(Message->CompletionQueue, NotifyMessage); + MsqSendNotifyMessage(Message->SenderQueue, NotifyMessage); } +Notified: + /* FIXME - decrease reference counter of sender's message queue here */ ExFreePool(Message); return(TRUE); } @@ -799,31 +821,96 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue, LARGE_INTEGER Timeout; PLIST_ENTRY Entry; + if(!(Message = ExAllocatePoolWithTag(PagedPool, sizeof(USER_SENT_MESSAGE), TAG_USRMSG))) + { + DPRINT1("MsqSendMessage(): Not enough memory to allocate a message"); + return STATUS_INSUFFICIENT_RESOURCES; + } + KeInitializeEvent(&CompletionEvent, NotificationEvent, FALSE); - - Message = ExAllocatePoolWithTag(PagedPool, sizeof(USER_SENT_MESSAGE), TAG_USRMSG); + + ThreadQueue = PsGetWin32Thread()->MessageQueue; + ASSERT(ThreadQueue != MessageQueue); + + /* FIXME - increase reference counter of sender's message queue here */ + + Result = 0; Message->Msg.hwnd = Wnd; Message->Msg.message = Msg; Message->Msg.wParam = wParam; Message->Msg.lParam = lParam; Message->CompletionEvent = &CompletionEvent; Message->Result = &Result; - Message->CompletionQueue = NULL; + Message->SenderQueue = ThreadQueue; Message->CompletionCallback = NULL; - + + /* add it to the list of pending messages */ + IntLockMessageQueue(ThreadQueue); + InsertTailList(&MessageQueue->DispatchingMessagesHead, &Message->DispatchingListEntry); + IntUnLockMessageQueue(ThreadQueue); + + /* queue it in the destination's message queue */ IntLockMessageQueue(MessageQueue); InsertTailList(&MessageQueue->SentMessagesListHead, &Message->ListEntry); IntUnLockMessageQueue(MessageQueue); + KeSetEvent(&MessageQueue->NewMessages, IO_NO_INCREMENT, FALSE); + /* we can't access the Message anymore since it could have already been deleted! */ + Timeout.QuadPart = uTimeout * -10000; - ThreadQueue = PsGetWin32Thread()->MessageQueue; if(Block) { /* don't process messages sent to the thread */ WaitStatus = KeWaitForSingleObject(&CompletionEvent, UserRequest, UserMode, FALSE, (uTimeout ? &Timeout : NULL)); + if(WaitStatus == STATUS_TIMEOUT) + { + /* look up if the message has already been dispatched, if so + make sure it can't pass a result and it must not set the completion event anymore */ + IntLockMessageQueue(MessageQueue); + Entry = MessageQueue->SentMessagesListHead.Flink; + while (Entry != &MessageQueue->SentMessagesListHead) + { + if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry) + == Message) + { + /* we can access Message here, it's secure because the message queue is locked + and the message is still hasn't been dispatched */ + Message->CompletionEvent = NULL; + Message->Result = NULL; + break; + } + Entry = Entry->Flink; + } + IntUnLockMessageQueue(MessageQueue); + + /* remove from the local dispatching list so the other thread knows, + it can't pass a result and it must not set the completion event anymore */ + IntLockMessageQueue(ThreadQueue); + Entry = ThreadQueue->DispatchingMessagesHead.Flink; + while (Entry != &ThreadQueue->DispatchingMessagesHead) + { + if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, DispatchingListEntry) + == Message) + { + /* we can access Message here, it's secure because the sender's message is locked + and the message has definitely not yet been destroyed, otherwise it would + have been removed from this list by the dispatching routine right after + dispatching the message */ + Message->CompletionEvent = NULL; + Message->Result = NULL; + RemoveEntryList(&Message->DispatchingListEntry); + break; + } + Entry = Entry->Flink; + } + IntUnLockMessageQueue(ThreadQueue); + + DPRINT("MsqSendMessage (blocked) timed out\n"); + } + while (MsqDispatchOneSentMessage(ThreadQueue)); } else { @@ -837,26 +924,51 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue, UserMode, FALSE, (uTimeout ? &Timeout : NULL), NULL); if(WaitStatus == STATUS_TIMEOUT) { - IntLockMessageQueue(MessageQueue); + /* look up if the message has already been dispatched, if so + make sure it can't pass a result and it must not set the completion event anymore */ + IntLockMessageQueue(MessageQueue); Entry = MessageQueue->SentMessagesListHead.Flink; while (Entry != &MessageQueue->SentMessagesListHead) { if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry) == Message) { - Message->CompletionEvent = NULL; + /* we can access Message here, it's secure because the message queue is locked + and the message is still hasn't been dispatched */ + Message->CompletionEvent = NULL; + Message->Result = NULL; break; } Entry = Entry->Flink; } IntUnLockMessageQueue(MessageQueue); - DPRINT("MsqSendMessage timed out\n"); + + /* remove from the local dispatching list so the other thread knows, + it can't pass a result and it must not set the completion event anymore */ + IntLockMessageQueue(ThreadQueue); + Entry = ThreadQueue->DispatchingMessagesHead.Flink; + while (Entry != &ThreadQueue->DispatchingMessagesHead) + { + if ((PUSER_SENT_MESSAGE) CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, DispatchingListEntry) + == Message) + { + /* we can access Message here, it's secure because the sender's message is locked + and the message has definitely not yet been destroyed, otherwise it would + have been removed from this list by the dispatching routine right after + dispatching the message */ + Message->CompletionEvent = NULL; + Message->Result = NULL; + RemoveEntryList(&Message->DispatchingListEntry); + break; + } + Entry = Entry->Flink; + } + IntUnLockMessageQueue(ThreadQueue); + + DPRINT("MsqSendMessage timed out\n"); break; } - while (MsqDispatchOneSentMessage(ThreadQueue)) - { - ; - } + while (MsqDispatchOneSentMessage(ThreadQueue)); } while (NT_SUCCESS(WaitStatus) && STATUS_WAIT_0 != WaitStatus); } @@ -972,6 +1084,7 @@ MsqInitializeMessageQueue(struct _ETHREAD *Thread, PUSER_MESSAGE_QUEUE MessageQu InitializeListHead(&MessageQueue->PostedMessagesListHead); InitializeListHead(&MessageQueue->SentMessagesListHead); InitializeListHead(&MessageQueue->HardwareMessagesListHead); + InitializeListHead(&MessageQueue->DispatchingMessagesHead); KeInitializeMutex(&MessageQueue->HardwareLock, 0); ExInitializeFastMutex(&MessageQueue->Lock); MessageQueue->QuitPosted = FALSE; @@ -1008,12 +1121,13 @@ MsqCreateMessageQueue(struct _ETHREAD *Thread) MessageQueue = (PUSER_MESSAGE_QUEUE)ExAllocatePoolWithTag(PagedPool, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO), TAG_MSGQ); - RtlZeroMemory(MessageQueue, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO)); + if (!MessageQueue) { return NULL; } + RtlZeroMemory(MessageQueue, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO)); MsqInitializeMessageQueue(Thread, MessageQueue); return MessageQueue;