etc/comp_channel: fix lost event problem
authorshefty <shefty@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Thu, 23 Apr 2009 19:25:04 +0000 (19:25 +0000)
committershefty <shefty@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Thu, 23 Apr 2009 19:25:04 +0000 (19:25 +0000)
The previous version of the completion channel was racy and would
occasionally lose events, resulting in users blocking indefinitely
if no new events occurred.  The most sure fix for this is to add
a thread to the completion manager that reaps events from an IO
completion port and dispatches them to the correct completion
channel.  This results in a 1-2% performance hit in libibverbs
bandwidth tests that wait on CQ, but actually works.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
git-svn-id: svn://openib.tc.cornell.edu/gen1/trunk@2127 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86

etc/user/comp_channel.cpp
inc/user/comp_channel.h

index 7a3213d..613189a 100644 (file)
  */\r
 \r
 #include <comp_channel.h>\r
+#include <process.h>\r
 \r
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry);\r
 static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
 \r
+\r
+/*\r
+ * Completion manager\r
+ */\r
+\r
+static unsigned __stdcall CompThreadPoll(void *Context)\r
+{\r
+       COMP_MANAGER *mgr = (COMP_MANAGER *) Context;\r
+       COMP_ENTRY *entry;\r
+       OVERLAPPED *overlap;\r
+       DWORD bytes;\r
+       ULONG_PTR key;\r
+\r
+       while (mgr->Run) {\r
+               GetQueuedCompletionStatus(mgr->CompQueue, &bytes, &key,\r
+                                                                 &overlap, INFINITE);\r
+               entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);\r
+\r
+               if (entry->Channel) {\r
+                       CompChannelQueue(entry->Channel, entry);\r
+               } else {\r
+                       CompManagerQueue(mgr, entry);\r
+               }\r
+       }\r
+\r
+       _endthreadex(0);\r
+       return 0;\r
+}\r
+\r
 DWORD CompManagerOpen(COMP_MANAGER *pMgr)\r
 {\r
+       DWORD ret;\r
+\r
+       InitializeCriticalSection(&pMgr->Lock);\r
+       pMgr->Busy = 0;\r
+       DListInit(&pMgr->DoneList);\r
+       CompEntryInit(NULL, &pMgr->Entry);\r
+\r
        pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1);\r
        if (pMgr->CompQueue == NULL) {\r
-               return GetLastError();\r
+               ret = GetLastError();\r
+               goto err1;\r
        }\r
 \r
        pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);\r
        if (pMgr->Event == NULL) {\r
-               return GetLastError();\r
+               ret = GetLastError();\r
+               goto err2;\r
        }\r
 \r
-       pMgr->Lock = 0;\r
+       pMgr->Run = TRUE;\r
+       pMgr->Thread = (HANDLE) _beginthreadex(NULL, 0, CompThreadPoll, pMgr, 0, NULL);\r
+       if (pMgr->Thread == NULL) {\r
+               ret = GetLastError();\r
+               goto err3;\r
+       }\r
        return 0;\r
+\r
+err3:\r
+       CloseHandle(pMgr->Event);\r
+err2:\r
+       CloseHandle(pMgr->CompQueue);\r
+err1:\r
+       DeleteCriticalSection(&pMgr->Lock);     \r
+       return ret;\r
 }\r
 \r
 void CompManagerClose(COMP_MANAGER *pMgr)\r
 {\r
+       pMgr->Run = FALSE;\r
+       CompManagerCancel(pMgr);\r
+       WaitForSingleObject(pMgr->Thread, INFINITE);\r
+       CloseHandle(pMgr->Thread);\r
+\r
        CloseHandle(pMgr->CompQueue);\r
        CloseHandle(pMgr->Event);\r
+       DeleteCriticalSection(&pMgr->Lock);     \r
 }\r
 \r
 DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)\r
@@ -61,38 +120,85 @@ DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)
        return (cq == NULL) ? GetLastError() : 0;\r
 }\r
 \r
