etc/comp_channel: add completion channel abstraction for scalability
authorshefty <shefty@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Wed, 11 Mar 2009 21:39:31 +0000 (21:39 +0000)
committershefty <shefty@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Wed, 11 Mar 2009 21:39:31 +0000 (21:39 +0000)
Add a new abstraction, the completion channel, capable of de-multiplexing
overlapped completion events among multiple queues.

The completion abstraction consists of 3 main components:

COMP_MANAGER
Maps to an IOCP.  The completion manager tracks all completions on any
of its associated channels.  This allows a user to 'poll' the completion
manager to receive notification when a completion event occurs on any
of the channels, similar to polling a set of fd's.

COMP_CHANNEL
Maps to a queue of completed requests.  A user can 'poll' a single channel
for completions if they are only interest in processing events on that
channel.  Internally, polling on a channel will poll the manager for
completions, but only process those associated with the specified channel.

COMP_EVENT
Maps to an overlapped structure.  Operations needing a completion event
reference this structure.  When the event occurs, it is queued to the
correct channel.

The implementation assumes that only one or a very small number of threads
will ever be trying to process events at any one time.  (Based on existing
applications, this is true.)  The abstraction itself is threadless.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
git-svn-id: svn://openib.tc.cornell.edu/gen1/trunk@2024 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86

etc/user/comp_channel.cpp [new file with mode: 0644]
inc/user/comp_channel.h [new file with mode: 0644]

