reactos/dll/win32/rpcrt4/rpcrt4_ros.diff
Art Yerkes c501d8112c Create a branch for network fixes.
svn path=/branches/aicom-network-fixes/; revision=34994
2008-08-01 11:32:26 +00:00

376 lines
11 KiB
Diff

Index: rpc_server.c
===================================================================
--- rpc_server.c (working copy)
+++ rpc_server.c (working copy)
@@ -1017,22 +1017,30 @@
/***********************************************************************
* RpcMgmtServerWaitListen (RPCRT4.@)
*/
RPC_STATUS WINAPI RpcMgmtWaitServerListen( void )
{
- TRACE("()\n");
+ RpcServerProtseq *cps;
EnterCriticalSection(&listen_cs);
if (!std_listen) {
LeaveCriticalSection(&listen_cs);
return RPC_S_NOT_LISTENING;
}
+ do
+ {
LeaveCriticalSection(&listen_cs);
- FIXME("not waiting for server calls to finish\n");
+ LIST_FOR_EACH_ENTRY(cps, &protseqs, RpcServerProtseq, entry)
+ WaitForSingleObject(cps->server_ready_event, INFINITE);
+
+ EnterCriticalSection(&listen_cs);
+ } while (!std_listen);
+
+ LeaveCriticalSection(&listen_cs);
return RPC_S_OK;
}
/***********************************************************************
Index: rpc_transport.c
===================================================================
--- rpc_transport.c (working copy)
+++ rpc_transport.c (working copy)
@@ -54,10 +54,13 @@
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
#include "windef.h"
#include "winbase.h"
#include "winnls.h"
#include "winerror.h"
#include "winternl.h"
@@ -71,10 +74,12 @@
#include "rpc_binding.h"
#include "rpc_message.h"
#include "rpc_server.h"
#include "epm_towers.h"
+#include "unix_func.h"
+
#ifndef SOL_TCP
# define SOL_TCP IPPROTO_TCP
#endif
WINE_DEFAULT_DEBUG_CHANNEL(rpc);
@@ -83,11 +88,11 @@
typedef struct _RpcConnection_np
{
RpcConnection common;
HANDLE pipe;
- OVERLAPPED ovl;
+ OVERLAPPED ovl[2];
BOOL listening;
} RpcConnection_np;
static RpcConnection *rpcrt4_conn_np_alloc(void)
{
@@ -105,15 +110,15 @@
{
if (npc->listening)
return RPC_S_OK;
npc->listening = TRUE;
- if (ConnectNamedPipe(npc->pipe, &npc->ovl))
+ if (ConnectNamedPipe(npc->pipe, &npc->ovl[0]))
return RPC_S_OK;
if (GetLastError() == ERROR_PIPE_CONNECTED) {
- SetEvent(npc->ovl.hEvent);
+ SetEvent(npc->ovl[0].hEvent);
return RPC_S_OK;
}
if (GetLastError() == ERROR_IO_PENDING) {
/* will be completed in rpcrt4_protseq_np_wait_for_new_connection */
return RPC_S_OK;
@@ -126,11 +131,11 @@
static RPC_STATUS rpcrt4_conn_create_pipe(RpcConnection *Connection, LPCSTR pname)
{
RpcConnection_np *npc = (RpcConnection_np *) Connection;
TRACE("listening on %s\n", pname);
- npc->pipe = CreateNamedPipeA(pname, PIPE_ACCESS_DUPLEX,
+ npc->pipe = CreateNamedPipeA(pname, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE,
PIPE_UNLIMITED_INSTANCES,
RPC_MAX_PACKET_SIZE, RPC_MAX_PACKET_SIZE, 5000, NULL);
if (npc->pipe == INVALID_HANDLE_VALUE) {
WARN("CreateNamedPipe failed with error %d\n", GetLastError());
@@ -139,11 +144,12 @@
else
return RPC_S_CANT_CREATE_ENDPOINT;
}
memset(&npc->ovl, 0, sizeof(npc->ovl));
- npc->ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+ npc->ovl[0].hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+ npc->ovl[1].hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
/* Note: we don't call ConnectNamedPipe here because it must be done in the
* server thread as the thread must be alertable */
return RPC_S_OK;
}
@@ -202,11 +208,13 @@
/* success */
memset(&npc->ovl, 0, sizeof(npc->ovl));
/* pipe is connected; change to message-read mode. */
dwMode = PIPE_READMODE_MESSAGE;
SetNamedPipeHandleState(pipe, &dwMode, NULL, NULL);
- npc->ovl.hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+ npc->ovl[0].hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+ npc->ovl[1].hEvent = CreateEventW(NULL, TRUE, FALSE, NULL);
+
npc->pipe = pipe;
return RPC_S_OK;
}
@@ -308,11 +316,12 @@
{
/* because of the way named pipes work, we'll transfer the connected pipe
* to the child, then reopen the server binding to continue listening */
new_npc->pipe = old_npc->pipe;
- new_npc->ovl = old_npc->ovl;
+ new_npc->ovl[0] = old_npc->ovl[0];
+ new_npc->ovl[1] = old_npc->ovl[1];
old_npc->pipe = 0;
memset(&old_npc->ovl, 0, sizeof(old_npc->ovl));
old_npc->listening = FALSE;
}
@@ -359,13 +368,18 @@
unsigned int bytes_left = count;
while (bytes_left)
{
DWORD bytes_read;
- ret = ReadFile(npc->pipe, buf, bytes_left, &bytes_read, NULL);
- if (!ret || !bytes_read)
+ ret = ReadFile(npc->pipe, buf, bytes_left, &bytes_read, &npc->ovl[0]);
+ if ((!ret || !bytes_read) && (GetLastError() != ERROR_IO_PENDING))
break;
+
+ ret = GetOverlappedResult(npc->pipe, &npc->ovl[0], &bytes_read, TRUE);
+ if (!ret && GetLastError() != ERROR_MORE_DATA)
+ break;
+
bytes_left -= bytes_read;
buf += bytes_read;
}
return ret ? count : -1;
}
@@ -379,13 +393,18 @@
unsigned int bytes_left = count;
while (bytes_left)
{
DWORD bytes_written;
- ret = WriteFile(npc->pipe, buf, count, &bytes_written, NULL);
- if (!ret || !bytes_written)
+ ret = WriteFile(npc->pipe, buf, count, &bytes_written, &npc->ovl[1]);
+ if ((!ret || !bytes_written) && (GetLastError() != ERROR_IO_PENDING))
break;
+
+ ret = GetOverlappedResult(npc->pipe, &npc->ovl[1], &bytes_written, TRUE);
+ if (!ret && GetLastError() != ERROR_MORE_DATA)
+ break;
+
bytes_left -= bytes_written;
buf += bytes_written;
}
return ret ? count : -1;
}
@@ -396,14 +415,19 @@
if (npc->pipe) {
FlushFileBuffers(npc->pipe);
CloseHandle(npc->pipe);
npc->pipe = 0;
}
- if (npc->ovl.hEvent) {
- CloseHandle(npc->ovl.hEvent);
- npc->ovl.hEvent = 0;
+ if (npc->ovl[0].hEvent) {
+ CloseHandle(npc->ovl[0].hEvent);
+ npc->ovl[0].hEvent = 0;
+ }
+ if (npc->ovl[1].hEvent) {
+ CloseHandle(npc->ovl[1].hEvent);
+ npc->ovl[1].hEvent = 0;
}
+
return 0;
}
static void rpcrt4_conn_np_cancel_call(RpcConnection *Connection)
{
@@ -547,11 +571,11 @@
/* open and count connections */
*count = 1;
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) {
rpcrt4_conn_listen_pipe(conn);
- if (conn->ovl.hEvent)
+ if (conn->ovl[0].hEvent)
(*count)++;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
}
/* make array of connections */
@@ -568,11 +592,11 @@
objs[0] = npps->mgr_event;
*count = 1;
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) {
- if ((objs[*count] = conn->ovl.hEvent))
+ if ((objs[*count] = conn->ovl[0].hEvent))
(*count)++;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
}
LeaveCriticalSection(&protseq->cs);
return objs;
@@ -615,11 +639,11 @@
b_handle = objs[res - WAIT_OBJECT_0];
/* find which connection got a RPC */
EnterCriticalSection(&protseq->cs);
conn = CONTAINING_RECORD(protseq->conn, RpcConnection_np, common);
while (conn) {
- if (b_handle == conn->ovl.hEvent) break;
+ if (b_handle == conn->ovl[0].hEvent) break;
conn = CONTAINING_RECORD(conn->common.Next, RpcConnection_np, common);
}
cconn = NULL;
if (conn)
RPCRT4_SpawnConnection(&cconn, &conn->common);
@@ -713,16 +737,18 @@
RpcConnection_tcp *tcpc;
tcpc = HeapAlloc(GetProcessHeap(), 0, sizeof(RpcConnection_tcp));
if (tcpc == NULL)
return NULL;
tcpc->sock = -1;
+#ifndef __REACTOS__
if (socketpair(PF_UNIX, SOCK_STREAM, 0, tcpc->cancel_fds) < 0)
{
ERR("socketpair() failed: %s\n", strerror(errno));
HeapFree(GetProcessHeap(), 0, tcpc);
return NULL;
}
+#endif
return &tcpc->common;
}
static RPC_STATUS rpcrt4_ncacn_ip_tcp_open(RpcConnection* Connection)
{
@@ -783,12 +809,11 @@
continue;
}
/* RPC depends on having minimal latency so disable the Nagle algorithm */
val = 1;
- setsockopt(sock, SOL_TCP, TCP_NODELAY, &val, sizeof(val));
- fcntl(sock, F_SETFL, O_NONBLOCK); /* make socket nonblocking */
+ setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&val, sizeof(val));
tcpc->sock = sock;
freeaddrinfo(ai);
TRACE("connected\n");
@@ -806,10 +831,11 @@
int sock;
int ret;
struct addrinfo *ai;
struct addrinfo *ai_cur;
struct addrinfo hints;
+ u_long blocking;
RpcConnection *first_connection = NULL;
TRACE("(%p, %s)\n", protseq, endpoint);
hints.ai_flags = AI_PASSIVE /* for non-localhost addresses */;
@@ -857,11 +883,11 @@
ret = bind(sock, ai_cur->ai_addr, ai_cur->ai_addrlen);
if (ret < 0)
{
WARN("bind failed: %s\n", strerror(errno));
close(sock);
- if (errno == EADDRINUSE)
+ if (errno == WSAEADDRINUSE)
status = RPC_S_DUPLICATE_ENDPOINT;
else
status = RPC_S_CANT_CREATE_ENDPOINT;
continue;
}
@@ -886,11 +912,12 @@
}
/* need a non-blocking socket, otherwise accept() has a potential
* race-condition (poll() says it is readable, connection drops,
* and accept() blocks until the next connection comes...)
*/
- ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+ blocking = 1;
+ ret = ioctlsocket(sock, FIONBIO, &blocking);
if (ret < 0)
{
WARN("couldn't make socket non-blocking, error %d\n", ret);
RPCRT4_DestroyConnection(&tcpc->common);
status = RPC_S_OUT_OF_RESOURCES;
@@ -929,10 +956,11 @@
static RPC_STATUS rpcrt4_conn_tcp_handoff(RpcConnection *old_conn, RpcConnection *new_conn)
{
int ret;
struct sockaddr_in address;
socklen_t addrsize;
+ u_long blocking;
RpcConnection_tcp *server = (RpcConnection_tcp*) old_conn;
RpcConnection_tcp *client = (RpcConnection_tcp*) new_conn;
addrsize = sizeof(address);
ret = accept(server->sock, (struct sockaddr*) &address, &addrsize);
@@ -940,11 +968,12 @@
{
ERR("Failed to accept a TCP connection: error %d\n", ret);
return RPC_S_OUT_OF_RESOURCES;
}
/* reset to blocking behaviour */
- fcntl(ret, F_SETFL, 0);
+ blocking = 0;
+ ret = ioctlsocket(ret, FIONBIO, &blocking);
client->sock = ret;
TRACE("Accepted a new TCP connection\n");
return RPC_S_OK;
}
@@ -1187,14 +1216,16 @@
{
RpcServerProtseq_sock *ps = HeapAlloc(GetProcessHeap(), 0, sizeof(*ps));
if (ps)
{
int fds[2];
+ u_long blocking;
if (!socketpair(PF_UNIX, SOCK_DGRAM, 0, fds))
{
- fcntl(fds[0], F_SETFL, O_NONBLOCK);
- fcntl(fds[1], F_SETFL, O_NONBLOCK);
+ blocking = 1;
+ ioctlsocket(fds[0], FIONBIO, &blocking);
+ ioctlsocket(fds[1], FIONBIO, &blocking);
ps->mgr_event_rcv = fds[0];
ps->mgr_event_snd = fds[1];
}
else
{