+static void CompManagerQueue(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)\r
+{\r
+       EnterCriticalSection(&pMgr->Lock);\r
+       DListInsertTail(&pEntry->MgrEntry, &pMgr->DoneList);\r
+       SetEvent(pMgr->Event);\r
+       LeaveCriticalSection(&pMgr->Lock);\r
+}\r
+\r
+static void CompManagerRemoveEntry(COMP_MANAGER *pMgr, COMP_ENTRY *pEntry)\r
+{\r
+       EnterCriticalSection(&pMgr->Lock);\r
+       DListRemove(&pEntry->MgrEntry);\r
+       LeaveCriticalSection(&pMgr->Lock);\r
+}\r
+\r
 DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,\r
                                          COMP_CHANNEL **ppChannel)\r
 {\r
        COMP_ENTRY *entry;\r
-       OVERLAPPED *overlap;\r
-       DWORD bytes, ret;\r
-       ULONG_PTR key;\r
+       DWORD ret = 0;\r
 \r
-       if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,\r
-                                                                 Milliseconds)) {\r
-               entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);\r
-               *ppChannel = entry->Channel;\r
-               CompChannelQueue(entry->Channel, entry);\r
-               ret = 0;\r
-       } else {\r
-               ret = GetLastError();\r
+       EnterCriticalSection(&pMgr->Lock);\r
+       while (DListEmpty(&pMgr->DoneList)) {\r
+               ResetEvent(pMgr->Event);\r
+               LeaveCriticalSection(&pMgr->Lock);\r
+       \r
+               ret = WaitForSingleObject(pMgr->Event, Milliseconds);\r
+               if (ret) {\r
+                       return ret;\r
+               }\r
+\r
+               EnterCriticalSection(&pMgr->Lock);\r
        }\r
+\r
+       entry = CONTAINING_RECORD(pMgr->DoneList.Next, COMP_ENTRY, MgrEntry);\r
+       *ppChannel = entry->Channel;\r
+       if (entry->Channel == NULL) {\r
+               DListRemove(&entry->MgrEntry);\r
+               InterlockedExchange(&entry->Busy, 0);\r
+               ret = ERROR_CANCELLED;\r
+       }\r
+       LeaveCriticalSection(&pMgr->Lock);\r
+\r
        return ret;\r
 }\r
 \r
+void CompManagerCancel(COMP_MANAGER *pMgr)\r
+{\r
+       if (InterlockedCompareExchange(&pMgr->Entry.Busy, 1, 0) == 0) {\r
+               PostQueuedCompletionStatus(pMgr->CompQueue, 0, (ULONG_PTR) pMgr,\r
+                                                                  &pMgr->Entry.Overlap);\r
+       }\r
+}\r
+\r
+\r
+/*\r
+ * Completion channel\r
+ */\r
 \r
-void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)\r
+DWORD CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)\r
 {\r
        pChannel->Manager = pMgr;\r
        pChannel->Head = NULL;\r
        pChannel->TailPtr = &pChannel->Head;\r
-       InitializeCriticalSection(&pChannel->Lock);\r
        pChannel->Milliseconds = Milliseconds;\r
+\r
+       pChannel->Event = CreateEvent(NULL, TRUE, TRUE, NULL);\r
+       if (pChannel->Event == NULL) {\r
+               return GetLastError();\r
+       }\r
+\r
+       InitializeCriticalSection(&pChannel->Lock);\r
+       CompEntryInit(pChannel, &pChannel->Entry);\r
+       return 0;\r
 }\r
 \r
 void CompChannelCleanup(COMP_CHANNEL *pChannel)\r
 {\r
+       CloseHandle(pChannel->Event);\r
        DeleteCriticalSection(&pChannel->Lock); \r
 }\r
 \r
@@ -114,84 +220,101 @@ static COMP_ENTRY *CompChannelRemoveHead(COMP_CHANNEL *pChannel)
        return entry;\r
 }\r
 \r
