Compare commits

...

6 Commits

Author SHA1 Message Date
Justin Miller ffa710fcc3
Merge 53cc6a8c04 into 45aa8f8111 2024-04-26 18:13:59 +00:00
Justin Miller 53cc6a8c04 Diverge a little less 2024-04-26 11:13:49 -07:00
Justin Miller 4f67f4c53d [MEDIA] Update winesync.txt accordingly 2024-04-26 01:36:23 -07:00
Justin Miller 8f54efd379 [NTDLL] Init KeyedEvents even on NT5.2 dll_export 2024-04-26 01:32:05 -07:00
Justin Miller b97e9320ee [KERNEL32][ROSTESTS][SDK] Enable threadpooling 2024-04-26 01:15:30 -07:00
Denis Malikov abcf6bc2e8 [KERNEL32_VISTA][SDK] Import Threadpool.c from wine-9.7 2024-04-25 21:40:54 -07:00
18 changed files with 3611 additions and 1750 deletions

View File

@ -1311,45 +1311,45 @@
@ stub -version=0x600+ ShipAssertMsgA
@ stub -version=0x600+ ShipAssertMsgW
@ stub -version=0x600+ TpAllocAlpcCompletion
@ stub -version=0x600+ TpAllocCleanupGroup
@ stub -version=0x600+ TpAllocIoCompletion
@ stub -version=0x600+ TpAllocPool
@ stub -version=0x600+ TpAllocTimer
@ stub -version=0x600+ TpAllocWait
@ stub -version=0x600+ TpAllocWork
@ stub -version=0x600+ TpCallbackLeaveCriticalSectionOnCompletion
@ stub -version=0x600+ TpCallbackMayRunLong
@ stub -version=0x600+ TpCallbackReleaseMutexOnCompletion
@ stub -version=0x600+ TpCallbackReleaseSemaphoreOnCompletion
@ stub -version=0x600+ TpCallbackSetEventOnCompletion
@ stub -version=0x600+ TpCallbackUnloadDllOnCompletion
@ stub -version=0x600+ TpCancelAsyncIoOperation
@ stdcall -version=0x600+ TpAllocCleanupGroup(ptr)
@ stdcall -version=0x600+ TpAllocIoCompletion(ptr ptr ptr ptr ptr)
@ stdcall -version=0x600+ TpAllocPool(ptr ptr)
@ stdcall -version=0x600+ TpAllocTimer(ptr ptr ptr ptr)
@ stdcall -version=0x600+ TpAllocWait(ptr ptr ptr ptr)
@ stdcall -version=0x600+ TpAllocWork(ptr ptr ptr ptr)
@ stdcall -version=0x600+ TpCallbackLeaveCriticalSectionOnCompletion(ptr ptr)
@ stdcall -version=0x600+ TpCallbackMayRunLong(ptr)
@ stdcall -version=0x600+ TpCallbackReleaseMutexOnCompletion(ptr ptr)
@ stdcall -version=0x600+ TpCallbackReleaseSemaphoreOnCompletion(ptr ptr long)
@ stdcall -version=0x600+ TpCallbackSetEventOnCompletion(ptr ptr)
@ stdcall -version=0x600+ TpCallbackUnloadDllOnCompletion(ptr ptr)
@ stdcall -version=0x600+ TpCancelAsyncIoOperation(ptr)
@ stub -version=0x600+ TpCaptureCaller
@ stub -version=0x600+ TpCheckTerminateWorker
@ stub -version=0x600+ TpDbgDumpHeapUsage
@ stub -version=0x600+ TpDbgSetLogRoutine
@ stub -version=0x600+ TpDisassociateCallback
@ stub -version=0x600+ TpIsTimerSet
@ stub -version=0x600+ TpPostWork
@ stdcall -version=0x600+ TpDisassociateCallback(ptr)
@ stdcall -version=0x600+ TpIsTimerSet(ptr)
@ stdcall -version=0x600+ TpPostWork(ptr)
@ stub -version=0x600+ TpReleaseAlpcCompletion
@ stub -version=0x600+ TpReleaseCleanupGroup
@ stub -version=0x600+ TpReleaseCleanupGroupMembers
@ stub -version=0x600+ TpReleaseIoCompletion
@ stub -version=0x600+ TpReleasePool
@ stub -version=0x600+ TpReleaseTimer
@ stub -version=0x600+ TpReleaseWait
@ stub -version=0x600+ TpReleaseWork
@ stub -version=0x600+ TpSetPoolMaxThreads
@ stub -version=0x600+ TpSetPoolMinThreads
@ stub -version=0x600+ TpSetTimer
@ stub -version=0x600+ TpSetWait
@ stub -version=0x600+ TpSimpleTryPost
@ stub -version=0x600+ TpStartAsyncIoOperation
@ stdcall -version=0x600+ TpReleaseCleanupGroup(ptr)
@ stdcall -version=0x600+ TpReleaseCleanupGroupMembers(ptr long ptr)
@ stdcall -version=0x600+ TpReleaseIoCompletion(ptr)
@ stdcall -version=0x600+ TpReleasePool(ptr)
@ stdcall -version=0x600+ TpReleaseTimer(ptr)
@ stdcall -version=0x600+ TpReleaseWait(ptr)
@ stdcall -version=0x600+ TpReleaseWork(ptr)
@ stdcall -version=0x600+ TpSetPoolMaxThreads(ptr long)
@ stdcall -version=0x600+ TpSetPoolMinThreads(ptr long)
@ stdcall -version=0x600+ TpSetTimer(ptr ptr long long)
@ stdcall -version=0x600+ TpSetWait(ptr long ptr)
@ stdcall -version=0x600+ TpSimpleTryPost(ptr ptr ptr)
@ stdcall -version=0x600+ TpStartAsyncIoOperation(ptr)
@ stub -version=0x600+ TpWaitForAlpcCompletion
@ stub -version=0x600+ TpWaitForIoCompletion
@ stub -version=0x600+ TpWaitForTimer
@ stub -version=0x600+ TpWaitForWait
@ stub -version=0x600+ TpWaitForWork
@ stdcall -version=0x600+ TpWaitForIoCompletion(ptr long)
@ stdcall -version=0x600+ TpWaitForTimer(ptr long)
@ stdcall -version=0x600+ TpWaitForWait(ptr long)
@ stdcall -version=0x600+ TpWaitForWork(ptr long)
@ stdcall -ret64 VerSetConditionMask(double long long)
@ stub -version=0x600+ WerCheckEventEscalation
@ stub -version=0x600+ WerReportSQMEvent

View File

