etc/work_queue: abstraction to manage a small pool of IO_WORKITEMs
authorshefty <shefty@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Wed, 27 May 2009 15:59:36 +0000 (15:59 +0000)
committershefty <shefty@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Wed, 27 May 2009 15:59:36 +0000 (15:59 +0000)
Create an abstraction for managing a small pool of IO_WORKITEMs that
can be used to process a queue of work requests at passive level.

To prevent starvation of other work items and ensure fairness of system
threads, only a single work requests is processed each time a work
item is queued.  If more work remains, the work item is requeued.

Using a pool of work items, rather than a single work item, allows for
some parallelism of tasks.

Signed-off-by: Sean Hefty <sean.hefty@intel.com>
git-svn-id: svn://openib.tc.cornell.edu/gen1/trunk@2203 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86

etc/kernel/work_queue.c [new file with mode: 0644]
inc/kernel/work_queue.h [new file with mode: 0644]

diff --git a/etc/kernel/work_queue.c b/etc/kernel/work_queue.c
new file mode 100644 (file)
index 0000000..24af064
--- /dev/null
@@ -0,0 +1,146 @@
+/*\r
+ * Copyright (c) 2009 Intel Corporation. All rights reserved.\r
+ *\r
+ * This software is available to you under the OpenIB.org BSD license\r
+ * below:\r
+ *\r
+ *     Redistribution and use in source and binary forms, with or\r
+ *     without modification, are permitted provided that the following\r
+ *     conditions are met:\r
+ *\r
+ *      - Redistributions of source code must retain the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer.\r
+ *\r
+ *      - Redistributions in binary form must reproduce the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer in the documentation and/or other materials\r
+ *        provided with the distribution.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,\r
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF\r
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV\r
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS\r
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN\r
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\r
+ * SOFTWARE.\r
+ */\r
+\r
+#include <ntddk.h>\r
+#include "work_queue.h"\r
+\r
+\r
+typedef struct _WORK_QUEUE_TASK\r
+{\r
+       WORK_QUEUE                      *pWorkQueue;\r
+       PIO_WORKITEM            pWorkItem;\r
+       int                                     Next;\r
+       int                                     Index;\r
+\r
+}      WORK_QUEUE_TASK;\r
+\r
+\r
+#if (WINVER < _WIN32_WINNT_WIN6)\r
+#define KeQueryActiveProcessorCount(x) KeNumberProcessors\r
+#endif\r
+\r
+\r
+NTSTATUS WorkQueueInit(WORK_QUEUE *pWorkQueue, PDEVICE_OBJECT Device, int TaskCount)\r
+{\r
+       WORK_QUEUE_TASK *task;\r
+       KAFFINITY procs;\r
+       int i;\r
+\r
+       if (TaskCount == 0) {\r
+               TaskCount = KeQueryActiveProcessorCount(&procs);\r
+       }\r
+\r
+       KeInitializeSpinLock(&pWorkQueue->Lock);\r
+       InitializeListHead(&pWorkQueue->List);\r
+       pWorkQueue->TaskCount = TaskCount;\r
+       pWorkQueue->TaskArray = ExAllocatePoolWithTag(NonPagedPool,\r
+                                                                                                 sizeof(WORK_QUEUE_TASK) * (TaskCount + 1),\r
+                                                                                                 'ktqw');\r
+       if (pWorkQueue->TaskArray == NULL) {\r
+               return STATUS_INSUFFICIENT_RESOURCES;\r
+       }\r
+\r
+       for (i = 0; i <= TaskCount; i++) {\r
+               task = &pWorkQueue->TaskArray[i];\r
+               task->pWorkQueue = pWorkQueue;\r
+               task->Index = i;\r
+               task->Next = i + 1;\r
+               if (i > 0) {\r
+                       task->pWorkItem = IoAllocateWorkItem(Device);\r
+                       if (task->pWorkItem == NULL) {\r
+                               goto err;\r
+                       }\r
+               }\r
+       }\r
+       task->Next = 0;\r
+       return STATUS_SUCCESS;\r
+\r
+err:\r
+       while (--i > 0) {\r
+               IoFreeWorkItem(pWorkQueue->TaskArray[i].pWorkItem);\r
+       }\r
+       ExFreePool(pWorkQueue->TaskArray);\r
+       return STATUS_INSUFFICIENT_RESOURCES;\r
+}\r
+\r
+void WorkQueueDestroy(WORK_QUEUE *pWorkQueue)\r
+{\r
+       while (pWorkQueue->TaskCount > 0) {\r
+               IoFreeWorkItem(pWorkQueue->TaskArray[pWorkQueue->TaskCount--].pWorkItem);\r
+       }\r
+       ExFreePool(pWorkQueue->TaskArray);\r
+}\r
+\r
+static VOID WorkQueueHandler(PDEVICE_OBJECT pDevice, void *Context)\r
+{\r
+       WORK_QUEUE *wq;\r
+       WORK_QUEUE_TASK *task = (WORK_QUEUE_TASK *) Context;\r
+       WORK_ENTRY *work;\r
+       LIST_ENTRY *entry;\r
+       KLOCK_QUEUE_HANDLE lockqueue;\r
+       UNREFERENCED_PARAMETER(pDevice);\r
+\r
+       wq = task->pWorkQueue;\r
+       KeAcquireInStackQueuedSpinLock(&wq->Lock, &lockqueue);\r
+\r
+       if (!IsListEmpty(&wq->List)) {\r
+               entry = RemoveHeadList(&wq->List);\r
+               KeReleaseInStackQueuedSpinLock(&lockqueue);\r
+\r
+               work = CONTAINING_RECORD(entry, WORK_ENTRY, Entry);\r
+               work->WorkHandler(work);\r
+\r
+               KeAcquireInStackQueuedSpinLock(&wq->Lock, &lockqueue);\r
+               if (!IsListEmpty(&wq->List)) {\r
+                       KeReleaseInStackQueuedSpinLock(&lockqueue);\r
+                       IoQueueWorkItem(task->pWorkItem, WorkQueueHandler, DelayedWorkQueue, task);\r
+                       return;\r
+               }\r
+       }\r
+\r
+       task->Next = wq->TaskArray[0].Next;\r
+       wq->TaskArray[0].Next = task->Index;\r
+       KeReleaseInStackQueuedSpinLock(&lockqueue);\r
+}\r
+\r
+void WorkQueueInsert(WORK_QUEUE *pWorkQueue, WORK_ENTRY *pWork)\r
+{\r
+       WORK_QUEUE_TASK *task;\r
+       KLOCK_QUEUE_HANDLE lockqueue;\r
+\r
+       KeAcquireInStackQueuedSpinLock(&pWorkQueue->Lock, &lockqueue);\r
+       InsertHeadList(&pWorkQueue->List, &pWork->Entry);\r
+       task = &pWorkQueue->TaskArray[pWorkQueue->TaskArray[0].Next];\r
+       pWorkQueue->TaskArray[0].Next = task->Next;\r
+       KeReleaseInStackQueuedSpinLock(&lockqueue);\r
+\r
+       if (task->Index != 0) {\r
+               IoQueueWorkItem(task->pWorkItem, WorkQueueHandler, DelayedWorkQueue, task);\r
+       }\r
+}\r
diff --git a/inc/kernel/work_queue.h b/inc/kernel/work_queue.h
new file mode 100644 (file)
index 0000000..8873442
--- /dev/null
@@ -0,0 +1,71 @@
+/*\r
+ * Copyright (c) 2009 Intel Corporation. All rights reserved.\r
+ *\r
+ * This software is available to you under the OpenIB.org BSD license\r
+ * below:\r
+ *\r
+ *     Redistribution and use in source and binary forms, with or\r
+ *     without modification, are permitted provided that the following\r
+ *     conditions are met:\r
+ *\r
+ *      - Redistributions of source code must retain the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer.\r
+ *\r
+ *      - Redistributions in binary form must reproduce the above\r
+ *        copyright notice, this list of conditions and the following\r
+ *        disclaimer in the documentation and/or other materials\r
+ *        provided with the distribution.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,\r
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF\r
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AWV\r
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS\r
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN\r
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE\r
+ * SOFTWARE.\r
+ */\r
+\r
+#pragma once\r
+\r
+#ifndef _WORK_QUEUE_H_\r
+#define _WORK_QUEUE_H_\r
+\r
+#include <ntddk.h>\r
+\r
+// Allow overlaying across IRP.Tail.Overlay.DriverContext\r
+typedef struct _WORK_ENTRY\r
+{\r
+       LIST_ENTRY                      Entry;\r
+       void                            (*WorkHandler)(struct _WORK_ENTRY *Work);\r
+       void                            *Context;\r
+\r
+}      WORK_ENTRY;\r
+\r
+static void WorkEntryInit(WORK_ENTRY *pWork,\r
+                                                 void (*WorkHandler)(struct _WORK_ENTRY *Work), void *Context)\r
+{\r
+       pWork->WorkHandler = WorkHandler;\r
+       pWork->Context = Context;\r
+}\r
+\r
+#define WorkEntryFromIrp(pIrp) ((WORK_ENTRY *) (pIrp)->Tail.Overlay.DriverContext)\r
+\r
+struct _WORK_QUEUE_TASK;\r
+\r
+typedef struct _WORK_QUEUE\r
+{\r
+       LIST_ENTRY                                      List;\r
+       KSPIN_LOCK                                      Lock;\r
+       int                                                     TaskCount;\r
+       struct _WORK_QUEUE_TASK         *TaskArray;     // TaskArray[0] is for internal use\r
+\r
+}      WORK_QUEUE;\r
+\r
+NTSTATUS WorkQueueInit(WORK_QUEUE *pWorkQueue, PDEVICE_OBJECT Device,\r
+                                          int TaskCount);\r
+void WorkQueueDestroy(WORK_QUEUE *pWorkQueue);\r
+void WorkQueueInsert(WORK_QUEUE *pWorkQueue, WORK_ENTRY *pWork);\r
+\r
+#endif // _WORK_QUEUE_H_\r