+static COMP_ENTRY *CompChannelFindRemove(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+{\r
+       COMP_ENTRY **entry_ptr, *entry;\r
+\r
+       EnterCriticalSection(&pChannel->Lock);\r
+       entry_ptr = &pChannel->Head;\r
+       while (*entry_ptr && *entry_ptr != pEntry) {\r
+               entry_ptr = &(*entry_ptr)->Next;\r
+       }\r
+\r
+       if (*entry_ptr != NULL) {\r
+               *entry_ptr = pEntry->Next;\r
+               if (pChannel->TailPtr == &pEntry->Next) {\r
+                       pChannel->TailPtr = entry_ptr;\r
+               }\r
+               CompManagerRemoveEntry(pChannel->Manager, pEntry);\r
+               InterlockedExchange(&pEntry->Busy, 0);\r
+       }\r
+       LeaveCriticalSection(&pChannel->Lock);\r
+       return *entry_ptr;\r
+}\r
+\r
 static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
 {\r
        pEntry->Next = NULL;\r
        EnterCriticalSection(&pChannel->Lock);\r
+       CompManagerQueue(pChannel->Manager, pEntry);\r
        CompChannelInsertTail(pChannel, pEntry);\r
+       SetEvent(pChannel->Event);\r
        LeaveCriticalSection(&pChannel->Lock);\r
 }\r
 \r
 DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)\r
 {\r
-       COMP_MANAGER *mgr = pChannel->Manager;\r
-       COMP_CHANNEL *chan;\r
-       DWORD ret = 0;\r
-       ULONG locked;\r
+       COMP_ENTRY *entry;\r
+       DWORD ret;\r
 \r
        EnterCriticalSection(&pChannel->Lock);\r
        while (pChannel->Head == NULL) {\r
+               ResetEvent(pChannel->Event);\r
                LeaveCriticalSection(&pChannel->Lock);\r
 \r
-               locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);\r
-               if (locked == 0) {\r
-                       ResetEvent(mgr->Event);\r
-                       ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan);\r
-                       InterlockedExchange(&mgr->Lock, 0);\r
-                       SetEvent(mgr->Event);\r
-               } else {\r
-                       ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds);\r
-               }\r
+               ret = WaitForSingleObject(pChannel->Event, pChannel->Milliseconds);\r
                if (ret) {\r
-                       goto out;\r
+                       return ret;\r
                }\r
 \r
                EnterCriticalSection(&pChannel->Lock);\r
        }\r
-       *ppEntry = CompChannelRemoveHead(pChannel);\r
+       entry = CompChannelRemoveHead(pChannel);\r
+       CompManagerRemoveEntry(pChannel->Manager, entry);\r
        LeaveCriticalSection(&pChannel->Lock);\r
 \r
-out:\r
+       InterlockedExchange(&entry->Busy, 0);\r
+       *ppEntry = entry;\r
+       ret = (entry == &pChannel->Entry) ? ERROR_CANCELLED : 0;\r
+\r
        return ret;\r
 }\r
 \r