@ -2405,8 +2405,8 @@ LdrpInitializeProcess(IN PCONTEXT Context,
/* Check whether all static imports were properly loaded and return here */
if (!NT_SUCCESS(ImportStatus)) return ImportStatus;
#if (DLL_EXPORT_VERSION >= _WIN32_WINNT_VISTA)
/* Initialize the keyed event for condition variables */
#if 1 //(DLL_EXPORT_VERSION >= _WIN32_WINNT_VISTA)
/* Initialize the keyed event for condition variables - needed for winesync'ed rtl */
RtlpInitializeKeyedEvent();
#endif

View File

@ -3,52 +3,6 @@
#define NDEBUG
#include <debug.h>
VOID
NTAPI
RtlInitializeConditionVariable(OUT PRTL_CONDITION_VARIABLE ConditionVariable);
VOID
NTAPI
RtlWakeConditionVariable(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable);
VOID
NTAPI
RtlWakeAllConditionVariable(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable);
NTSTATUS
NTAPI
RtlSleepConditionVariableCS(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
IN OUT PRTL_CRITICAL_SECTION CriticalSection,
IN PLARGE_INTEGER TimeOut OPTIONAL);
NTSTATUS
NTAPI
RtlSleepConditionVariableSRW(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
IN OUT PRTL_SRWLOCK SRWLock,
IN PLARGE_INTEGER TimeOut OPTIONAL,
IN ULONG Flags);
VOID
NTAPI
RtlInitializeSRWLock(OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlAcquireSRWLockShared(IN OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlReleaseSRWLockShared(IN OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlAcquireSRWLockExclusive(IN OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlReleaseSRWLockExclusive(IN OUT PRTL_SRWLOCK SRWLock);
VOID
WINAPI
AcquireSRWLockExclusive(PSRWLOCK Lock)

View File

@ -263,8 +263,7 @@ check Wine current sources first as it may already be fixed.
sdk/lib/3rdparty/strmbase # Synced to WineStaging-3.3
sdk/lib/rtl/actctx.c # Partly synced with WineStaging-1.9.16
sdk/lib/rtl/timerqueue.c # Partly synced with WineStaging-1.7.55
sdk/lib/rtl/wait.c # Partly synced with WineStaging-1.7.55
sdk/lib/rtl/threadpool.c # synced with Wine-9.7
advapi32 -
dll/win32/advapi32/wine/cred.c # Synced to WineStaging-3.3

View File

@ -23,6 +23,7 @@ list(APPEND SOURCE
rtlstr.c
string.c
testlist.c
threadpool.c
time.c)
if(ARCH STREQUAL "i386")

View File

@ -23,6 +23,7 @@ extern void func_rtl(void);
extern void func_rtlbitmap(void);
extern void func_rtlstr(void);
extern void func_string(void);
extern void func_threadpool(void);
extern void func_time(void);
const struct test winetest_testlist[] =
@ -49,6 +50,7 @@ const struct test winetest_testlist[] =
{ "rtlbitmap", func_rtlbitmap },
{ "rtlstr", func_rtlstr },
{ "string", func_string },
{ "threadpool", func_threadpool},
{ "time", func_time },
{ 0, 0 }
};

View File

@ -477,7 +477,7 @@ static void CALLBACK simple2_cb(TP_CALLBACK_INSTANCE *instance, void *userdata)
static void test_tp_simple(void)
{
TP_CALLBACK_ENVIRON environment;
TP_CALLBACK_ENVIRON_V3 environment3;
//TP_CALLBACK_ENVIRON_V3 environment3;
TP_CLEANUP_GROUP *group;
HANDLE semaphore;
NTSTATUS status;
@ -512,7 +512,7 @@ static void test_tp_simple(void)
ok(!status, "TpSimpleTryPost failed with status %x\n", status);
result = WaitForSingleObject(semaphore, 1000);
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
#ifndef __REACTOS__ // Windows 7
/* test with environment version 3 */
memset(&environment3, 0, sizeof(environment3));
environment3.Version = 3;
@ -523,7 +523,7 @@ static void test_tp_simple(void)
ok(!status, "TpSimpleTryPost failed with status %x\n", status);
result = WaitForSingleObject(semaphore, 1000);
ok(result == WAIT_OBJECT_0, "WaitForSingleObject returned %u\n", result);
#endif
/* test with invalid version number */
memset(&environment, 0, sizeof(environment));
environment.Version = 9999;

View File

@ -4693,6 +4693,54 @@ BOOLEAN
NTAPI
RtlGetNtProductType(_Out_ PNT_PRODUCT_TYPE ProductType);
//
// Synchronization functions
//
VOID
NTAPI
RtlInitializeConditionVariable(OUT PRTL_CONDITION_VARIABLE ConditionVariable);
VOID
NTAPI
RtlWakeConditionVariable(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable);
VOID
NTAPI
RtlWakeAllConditionVariable(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable);
NTSTATUS
NTAPI
RtlSleepConditionVariableCS(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
IN OUT PRTL_CRITICAL_SECTION CriticalSection,
IN const PLARGE_INTEGER TimeOut OPTIONAL);
NTSTATUS
NTAPI
RtlSleepConditionVariableSRW(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
IN OUT PRTL_SRWLOCK SRWLock,
IN PLARGE_INTEGER TimeOut OPTIONAL,
IN ULONG Flags);
VOID
NTAPI
RtlInitializeSRWLock(OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlAcquireSRWLockShared(IN OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlReleaseSRWLockShared(IN OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlAcquireSRWLockExclusive(IN OUT PRTL_SRWLOCK SRWLock);
VOID
NTAPI
RtlReleaseSRWLockExclusive(IN OUT PRTL_SRWLOCK SRWLock);
//
// Secure Memory Functions
//

View File

@ -4098,6 +4098,7 @@ InitOnceExecuteOnce(
_Inout_opt_ PVOID Parameter,
_Outptr_opt_result_maybenull_ LPVOID *Context);
typedef VOID (NTAPI *PTP_WIN32_IO_CALLBACK)(PTP_CALLBACK_INSTANCE,PVOID,PVOID,ULONG,ULONG_PTR,PTP_IO);
#if defined(_SLIST_HEADER_) && !defined(_NTOS_) && !defined(_NTOSP_)

View File

@ -279,6 +279,19 @@ typedef struct _OSVERSIONINFOEXW {
UCHAR wReserved;
} OSVERSIONINFOEXW, *POSVERSIONINFOEXW, *LPOSVERSIONINFOEXW, RTL_OSVERSIONINFOEXW, *PRTL_OSVERSIONINFOEXW;
#define RTL_CONDITION_VARIABLE_INIT {0}
#define RTL_CONDITION_VARIABLE_LOCKMODE_SHARED 0x1
typedef struct _RTL_CONDITION_VARIABLE {
PVOID Ptr;
} RTL_CONDITION_VARIABLE, *PRTL_CONDITION_VARIABLE;
#define RTL_SRWLOCK_INIT {0}
typedef struct _RTL_SRWLOCK {
PVOID Ptr;
} RTL_SRWLOCK, *PRTL_SRWLOCK;
#ifdef UNICODE
typedef OSVERSIONINFOEXW OSVERSIONINFOEX;
typedef POSVERSIONINFOEXW POSVERSIONINFOEX;

View File

@ -4451,7 +4451,11 @@ DbgRaiseAssertionFailure(VOID)
typedef struct _TP_POOL TP_POOL, *PTP_POOL;
typedef struct _TP_WORK TP_WORK, *PTP_WORK;
typedef struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE, *PTP_CALLBACK_INSTANCE;
typedef struct _TP_TIMER TP_TIMER, *PTP_TIMER;
typedef struct _TP_WAIT TP_WAIT, *PTP_WAIT;
typedef struct _TP_IO TP_IO, *PTP_IO;
typedef DWORD TP_WAIT_RESULT;
typedef DWORD TP_VERSION, *PTP_VERSION;
typedef enum _TP_CALLBACK_PRIORITY {
@ -4462,6 +4466,12 @@ typedef enum _TP_CALLBACK_PRIORITY {
TP_CALLBACK_PRIORITY_COUNT = TP_CALLBACK_PRIORITY_INVALID
} TP_CALLBACK_PRIORITY;
typedef struct _TP_POOL_STACK_INFORMATION
{
SIZE_T StackReserve;
SIZE_T StackCommit;
} TP_POOL_STACK_INFORMATION,*PTP_POOL_STACK_INFORMATION;
typedef VOID
(NTAPI *PTP_WORK_CALLBACK)(
_Inout_ PTP_CALLBACK_INSTANCE Instance,
@ -4480,6 +4490,9 @@ typedef VOID
_Inout_opt_ PVOID ObjectContext,
_Inout_opt_ PVOID CleanupContext);
typedef VOID (NTAPI *PTP_TIMER_CALLBACK)(PTP_CALLBACK_INSTANCE,PVOID,PTP_TIMER);
typedef VOID (NTAPI *PTP_WAIT_CALLBACK)(PTP_CALLBACK_INSTANCE,PVOID,PTP_WAIT,TP_WAIT_RESULT);
#if (_WIN32_WINNT >= _WIN32_WINNT_WIN7)
typedef struct _TP_CALLBACK_ENVIRON_V3 {
TP_VERSION Version;

View File

@ -71,8 +71,8 @@ list(APPEND SOURCE
unicodeprefix.c
vectoreh.c
version.c
wait.c
workitem.c
threadpool.c
rtl.h)
if(ARCH STREQUAL "i386")
@ -112,6 +112,7 @@ add_library(rtl ${SOURCE} ${rtl_asm})
target_link_libraries(rtl PRIVATE pseh)
add_pch(rtl rtl.h SOURCE)
add_dependencies(rtl psdk asm)
set_source_files_properties(threadpool.c PROPERTIES COMPILE_DEFINITIONS __WINESRC__)
list(APPEND SOURCE_VISTA
condvar.c

View File

@ -499,7 +499,7 @@ NTSTATUS
NTAPI
RtlSleepConditionVariableCS(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
IN OUT PRTL_CRITICAL_SECTION CriticalSection,
IN const LARGE_INTEGER * TimeOut OPTIONAL)
IN const PLARGE_INTEGER TimeOut OPTIONAL)
{
return InternalSleep(ConditionVariable,
CriticalSection,
@ -512,7 +512,7 @@ NTSTATUS
NTAPI
RtlSleepConditionVariableSRW(IN OUT PRTL_CONDITION_VARIABLE ConditionVariable,
IN OUT PRTL_SRWLOCK SRWLock,
IN const LARGE_INTEGER * TimeOut OPTIONAL,
IN PLARGE_INTEGER TimeOut OPTIONAL,
IN ULONG Flags)
{
return InternalSleep(ConditionVariable,

View File

@ -17,4 +17,9 @@
/* Main RTL Header */
#include "rtl.h"
#define TRACE(fmt, ...)
#define WARN(fmt, ...)
#define FIXME(fmt, ...)
#define ERR(fmt, ...)
#endif /* RTL_VISTA_H */

3485
sdk/lib/rtl/threadpool.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -64,405 +64,6 @@ struct timer_queue
#define EXPIRE_NEVER (~(ULONGLONG) 0)
#define TIMER_QUEUE_MAGIC 0x516d6954 /* TimQ */
static void queue_remove_timer(struct queue_timer *t)
{
/* We MUST hold the queue cs while calling this function. This ensures
that we cannot queue another callback for this timer. The runcount
being zero makes sure we don't have any already queued. */
struct timer_queue *q = t->q;
assert(t->runcount == 0);
assert(t->destroy);
list_remove(&t->entry);
if (t->event)
NtSetEvent(t->event, NULL);
RtlFreeHeap(RtlGetProcessHeap(), 0, t);
if (q->quit && list_empty(&q->timers))
NtSetEvent(q->event, NULL);
}
static void timer_cleanup_callback(struct queue_timer *t)
{
struct timer_queue *q = t->q;
RtlEnterCriticalSection(&q->cs);
assert(0 < t->runcount);
--t->runcount;
if (t->destroy && t->runcount == 0)
queue_remove_timer(t);
RtlLeaveCriticalSection(&q->cs);
}
static VOID WINAPI timer_callback_wrapper(LPVOID p)
{
struct queue_timer *t = p;
t->callback(t->param, TRUE);
timer_cleanup_callback(t);
}
static inline ULONGLONG queue_current_time(void)
{
LARGE_INTEGER now, freq;
NtQueryPerformanceCounter(&now, &freq);
return now.QuadPart * 1000 / freq.QuadPart;
}
static void queue_add_timer(struct queue_timer *t, ULONGLONG time,
BOOL set_event)
{
/* We MUST hold the queue cs while calling this function. */
struct timer_queue *q = t->q;
struct list *ptr = &q->timers;
assert(!q->quit || (t->destroy && time == EXPIRE_NEVER));
if (time != EXPIRE_NEVER)
LIST_FOR_EACH(ptr, &q->timers)
{
struct queue_timer *cur = LIST_ENTRY(ptr, struct queue_timer, entry);
if (time < cur->expire)
break;
}
list_add_before(ptr, &t->entry);
t->expire = time;
/* If we insert at the head of the list, we need to expire sooner
than expected. */
if (set_event && &t->entry == list_head(&q->timers))
NtSetEvent(q->event, NULL);
}
static inline void queue_move_timer(struct queue_timer *t, ULONGLONG time,
BOOL set_event)
{
/* We MUST hold the queue cs while calling this function. */
list_remove(&t->entry);
queue_add_timer(t, time, set_event);
}
static void queue_timer_expire(struct timer_queue *q)
{
struct queue_timer *t = NULL;
RtlEnterCriticalSection(&q->cs);
if (list_head(&q->timers))
{
ULONGLONG now, next;
t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
if (!t->destroy && t->expire <= ((now = queue_current_time())))
{
++t->runcount;
if (t->period)
{
next = t->expire + t->period;
/* avoid trigger cascade if overloaded / hibernated */
if (next < now)
next = now + t->period;
}
else
next = EXPIRE_NEVER;
queue_move_timer(t, next, FALSE);
}
else
t = NULL;
}
RtlLeaveCriticalSection(&q->cs);
if (t)
{
if (t->flags & WT_EXECUTEINTIMERTHREAD)
timer_callback_wrapper(t);
else
{
ULONG flags
= (t->flags
& (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD
| WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION));
NTSTATUS status = RtlQueueWorkItem(timer_callback_wrapper, t, flags);
if (status != STATUS_SUCCESS)
timer_cleanup_callback(t);
}
}
}
static ULONG queue_get_timeout(struct timer_queue *q)
{
struct queue_timer *t;
ULONG timeout = INFINITE;
RtlEnterCriticalSection(&q->cs);
if (list_head(&q->timers))
{
t = LIST_ENTRY(list_head(&q->timers), struct queue_timer, entry);
assert(!t->destroy || t->expire == EXPIRE_NEVER);
if (t->expire != EXPIRE_NEVER)
{
ULONGLONG time = queue_current_time();
timeout = t->expire < time ? 0 : (ULONG)(t->expire - time);
}
}
RtlLeaveCriticalSection(&q->cs);
return timeout;
}
static DWORD WINAPI timer_queue_thread_proc(LPVOID p)
{
struct timer_queue *q = p;
ULONG timeout_ms;
timeout_ms = INFINITE;
for (;;)
{
LARGE_INTEGER timeout;
NTSTATUS status;
BOOL done = FALSE;
status = NtWaitForSingleObject(
q->event, FALSE, get_nt_timeout(&timeout, timeout_ms));
if (status == STATUS_WAIT_0)
{
/* There are two possible ways to trigger the event. Either
we are quitting and the last timer got removed, or a new
timer got put at the head of the list so we need to adjust
our timeout. */
RtlEnterCriticalSection(&q->cs);
if (q->quit && list_empty(&q->timers))
done = TRUE;
RtlLeaveCriticalSection(&q->cs);
}
else if (status == STATUS_TIMEOUT)
queue_timer_expire(q);
if (done)
break;
timeout_ms = queue_get_timeout(q);
}
NtClose(q->event);
RtlDeleteCriticalSection(&q->cs);
q->magic = 0;
RtlFreeHeap(RtlGetProcessHeap(), 0, q);
RtlpExitThreadFunc(STATUS_SUCCESS);
return 0;
}
static void queue_destroy_timer(struct queue_timer *t)
{
/* We MUST hold the queue cs while calling this function. */
t->destroy = TRUE;
if (t->runcount == 0)
/* Ensure a timer is promptly removed. If callbacks are pending,
it will be removed after the last one finishes by the callback
cleanup wrapper. */
queue_remove_timer(t);
else
/* Make sure no destroyed timer masks an active timer at the head
of the sorted list. */
queue_move_timer(t, EXPIRE_NEVER, FALSE);
}
/***********************************************************************
* RtlCreateTimerQueue (NTDLL.@)
*
* Creates a timer queue object and returns a handle to it.
*
* PARAMS
* NewTimerQueue [O] The newly created queue.
*
* RETURNS
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*/
NTSTATUS WINAPI RtlCreateTimerQueue(PHANDLE NewTimerQueue)
{
NTSTATUS status;
struct timer_queue *q = RtlAllocateHeap(RtlGetProcessHeap(), 0, sizeof *q);
if (!q)
return STATUS_NO_MEMORY;
RtlInitializeCriticalSection(&q->cs);
list_init(&q->timers);
q->quit = FALSE;
q->magic = TIMER_QUEUE_MAGIC;
status = NtCreateEvent(&q->event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
if (status != STATUS_SUCCESS)
{
RtlFreeHeap(RtlGetProcessHeap(), 0, q);
return status;
}
status = RtlpStartThreadFunc(timer_queue_thread_proc, q, &q->thread);
if (status != STATUS_SUCCESS)
{
NtClose(q->event);
RtlFreeHeap(RtlGetProcessHeap(), 0, q);
return status;
}
NtResumeThread(q->thread, NULL);
*NewTimerQueue = q;
return STATUS_SUCCESS;
}
/***********************************************************************
* RtlDeleteTimerQueueEx (NTDLL.@)
*
* Deletes a timer queue object.
*
* PARAMS
* TimerQueue [I] The timer queue to destroy.
* CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
* wait until all timers are finished firing before
* returning. Otherwise, return immediately and set the
* event when all timers are done.
*
* RETURNS
* Success: STATUS_SUCCESS if synchronous, STATUS_PENDING if not.
* Failure: Any NTSTATUS code.
*/
NTSTATUS WINAPI RtlDeleteTimerQueueEx(HANDLE TimerQueue, HANDLE CompletionEvent)
{
struct timer_queue *q = TimerQueue;
struct queue_timer *t, *temp;
HANDLE thread;
NTSTATUS status;
if (!q || q->magic != TIMER_QUEUE_MAGIC)
return STATUS_INVALID_HANDLE;
thread = q->thread;
RtlEnterCriticalSection(&q->cs);
q->quit = TRUE;
if (list_head(&q->timers))
/* When the last timer is removed, it will signal the timer thread to
exit... */
LIST_FOR_EACH_ENTRY_SAFE(t, temp, &q->timers, struct queue_timer, entry)
queue_destroy_timer(t);
else
/* However if we have none, we must do it ourselves. */
NtSetEvent(q->event, NULL);
RtlLeaveCriticalSection(&q->cs);
if (CompletionEvent == INVALID_HANDLE_VALUE)
{
NtWaitForSingleObject(thread, FALSE, NULL);
status = STATUS_SUCCESS;
}
else
{
if (CompletionEvent)
{
DPRINT1("asynchronous return on completion event unimplemented\n");
NtWaitForSingleObject(thread, FALSE, NULL);
NtSetEvent(CompletionEvent, NULL);
}
status = STATUS_PENDING;
}
NtClose(thread);
return status;
}
static struct timer_queue *get_timer_queue(HANDLE TimerQueue)
{
static struct timer_queue *default_timer_queue;
if (TimerQueue)
return TimerQueue;
else
{
if (!default_timer_queue)
{
HANDLE q;
NTSTATUS status = RtlCreateTimerQueue(&q);
if (status == STATUS_SUCCESS)
{
PVOID p = InterlockedCompareExchangePointer(
(void **) &default_timer_queue, q, NULL);
if (p)
/* Got beat to the punch. */
RtlDeleteTimerQueueEx(q, NULL);
}
}
return default_timer_queue;
}
}
/***********************************************************************
* RtlCreateTimer (NTDLL.@)
*
* Creates a new timer associated with the given queue.
*
* PARAMS
* NewTimer [O] The newly created timer.
* TimerQueue [I] The queue to hold the timer.
* Callback [I] The callback to fire.
* Parameter [I] The argument for the callback.
* DueTime [I] The delay, in milliseconds, before first firing the
* timer.
* Period [I] The period, in milliseconds, at which to fire the timer
* after the first callback. If zero, the timer will only
* fire once. It still needs to be deleted with
* RtlDeleteTimer.
* Flags [I] Flags controlling the execution of the callback. In
* addition to the WT_* thread pool flags (see
* RtlQueueWorkItem), WT_EXECUTEINTIMERTHREAD and
* WT_EXECUTEONLYONCE are supported.
*
* RETURNS
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*/
NTSTATUS WINAPI RtlCreateTimer(HANDLE TimerQueue, PHANDLE NewTimer,
WAITORTIMERCALLBACKFUNC Callback,
PVOID Parameter, DWORD DueTime, DWORD Period,
ULONG Flags)
{
NTSTATUS status;
struct queue_timer *t;
struct timer_queue *q = get_timer_queue(TimerQueue);
if (!q) return STATUS_NO_MEMORY;
if (q->magic != TIMER_QUEUE_MAGIC) return STATUS_INVALID_HANDLE;
t = RtlAllocateHeap(RtlGetProcessHeap(), 0, sizeof *t);
if (!t)
return STATUS_NO_MEMORY;
t->q = q;
t->runcount = 0;
t->callback = Callback;
t->param = Parameter;
t->period = Period;
t->flags = Flags;
t->destroy = FALSE;
t->event = NULL;
status = STATUS_SUCCESS;
RtlEnterCriticalSection(&q->cs);
if (q->quit)
status = STATUS_INVALID_HANDLE;
else
queue_add_timer(t, queue_current_time() + DueTime, TRUE);
RtlLeaveCriticalSection(&q->cs);
if (status == STATUS_SUCCESS)
*NewTimer = t;
else
RtlFreeHeap(RtlGetProcessHeap(), 0, t);
return status;
}
NTSTATUS
WINAPI
RtlSetTimer(
@ -483,100 +84,6 @@ RtlSetTimer(
Flags);
}
/***********************************************************************
* RtlUpdateTimer (NTDLL.@)
*
* Changes the time at which a timer expires.
*
* PARAMS
* TimerQueue [I] The queue that holds the timer.
* Timer [I] The timer to update.
* DueTime [I] The delay, in milliseconds, before next firing the timer.
* Period [I] The period, in milliseconds, at which to fire the timer
* after the first callback. If zero, the timer will not
* refire once. It still needs to be deleted with
* RtlDeleteTimer.
*
* RETURNS
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*/
NTSTATUS WINAPI RtlUpdateTimer(HANDLE TimerQueue, HANDLE Timer,
DWORD DueTime, DWORD Period)
{
struct queue_timer *t = Timer;
struct timer_queue *q = t->q;
RtlEnterCriticalSection(&q->cs);
/* Can't change a timer if it was once-only or destroyed. */
if (t->expire != EXPIRE_NEVER)
{
t->period = Period;
queue_move_timer(t, queue_current_time() + DueTime, TRUE);
}
RtlLeaveCriticalSection(&q->cs);
return STATUS_SUCCESS;
}
/***********************************************************************
* RtlDeleteTimer (NTDLL.@)
*
* Cancels a timer-queue timer.
*
* PARAMS
* TimerQueue [I] The queue that holds the timer.
* Timer [I] The timer to update.
* CompletionEvent [I] If NULL, return immediately. If INVALID_HANDLE_VALUE,
* wait until the timer is finished firing all pending
* callbacks before returning. Otherwise, return
* immediately and set the timer is done.
*
* RETURNS
* Success: STATUS_SUCCESS if the timer is done, STATUS_PENDING if not,
or if the completion event is NULL.
* Failure: Any NTSTATUS code.
*/
NTSTATUS WINAPI RtlDeleteTimer(HANDLE TimerQueue, HANDLE Timer,
HANDLE CompletionEvent)
{
struct queue_timer *t = Timer;
struct timer_queue *q;
NTSTATUS status = STATUS_PENDING;
HANDLE event = NULL;
if (!Timer)
return STATUS_INVALID_PARAMETER_1;
q = t->q;
if (CompletionEvent == INVALID_HANDLE_VALUE)
{
status = NtCreateEvent(&event, EVENT_ALL_ACCESS, NULL, SynchronizationEvent, FALSE);
if (status == STATUS_SUCCESS)
status = STATUS_PENDING;
}
else if (CompletionEvent)
event = CompletionEvent;
RtlEnterCriticalSection(&q->cs);
t->event = event;
if (t->runcount == 0 && event)
status = STATUS_SUCCESS;
queue_destroy_timer(t);
RtlLeaveCriticalSection(&q->cs);
if (CompletionEvent == INVALID_HANDLE_VALUE && event)
{
if (status == STATUS_PENDING)
{
NtWaitForSingleObject(event, FALSE, NULL);
status = STATUS_SUCCESS;
}
NtClose(event);
}
return status;
}
/*
* @implemented
*/

View File

@ -1,275 +0,0 @@
/*
* COPYRIGHT: See COPYING in the top level directory
* PROJECT: ReactOS system libraries
* PURPOSE: Rtl user wait functions
* FILE: lib/rtl/wait.c
* PROGRAMERS:
* Alex Ionescu (alex@relsoft.net)
* Eric Kohl
* KJK::Hyperion
*/
/* INCLUDES *****************************************************************/
#include <rtl.h>
#define NDEBUG
#include <debug.h>
typedef struct _RTLP_WAIT
{
HANDLE Object;
BOOLEAN CallbackInProgress;
HANDLE CancelEvent;
LONG DeleteCount;
HANDLE CompletionEvent;
ULONG Flags;
WAITORTIMERCALLBACKFUNC Callback;
PVOID Context;
ULONG Milliseconds;
} RTLP_WAIT, *PRTLP_WAIT;
/* PRIVATE FUNCTIONS *******************************************************/
static inline PLARGE_INTEGER get_nt_timeout( PLARGE_INTEGER pTime, ULONG timeout )
{
if (timeout == INFINITE) return NULL;
pTime->QuadPart = (ULONGLONG)timeout * -10000;
return pTime;
}
static VOID
NTAPI
Wait_thread_proc(LPVOID Arg)
{
PRTLP_WAIT Wait = (PRTLP_WAIT) Arg;
NTSTATUS Status;
BOOLEAN alertable = (Wait->Flags & WT_EXECUTEINIOTHREAD) != 0;
HANDLE handles[2] = { Wait->CancelEvent, Wait->Object };
LARGE_INTEGER timeout;
HANDLE completion_event;
// TRACE("\n");
while (TRUE)
{
Status = NtWaitForMultipleObjects( 2,
handles,
WaitAny,
alertable,
get_nt_timeout( &timeout, Wait->Milliseconds ) );
if (Status == STATUS_WAIT_1 || Status == STATUS_TIMEOUT)
{
BOOLEAN TimerOrWaitFired;
if (Status == STATUS_WAIT_1)
{
// TRACE( "object %p signaled, calling callback %p with context %p\n",
// Wait->Object, Wait->Callback,
// Wait->Context );
TimerOrWaitFired = FALSE;
}
else
{
// TRACE( "wait for object %p timed out, calling callback %p with context %p\n",
// Wait->Object, Wait->Callback,
// Wait->Context );
TimerOrWaitFired = TRUE;
}
Wait->CallbackInProgress = TRUE;
Wait->Callback( Wait->Context, TimerOrWaitFired );
Wait->CallbackInProgress = FALSE;
if (Wait->Flags & WT_EXECUTEONLYONCE)
break;
}
else if (Status != STATUS_USER_APC)
break;
}
completion_event = Wait->CompletionEvent;
if (completion_event) NtSetEvent( completion_event, NULL );
if (InterlockedIncrement( &Wait->DeleteCount ) == 2 )
{
NtClose( Wait->CancelEvent );
RtlFreeHeap( RtlGetProcessHeap(), 0, Wait );
}
}
/* FUNCTIONS ***************************************************************/
/***********************************************************************
* RtlRegisterWait
*
* Registers a wait for a handle to become signaled.
*
* PARAMS
* NewWaitObject [I] Handle to the new wait object. Use RtlDeregisterWait() to free it.
* Object [I] Object to wait to become signaled.
* Callback [I] Callback function to execute when the wait times out or the handle is signaled.
* Context [I] Context to pass to the callback function when it is executed.
* Milliseconds [I] Number of milliseconds to wait before timing out.
* Flags [I] Flags. See notes.
*
* RETURNS
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*
* NOTES
* Flags can be one or more of the following:
*|WT_EXECUTEDEFAULT - Executes the work item in a non-I/O worker thread.
*|WT_EXECUTEINIOTHREAD - Executes the work item in an I/O worker thread.
*|WT_EXECUTEINPERSISTENTTHREAD - Executes the work item in a thread that is persistent.
*|WT_EXECUTELONGFUNCTION - Hints that the execution can take a long time.
*|WT_TRANSFER_IMPERSONATION - Executes the function with the current access token.
*/
NTSTATUS
NTAPI
RtlRegisterWait(PHANDLE NewWaitObject,
HANDLE Object,
WAITORTIMERCALLBACKFUNC Callback,
PVOID Context,
ULONG Milliseconds,
ULONG Flags)
{
PRTLP_WAIT Wait;
NTSTATUS Status;
//TRACE( "(%p, %p, %p, %p, %d, 0x%x)\n", NewWaitObject, Object, Callback, Context, Milliseconds, Flags );
Wait = RtlAllocateHeap( RtlGetProcessHeap(), 0, sizeof(RTLP_WAIT) );
if (!Wait)
return STATUS_NO_MEMORY;
Wait->Object = Object;
Wait->Callback = Callback;
Wait->Context = Context;
Wait->Milliseconds = Milliseconds;
Wait->Flags = Flags;
Wait->CallbackInProgress = FALSE;
Wait->DeleteCount = 0;
Wait->CompletionEvent = NULL;
Status = NtCreateEvent( &Wait->CancelEvent,
EVENT_ALL_ACCESS,
NULL,
NotificationEvent,
FALSE );
if (Status != STATUS_SUCCESS)
{
RtlFreeHeap( RtlGetProcessHeap(), 0, Wait );
return Status;
}
Flags = Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINPERSISTENTTHREAD |
WT_EXECUTELONGFUNCTION | WT_TRANSFER_IMPERSONATION);
Status = RtlQueueWorkItem( Wait_thread_proc,
Wait,
Flags );
if (Status != STATUS_SUCCESS)
{
NtClose( Wait->CancelEvent );
RtlFreeHeap( RtlGetProcessHeap(), 0, Wait );
return Status;
}
*NewWaitObject = Wait;
return Status;
}
/***********************************************************************
* RtlDeregisterWaitEx
*
* Cancels a wait operation and frees the resources associated with calling
* RtlRegisterWait().
*
* PARAMS
* WaitObject [I] Handle to the wait object to free.
*
* RETURNS
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*/
NTSTATUS
NTAPI
RtlDeregisterWaitEx(HANDLE WaitHandle,
HANDLE CompletionEvent)
{
PRTLP_WAIT Wait = (PRTLP_WAIT) WaitHandle;
NTSTATUS Status = STATUS_SUCCESS;
//TRACE( "(%p)\n", WaitHandle );
NtSetEvent( Wait->CancelEvent, NULL );
if (Wait->CallbackInProgress)
{
if (CompletionEvent != NULL)
{
if (CompletionEvent == INVALID_HANDLE_VALUE)
{
Status = NtCreateEvent( &CompletionEvent,
EVENT_ALL_ACCESS,
NULL,
NotificationEvent,
FALSE );
if (Status != STATUS_SUCCESS)
return Status;
(void)InterlockedExchangePointer( &Wait->CompletionEvent, CompletionEvent );
if (Wait->CallbackInProgress)
NtWaitForSingleObject( CompletionEvent, FALSE, NULL );
NtClose( CompletionEvent );
}
else
{
(void)InterlockedExchangePointer( &Wait->CompletionEvent, CompletionEvent );
if (Wait->CallbackInProgress)
Status = STATUS_PENDING;
}
}
else
Status = STATUS_PENDING;
}
if (InterlockedIncrement( &Wait->DeleteCount ) == 2 )
{
Status = STATUS_SUCCESS;
NtClose( Wait->CancelEvent );
RtlFreeHeap( RtlGetProcessHeap(), 0, Wait );
}
return Status;
}
/***********************************************************************
* RtlDeregisterWait
*
* Cancels a wait operation and frees the resources associated with calling
* RtlRegisterWait().
*
* PARAMS
* WaitObject [I] Handle to the wait object to free.
*
* RETURNS
* Success: STATUS_SUCCESS.
* Failure: Any NTSTATUS code.
*/
NTSTATUS
NTAPI
RtlDeregisterWait(HANDLE WaitHandle)
{
return RtlDeregisterWaitEx(WaitHandle, NULL);
}
/* EOF */

View File

@ -45,899 +45,6 @@ RtlpExitThread(IN NTSTATUS ExitStatus)
PRTL_START_POOL_THREAD RtlpStartThreadFunc = RtlpStartThread;
PRTL_EXIT_POOL_THREAD RtlpExitThreadFunc = RtlpExitThread;
#define MAX_WORKERTHREADS 0x100
#define WORKERTHREAD_CREATION_THRESHOLD 0x5
typedef struct _RTLP_IOWORKERTHREAD
{
LIST_ENTRY ListEntry;
HANDLE ThreadHandle;
ULONG Flags;
} RTLP_IOWORKERTHREAD, *PRTLP_IOWORKERTHREAD;
typedef struct _RTLP_WORKITEM
{
WORKERCALLBACKFUNC Function;
PVOID Context;
ULONG Flags;
HANDLE TokenHandle;
} RTLP_WORKITEM, *PRTLP_WORKITEM;
static LONG ThreadPoolInitialized = 0;
static RTL_CRITICAL_SECTION ThreadPoolLock;
static PRTLP_IOWORKERTHREAD PersistentIoThread;
static LIST_ENTRY ThreadPoolIOWorkerThreadsList;
static HANDLE ThreadPoolCompletionPort;
static LONG ThreadPoolWorkerThreads;
static LONG ThreadPoolWorkerThreadsRequests;
static LONG ThreadPoolWorkerThreadsLongRequests;
static LONG ThreadPoolIOWorkerThreads;
static LONG ThreadPoolIOWorkerThreadsRequests;
static LONG ThreadPoolIOWorkerThreadsLongRequests;
#define IsThreadPoolInitialized() (*((volatile LONG*)&ThreadPoolInitialized) == 1)
static NTSTATUS
RtlpInitializeThreadPool(VOID)
{
NTSTATUS Status = STATUS_SUCCESS;
LONG InitStatus;
do
{
InitStatus = InterlockedCompareExchange(&ThreadPoolInitialized,
2,
0);
if (InitStatus == 0)
{
/* We're the first thread to initialize the thread pool */
InitializeListHead(&ThreadPoolIOWorkerThreadsList);
PersistentIoThread = NULL;
ThreadPoolWorkerThreads = 0;
ThreadPoolWorkerThreadsRequests = 0;
ThreadPoolWorkerThreadsLongRequests = 0;
ThreadPoolIOWorkerThreads = 0;
ThreadPoolIOWorkerThreadsRequests = 0;
ThreadPoolIOWorkerThreadsLongRequests = 0;
/* Initialize the lock */
Status = RtlInitializeCriticalSection(&ThreadPoolLock);
if (!NT_SUCCESS(Status))
goto Finish;
/* Create the complection port */
Status = NtCreateIoCompletion(&ThreadPoolCompletionPort,
IO_COMPLETION_ALL_ACCESS,
NULL,
0);
if (!NT_SUCCESS(Status))
{
RtlDeleteCriticalSection(&ThreadPoolLock);
goto Finish;
}
Finish:
/* Initialization done */
InterlockedExchange(&ThreadPoolInitialized,
1);
break;
}
else if (InitStatus == 2)
{
LARGE_INTEGER Timeout;
/* Another thread is currently initializing the thread pool!
Poll after a short period of time to see if the initialization
was completed */
Timeout.QuadPart = -10000000LL; /* Wait for a second */
NtDelayExecution(FALSE,
&Timeout);
}
} while (InitStatus != 1);
return Status;
}
static NTSTATUS
RtlpGetImpersonationToken(OUT PHANDLE TokenHandle)
{
NTSTATUS Status;
Status = NtOpenThreadToken(NtCurrentThread(),
TOKEN_IMPERSONATE,
TRUE,
TokenHandle);
if (Status == STATUS_NO_TOKEN || Status == STATUS_CANT_OPEN_ANONYMOUS)
{
*TokenHandle = NULL;
Status = STATUS_SUCCESS;
}
return Status;
}
static NTSTATUS
RtlpStartWorkerThread(PTHREAD_START_ROUTINE StartRoutine)
{
NTSTATUS Status;
HANDLE ThreadHandle;
LARGE_INTEGER Timeout;
volatile LONG WorkerInitialized = 0;
Timeout.QuadPart = -10000LL; /* Wait for 100ms */
/* Start the thread */
Status = RtlpStartThreadFunc(StartRoutine, (PVOID)&WorkerInitialized, &ThreadHandle);
if (NT_SUCCESS(Status))
{
NtResumeThread(ThreadHandle, NULL);
/* Poll until the thread got a chance to initialize */
while (WorkerInitialized == 0)
{
NtDelayExecution(FALSE,
&Timeout);
}
NtClose(ThreadHandle);
}
return Status;
}
static VOID
NTAPI
RtlpExecuteWorkItem(IN OUT PVOID NormalContext,
IN OUT PVOID SystemArgument1,
IN OUT PVOID SystemArgument2)
{
NTSTATUS Status;
BOOLEAN Impersonated = FALSE;
RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
RtlFreeHeap(RtlGetProcessHeap(),
0,
SystemArgument2);
if (WorkItem.TokenHandle != NULL)
{
Status = NtSetInformationThread(NtCurrentThread(),
ThreadImpersonationToken,
&WorkItem.TokenHandle,
sizeof(HANDLE));
NtClose(WorkItem.TokenHandle);
if (NT_SUCCESS(Status))
{
Impersonated = TRUE;
}
}
_SEH2_TRY
{
DPRINT("RtlpExecuteWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
/* Execute the function */
WorkItem.Function(WorkItem.Context);
}
_SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
{
DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
}
_SEH2_END;
if (Impersonated)
{
WorkItem.TokenHandle = NULL;
Status = NtSetInformationThread(NtCurrentThread(),
ThreadImpersonationToken,
&WorkItem.TokenHandle,
sizeof(HANDLE));
if (!NT_SUCCESS(Status))
{
DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
}
}
/* update the requests counter */
InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
{
InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
}
}
static NTSTATUS
RtlpQueueWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
{
NTSTATUS Status = STATUS_SUCCESS;
InterlockedIncrement(&ThreadPoolWorkerThreadsRequests);
if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
{
InterlockedIncrement(&ThreadPoolWorkerThreadsLongRequests);
}
if (WorkItem->Flags & WT_EXECUTEINPERSISTENTTHREAD)
{
Status = RtlpInitializeTimerThread();
if (NT_SUCCESS(Status))
{
/* Queue an APC in the timer thread */
Status = NtQueueApcThread(TimerThreadHandle,
RtlpExecuteWorkItem,
NULL,
NULL,
WorkItem);
}
}
else
{
/* Queue an IO completion message */
Status = NtSetIoCompletion(ThreadPoolCompletionPort,
RtlpExecuteWorkItem,
WorkItem,
STATUS_SUCCESS,
0);
}
if (!NT_SUCCESS(Status))
{
InterlockedDecrement(&ThreadPoolWorkerThreadsRequests);
if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
{
InterlockedDecrement(&ThreadPoolWorkerThreadsLongRequests);
}
}
return Status;
}
static VOID
NTAPI
RtlpExecuteIoWorkItem(IN OUT PVOID NormalContext,
IN OUT PVOID SystemArgument1,
IN OUT PVOID SystemArgument2)
{
NTSTATUS Status;
BOOLEAN Impersonated = FALSE;
PRTLP_IOWORKERTHREAD IoThread = (PRTLP_IOWORKERTHREAD)NormalContext;
RTLP_WORKITEM WorkItem = *(volatile RTLP_WORKITEM *)SystemArgument2;
ASSERT(IoThread != NULL);
RtlFreeHeap(RtlGetProcessHeap(),
0,
SystemArgument2);
if (WorkItem.TokenHandle != NULL)
{
Status = NtSetInformationThread(NtCurrentThread(),
ThreadImpersonationToken,
&WorkItem.TokenHandle,
sizeof(HANDLE));
NtClose(WorkItem.TokenHandle);
if (NT_SUCCESS(Status))
{
Impersonated = TRUE;
}
}
_SEH2_TRY
{
DPRINT("RtlpExecuteIoWorkItem: Function: 0x%p Context: 0x%p ImpersonationToken: 0x%p\n", WorkItem.Function, WorkItem.Context, WorkItem.TokenHandle);
/* Execute the function */
WorkItem.Function(WorkItem.Context);
}
_SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
{
DPRINT1("Exception 0x%x while executing IO work item 0x%p\n", _SEH2_GetExceptionCode(), WorkItem.Function);
}
_SEH2_END;
if (Impersonated)
{
WorkItem.TokenHandle = NULL;
Status = NtSetInformationThread(NtCurrentThread(),
ThreadImpersonationToken,
&WorkItem.TokenHandle,
sizeof(HANDLE));
if (!NT_SUCCESS(Status))
{
DPRINT1("Failed to revert worker thread to self!!! Status: 0x%x\n", Status);
}
}
/* remove the long function flag */
if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
{
Status = RtlEnterCriticalSection(&ThreadPoolLock);
if (NT_SUCCESS(Status))
{
IoThread->Flags &= ~WT_EXECUTELONGFUNCTION;
RtlLeaveCriticalSection(&ThreadPoolLock);
}
}
/* update the requests counter */
InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
if (WorkItem.Flags & WT_EXECUTELONGFUNCTION)
{
InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
}
}
static NTSTATUS
RtlpQueueIoWorkerThread(IN OUT PRTLP_WORKITEM WorkItem)
{
PLIST_ENTRY CurrentEntry;
PRTLP_IOWORKERTHREAD IoThread = NULL;
NTSTATUS Status = STATUS_SUCCESS;
if (WorkItem->Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
{
if (PersistentIoThread != NULL)
{
/* We already have a persistent IO worker thread */
IoThread = PersistentIoThread;
}
else
{
/* We're not aware of any persistent IO worker thread. Search for a unused
worker thread that doesn't have a long function queued */
CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
{
IoThread = CONTAINING_RECORD(CurrentEntry,
RTLP_IOWORKERTHREAD,
ListEntry);
if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
break;
CurrentEntry = CurrentEntry->Flink;
}
if (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
{
/* Found a worker thread we can use. */
ASSERT(IoThread != NULL);
IoThread->Flags |= WT_EXECUTEINPERSISTENTIOTHREAD;
PersistentIoThread = IoThread;
}
else
{
DPRINT1("Failed to find a worker thread for the persistent IO thread!\n");
return STATUS_NO_MEMORY;
}
}
}
else
{
/* Find a worker thread that is not currently executing a long function */
CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
while (CurrentEntry != &ThreadPoolIOWorkerThreadsList)
{
IoThread = CONTAINING_RECORD(CurrentEntry,
RTLP_IOWORKERTHREAD,
ListEntry);
if (!(IoThread->Flags & WT_EXECUTELONGFUNCTION))
{
/* if we're trying to queue a long function then make sure we're not dealing
with the persistent thread */
if ((WorkItem->Flags & WT_EXECUTELONGFUNCTION) && !(IoThread->Flags & WT_EXECUTEINPERSISTENTIOTHREAD))
{
/* found a candidate */
break;
}
}
CurrentEntry = CurrentEntry->Flink;
}
if (CurrentEntry == &ThreadPoolIOWorkerThreadsList)
{
/* Couldn't find an appropriate thread, see if we can use the persistent thread (if it exists) for now */
if (ThreadPoolIOWorkerThreads == 0)
{
DPRINT1("Failed to find a worker thread for the work item 0x%p!\n", WorkItem);
ASSERT(IsListEmpty(&ThreadPoolIOWorkerThreadsList));
return STATUS_NO_MEMORY;
}
else
{
/* pick the first worker thread */
CurrentEntry = ThreadPoolIOWorkerThreadsList.Flink;
IoThread = CONTAINING_RECORD(CurrentEntry,
RTLP_IOWORKERTHREAD,
ListEntry);
/* Since this might be the persistent worker thread, don't run as a
long function */
WorkItem->Flags &= ~WT_EXECUTELONGFUNCTION;
}
}
/* Move the picked thread to the end of the list. Since we're always searching
from the beginning, this improves distribution of work items */
RemoveEntryList(&IoThread->ListEntry);
InsertTailList(&ThreadPoolIOWorkerThreadsList,
&IoThread->ListEntry);
}
ASSERT(IoThread != NULL);
InterlockedIncrement(&ThreadPoolIOWorkerThreadsRequests);
if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
{
/* We're about to queue a long function, mark the thread */
IoThread->Flags |= WT_EXECUTELONGFUNCTION;
InterlockedIncrement(&ThreadPoolIOWorkerThreadsLongRequests);
}
/* It's time to queue the work item */
Status = NtQueueApcThread(IoThread->ThreadHandle,
RtlpExecuteIoWorkItem,
IoThread,
NULL,
WorkItem);
if (!NT_SUCCESS(Status))
{
DPRINT1("Failed to queue APC for work item 0x%p\n", WorkItem->Function);
InterlockedDecrement(&ThreadPoolIOWorkerThreadsRequests);
if (WorkItem->Flags & WT_EXECUTELONGFUNCTION)
{
InterlockedDecrement(&ThreadPoolIOWorkerThreadsLongRequests);
}
}
return Status;
}
static BOOLEAN
RtlpIsIoPending(IN HANDLE ThreadHandle OPTIONAL)
{
NTSTATUS Status;
ULONG IoPending;
BOOLEAN CreatedHandle = FALSE;
BOOLEAN IsIoPending = TRUE;
if (ThreadHandle == NULL)
{
Status = NtDuplicateObject(NtCurrentProcess(),
NtCurrentThread(),
NtCurrentProcess(),
&ThreadHandle,
0,
0,
DUPLICATE_SAME_ACCESS);
if (!NT_SUCCESS(Status))
{
return IsIoPending;
}
CreatedHandle = TRUE;
}
Status = NtQueryInformationThread(ThreadHandle,
ThreadIsIoPending,
&IoPending,
sizeof(IoPending),
NULL);
if (NT_SUCCESS(Status) && IoPending == 0)
{
IsIoPending = FALSE;
}
if (CreatedHandle)
{
NtClose(ThreadHandle);
}
return IsIoPending;
}
static ULONG
NTAPI
RtlpIoWorkerThreadProc(IN PVOID Parameter)
{
volatile RTLP_IOWORKERTHREAD ThreadInfo;
LARGE_INTEGER Timeout;
BOOLEAN Terminate;
NTSTATUS Status = STATUS_SUCCESS;
if (InterlockedIncrement(&ThreadPoolIOWorkerThreads) > MAX_WORKERTHREADS)
{
/* Oops, too many worker threads... */
goto InitFailed;
}
/* Get a thread handle to ourselves */
Status = NtDuplicateObject(NtCurrentProcess(),
NtCurrentThread(),
NtCurrentProcess(),
(PHANDLE)&ThreadInfo.ThreadHandle,
0,
0,
DUPLICATE_SAME_ACCESS);
if (!NT_SUCCESS(Status))
{
DPRINT1("Failed to create handle to own thread! Status: 0x%x\n", Status);
InitFailed:
InterlockedDecrement(&ThreadPoolIOWorkerThreads);
/* Signal initialization completion */
InterlockedExchange((PLONG)Parameter,
1);
RtlpExitThreadFunc(Status);
return 0;
}
ThreadInfo.Flags = 0;
/* Insert the thread into the list */
InsertHeadList((PLIST_ENTRY)&ThreadPoolIOWorkerThreadsList,
(PLIST_ENTRY)&ThreadInfo.ListEntry);
/* Signal initialization completion */
InterlockedExchange((PLONG)Parameter,
1);
for (;;)
{
Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
Wait:
do
{
/* Perform an alertable wait, the work items are going to be executed as APCs */
Status = NtDelayExecution(TRUE,
&Timeout);
/* Loop as long as we executed an APC */
} while (Status != STATUS_SUCCESS);
/* We timed out, let's see if we're allowed to terminate */
Terminate = FALSE;
Status = RtlEnterCriticalSection(&ThreadPoolLock);
if (NT_SUCCESS(Status))
{
if (ThreadInfo.Flags & WT_EXECUTEINPERSISTENTIOTHREAD)
{
/* This thread is supposed to be persistent. Don't terminate! */
RtlLeaveCriticalSection(&ThreadPoolLock);
Timeout.QuadPart = -0x7FFFFFFFFFFFFFFFLL;
goto Wait;
}
/* FIXME - figure out an effective method to determine if it's appropriate to
lower the number of threads. For now let's always terminate if there's
at least one thread and no queued items. */
Terminate = (*((volatile LONG*)&ThreadPoolIOWorkerThreads) - *((volatile LONG*)&ThreadPoolIOWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD) &&
(*((volatile LONG*)&ThreadPoolIOWorkerThreadsRequests) == 0);
if (Terminate)
{
/* Prevent termination as long as IO is pending */
Terminate = !RtlpIsIoPending(ThreadInfo.ThreadHandle);
}
if (Terminate)
{
/* Rundown the thread and unlink it from the list */
InterlockedDecrement(&ThreadPoolIOWorkerThreads);
RemoveEntryList((PLIST_ENTRY)&ThreadInfo.ListEntry);
}
RtlLeaveCriticalSection(&ThreadPoolLock);
if (Terminate)
{
/* Break the infinite loop and terminate */
Status = STATUS_SUCCESS;
break;
}
}
else
{
DPRINT1("Failed to acquire the thread pool lock!!! Status: 0x%x\n", Status);
break;
}
}
NtClose(ThreadInfo.ThreadHandle);
RtlpExitThreadFunc(Status);
return 0;
}
static ULONG
NTAPI
RtlpWorkerThreadProc(IN PVOID Parameter)
{
LARGE_INTEGER Timeout;
BOOLEAN Terminate;
PVOID SystemArgument2;
IO_STATUS_BLOCK IoStatusBlock;
ULONG TimeoutCount = 0;
PKNORMAL_ROUTINE ApcRoutine;
NTSTATUS Status = STATUS_SUCCESS;
if (InterlockedIncrement(&ThreadPoolWorkerThreads) > MAX_WORKERTHREADS)
{
/* Signal initialization completion */
InterlockedExchange((PLONG)Parameter,
1);
/* Oops, too many worker threads... */
RtlpExitThreadFunc(Status);
return 0;
}
/* Signal initialization completion */
InterlockedExchange((PLONG)Parameter,
1);
for (;;)
{
Timeout.QuadPart = -50000000LL; /* Wait for 5 seconds by default */
/* Dequeue a completion message */
Status = NtRemoveIoCompletion(ThreadPoolCompletionPort,
(PVOID*)&ApcRoutine,
&SystemArgument2,
&IoStatusBlock,
&Timeout);
if (Status == STATUS_SUCCESS)
{
TimeoutCount = 0;
_SEH2_TRY
{
/* Call the APC routine */
ApcRoutine(NULL,
(PVOID)IoStatusBlock.Information,
SystemArgument2);
}
_SEH2_EXCEPT(EXCEPTION_EXECUTE_HANDLER)
{
(void)0;
}
_SEH2_END;
}
else
{
Terminate = FALSE;
if (!NT_SUCCESS(RtlEnterCriticalSection(&ThreadPoolLock)))
continue;
/* FIXME - this should be optimized, check if there's requests, etc */
if (Status == STATUS_TIMEOUT)
{
/* FIXME - we might want to optimize this */
if (TimeoutCount++ > 2 &&
*((volatile LONG*)&ThreadPoolWorkerThreads) - *((volatile LONG*)&ThreadPoolWorkerThreadsLongRequests) >= WORKERTHREAD_CREATION_THRESHOLD)
{
Terminate = TRUE;
}
}
else
Terminate = TRUE;
RtlLeaveCriticalSection(&ThreadPoolLock);
if (Terminate)
{
/* Prevent termination as long as IO is pending */
Terminate = !RtlpIsIoPending(NULL);
}
if (Terminate)
{
InterlockedDecrement(&ThreadPoolWorkerThreads);
Status = STATUS_SUCCESS;
break;
}
}
}
RtlpExitThreadFunc(Status);
return 0;
}
/*
* @implemented
*/
NTSTATUS
NTAPI
RtlQueueWorkItem(IN WORKERCALLBACKFUNC Function,
IN PVOID Context OPTIONAL,
IN ULONG Flags)
{
LONG FreeWorkers;
NTSTATUS Status;
PRTLP_WORKITEM WorkItem;
DPRINT("RtlQueueWorkItem(0x%p, 0x%p, 0x%x)\n", Function, Context, Flags);
/* Initialize the thread pool if not already initialized */
if (!IsThreadPoolInitialized())
{
Status = RtlpInitializeThreadPool();
if (!NT_SUCCESS(Status))
return Status;
}
/* Allocate a work item */
WorkItem = RtlAllocateHeap(RtlGetProcessHeap(),
0,
sizeof(RTLP_WORKITEM));
if (WorkItem == NULL)
return STATUS_NO_MEMORY;
WorkItem->Function = Function;
WorkItem->Context = Context;
WorkItem->Flags = Flags;
if (Flags & WT_TRANSFER_IMPERSONATION)
{
Status = RtlpGetImpersonationToken(&WorkItem->TokenHandle);
if (!NT_SUCCESS(Status))
{
DPRINT1("Failed to get impersonation token! Status: 0x%x\n", Status);
goto Cleanup;
}
}
else
WorkItem->TokenHandle = NULL;
Status = RtlEnterCriticalSection(&ThreadPoolLock);
if (NT_SUCCESS(Status))
{
if (Flags & (WT_EXECUTEINIOTHREAD | WT_EXECUTEINUITHREAD | WT_EXECUTEINPERSISTENTIOTHREAD))
{
/* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
FreeWorkers = ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsLongRequests;
if (((Flags & (WT_EXECUTEINPERSISTENTIOTHREAD | WT_EXECUTELONGFUNCTION)) == WT_EXECUTELONGFUNCTION) &&
PersistentIoThread != NULL)
{
/* We shouldn't queue a long function into the persistent IO thread */
FreeWorkers--;
}
/* See if it's a good idea to grow the pool */
if (ThreadPoolIOWorkerThreads < MAX_WORKERTHREADS &&
(FreeWorkers <= 0 || ThreadPoolIOWorkerThreads - ThreadPoolIOWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
{
/* Grow the thread pool */
Status = RtlpStartWorkerThread(RtlpIoWorkerThreadProc);
if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolIOWorkerThreads) != 0)
{
/* We failed to create the thread, but there's at least one there so
we can at least queue the request */
Status = STATUS_SUCCESS;
}
}
if (NT_SUCCESS(Status))
{
/* Queue a IO worker thread */
Status = RtlpQueueIoWorkerThread(WorkItem);
}
}
else
{
/* FIXME - We should optimize the algorithm used to determine whether to grow the thread pool! */
FreeWorkers = ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsLongRequests;
/* See if it's a good idea to grow the pool */
if (ThreadPoolWorkerThreads < MAX_WORKERTHREADS &&
(FreeWorkers <= 0 || ThreadPoolWorkerThreads - ThreadPoolWorkerThreadsRequests < WORKERTHREAD_CREATION_THRESHOLD))
{
/* Grow the thread pool */
Status = RtlpStartWorkerThread(RtlpWorkerThreadProc);
if (!NT_SUCCESS(Status) && *((volatile LONG*)&ThreadPoolWorkerThreads) != 0)
{
/* We failed to create the thread, but there's at least one there so
we can at least queue the request */
Status = STATUS_SUCCESS;
}
}
if (NT_SUCCESS(Status))
{
/* Queue a normal worker thread */
Status = RtlpQueueWorkerThread(WorkItem);
}
}
RtlLeaveCriticalSection(&ThreadPoolLock);
}
if (!NT_SUCCESS(Status))
{
if (WorkItem->TokenHandle != NULL)
{
NtClose(WorkItem->TokenHandle);
}
Cleanup:
RtlFreeHeap(RtlGetProcessHeap(),
0,
WorkItem);
}
return Status;
}
/*
* @unimplemented
*/
NTSTATUS
NTAPI
RtlSetIoCompletionCallback(IN HANDLE FileHandle,
IN PIO_APC_ROUTINE Callback,
IN ULONG Flags)
{
IO_STATUS_BLOCK IoStatusBlock;
FILE_COMPLETION_INFORMATION FileCompletionInfo;
NTSTATUS Status;
DPRINT("RtlSetIoCompletionCallback(0x%p, 0x%p, 0x%x)\n", FileHandle, Callback, Flags);
/* Initialize the thread pool if not already initialized */
if (!IsThreadPoolInitialized())
{
Status = RtlpInitializeThreadPool();
if (!NT_SUCCESS(Status))
return Status;
}
FileCompletionInfo.Port = ThreadPoolCompletionPort;
FileCompletionInfo.Key = (PVOID)Callback;
Status = NtSetInformationFile(FileHandle,
&IoStatusBlock,
&FileCompletionInfo,
sizeof(FileCompletionInfo),
FileCompletionInformation);
return Status;
}
/*
* @implemented
*/