diff --git a/etc/user/comp_channel.cpp b/etc/user/comp_channel.cpp
new file mode 100644 (file)
index 0000000..7a3213d
--- /dev/null
@@ -0,0 +1,197 @@
+/*\r
+ * Copyright (c) 2008, 2009 Intel Corporation.  All rights reserved.\r
+ *\r
+ * This software is available to you under the OpenIB.org BSD license\r
+ * below:\r
+ *\r
+ *     Redistribution and use in source and binary forms, with or\r
+ *     without modification, are permitted provided that the following\r
+ *     conditions are met:\r
+ *\r
+ *      - Redistributions of source code must retain the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer.\r
+ *\r
+ *      - Redistributions in binary form must reproduce the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer in the documentation and/or other materials\r
+ *        provided with the distribution.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,\r
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF\r
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV\r
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS\r
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN\r
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\r
+ * SOFTWARE.\r
+ */\r
+\r
+#include <comp_channel.h>\r
+\r
+static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
+\r
+DWORD CompManagerOpen(COMP_MANAGER *pMgr)\r
+{\r
+       pMgr->CompQueue = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, -1);\r
+       if (pMgr->CompQueue == NULL) {\r
+               return GetLastError();\r
+       }\r
+\r
+       pMgr->Event = CreateEvent(NULL, TRUE, TRUE, NULL);\r
+       if (pMgr->Event == NULL) {\r
+               return GetLastError();\r
+       }\r
+\r
+       pMgr->Lock = 0;\r
+       return 0;\r
+}\r
+\r
+void CompManagerClose(COMP_MANAGER *pMgr)\r
+{\r
+       CloseHandle(pMgr->CompQueue);\r
+       CloseHandle(pMgr->Event);\r
+}\r
+\r
+DWORD CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key)\r
+{\r
+       HANDLE cq;\r
+\r
+       cq = CreateIoCompletionPort(hFile, pMgr->CompQueue, Key, 0);\r
+       return (cq == NULL) ? GetLastError() : 0;\r
+}\r
+\r
+DWORD CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,\r
+                                         COMP_CHANNEL **ppChannel)\r
+{\r
+       COMP_ENTRY *entry;\r
+       OVERLAPPED *overlap;\r
+       DWORD bytes, ret;\r
+       ULONG_PTR key;\r
+\r
+       if (GetQueuedCompletionStatus(pMgr->CompQueue, &bytes, &key, &overlap,\r
+                                                                 Milliseconds)) {\r
+               entry = CONTAINING_RECORD(overlap, COMP_ENTRY, Overlap);\r
+               *ppChannel = entry->Channel;\r
+               CompChannelQueue(entry->Channel, entry);\r
+               ret = 0;\r
+       } else {\r
+               ret = GetLastError();\r
+       }\r
+       return ret;\r
+}\r
+\r
+\r
+void CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel, DWORD Milliseconds)\r
+{\r
+       pChannel->Manager = pMgr;\r
+       pChannel->Head = NULL;\r
+       pChannel->TailPtr = &pChannel->Head;\r
+       InitializeCriticalSection(&pChannel->Lock);\r
+       pChannel->Milliseconds = Milliseconds;\r
+}\r
+\r
+void CompChannelCleanup(COMP_CHANNEL *pChannel)\r
+{\r
+       DeleteCriticalSection(&pChannel->Lock); \r
+}\r
+\r
+static void CompChannelInsertTail(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+{\r
+       *pChannel->TailPtr = pEntry;\r
+       pChannel->TailPtr = &pEntry->Next;\r
+}\r
+\r
+static COMP_ENTRY *CompChannelRemoveHead(COMP_CHANNEL *pChannel)\r
+{\r
+       COMP_ENTRY *entry;\r
+\r
+       entry = pChannel->Head;\r
+       pChannel->Head = entry->Next;\r
+       if (pChannel->Head == NULL) {\r
+               pChannel->TailPtr = &pChannel->Head;\r
+       }\r
+       return entry;\r
+}\r
+\r
+static void CompChannelQueue(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+{\r
+       pEntry->Next = NULL;\r
+       EnterCriticalSection(&pChannel->Lock);\r
+       CompChannelInsertTail(pChannel, pEntry);\r
+       LeaveCriticalSection(&pChannel->Lock);\r
+}\r
+\r
+DWORD CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry)\r
+{\r
+       COMP_MANAGER *mgr = pChannel->Manager;\r
+       COMP_CHANNEL *chan;\r
+       DWORD ret = 0;\r
+       ULONG locked;\r
+\r
+       EnterCriticalSection(&pChannel->Lock);\r
+       while (pChannel->Head == NULL) {\r
+               LeaveCriticalSection(&pChannel->Lock);\r
+\r
+               locked = InterlockedCompareExchange(&mgr->Lock, 1, 0);\r
+               if (locked == 0) {\r
+                       ResetEvent(mgr->Event);\r
+                       ret = CompManagerPoll(mgr, pChannel->Milliseconds, &chan);\r
+                       InterlockedExchange(&mgr->Lock, 0);\r
+                       SetEvent(mgr->Event);\r
+               } else {\r
+                       ret = WaitForSingleObject(mgr->Event, pChannel->Milliseconds);\r
+               }\r
+               if (ret) {\r
+                       goto out;\r
+               }\r
+\r
+               EnterCriticalSection(&pChannel->Lock);\r
+       }\r
+       *ppEntry = CompChannelRemoveHead(pChannel);\r
+       LeaveCriticalSection(&pChannel->Lock);\r
+\r
+out:\r
+       return ret;\r
+}\r
+\r
+void CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+{\r
+       COMP_CHANNEL *chan;\r
+       COMP_ENTRY **entry_ptr;\r
+       DWORD ret;\r
+\r
+       do {\r
+               ret = CompManagerPoll(pChannel->Manager, 0, &chan);\r
+       } while (!ret);\r
+       SetEvent(pChannel->Manager->Event);\r
+\r
+       EnterCriticalSection(&pChannel->Lock);\r
+       entry_ptr = &pChannel->Head;\r
+       while (*entry_ptr && *entry_ptr != pEntry) {\r
+               entry_ptr = &(*entry_ptr)->Next;\r
+       }\r
+\r
+       if (*entry_ptr != NULL) {\r
+               *entry_ptr = pEntry->Next;\r
+               if (pChannel->TailPtr == &pEntry->Next) {\r
+                       pChannel->TailPtr = entry_ptr;\r
+               }\r
+       }\r
+       LeaveCriticalSection(&pChannel->Lock);\r
+}\r
+\r
+void CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry)\r
+{\r
+       pEntry->Channel = pChannel;\r
+}\r
+\r
+DWORD CompEntryPost(COMP_ENTRY *pEntry)\r
+{\r
+       if (PostQueuedCompletionStatus(pEntry->Channel->Manager->CompQueue, 0, 0,\r
+                                                                  &pEntry->Overlap)) {\r
+               return 0;\r
+       } else {\r
+               return GetLastError();\r
+       }\r
+}\r
diff --git a/inc/user/comp_channel.h b/inc/user/comp_channel.h
new file mode 100644 (file)
index 0000000..f9789d5
--- /dev/null
@@ -0,0 +1,86 @@
+/*\r
+ * Copyright (c) 2009 Intel Corp., Inc.  All rights reserved.\r
+ *\r
+ * This software is available to you under the OpenIB.org BSD license\r
+ * below:\r
+ *\r
+ *     Redistribution and use in source and binary forms, with or\r
+ *     without modification, are permitted provided that the following\r
+ *     conditions are met:\r
+ *\r
+ *      - Redistributions of source code must retain the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer.\r
+ *\r
+ *      - Redistributions in binary form must reproduce the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer in the documentation and/or other materials\r
+ *        provided with the distribution.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,\r
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF\r
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV\r
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS\r
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN\r
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\r
+ * SOFTWARE.\r
+ */\r
+\r
+#pragma once\r
+\r
+#ifndef COMP_CHANNEL_H\r
+#define COMP_CHANNEL_H\r
+\r
+#include <windows.h>\r
+\r
+#ifdef __cplusplus\r
+extern "C" {\r
+#endif\r
+\r
+typedef struct _COMP_ENTRY\r
+{\r
+       struct _COMP_ENTRY              *Next;\r
+       OVERLAPPED                              Overlap;\r
+       struct _COMP_CHANNEL    *Channel;\r
+\r
+}      COMP_ENTRY;\r
+\r
+typedef struct _COMP_CHANNEL\r
+{\r
+       struct _COMP_MANAGER    *Manager;\r
+       COMP_ENTRY                              *Head;\r
+       COMP_ENTRY                              **TailPtr;\r
+       CRITICAL_SECTION                Lock;\r
+       DWORD                                   Milliseconds;\r
+\r
+}      COMP_CHANNEL;\r
+\r
+typedef struct _COMP_MANAGER\r
+{\r
+       HANDLE                                  CompQueue;\r
+       HANDLE                                  Event;\r
+       LONG volatile                   Lock;\r
+\r
+}      COMP_MANAGER;\r
+\r
+DWORD          CompManagerOpen(COMP_MANAGER *pMgr);\r
+void           CompManagerClose(COMP_MANAGER *pMgr);\r
+DWORD          CompManagerMonitor(COMP_MANAGER *pMgr, HANDLE hFile, ULONG_PTR Key);\r
+DWORD          CompManagerPoll(COMP_MANAGER *pMgr, DWORD Milliseconds,\r
+                                                       COMP_CHANNEL **ppChannel);\r
+\r
+void           CompChannelInit(COMP_MANAGER *pMgr, COMP_CHANNEL *pChannel,\r
+                                                       DWORD Milliseconds);\r
+void           CompChannelCleanup(COMP_CHANNEL *pChannel);\r
+DWORD          CompChannelPoll(COMP_CHANNEL *pChannel, COMP_ENTRY **ppEntry);\r
+void           CompChannelRemoveEntry(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
+\r
+void           CompEntryInit(COMP_CHANNEL *pChannel, COMP_ENTRY *pEntry);\r
+DWORD          CompEntryPost(COMP_ENTRY *pEntry);\r
+\r
+#ifdef __cplusplus\r
+}\r
+#endif\r
+\r
+#endif /* COMP_CHANNEL_H */\r