-void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+void CompChannelCancel(COMP_CHANNEL *pChannel)\r
 {\r
-       COMP_CHANNEL *chan;\r
-       COMP_ENTRY **entry_ptr;\r
-       DWORD ret;\r
-\r
-       do {\r
-               ret = CompManagerPoll(pChannel->Manager, 0, &chan);\r
-       } while (!ret);\r
-       SetEvent(pChannel->Manager->Event);\r
-\r
-       EnterCriticalSection(&pChannel->Lock);\r
-       entry_ptr = &pChannel->Head;\r
-       while (*entry_ptr && *entry_ptr != pEntry) {\r
-               entry_ptr = &(*entry_ptr)->Next;\r
-       }\r
-\r
-       if (*entry_ptr != NULL) {\r
-               *entry_ptr = pEntry->Next;\r
-               if (pChannel->TailPtr == &pEntry->Next) {\r
-                       pChannel->TailPtr = entry_ptr;\r
-               }\r
+       if (InterlockedCompareExchange(&pChannel->Entry.Busy, 1, 0) == 0) {\r
+               PostQueuedCompletionStatus(pChannel->Manager->CompQueue, 0,\r
+                                                                  (ULONG_PTR) pChannel, &pChannel->Entry.Overlap);\r
        }\r
-       LeaveCriticalSection(&pChannel->Lock);\r
 }\r
 \r
+\r
+/*\r
+ * Completion entry\r
+ */\r
+\r
 void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
 {\r
+       RtlZeroMemory(pEntry, sizeof *pEntry);\r
        pEntry->Channel = pChannel;\r
 }\r
 \r
 DWORD CompEntryPost(COMP_ENTRY *pEntry)\r
 {\r
-       if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0,\r
-                                                                  &pEntry->Overlap)) {\r
-               return 0;\r
-       } else {\r
-               return GetLastError();\r
+       if (InterlockedCompareExchange(&pEntry->Busy, 1, 0) == 0) {\r
+               if (!PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue,\r
+                                                                               0, 0, &pEntry->Overlap)) {\r
+                       InterlockedExchange(&pEntry->Busy, 0);\r
+                       return GetLastError();\r
+               }\r
+       }\r
+       return 0;\r
+}\r
+\r
+void CompEntryCancel(COMP_ENTRY *pEntry)\r
+{\r
+       while (pEntry->Busy) {\r
+               Sleep(0);\r
+               CompChannelFindRemove(pEntry->Channel, pEntry);\r
        }\r
 }\r
index f9789d5..a7a62c5 100644 (file)
@@ -33,6 +33,7 @@
 #define COMP_CHANNEL_H\r
 \r
 #include <windows.h>\r
+#include <dlist.h>\r
 \r
 #ifdef __cplusplus\r
 extern "C" {\r
@@ -41,8 +42,10 @@ extern "C" {
 typedef struct _COMP_ENTRY\r
 {\r
        struct _COMP_ENTRY              *Next;\r
+       DLIST_ENTRY                             MgrEntry;\r
        OVERLAPPED                              Overlap;\r
        struct _COMP_CHANNEL    *Channel;\r
+       LONG volatile                   Busy;\r
 \r
 }      COMP_ENTRY;\r
 \r
@@ -51,6 +54,8 @@ typedef struct _COMP_CHANNEL
        struct _COMP_MANAGER    *Manager;\r
        COMP_ENTRY                              *Head;\r
        COMP_ENTRY                              **TailPtr;\r
+       COMP_ENTRY                              Entry;\r
+       HANDLE                                  Event;\r
        CRITICAL_SECTION                Lock;\r
        DWORD                                   Milliseconds;\r
 \r
@@ -59,8 +64,13 @@ typedef struct _COMP_CHANNEL
 typedef struct _COMP_MANAGER\r
 {\r
        HANDLE                                  CompQueue;\r
+       DLIST_ENTRY                             DoneList;\r
+       COMP_ENTRY                              Entry;\r
+       HANDLE                                  Thread;\r
+       BOOL                                    Run;\r
        HANDLE                                  Event;\r
-       LONG volatile                   Lock;\r
+       LONG volatile                   Busy;\r
+       CRITICAL_SECTION                Lock;\r
 \r
 }      COMP_MANAGER;\r
 \r
@@ -69,15 +79,17 @@ void                CompManagerClose(COMP_MANAGER *pMgr);
 DWORD          CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key);\r
 DWORD          CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,\r
                                                        COMP_CHANNEL **ppChannel);\r
+void           CompManagerCancel(COMP_MANAGER *pMgr);\r
 \r
-void           CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,\r
+DWORD          CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,\r
                                                        DWORD Milliseconds);\r
 void           CompChannelCleanup(COMP_CHANNEL *pChannel);\r
 DWORD          CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry);\r
-void           CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
+void           CompChannelCancel(COMP_CHANNEL *pChannel);\r
 \r
 void           CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
 DWORD          CompEntryPost(COMP_ENTRY *pEntry);\r
+void           CompEntryCancel(COMP_ENTRY *pEntry);\r
 \r
 #ifdef __cplusplus\r
 }\r