1. added basic reference counting for message queues

2. fixed memory leak when a thread terminates while dispatching a message
3. wake the sender's thread in case of 2.

svn path=/trunk/; revision=9470
This commit is contained in:
Thomas Bluemel 2004-05-22 16:48:50 +00:00
parent 4307e3115e
commit c6bdf3f977
3 changed files with 129 additions and 23 deletions

View file

@ -42,6 +42,9 @@ typedef struct _USER_SENT_MESSAGE_NOTIFY
typedef struct _USER_MESSAGE_QUEUE typedef struct _USER_MESSAGE_QUEUE
{ {
/* Reference counter, only access this variable with interlocked functions! */
LONG References;
/* Owner of the message queue */ /* Owner of the message queue */
struct _ETHREAD *Thread; struct _ETHREAD *Thread;
/* Queue of messages sent to the queue. */ /* Queue of messages sent to the queue. */
@ -99,6 +102,8 @@ typedef struct _USER_MESSAGE_QUEUE
/* messages that are currently dispatched by other threads */ /* messages that are currently dispatched by other threads */
LIST_ENTRY DispatchingMessagesHead; LIST_ENTRY DispatchingMessagesHead;
/* messages that are currently dispatched by this message queue, required for cleanup */
LIST_ENTRY LocalDispatchingMessagesHead;
} USER_MESSAGE_QUEUE, *PUSER_MESSAGE_QUEUE; } USER_MESSAGE_QUEUE, *PUSER_MESSAGE_QUEUE;
BOOL FASTCALL BOOL FASTCALL
@ -127,7 +132,7 @@ MsqFindMessage(IN PUSER_MESSAGE_QUEUE MessageQueue,
VOID FASTCALL VOID FASTCALL
MsqInitializeMessageQueue(struct _ETHREAD *Thread, PUSER_MESSAGE_QUEUE MessageQueue); MsqInitializeMessageQueue(struct _ETHREAD *Thread, PUSER_MESSAGE_QUEUE MessageQueue);
VOID FASTCALL VOID FASTCALL
MsqFreeMessageQueue(PUSER_MESSAGE_QUEUE MessageQueue); MsqCleanupMessageQueue(PUSER_MESSAGE_QUEUE MessageQueue);
PUSER_MESSAGE_QUEUE FASTCALL PUSER_MESSAGE_QUEUE FASTCALL
MsqCreateMessageQueue(struct _ETHREAD *Thread); MsqCreateMessageQueue(struct _ETHREAD *Thread);
VOID FASTCALL VOID FASTCALL
@ -197,20 +202,32 @@ LPARAM FASTCALL MsqSetMessageExtraInfo(LPARAM lParam);
LPARAM FASTCALL MsqGetMessageExtraInfo(VOID); LPARAM FASTCALL MsqGetMessageExtraInfo(VOID);
#define IntLockMessageQueue(MsgQueue) \ #define IntLockMessageQueue(MsgQueue) \
ExAcquireFastMutex(&MsgQueue->Lock) ExAcquireFastMutex(&(MsgQueue)->Lock)
#define IntUnLockMessageQueue(MsgQueue) \ #define IntUnLockMessageQueue(MsgQueue) \
ExReleaseFastMutex(&MsgQueue->Lock) ExReleaseFastMutex(&(MsgQueue)->Lock)
#define IntLockHardwareMessageQueue(MsgQueue) \ #define IntLockHardwareMessageQueue(MsgQueue) \
KeWaitForMutexObject(&MsgQueue->HardwareLock, UserRequest, KernelMode, FALSE, NULL) KeWaitForMutexObject(&(MsgQueue)->HardwareLock, UserRequest, KernelMode, FALSE, NULL)
#define IntUnLockHardwareMessageQueue(MsgQueue) \ #define IntUnLockHardwareMessageQueue(MsgQueue) \
KeReleaseMutex(&MsgQueue->HardwareLock, FALSE) KeReleaseMutex(&(MsgQueue)->HardwareLock, FALSE)
#define IntReferenceMessageQueue(MsgQueue) \
InterlockedIncrement(&(MsgQueue)->References)
#define IntDereferenceMessageQueue(MsgQueue) \
do { \
if(InterlockedDecrement(&(MsgQueue)->References) == 0) \
{ \
DPRINT("Free message queue 0x%x\n", (MsgQueue)); \
ExFreePool((MsgQueue)); \
} \
} while(0)
/* check the queue status */ /* check the queue status */
#define MsqIsSignaled(MsgQueue) \ #define MsqIsSignaled(MsgQueue) \
((MsgQueue->WakeBits & MsgQueue->WakeMask) || (MsgQueue->ChangedBits & MsgQueue->ChangedMask)) (((MsgQueue)->WakeBits & (MsgQueue)->WakeMask) || ((MsgQueue)->ChangedBits & (MsgQueue)->ChangedMask))
#define IS_BTN_MESSAGE(message,code) \ #define IS_BTN_MESSAGE(message,code) \
((message) == WM_LBUTTON##code || \ ((message) == WM_LBUTTON##code || \

View file

@ -16,7 +16,7 @@
* along with this program; if not, write to the Free Software * along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/ */
/* $Id: dllmain.c,v 1.74 2004/05/21 10:09:31 weiden Exp $ /* $Id: dllmain.c,v 1.75 2004/05/22 16:48:50 weiden Exp $
* *
* Entry Point for win32k.sys * Entry Point for win32k.sys
*/ */
@ -181,6 +181,7 @@ Win32kThreadCallback (struct _ETHREAD *Thread,
UnregisterThreadHotKeys(Thread); UnregisterThreadHotKeys(Thread);
DestroyThreadWindows(Thread); DestroyThreadWindows(Thread);
IntBlockInput(Win32Thread, FALSE); IntBlockInput(Win32Thread, FALSE);
MsqDestroyMessageQueue(Win32Thread->MessageQueue);
} }
return STATUS_SUCCESS; return STATUS_SUCCESS;

View file

@ -16,7 +16,7 @@
* along with this program; if not, write to the Free Software * along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/ */
/* $Id: msgqueue.c,v 1.98 2004/05/20 21:48:41 weiden Exp $ /* $Id: msgqueue.c,v 1.99 2004/05/22 16:48:50 weiden Exp $
* *
* COPYRIGHT: See COPYING in the top level directory * COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS kernel * PROJECT: ReactOS kernel
@ -420,8 +420,6 @@ MsqPeekHardwareMessage(PUSER_MESSAGE_QUEUE MessageQueue, HWND hWnd,
return FALSE; return FALSE;
} }
DesktopWindow = IntGetWindowObject(IntGetDesktopWindow());
WaitObjects[1] = &MessageQueue->NewMessages; WaitObjects[1] = &MessageQueue->NewMessages;
WaitObjects[0] = &HardwareMessageQueueLock; WaitObjects[0] = &HardwareMessageQueueLock;
do do
@ -435,6 +433,8 @@ MsqPeekHardwareMessage(PUSER_MESSAGE_QUEUE MessageQueue, HWND hWnd,
} }
while (NT_SUCCESS(WaitStatus) && STATUS_WAIT_0 != WaitStatus); while (NT_SUCCESS(WaitStatus) && STATUS_WAIT_0 != WaitStatus);
DesktopWindow = IntGetWindowObject(IntGetDesktopWindow());
/* Process messages in the message queue itself. */ /* Process messages in the message queue itself. */
IntLockHardwareMessageQueue(MessageQueue); IntLockHardwareMessageQueue(MessageQueue);
CurrentEntry = MessageQueue->HardwareMessagesListHead.Flink; CurrentEntry = MessageQueue->HardwareMessagesListHead.Flink;
@ -729,6 +729,7 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
PUSER_SENT_MESSAGE Message; PUSER_SENT_MESSAGE Message;
PLIST_ENTRY Entry; PLIST_ENTRY Entry;
LRESULT Result; LRESULT Result;
BOOL Freed;
PUSER_SENT_MESSAGE_NOTIFY NotifyMessage; PUSER_SENT_MESSAGE_NOTIFY NotifyMessage;
IntLockMessageQueue(MessageQueue); IntLockMessageQueue(MessageQueue);
@ -737,8 +738,16 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
IntUnLockMessageQueue(MessageQueue); IntUnLockMessageQueue(MessageQueue);
return(FALSE); return(FALSE);
} }
/* remove it from the list of pending messages */
Entry = RemoveHeadList(&MessageQueue->SentMessagesListHead); Entry = RemoveHeadList(&MessageQueue->SentMessagesListHead);
Message = CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry); Message = CONTAINING_RECORD(Entry, USER_SENT_MESSAGE, ListEntry);
/* insert it to the list of messages that are currently dispatched by this
message queue */
InsertTailList(&MessageQueue->LocalDispatchingMessagesHead,
&Message->ListEntry);
IntUnLockMessageQueue(MessageQueue); IntUnLockMessageQueue(MessageQueue);
/* Call the window procedure. */ /* Call the window procedure. */
@ -747,9 +756,17 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
Message->Msg.wParam, Message->Msg.wParam,
Message->Msg.lParam); Message->Msg.lParam);
/* remove the message from the local dispatching list, because it doesn't need
to be cleaned up on thread termination anymore */
IntLockMessageQueue(MessageQueue);
RemoveEntryList(&Message->ListEntry);
IntUnLockMessageQueue(MessageQueue);
/* remove the message from the dispatching list, so lock the sender's message queue */ /* remove the message from the dispatching list, so lock the sender's message queue */
IntLockMessageQueue(Message->SenderQueue); IntLockMessageQueue(Message->SenderQueue);
if(Message->DispatchingListEntry.Flink != NULL)
Freed = (Message->DispatchingListEntry.Flink == NULL);
if(!Freed)
{ {
/* only remove it from the dispatching list if not already removed by a timeout */ /* only remove it from the dispatching list if not already removed by a timeout */
RemoveEntryList(&Message->DispatchingListEntry); RemoveEntryList(&Message->DispatchingListEntry);
@ -773,7 +790,7 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
IntUnLockMessageQueue(Message->SenderQueue); IntUnLockMessageQueue(Message->SenderQueue);
/* Notify the sender if they specified a callback. */ /* Notify the sender if they specified a callback. */
if (Message->CompletionCallback != NULL) if (!Freed && Message->CompletionCallback != NULL)
{ {
if(!(NotifyMessage = ExAllocatePoolWithTag(NonPagedPool, if(!(NotifyMessage = ExAllocatePoolWithTag(NonPagedPool,
sizeof(USER_SENT_MESSAGE_NOTIFY), TAG_USRMSG))) sizeof(USER_SENT_MESSAGE_NOTIFY), TAG_USRMSG)))
@ -792,7 +809,13 @@ MsqDispatchOneSentMessage(PUSER_MESSAGE_QUEUE MessageQueue)
} }
Notified: Notified:
/* FIXME - decrease reference counter of sender's message queue here */ if(!Freed)
{
/* only dereference our message queue if the message has not been timed out */
IntDereferenceMessageQueue(MessageQueue);
}
/* only free the message if not freed already */
ExFreePool(Message); ExFreePool(Message);
return(TRUE); return(TRUE);
} }
@ -846,6 +869,8 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue,
Message->SenderQueue = ThreadQueue; Message->SenderQueue = ThreadQueue;
Message->CompletionCallback = NULL; Message->CompletionCallback = NULL;
IntReferenceMessageQueue(MessageQueue);
/* add it to the list of pending messages */ /* add it to the list of pending messages */
IntLockMessageQueue(ThreadQueue); IntLockMessageQueue(ThreadQueue);
InsertTailList(&ThreadQueue->DispatchingMessagesHead, &Message->DispatchingListEntry); InsertTailList(&ThreadQueue->DispatchingMessagesHead, &Message->DispatchingListEntry);
@ -902,6 +927,7 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue,
Message->CompletionEvent = NULL; Message->CompletionEvent = NULL;
Message->Result = NULL; Message->Result = NULL;
RemoveEntryList(&Message->DispatchingListEntry); RemoveEntryList(&Message->DispatchingListEntry);
IntDereferenceMessageQueue(MessageQueue);
break; break;
} }
Entry = Entry->Flink; Entry = Entry->Flink;
@ -959,6 +985,7 @@ MsqSendMessage(PUSER_MESSAGE_QUEUE MessageQueue,
Message->CompletionEvent = NULL; Message->CompletionEvent = NULL;
Message->Result = NULL; Message->Result = NULL;
RemoveEntryList(&Message->DispatchingListEntry); RemoveEntryList(&Message->DispatchingListEntry);
IntDereferenceMessageQueue(MessageQueue);
break; break;
} }
Entry = Entry->Flink; Entry = Entry->Flink;
@ -1085,6 +1112,7 @@ MsqInitializeMessageQueue(struct _ETHREAD *Thread, PUSER_MESSAGE_QUEUE MessageQu
InitializeListHead(&MessageQueue->SentMessagesListHead); InitializeListHead(&MessageQueue->SentMessagesListHead);
InitializeListHead(&MessageQueue->HardwareMessagesListHead); InitializeListHead(&MessageQueue->HardwareMessagesListHead);
InitializeListHead(&MessageQueue->DispatchingMessagesHead); InitializeListHead(&MessageQueue->DispatchingMessagesHead);
InitializeListHead(&MessageQueue->LocalDispatchingMessagesHead);
KeInitializeMutex(&MessageQueue->HardwareLock, 0); KeInitializeMutex(&MessageQueue->HardwareLock, 0);
ExInitializeFastMutex(&MessageQueue->Lock); ExInitializeFastMutex(&MessageQueue->Lock);
MessageQueue->QuitPosted = FALSE; MessageQueue->QuitPosted = FALSE;
@ -1098,19 +1126,75 @@ MsqInitializeMessageQueue(struct _ETHREAD *Thread, PUSER_MESSAGE_QUEUE MessageQu
} }
VOID FASTCALL VOID FASTCALL
MsqFreeMessageQueue(PUSER_MESSAGE_QUEUE MessageQueue) MsqCleanupMessageQueue(PUSER_MESSAGE_QUEUE MessageQueue)
{ {
PLIST_ENTRY CurrentEntry; PLIST_ENTRY CurrentEntry;
PUSER_MESSAGE CurrentMessage; PUSER_MESSAGE CurrentMessage;
PUSER_SENT_MESSAGE CurrentSentMessage;
CurrentEntry = MessageQueue->PostedMessagesListHead.Flink; IntLockMessageQueue(MessageQueue);
while (CurrentEntry != &MessageQueue->PostedMessagesListHead)
/* cleanup posted messages */
while (!IsListEmpty(&MessageQueue->PostedMessagesListHead))
{ {
CurrentEntry = RemoveHeadList(&MessageQueue->PostedMessagesListHead);
CurrentMessage = CONTAINING_RECORD(CurrentEntry, USER_MESSAGE, CurrentMessage = CONTAINING_RECORD(CurrentEntry, USER_MESSAGE,
ListEntry); ListEntry);
CurrentEntry = CurrentEntry->Flink;
MsqDestroyMessage(CurrentMessage); MsqDestroyMessage(CurrentMessage);
} }
/* remove the messages that have not yet been dispatched */
while (!IsListEmpty(&MessageQueue->SentMessagesListHead))
{
CurrentEntry = RemoveHeadList(&MessageQueue->SentMessagesListHead);
CurrentSentMessage = CONTAINING_RECORD(CurrentEntry, USER_SENT_MESSAGE,
ListEntry);
DPRINT("Notify the sender and remove a message from the queue that had not been dispatched\n");
/* remove the message from the dispatching list */
if(CurrentSentMessage->DispatchingListEntry.Flink != NULL)
{
RemoveEntryList(&CurrentSentMessage->DispatchingListEntry);
}
/* wake the sender's thread */
if (CurrentSentMessage->CompletionEvent != NULL)
{
KeSetEvent(CurrentSentMessage->CompletionEvent, IO_NO_INCREMENT, FALSE);
}
/* dereference our message queue */
IntDereferenceMessageQueue(MessageQueue);
/* free the message */
ExFreePool(CurrentSentMessage);
}
/* notify senders of dispatching messages. This needs to be cleaned up if e.g.
ExitThread() was called in a SendMessage() umode callback */
while (!IsListEmpty(&MessageQueue->LocalDispatchingMessagesHead))
{
CurrentEntry = RemoveHeadList(&MessageQueue->LocalDispatchingMessagesHead);
CurrentSentMessage = CONTAINING_RECORD(CurrentEntry, USER_SENT_MESSAGE,
ListEntry);
DPRINT("Notify the sender, the thread has been terminated while dispatching a message!\n");
/* wake the sender's thread */
if (CurrentSentMessage->CompletionEvent != NULL)
{
KeSetEvent(CurrentSentMessage->CompletionEvent, IO_NO_INCREMENT, FALSE);
}
/* dereference our message queue */
IntDereferenceMessageQueue(MessageQueue);
/* free the message */
ExFreePool(CurrentSentMessage);
}
DbgPrint("Done cleaning up message queue...\n");
IntUnLockMessageQueue(MessageQueue);
} }
PUSER_MESSAGE_QUEUE FASTCALL PUSER_MESSAGE_QUEUE FASTCALL
@ -1128,6 +1212,9 @@ MsqCreateMessageQueue(struct _ETHREAD *Thread)
} }
RtlZeroMemory(MessageQueue, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO)); RtlZeroMemory(MessageQueue, sizeof(USER_MESSAGE_QUEUE) + sizeof(THRDCARETINFO));
/* hold at least one reference until it'll be destroyed */
IntReferenceMessageQueue(MessageQueue);
/* initialize the queue */
MsqInitializeMessageQueue(Thread, MessageQueue); MsqInitializeMessageQueue(Thread, MessageQueue);
return MessageQueue; return MessageQueue;
@ -1136,8 +1223,9 @@ MsqCreateMessageQueue(struct _ETHREAD *Thread)
VOID FASTCALL VOID FASTCALL
MsqDestroyMessageQueue(PUSER_MESSAGE_QUEUE MessageQueue) MsqDestroyMessageQueue(PUSER_MESSAGE_QUEUE MessageQueue)
{ {
MsqFreeMessageQueue(MessageQueue); MsqCleanupMessageQueue(MessageQueue);
ExFreePool(MessageQueue); /* decrease the reference counter, if it hits zero, the queue will be freed */
IntDereferenceMessageQueue(MessageQueue);
} }
PHOOKTABLE FASTCALL PHOOKTABLE FASTCALL