[DAPL] match WinOF 2.2 release.
authorstansmith <stansmith@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Thu, 18 Mar 2010 19:30:11 +0000 (19:30 +0000)
committerstansmith <stansmith@ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86>
Thu, 18 Mar 2010 19:30:11 +0000 (19:30 +0000)
git-svn-id: svn://openib.tc.cornell.edu/gen1/trunk@2754 ad392aa1-c5ef-ae45-8dd8-e69d62a5ef86

ulp/dapl2/dapl/openib_scm/cm.c

index 6958b67..69f3181 100644 (file)
-/*
- * This Software is licensed under one of the following licenses:
- *
- * 1) under the terms of the "Common Public License 1.0" a copy of which is
- *    available from the Open Source Initiative, see
- *    http://www.opensource.org/licenses/cpl.php.
- *
- * 2) under the terms of the "The BSD License" a copy of which is
- *    available from the Open Source Initiative, see
- *    http://www.opensource.org/licenses/bsd-license.php.
- *
- * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
- *    copy of which is available from the Open Source Initiative, see
- *    http://www.opensource.org/licenses/gpl-license.php.
- *
- * Licensee has the right to choose one of the above licenses.
- *
- * Redistributions of source code must retain the above copyright
- * notice and one of the license notices.
- *
- * Redistributions in binary form must reproduce both the above copyright
- * notice, one of the license notices in the documentation
- * and/or other materials provided with the distribution.
- */
-
-/***************************************************************************
- *
- *   Module:            uDAPL
- *
- *   Filename:          dapl_ib_cm.c
- *
- *   Author:            Arlin Davis
- *
- *   Created:           3/10/2005
- *
- *   Description: 
- *
- *   The uDAPL openib provider - connection management
- *
- ****************************************************************************
- *                Source Control System Information
- *
- *    $Id: $
- *
- *     Copyright (c) 2005 Intel Corporation.  All rights reserved.
- *
- **************************************************************************/
-
-#if defined(_WIN32)
-#define FD_SETSIZE 1024
-#define DAPL_FD_SETSIZE FD_SETSIZE
-#endif
-
-#include "dapl.h"
-#include "dapl_adapter_util.h"
-#include "dapl_evd_util.h"
-#include "dapl_cr_util.h"
-#include "dapl_name_service.h"
-#include "dapl_ib_util.h"
-#include "dapl_ep_util.h"
-#include "dapl_osd.h"
-
-#if defined(_WIN32) || defined(_WIN64)
-enum DAPL_FD_EVENTS {
-       DAPL_FD_READ = 0x1,
-       DAPL_FD_WRITE = 0x2,
-       DAPL_FD_ERROR = 0x4
-};
-
-static int dapl_config_socket(DAPL_SOCKET s)
-{
-       unsigned long nonblocking = 1;
-       int ret, opt = 1;
-
-       ret = ioctlsocket(s, FIONBIO, &nonblocking);
-
-       /* no delay for small packets */
-       if (!ret)
-               ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, 
-                                (char *)&opt, sizeof(opt));
-       return ret;
-}
-
-static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,
-                              int addrlen)
-{
-       int err;
-
-       err = connect(s, addr, addrlen);
-       if (err == SOCKET_ERROR)
-               err = WSAGetLastError();
-       return (err == WSAEWOULDBLOCK) ? EAGAIN : err;
-}
-
-struct dapl_fd_set {
-       struct fd_set set[3];
-};
-
-static struct dapl_fd_set *dapl_alloc_fd_set(void)
-{
-       return dapl_os_alloc(sizeof(struct dapl_fd_set));
-}
-
-static void dapl_fd_zero(struct dapl_fd_set *set)
-{
-       FD_ZERO(&set->set[0]);
-       FD_ZERO(&set->set[1]);
-       FD_ZERO(&set->set[2]);
-}
-
-static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
-                      enum DAPL_FD_EVENTS event)
-{
-       FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);
-       FD_SET(s, &set->set[2]);
-       return 0;
-}
-
-static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
-{
-       struct fd_set rw_fds;
-       struct fd_set err_fds;
-       struct timeval tv;
-       int ret;
-
-       FD_ZERO(&rw_fds);
-       FD_ZERO(&err_fds);
-       FD_SET(s, &rw_fds);
-       FD_SET(s, &err_fds);
-
-       tv.tv_sec = 0;
-       tv.tv_usec = 0;
-
-       if (event == DAPL_FD_READ)
-               ret = select(1, &rw_fds, NULL, &err_fds, &tv);
-       else
-               ret = select(1, NULL, &rw_fds, &err_fds, &tv);
-
-       if (ret == 0)
-               return 0;
-       else if (ret == SOCKET_ERROR)
-               return DAPL_FD_ERROR;
-       else if (FD_ISSET(s, &rw_fds))
-               return event;
-       else
-               return DAPL_FD_ERROR;
-}
-
-static int dapl_select(struct dapl_fd_set *set)
-{
-       int ret;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep\n");
-       ret = select(0, &set->set[0], &set->set[1], &set->set[2], NULL);
-       dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup\n");
-
-       if (ret == SOCKET_ERROR)
-               dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
-                            " dapl_select: error 0x%x\n", WSAGetLastError());
-
-       return ret;
-}
-
-static int dapl_socket_errno(void)
-{
-       int err;
-
-       err = WSAGetLastError();
-       switch (err) {
-       case WSAEACCES:
-       case WSAEADDRINUSE:
-               return EADDRINUSE;
-       case WSAECONNRESET:
-               return ECONNRESET;
-       default:
-               return err;
-       }
-}
-#else                          // _WIN32 || _WIN64
-enum DAPL_FD_EVENTS {
-       DAPL_FD_READ = POLLIN,
-       DAPL_FD_WRITE = POLLOUT,
-       DAPL_FD_ERROR = POLLERR
-};
-
-static int dapl_config_socket(DAPL_SOCKET s)
-{
-       int ret, opt = 1;
-
-       /* non-blocking */
-       ret = fcntl(s, F_GETFL);
-       if (ret >= 0)
-               ret = fcntl(s, F_SETFL, ret | O_NONBLOCK);
-
-       /* no delay for small packets */
-       if (!ret)
-               ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, 
-                                (char *)&opt, sizeof(opt));
-       return ret;
-}
-
-static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,
-                              int addrlen)
-{
-       int ret;
-
-       ret = connect(s, addr, addrlen);
-
-       return (errno == EINPROGRESS) ? EAGAIN : ret;
-}
-
-struct dapl_fd_set {
-       int index;
-       struct pollfd set[DAPL_FD_SETSIZE];
-};
-
-static struct dapl_fd_set *dapl_alloc_fd_set(void)
-{
-       return dapl_os_alloc(sizeof(struct dapl_fd_set));
-}
-
-static void dapl_fd_zero(struct dapl_fd_set *set)
-{
-       set->index = 0;
-}
-
-static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
-                      enum DAPL_FD_EVENTS event)
-{
-       if (set->index == DAPL_FD_SETSIZE - 1) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",
-                        set->index + 1);
-               return -1;
-       }
-
-       set->set[set->index].fd = s;
-       set->set[set->index].revents = 0;
-       set->set[set->index++].events = event;
-       return 0;
-}
-
-static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
-{
-       struct pollfd fds;
-       int ret;
-
-       fds.fd = s;
-       fds.events = event;
-       fds.revents = 0;
-       ret = poll(&fds, 1, 0);
-       dapl_log(DAPL_DBG_TYPE_THREAD, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",
-                s, ret, fds.revents);
-       if (ret == 0)
-               return 0;
-       else if (ret < 0 || (fds.revents & (POLLERR | POLLHUP | POLLNVAL))) 
-               return DAPL_FD_ERROR;
-       else 
-               return event;
-}
-
-static int dapl_select(struct dapl_fd_set *set)
-{
-       int ret;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: sleep, fds=%d\n", set->index);
-       ret = poll(set->set, set->index, -1);
-       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: wakeup, ret=0x%x\n", ret);
-       return ret;
-}
-
-#define dapl_socket_errno() errno
-#endif
-
-static void dapli_cm_thread_signal(dp_ib_cm_handle_t cm_ptr) 
-{
-       send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);
-}
-
-static void dapli_cm_free(dp_ib_cm_handle_t cm_ptr) 
-{
-       dapl_os_lock(&cm_ptr->lock);
-       cm_ptr->state = DCM_FREE;
-       dapl_os_unlock(&cm_ptr->lock);
-       dapli_cm_thread_signal(cm_ptr);
-}
-
-static void dapli_cm_dealloc(dp_ib_cm_handle_t cm_ptr) 
-{
-       dapl_os_assert(!cm_ptr->ref_count);
-       
-       if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
-               shutdown(cm_ptr->socket, SHUT_RDWR);
-               closesocket(cm_ptr->socket);
-       }
-       if (cm_ptr->ah) 
-               ibv_destroy_ah(cm_ptr->ah);
-       
-       dapl_os_lock_destroy(&cm_ptr->lock);
-       dapl_os_wait_object_destroy(&cm_ptr->event);
-       dapl_os_free(cm_ptr, sizeof(*cm_ptr));
-}
-
-void dapls_cm_acquire(dp_ib_cm_handle_t cm_ptr)
-{
-       dapl_os_lock(&cm_ptr->lock);
-       cm_ptr->ref_count++;
-       dapl_os_unlock(&cm_ptr->lock);
-}
-
-void dapls_cm_release(dp_ib_cm_handle_t cm_ptr)
-{
-       dapl_os_lock(&cm_ptr->lock);
+/*\r
+ * This Software is licensed under one of the following licenses:\r
+ *\r
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is\r
+ *    available from the Open Source Initiative, see\r
+ *    http://www.opensource.org/licenses/cpl.php.\r
+ *\r
+ * 2) under the terms of the "The BSD License" a copy of which is\r
+ *    available from the Open Source Initiative, see\r
+ *    http://www.opensource.org/licenses/bsd-license.php.\r
+ *\r
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a\r
+ *    copy of which is available from the Open Source Initiative, see\r
+ *    http://www.opensource.org/licenses/gpl-license.php.\r
+ *\r
+ * Licensee has the right to choose one of the above licenses.\r
+ *\r
+ * Redistributions of source code must retain the above copyright\r
+ * notice and one of the license notices.\r
+ *\r
+ * Redistributions in binary form must reproduce both the above copyright\r
+ * notice, one of the license notices in the documentation\r
+ * and/or other materials provided with the distribution.\r
+ */\r
+\r
+/***************************************************************************\r
+ *\r
+ *   Module:            uDAPL\r
+ *\r
+ *   Filename:          dapl_ib_cm.c\r
+ *\r
+ *   Author:            Arlin Davis\r
+ *\r
+ *   Created:           3/10/2005\r
+ *\r
+ *   Description: \r
+ *\r
+ *   The uDAPL openib provider - connection management\r
+ *\r
+ ****************************************************************************\r
+ *                Source Control System Information\r
+ *\r
+ *    $Id: $\r
+ *\r
+ *     Copyright (c) 2005 Intel Corporation.  All rights reserved.\r
+ *\r
+ **************************************************************************/\r
+\r
+#if defined(_WIN32)\r
+#define FD_SETSIZE 1024\r
+#define DAPL_FD_SETSIZE FD_SETSIZE\r
+#endif\r
+\r
+#include "dapl.h"\r
+#include "dapl_adapter_util.h"\r
+#include "dapl_evd_util.h"\r
+#include "dapl_cr_util.h"\r
+#include "dapl_name_service.h"\r
+#include "dapl_ib_util.h"\r
+#include "dapl_ep_util.h"\r
+#include "dapl_osd.h"\r
+\r
+#if defined(_WIN32) || defined(_WIN64)\r
+enum DAPL_FD_EVENTS {\r
+       DAPL_FD_READ = 0x1,\r
+       DAPL_FD_WRITE = 0x2,\r
+       DAPL_FD_ERROR = 0x4\r
+};\r
+\r
+static int dapl_config_socket(DAPL_SOCKET s)\r
+{\r
+       unsigned long nonblocking = 1;\r
+       int ret, opt = 1;\r
+\r
+       ret = ioctlsocket(s, FIONBIO, &nonblocking);\r
+\r
+       /* no delay for small packets */\r
+       if (!ret)\r
+               ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, \r
+                                (char *)&opt, sizeof(opt));\r
+       return ret;\r
+}\r
+\r
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,\r
+                              int addrlen)\r
+{\r
+       int err;\r
+\r
+       err = connect(s, addr, addrlen);\r
+       if (err == SOCKET_ERROR)\r
+               err = WSAGetLastError();\r
+       return (err == WSAEWOULDBLOCK) ? EAGAIN : err;\r
+}\r
+\r
+struct dapl_fd_set {\r
+       struct fd_set set[3];\r
+};\r
+\r
+static struct dapl_fd_set *dapl_alloc_fd_set(void)\r
+{\r
+       return dapl_os_alloc(sizeof(struct dapl_fd_set));\r
+}\r
+\r
+static void dapl_fd_zero(struct dapl_fd_set *set)\r
+{\r
+       FD_ZERO(&set->set[0]);\r
+       FD_ZERO(&set->set[1]);\r
+       FD_ZERO(&set->set[2]);\r
+}\r
+\r
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,\r
+                      enum DAPL_FD_EVENTS event)\r
+{\r
+       FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);\r
+       FD_SET(s, &set->set[2]);\r
+       return 0;\r
+}\r
+\r
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)\r
+{\r
+       struct fd_set rw_fds;\r
+       struct fd_set err_fds;\r
+       struct timeval tv;\r
+       int ret;\r
+\r
+       FD_ZERO(&rw_fds);\r
+       FD_ZERO(&err_fds);\r
+       FD_SET(s, &rw_fds);\r
+       FD_SET(s, &err_fds);\r
+\r
+       tv.tv_sec = 0;\r
+       tv.tv_usec = 0;\r
+\r
+       if (event == DAPL_FD_READ)\r
+               ret = select(1, &rw_fds, NULL, &err_fds, &tv);\r
+       else\r
+               ret = select(1, NULL, &rw_fds, &err_fds, &tv);\r
+\r
+       if (ret == 0)\r
+               return 0;\r
+       else if (ret == SOCKET_ERROR)\r
+               return DAPL_FD_ERROR;\r
+       else if (FD_ISSET(s, &rw_fds))\r
+               return event;\r
+       else\r
+               return DAPL_FD_ERROR;\r
+}\r
+\r
+static int dapl_select(struct dapl_fd_set *set)\r
+{\r
+       int ret;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep\n");\r
+       ret = select(0, &set->set[0], &set->set[1], &set->set[2], NULL);\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup\n");\r
+\r
+       if (ret == SOCKET_ERROR)\r
+               dapl_dbg_log(DAPL_DBG_TYPE_THREAD,\r
+                            " dapl_select: error 0x%x\n", WSAGetLastError());\r
+\r
+       return ret;\r
+}\r
+\r
+static int dapl_socket_errno(void)\r
+{\r
+       int err;\r
+\r
+       err = WSAGetLastError();\r
+       switch (err) {\r
+       case WSAEACCES:\r
+       case WSAEADDRINUSE:\r
+               return EADDRINUSE;\r
+       case WSAECONNRESET:\r
+               return ECONNRESET;\r
+       default:\r
+               return err;\r
+       }\r
+}\r
+#else                          // _WIN32 || _WIN64\r
+enum DAPL_FD_EVENTS {\r
+       DAPL_FD_READ = POLLIN,\r
+       DAPL_FD_WRITE = POLLOUT,\r
+       DAPL_FD_ERROR = POLLERR\r
+};\r
+\r
+static int dapl_config_socket(DAPL_SOCKET s)\r
+{\r
+       int ret, opt = 1;\r
+\r
+       /* non-blocking */\r
+       ret = fcntl(s, F_GETFL);\r
+       if (ret >= 0)\r
+               ret = fcntl(s, F_SETFL, ret | O_NONBLOCK);\r
+\r
+       /* no delay for small packets */\r
+       if (!ret)\r
+               ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, \r
+                                (char *)&opt, sizeof(opt));\r
+       return ret;\r
+}\r
+\r
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,\r
+                              int addrlen)\r
+{\r
+       int ret;\r
+\r
+       ret = connect(s, addr, addrlen);\r
+\r
+       return (errno == EINPROGRESS) ? EAGAIN : ret;\r
+}\r
+\r
+struct dapl_fd_set {\r
+       int index;\r
+       struct pollfd set[DAPL_FD_SETSIZE];\r
+};\r
+\r
+static struct dapl_fd_set *dapl_alloc_fd_set(void)\r
+{\r
+       return dapl_os_alloc(sizeof(struct dapl_fd_set));\r
+}\r
+\r
+static void dapl_fd_zero(struct dapl_fd_set *set)\r
+{\r
+       set->index = 0;\r
+}\r
+\r
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,\r
+                      enum DAPL_FD_EVENTS event)\r
+{\r
+       if (set->index == DAPL_FD_SETSIZE - 1) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",\r
+                        set->index + 1);\r
+               return -1;\r
+       }\r
+\r
+       set->set[set->index].fd = s;\r
+       set->set[set->index].revents = 0;\r
+       set->set[set->index++].events = event;\r
+       return 0;\r
+}\r
+\r
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)\r
+{\r
+       struct pollfd fds;\r
+       int ret;\r
+\r
+       fds.fd = s;\r
+       fds.events = event;\r
+       fds.revents = 0;\r
+       ret = poll(&fds, 1, 0);\r
+       dapl_log(DAPL_DBG_TYPE_THREAD, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",\r
+                s, ret, fds.revents);\r
+       if (ret == 0)\r
+               return 0;\r
+       else if (ret < 0 || (fds.revents & (POLLERR | POLLHUP | POLLNVAL))) \r
+               return DAPL_FD_ERROR;\r
+       else \r
+               return event;\r
+}\r
+\r
+static int dapl_select(struct dapl_fd_set *set)\r
+{\r
+       int ret;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: sleep, fds=%d\n", set->index);\r
+       ret = poll(set->set, set->index, -1);\r
+       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: wakeup, ret=0x%x\n", ret);\r
+       return ret;\r
+}\r
+\r
+#define dapl_socket_errno() errno\r
+#endif\r
+\r
+static void dapli_cm_thread_signal(dp_ib_cm_handle_t cm_ptr) \r
+{\r
+       send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);\r
+}\r
+\r
+static void dapli_cm_free(dp_ib_cm_handle_t cm_ptr) \r
+{\r
+       dapl_os_lock(&cm_ptr->lock);\r
+       cm_ptr->state = DCM_FREE;\r
+       dapl_os_unlock(&cm_ptr->lock);\r
+       dapli_cm_thread_signal(cm_ptr);\r
+}\r
+\r
+static void dapli_cm_dealloc(dp_ib_cm_handle_t cm_ptr) \r
+{\r
+       dapl_os_assert(!cm_ptr->ref_count);\r
+       \r
+       if (cm_ptr->socket != DAPL_INVALID_SOCKET) {\r
+               shutdown(cm_ptr->socket, SHUT_RDWR);\r
+               closesocket(cm_ptr->socket);\r
+       }\r
+       if (cm_ptr->ah) \r
+               ibv_destroy_ah(cm_ptr->ah);\r
+       \r
+       dapl_os_lock_destroy(&cm_ptr->lock);\r
+       dapl_os_wait_object_destroy(&cm_ptr->event);\r
+       dapl_os_free(cm_ptr, sizeof(*cm_ptr));\r
+}\r
+\r
+void dapls_cm_acquire(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       dapl_os_lock(&cm_ptr->lock);\r
+       cm_ptr->ref_count++;\r
+       dapl_os_unlock(&cm_ptr->lock);\r
+}\r
+\r
+void dapls_cm_release(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       dapl_os_lock(&cm_ptr->lock);\r
        cm_ptr->ref_count--;\r
        if (cm_ptr->ref_count) {\r
                 dapl_os_unlock(&cm_ptr->lock);\r
@@ -318,1472 +318,1475 @@ void dapls_cm_release(dp_ib_cm_handle_t cm_ptr)
        }\r
        dapl_os_unlock(&cm_ptr->lock);\r
        dapli_cm_dealloc(cm_ptr);\r
-}
-
-static dp_ib_cm_handle_t dapli_cm_alloc(DAPL_EP *ep_ptr)
-{
-       dp_ib_cm_handle_t cm_ptr;
-
-       /* Allocate CM, init lock, and initialize */
-       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
-               return NULL;
-
-       (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
-       if (dapl_os_lock_init(&cm_ptr->lock))
-               goto bail;
-
-       if (dapl_os_wait_object_init(&cm_ptr->event)) {
-               dapl_os_lock_destroy(&cm_ptr->lock);
-               goto bail;
-       }
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->list_entry);
-       dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);
-
-       cm_ptr->msg.ver = htons(DCM_VER);
-       cm_ptr->socket = DAPL_INVALID_SOCKET;
-       dapls_cm_acquire(cm_ptr);
-               
-       /* Link EP and CM */
-       if (ep_ptr != NULL) {
-               dapl_ep_link_cm(ep_ptr, cm_ptr); /* ref++ */
-               cm_ptr->ep = ep_ptr;
-               cm_ptr->hca = ((DAPL_IA *)ep_ptr->param.ia_handle)->hca_ptr;
-       }
-       return cm_ptr;
-bail:
-       dapl_os_free(cm_ptr, sizeof(*cm_ptr));
-       return NULL;
-}
-
-/* queue socket for processing CM work */
-static void dapli_cm_queue(dp_ib_cm_handle_t cm_ptr)
-{
-       /* add to work queue for cr thread processing */
-       dapl_os_lock(&cm_ptr->hca->ib_trans.lock);
-       dapls_cm_acquire(cm_ptr);
-       dapl_llist_add_tail(&cm_ptr->hca->ib_trans.list,
-                           (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry, cm_ptr);
-       dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
-       dapli_cm_thread_signal(cm_ptr);
-}
-
-/* called with local LIST lock */
-static void dapli_cm_dequeue(dp_ib_cm_handle_t cm_ptr)
-{
-       /* Remove from work queue, cr thread processing */
-       dapl_llist_remove_entry(&cm_ptr->hca->ib_trans.list,
-                               (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);
-       dapls_cm_release(cm_ptr);
-}
-
-/* BLOCKING: called from dapl_ep_free, EP link will be last ref */
-void dapls_cm_free(dp_ib_cm_handle_t cm_ptr)
-{
-       dapl_log(DAPL_DBG_TYPE_CM,
-                " cm_free: cm %p %s ep %p refs=%d\n", 
-                cm_ptr, dapl_cm_state_str(cm_ptr->state),
-                cm_ptr->ep, cm_ptr->ref_count);
-       
-       /* free from internal workq, wait until EP is last ref */
-       dapl_os_lock(&cm_ptr->lock);
-       cm_ptr->state = DCM_FREE;
-       while (cm_ptr->ref_count != 1) {
-               dapl_os_unlock(&cm_ptr->lock);
-               dapl_os_sleep_usec(10000);
-               dapl_os_lock(&cm_ptr->lock);
-       }
-       dapl_os_unlock(&cm_ptr->lock);
-
-       /* unlink, dequeue from EP. Final ref so release will destroy */
-       dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);
-}
-
-/*
- * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
- *                 or from ep_free. 
- */
-DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
-{
-       DAT_UINT32 disc_data = htonl(0xdead);
-
-       dapl_os_lock(&cm_ptr->lock);
-       if (cm_ptr->state != DCM_CONNECTED || 
-           cm_ptr->state == DCM_DISCONNECTED) {
-               dapl_os_unlock(&cm_ptr->lock);
-               return DAT_SUCCESS;
-       }
-       cm_ptr->state = DCM_DISCONNECTED;
-       dapl_os_unlock(&cm_ptr->lock);
-       
-       /* send disc date, close socket, schedule destroy */
-       dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0,0,0);
-       send(cm_ptr->socket, (char *)&disc_data, sizeof(disc_data), 0);
-
-       /* disconnect events for RC's only */
-       if (cm_ptr->ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) {
-               if (cm_ptr->ep->cr_ptr) {
-                       dapls_cr_callback(cm_ptr,
-                                         IB_CME_DISCONNECTED,
-                                         NULL, 0, cm_ptr->sp);
-               } else {
-                       dapl_evd_connection_callback(cm_ptr,
-                                                    IB_CME_DISCONNECTED,
-                                                    NULL, 0, cm_ptr->ep);
-               }
-       }
-       
-       /* release from workq */
-       dapli_cm_free(cm_ptr);
-
-       /* scheduled destroy via disconnect clean in callback */
-       return DAT_SUCCESS;
-}
-
-/*
- * ACTIVE: socket connected, send QP information to peer 
- */
-static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
-{
-       int len, exp;
-       struct iovec iov[2];
-       struct dapl_ep *ep_ptr = cm_ptr->ep;
-
-       if (err) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_PENDING: %s ERR %s -> %s %d\n",
-                        err == -1 ? "POLL" : "SOCKOPT",
-                        err == -1 ? strerror(errno) : strerror(err), 
-                        inet_ntoa(((struct sockaddr_in *)
-                               &cm_ptr->addr)->sin_addr), 
-                        ntohs(((struct sockaddr_in *)
-                               &cm_ptr->addr)->sin_port));
-               goto bail;
-       }
-
-       cm_ptr->state = DCM_REP_PENDING;
-
-       /* send qp info and pdata to remote peer */
-       exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
-       iov[0].iov_base = (void *)&cm_ptr->msg;
-       iov[0].iov_len = exp;
-       if (cm_ptr->msg.p_size) {
-               iov[1].iov_base = cm_ptr->msg.p_data;
-               iov[1].iov_len = ntohs(cm_ptr->msg.p_size);
-               len = writev(cm_ptr->socket, iov, 2);
-       } else {
-               len = writev(cm_ptr->socket, iov, 1);
-       }
-
-       if (len != (exp + ntohs(cm_ptr->msg.p_size))) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_PENDING len ERR %s, wcnt=%d(%d) -> %s\n",
-                        strerror(errno), len, 
-                        exp + ntohs(cm_ptr->msg.p_size), 
-                        inet_ntoa(((struct sockaddr_in *)
-                                  ep_ptr->param.
-                                  remote_ia_address_ptr)->sin_addr));
-               goto bail;
-       }
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " CONN_PENDING: sending SRC lid=0x%x,"
-                    " qpn=0x%x, psize=%d\n",
-                    ntohs(cm_ptr->msg.saddr.ib.lid),
-                    ntohl(cm_ptr->msg.saddr.ib.qpn), 
-                    ntohs(cm_ptr->msg.p_size));
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " CONN_PENDING: SRC GID subnet %016llx id %016llx\n",
-                    (unsigned long long)
-                    htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[0]),
-                    (unsigned long long)
-                    htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[8]));
-       return;
-
-bail:
-       /* mark CM object for cleanup */
-       dapli_cm_free(cm_ptr);
-       dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, 0, ep_ptr);
-}
-
-/*
- * ACTIVE: Create socket, connect, defer exchange QP information to CR thread
- * to avoid blocking. 
- */
-static DAT_RETURN
-dapli_socket_connect(DAPL_EP * ep_ptr,
-                    DAT_IA_ADDRESS_PTR r_addr,
-                    DAT_CONN_QUAL r_qual, DAT_COUNT p_size, DAT_PVOID p_data)
-{
-       dp_ib_cm_handle_t cm_ptr;
-       int ret;
-       socklen_t sl;
-       DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
-       DAT_RETURN dat_ret = DAT_INSUFFICIENT_RESOURCES;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",
-                    r_qual, p_size);
-
-       cm_ptr = dapli_cm_alloc(ep_ptr);
-       if (cm_ptr == NULL)
-               return dat_ret;
-
-       /* create, connect, sockopt, and exchange QP information */
-       if ((cm_ptr->socket =
-            socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " connect: socket create ERR %s\n", strerror(errno));
-               goto bail;
-       }
-
-       ret = dapl_config_socket(cm_ptr->socket);
-       if (ret < 0) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " connect: config socket %d ERR %d %s\n",
-                        cm_ptr->socket, ret, strerror(dapl_socket_errno()));
-               dat_ret = DAT_INTERNAL_ERROR;
-               goto bail;
-       }
-
-       /* save remote address */
-       dapl_os_memcpy(&cm_ptr->addr, r_addr, sizeof(*r_addr));
-
-#ifdef DAPL_DBG
-       /* DBG: Active PID [0], PASSIVE PID [2]*/
-       *(uint16_t*)&cm_ptr->msg.resv[0] = htons((uint16_t)dapl_os_getpid()); 
-       *(uint16_t*)&cm_ptr->msg.resv[2] = ((struct sockaddr_in *)&cm_ptr->addr)->sin_port;
-#endif
-       ((struct sockaddr_in *)&cm_ptr->addr)->sin_port = htons(r_qual + 1000);
-       ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *)&cm_ptr->addr,
-                                 sizeof(cm_ptr->addr));
-       if (ret && ret != EAGAIN) {
-               dat_ret = DAT_INVALID_ADDRESS;
-               goto bail;
-       }
-
-       /* REQ: QP info in msg.saddr, IA address in msg.daddr, and pdata */
-       cm_ptr->hca = ia_ptr->hca_ptr;
-       cm_ptr->msg.op = ntohs(DCM_REQ);
-       cm_ptr->msg.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);
-       cm_ptr->msg.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;
-       cm_ptr->msg.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
-       dapl_os_memcpy(&cm_ptr->msg.saddr.ib.gid[0], 
-                      &ia_ptr->hca_ptr->ib_trans.gid, 16);
-       
-       /* get local address information from socket */
-       sl = sizeof(cm_ptr->msg.daddr.so);
-       if (getsockname(cm_ptr->socket, (struct sockaddr *)&cm_ptr->msg.daddr.so, &sl)) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                       " connect getsockname ERROR: %s -> %s r_qual %d\n",
-                       strerror(errno), 
-                       inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
-                       (unsigned int)r_qual);;
-       }
-
-       if (p_size) {
-               cm_ptr->msg.p_size = htons(p_size);
-               dapl_os_memcpy(cm_ptr->msg.p_data, p_data, p_size);
-       }
-
-       /* connected or pending, either way results via async event */
-       if (ret == 0)
-               dapli_socket_connected(cm_ptr, 0);
-       else
-               cm_ptr->state = DCM_CONN_PENDING;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: p_data=%p %p\n",
-                    cm_ptr->msg.p_data, cm_ptr->msg.p_data);
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " connect: %s r_qual %d pending, p_sz=%d, %d %d ...\n",
-                    inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr), 
-                    (unsigned int)r_qual, ntohs(cm_ptr->msg.p_size),
-                    cm_ptr->msg.p_data[0], cm_ptr->msg.p_data[1]);
-
-       /* queue up on work thread */
-       dapli_cm_queue(cm_ptr);
-       return DAT_SUCCESS;
-bail:
-       dapl_log(DAPL_DBG_TYPE_ERR,
-                " connect ERROR: %s -> %s r_qual %d\n",
-                strerror(errno), 
-                inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
-                (unsigned int)r_qual);
-
-       /* Never queued, destroy */
-       dapls_cm_release(cm_ptr);
-       return dat_ret;
-}
-
-/*
- * ACTIVE: exchange QP information, called from CR thread
- */
-static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
-{
-       DAPL_EP *ep_ptr = cm_ptr->ep;
-       int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
-       ib_cm_events_t event = IB_CME_LOCAL_FAILURE;
-       socklen_t sl;
-
-       /* read DST information into cm_ptr, overwrite SRC info */
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: recv peer QP data\n");
-
-       len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, exp, 0);
-       if (len != exp || ntohs(cm_ptr->msg.ver) != DCM_VER) {
-               dapl_log(DAPL_DBG_TYPE_WARN,
-                        " CONN_RTU read: sk %d ERR %s, rcnt=%d, v=%d -> %s PORT L-%x R-%x PID L-%x R-%x\n",
-                        cm_ptr->socket, strerror(errno), len, ntohs(cm_ptr->msg.ver),
-                        inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),
-                        ntohs(((struct sockaddr_in *)&cm_ptr->msg.daddr.so)->sin_port),
-                        ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port),
-                        ntohs(*(uint16_t*)&cm_ptr->msg.resv[0]),
-                        ntohs(*(uint16_t*)&cm_ptr->msg.resv[2]));
-
-               /* Retry; corner case where server tcp stack resets under load */
-               if (dapl_socket_errno() == ECONNRESET) {
-                       closesocket(cm_ptr->socket);
-                       cm_ptr->socket = DAPL_INVALID_SOCKET;
-                       dapli_socket_connect(cm_ptr->ep, (DAT_IA_ADDRESS_PTR)&cm_ptr->addr, 
-                                            ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port) - 1000,
-                                            ntohs(cm_ptr->msg.p_size), &cm_ptr->msg.p_data);
-                       dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);
-                       dapli_cm_free(cm_ptr);
-                       return;
-               }
-               goto bail;
-       }
-
-       /* keep the QP, address info in network order */
-       
-       /* save remote address information, in msg.daddr */
-       dapl_os_memcpy(&ep_ptr->remote_ia_address,
-                      &cm_ptr->msg.daddr.so,
-                      sizeof(union dcm_addr));
-
-       /* save local address information from socket */
-       sl = sizeof(cm_ptr->addr);
-       getsockname(cm_ptr->socket,(struct sockaddr *)&cm_ptr->addr, &sl);
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " CONN_RTU: DST %s %d lid=0x%x,"
-                    " qpn=0x%x, qp_type=%d, psize=%d\n",
-                    inet_ntoa(((struct sockaddr_in *)
-                               &cm_ptr->msg.daddr.so)->sin_addr),
-                    ntohs(((struct sockaddr_in *)
-                               &cm_ptr->msg.daddr.so)->sin_port),
-                    ntohs(cm_ptr->msg.saddr.ib.lid),
-                    ntohl(cm_ptr->msg.saddr.ib.qpn), 
-                    cm_ptr->msg.saddr.ib.qp_type, 
-                    ntohs(cm_ptr->msg.p_size));
-
-       /* validate private data size before reading */
-       if (ntohs(cm_ptr->msg.p_size) > DCM_MAX_PDATA_SIZE) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_RTU read: psize (%d) wrong -> %s\n",
-                        ntohs(cm_ptr->msg.p_size), 
-                        inet_ntoa(((struct sockaddr_in *)
-                                  ep_ptr->param.
-                                  remote_ia_address_ptr)->sin_addr));
-               goto bail;
-       }
-
-       /* read private data into cm_handle if any present */
-       dapl_dbg_log(DAPL_DBG_TYPE_EP," CONN_RTU: read private data\n");
-       exp = ntohs(cm_ptr->msg.p_size);
-       if (exp) {
-               len = recv(cm_ptr->socket, cm_ptr->msg.p_data, exp, 0);
-               if (len != exp) {
-                       dapl_log(DAPL_DBG_TYPE_ERR,
-                                " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
-                                strerror(errno), len,
-                                inet_ntoa(((struct sockaddr_in *)
-                                           ep_ptr->param.
-                                           remote_ia_address_ptr)->sin_addr));
-                       goto bail;
-               }
-       }
-
-       /* check for consumer or protocol stack reject */
-       if (ntohs(cm_ptr->msg.op) == DCM_REP)
-               event = IB_CME_CONNECTED;
-       else if (ntohs(cm_ptr->msg.op) == DCM_REJ_USER) 
-               event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
-       else  
-               event = IB_CME_DESTINATION_REJECT;
-       
-       if (event != IB_CME_CONNECTED) {
-               dapl_log(DAPL_DBG_TYPE_CM,
-                        " CONN_RTU: reject from %s %x\n",
-                        inet_ntoa(((struct sockaddr_in *)
-                                   &cm_ptr->msg.daddr.so)->sin_addr),
-                        ntohs(((struct sockaddr_in *)
-                                &cm_ptr->msg.daddr.so)->sin_port));
-               goto bail;
-       }
-
-       /* modify QP to RTR and then to RTS with remote info */
-       if (dapls_modify_qp_state(ep_ptr->qp_handle,
-                                 IBV_QPS_RTR, 
-                                 cm_ptr->msg.saddr.ib.qpn,
-                                 cm_ptr->msg.saddr.ib.lid,
-                                 (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_RTU: QPS_RTR ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",
-                        strerror(errno), ep_ptr->qp_handle->qp_type,
-                        ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,
-                        ntohl(cm_ptr->msg.saddr.ib.qpn), 
-                        ntohs(cm_ptr->msg.saddr.ib.lid),
-                        inet_ntoa(((struct sockaddr_in *)
-                                   &cm_ptr->msg.daddr.so)->sin_addr),
-                        ntohs(((struct sockaddr_in *)
-                                &cm_ptr->msg.daddr.so)->sin_port));
-               goto bail;
-       }
-       if (dapls_modify_qp_state(ep_ptr->qp_handle,
-                                 IBV_QPS_RTS, 
-                                 cm_ptr->msg.saddr.ib.qpn,
-                                 cm_ptr->msg.saddr.ib.lid,
-                                 NULL) != DAT_SUCCESS) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_RTU: QPS_RTS ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",
-                        strerror(errno), ep_ptr->qp_handle->qp_type,
-                        ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,
-                        ntohl(cm_ptr->msg.saddr.ib.qpn), 
-                        ntohs(cm_ptr->msg.saddr.ib.lid),
-                        inet_ntoa(((struct sockaddr_in *)
-                                   &cm_ptr->msg.daddr.so)->sin_addr),
-                        ntohs(((struct sockaddr_in *)
-                                &cm_ptr->msg.daddr.so)->sin_port));
-               goto bail;
-       }
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: send RTU\n");
-
-       /* complete handshake after final QP state change, Just ver+op */
-       cm_ptr->state = DCM_CONNECTED;
-       cm_ptr->msg.op = ntohs(DCM_RTU);
-       if (send(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0) == -1) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " CONN_RTU: write error = %s\n", strerror(errno));
-               goto bail;
-       }
-       /* post the event with private data */
-       event = IB_CME_CONNECTED;
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");
-
-#ifdef DAT_EXTENSIONS
-ud_bail:
-       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
-               DAT_IB_EXTENSION_EVENT_DATA xevent;
-               ib_pd_handle_t pd_handle = 
-                       ((DAPL_PZ *)ep_ptr->param.pz_handle)->pd_handle;
-
-               if (event == IB_CME_CONNECTED) {
-                       cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
-                                                    ep_ptr->qp_handle,
-                                                    cm_ptr->msg.saddr.ib.lid, 
-                                                    NULL);
-                       if (cm_ptr->ah) {
-                               /* post UD extended EVENT */
-                               xevent.status = 0;
-                               xevent.type = DAT_IB_UD_REMOTE_AH;
-                               xevent.remote_ah.ah = cm_ptr->ah;
-                               xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
-                               dapl_os_memcpy(&xevent.remote_ah.ia_addr,
-                                               &ep_ptr->remote_ia_address,
-                                               sizeof(union dcm_addr));
-                               event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
-
-                               dapl_log(DAPL_DBG_TYPE_CM, 
-                                       " CONN_RTU: UD AH %p for lid 0x%x"
-                                       " qpn 0x%x\n", 
-                                       cm_ptr->ah, 
-                                       ntohs(cm_ptr->msg.saddr.ib.lid),
-                                       ntohl(cm_ptr->msg.saddr.ib.qpn));
-       
-                       } else 
-                               event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
-                       
-               } else if (event == IB_CME_LOCAL_FAILURE) {
-                       event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
-               } else  
-                       event = DAT_IB_UD_CONNECTION_REJECT_EVENT;
-
-               dapls_evd_post_connection_event_ext(
-                               (DAPL_EVD *) ep_ptr->param.connect_evd_handle,
-                               event,
-                               (DAT_EP_HANDLE) ep_ptr,
-                               (DAT_COUNT) exp,
-                               (DAT_PVOID *) cm_ptr->msg.p_data,
-                               (DAT_PVOID *) &xevent);
-
-               /* cleanup and release from local list */
-               dapli_cm_free(cm_ptr);
-       
-       } else
-#endif
-       {
-               dapl_evd_connection_callback(cm_ptr, event, cm_ptr->msg.p_data,
-                                            DCM_MAX_PDATA_SIZE, ep_ptr);
-       }
-       return;
-
-bail:
-
-#ifdef DAT_EXTENSIONS
-       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) 
-               goto ud_bail;
-#endif
-       /* close socket, and post error event */
-       cm_ptr->state = DCM_REJECTED;
-       dapl_evd_connection_callback(NULL, event, cm_ptr->msg.p_data,
-                                    DCM_MAX_PDATA_SIZE, ep_ptr);
-       dapli_cm_free(cm_ptr);
-}
-
-/*
- * PASSIVE: Create socket, listen, accept, exchange QP information 
- */
-DAT_RETURN
-dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
-{
-       struct sockaddr_in addr;
-       ib_cm_srvc_handle_t cm_ptr = NULL;
-       DAT_RETURN dat_status = DAT_SUCCESS;
+}\r
+\r
+static dp_ib_cm_handle_t dapli_cm_alloc(DAPL_EP *ep_ptr)\r
+{\r
+       dp_ib_cm_handle_t cm_ptr;\r
+\r
+       /* Allocate CM, init lock, and initialize */\r
+       if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)\r
+               return NULL;\r
+\r
+       (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));\r
+       if (dapl_os_lock_init(&cm_ptr->lock))\r
+               goto bail;\r
+\r
+       if (dapl_os_wait_object_init(&cm_ptr->event)) {\r
+               dapl_os_lock_destroy(&cm_ptr->lock);\r
+               goto bail;\r
+       }\r
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->list_entry);\r
+       dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);\r
+\r
+       cm_ptr->msg.ver = htons(DCM_VER);\r
+       cm_ptr->socket = DAPL_INVALID_SOCKET;\r
+       dapls_cm_acquire(cm_ptr);\r
+               \r
+       /* Link EP and CM */\r
+       if (ep_ptr != NULL) {\r
+               dapl_ep_link_cm(ep_ptr, cm_ptr); /* ref++ */\r
+               cm_ptr->ep = ep_ptr;\r
+               cm_ptr->hca = ((DAPL_IA *)ep_ptr->param.ia_handle)->hca_ptr;\r
+       }\r
+       return cm_ptr;\r
+bail:\r
+       dapl_os_free(cm_ptr, sizeof(*cm_ptr));\r
+       return NULL;\r
+}\r
+\r
+/* queue socket for processing CM work */\r
+static void dapli_cm_queue(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       /* add to work queue for cr thread processing */\r
+       dapl_os_lock(&cm_ptr->hca->ib_trans.lock);\r
+       dapls_cm_acquire(cm_ptr);\r
+       dapl_llist_add_tail(&cm_ptr->hca->ib_trans.list,\r
+                           (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry, cm_ptr);\r
+       dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);\r
+       dapli_cm_thread_signal(cm_ptr);\r
+}\r
+\r
+/* called with local LIST lock */\r
+static void dapli_cm_dequeue(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       /* Remove from work queue, cr thread processing */\r
+       dapl_llist_remove_entry(&cm_ptr->hca->ib_trans.list,\r
+                               (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);\r
+       dapls_cm_release(cm_ptr);\r
+}\r
+\r
+/* BLOCKING: called from dapl_ep_free, EP link will be last ref */\r
+void dapls_cm_free(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       dapl_log(DAPL_DBG_TYPE_CM,\r
+                " cm_free: cm %p %s ep %p refs=%d\n", \r
+                cm_ptr, dapl_cm_state_str(cm_ptr->state),\r
+                cm_ptr->ep, cm_ptr->ref_count);\r
+       \r
+       /* free from internal workq, wait until EP is last ref */\r
+       dapl_os_lock(&cm_ptr->lock);\r
+       cm_ptr->state = DCM_FREE;\r
+       while (cm_ptr->ref_count != 1) {\r
+               dapl_os_unlock(&cm_ptr->lock);\r
+               dapl_os_sleep_usec(10000);\r
+               dapl_os_lock(&cm_ptr->lock);\r
+       }\r
+       dapl_os_unlock(&cm_ptr->lock);\r
+\r
+       /* unlink, dequeue from EP. Final ref so release will destroy */\r
+       dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);\r
+}\r
+\r
+/*\r
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect\r
+ *                 or from ep_free. \r
+ */\r
+DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       DAT_UINT32 disc_data = htonl(0xdead);\r
+\r
+       dapl_os_lock(&cm_ptr->lock);\r
+       if (cm_ptr->state != DCM_CONNECTED || \r
+           cm_ptr->state == DCM_DISCONNECTED) {\r
+               dapl_os_unlock(&cm_ptr->lock);\r
+               return DAT_SUCCESS;\r
+       }\r
+       cm_ptr->state = DCM_DISCONNECTED;\r
+       dapl_os_unlock(&cm_ptr->lock);\r
+       \r
+       /* send disc date, close socket, schedule destroy */\r
+       dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0,0,0);\r
+       send(cm_ptr->socket, (char *)&disc_data, sizeof(disc_data), 0);\r
+\r
+       /* disconnect events for RC's only */\r
+       if (cm_ptr->ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) {\r
+               if (cm_ptr->ep->cr_ptr) {\r
+                       dapls_cr_callback(cm_ptr,\r
+                                         IB_CME_DISCONNECTED,\r
+                                         NULL, 0, cm_ptr->sp);\r
+               } else {\r
+                       dapl_evd_connection_callback(cm_ptr,\r
+                                                    IB_CME_DISCONNECTED,\r
+                                                    NULL, 0, cm_ptr->ep);\r
+               }\r
+       }\r
+       \r
+       /* release from workq */\r
+       dapli_cm_free(cm_ptr);\r
+\r
+       /* scheduled destroy via disconnect clean in callback */\r
+       return DAT_SUCCESS;\r
+}\r
+\r
+/*\r
+ * ACTIVE: socket connected, send QP information to peer \r
+ */\r
+static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)\r
+{\r
+       int len, exp;\r
+       struct iovec iov[2];\r
+       struct dapl_ep *ep_ptr = cm_ptr->ep;\r
+\r
+       if (err) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " CONN_PENDING: %s ERR %s -> %s %d\n",\r
+                        err == -1 ? "POLL" : "SOCKOPT",\r
+                        err == -1 ? strerror(errno) : strerror(err), \r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                               &cm_ptr->addr)->sin_addr), \r
+                        ntohs(((struct sockaddr_in *)\r
+                               &cm_ptr->addr)->sin_port));\r
+               goto bail;\r
+       }\r
+\r
+       cm_ptr->state = DCM_REP_PENDING;\r
+\r
+       /* send qp info and pdata to remote peer */\r
+       exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+       iov[0].iov_base = (void *)&cm_ptr->msg;\r
+       iov[0].iov_len = exp;\r
+       if (cm_ptr->msg.p_size) {\r
+               iov[1].iov_base = cm_ptr->msg.p_data;\r
+               iov[1].iov_len = ntohs(cm_ptr->msg.p_size);\r
+               len = writev(cm_ptr->socket, iov, 2);\r
+       } else {\r
+               len = writev(cm_ptr->socket, iov, 1);\r
+       }\r
+\r
+       if (len != (exp + ntohs(cm_ptr->msg.p_size))) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " CONN_PENDING len ERR %s, wcnt=%d(%d) -> %s\n",\r
+                        strerror(errno), len, \r
+                        exp + ntohs(cm_ptr->msg.p_size), \r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                  ep_ptr->param.\r
+                                  remote_ia_address_ptr)->sin_addr));\r
+               goto bail;\r
+       }\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " CONN_PENDING: sending SRC lid=0x%x,"\r
+                    " qpn=0x%x, psize=%d\n",\r
+                    ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                    ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+                    ntohs(cm_ptr->msg.p_size));\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " CONN_PENDING: SRC GID subnet %016llx id %016llx\n",\r
+                    (unsigned long long)\r
+                    htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[0]),\r
+                    (unsigned long long)\r
+                    htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[8]));\r
+       return;\r
+\r
+bail:\r
+       /* mark CM object for cleanup */\r
+       dapli_cm_free(cm_ptr);\r
+       dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, 0, ep_ptr);\r
+}\r
+\r
+/*\r
+ * ACTIVE: Create socket, connect, defer exchange QP information to CR thread\r
+ * to avoid blocking. \r
+ */\r
+static DAT_RETURN\r
+dapli_socket_connect(DAPL_EP * ep_ptr,\r
+                    DAT_IA_ADDRESS_PTR r_addr,\r
+                    DAT_CONN_QUAL r_qual, DAT_COUNT p_size, DAT_PVOID p_data)\r
+{\r
+       dp_ib_cm_handle_t cm_ptr;\r
+       int ret;\r
+       socklen_t sl;\r
+       DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;\r
+       DAT_RETURN dat_ret = DAT_INSUFFICIENT_RESOURCES;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",\r
+                    r_qual, p_size);\r
+\r
+       cm_ptr = dapli_cm_alloc(ep_ptr);\r
+       if (cm_ptr == NULL)\r
+               return dat_ret;\r
+\r
+       /* create, connect, sockopt, and exchange QP information */\r
+       if ((cm_ptr->socket =\r
+            socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " connect: socket create ERR %s\n", strerror(errno));\r
+               goto bail;\r
+       }\r
+\r
+       ret = dapl_config_socket(cm_ptr->socket);\r
+       if (ret < 0) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " connect: config socket %d ERR %d %s\n",\r
+                        cm_ptr->socket, ret, strerror(dapl_socket_errno()));\r
+               dat_ret = DAT_INTERNAL_ERROR;\r
+               goto bail;\r
+       }\r
+\r
+       /* save remote address */\r
+       dapl_os_memcpy(&cm_ptr->addr, r_addr, sizeof(*r_addr));\r
+\r
+#ifdef DAPL_DBG\r
+       /* DBG: Active PID [0], PASSIVE PID [2]*/\r
+       *(uint16_t*)&cm_ptr->msg.resv[0] = htons((uint16_t)dapl_os_getpid()); \r
+       *(uint16_t*)&cm_ptr->msg.resv[2] = ((struct sockaddr_in *)&cm_ptr->addr)->sin_port;\r
+#endif\r
+       ((struct sockaddr_in *)&cm_ptr->addr)->sin_port = htons(r_qual + 1000);\r
+       ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *)&cm_ptr->addr,\r
+                                 sizeof(cm_ptr->addr));\r
+       if (ret && ret != EAGAIN) {\r
+               dat_ret = DAT_INVALID_ADDRESS;\r
+               goto bail;\r
+       }\r
+\r
+       /* REQ: QP info in msg.saddr, IA address in msg.daddr, and pdata */\r
+       cm_ptr->hca = ia_ptr->hca_ptr;\r
+       cm_ptr->msg.op = ntohs(DCM_REQ);\r
+       cm_ptr->msg.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);\r
+       cm_ptr->msg.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;\r
+       cm_ptr->msg.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;\r
+       dapl_os_memcpy(&cm_ptr->msg.saddr.ib.gid[0], \r
+                      &ia_ptr->hca_ptr->ib_trans.gid, 16);\r
+       \r
+       /* get local address information from socket */\r
+       sl = sizeof(cm_ptr->msg.daddr.so);\r
+       if (getsockname(cm_ptr->socket, (struct sockaddr *)&cm_ptr->msg.daddr.so, &sl)) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                       " connect getsockname ERROR: %s -> %s r_qual %d\n",\r
+                       strerror(errno), \r
+                       inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),\r
+                       (unsigned int)r_qual);;\r
+       }\r
+\r
+       if (p_size) {\r
+               cm_ptr->msg.p_size = htons(p_size);\r
+               dapl_os_memcpy(cm_ptr->msg.p_data, p_data, p_size);\r
+       }\r
+\r
+       /* connected or pending, either way results via async event */\r
+       if (ret == 0)\r
+               dapli_socket_connected(cm_ptr, 0);\r
+       else\r
+               cm_ptr->state = DCM_CONN_PENDING;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: p_data=%p %p\n",\r
+                    cm_ptr->msg.p_data, cm_ptr->msg.p_data);\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+                    " connect: %s r_qual %d pending, p_sz=%d, %d %d ...\n",\r
+                    inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr), \r
+                    (unsigned int)r_qual, ntohs(cm_ptr->msg.p_size),\r
+                    cm_ptr->msg.p_data[0], cm_ptr->msg.p_data[1]);\r
+\r
+       /* queue up on work thread */\r
+       dapli_cm_queue(cm_ptr);\r
+       return DAT_SUCCESS;\r
+bail:\r
+       dapl_log(DAPL_DBG_TYPE_ERR,\r
+                " connect ERROR: %s -> %s r_qual %d\n",\r
+                strerror(errno), \r
+                inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),\r
+                (unsigned int)r_qual);\r
+\r
+       /* Never queued, destroy */\r
+       dapls_cm_release(cm_ptr);\r
+       return dat_ret;\r
+}\r
+\r
+/*\r
+ * ACTIVE: exchange QP information, called from CR thread\r
+ */\r
+static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       DAPL_EP *ep_ptr = cm_ptr->ep;\r
+       int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+       ib_cm_events_t event = IB_CME_LOCAL_FAILURE;\r
+       socklen_t sl;\r
+\r
+       /* read DST information into cm_ptr, overwrite SRC info */\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: recv peer QP data\n");\r
+\r
+       len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, exp, 0);\r
+       if (len != exp || ntohs(cm_ptr->msg.ver) != DCM_VER) {\r
+               dapl_log(DAPL_DBG_TYPE_WARN,\r
+                        " CONN_RTU read: sk %d ERR %s, rcnt=%d, v=%d -> %s PORT L-%x R-%x PID L-%x R-%x\n",\r
+                        cm_ptr->socket, strerror(errno), len, ntohs(cm_ptr->msg.ver),\r
+                        inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),\r
+                        ntohs(((struct sockaddr_in *)&cm_ptr->msg.daddr.so)->sin_port),\r
+                        ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port),\r
+                        ntohs(*(uint16_t*)&cm_ptr->msg.resv[0]),\r
+                        ntohs(*(uint16_t*)&cm_ptr->msg.resv[2]));\r
+\r
+               /* Retry; corner case where server tcp stack resets under load */\r
+               if (dapl_socket_errno() == ECONNRESET) {\r
+                       closesocket(cm_ptr->socket);\r
+                       cm_ptr->socket = DAPL_INVALID_SOCKET;\r
+                       dapli_socket_connect(cm_ptr->ep, (DAT_IA_ADDRESS_PTR)&cm_ptr->addr, \r
+                                            ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port) - 1000,\r
+                                            ntohs(cm_ptr->msg.p_size), &cm_ptr->msg.p_data);\r
+                       dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);\r
+                       dapli_cm_free(cm_ptr);\r
+                       return;\r
+               }\r
+               goto bail;\r
+       }\r
+\r
+       /* keep the QP, address info in network order */\r
+       \r
+       /* save remote address information, in msg.daddr */\r
+       dapl_os_memcpy(&ep_ptr->remote_ia_address,\r
+                      &cm_ptr->msg.daddr.so,\r
+                      sizeof(union dcm_addr));\r
+\r
+       /* save local address information from socket */\r
+       sl = sizeof(cm_ptr->addr);\r
+       getsockname(cm_ptr->socket,(struct sockaddr *)&cm_ptr->addr, &sl);\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+                    " CONN_RTU: DST %s %d lid=0x%x,"\r
+                    " qpn=0x%x, qp_type=%d, psize=%d\n",\r
+                    inet_ntoa(((struct sockaddr_in *)\r
+                               &cm_ptr->msg.daddr.so)->sin_addr),\r
+                    ntohs(((struct sockaddr_in *)\r
+                               &cm_ptr->msg.daddr.so)->sin_port),\r
+                    ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                    ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+                    cm_ptr->msg.saddr.ib.qp_type, \r
+                    ntohs(cm_ptr->msg.p_size));\r
+\r
+       /* validate private data size before reading */\r
+       if (ntohs(cm_ptr->msg.p_size) > DCM_MAX_PDATA_SIZE) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " CONN_RTU read: psize (%d) wrong -> %s\n",\r
+                        ntohs(cm_ptr->msg.p_size), \r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                  ep_ptr->param.\r
+                                  remote_ia_address_ptr)->sin_addr));\r
+               goto bail;\r
+       }\r
+\r
+       /* read private data into cm_handle if any present */\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP," CONN_RTU: read private data\n");\r
+       exp = ntohs(cm_ptr->msg.p_size);\r
+       if (exp) {\r
+               len = recv(cm_ptr->socket, cm_ptr->msg.p_data, exp, 0);\r
+               if (len != exp) {\r
+                       dapl_log(DAPL_DBG_TYPE_ERR,\r
+                                " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",\r
+                                strerror(errno), len,\r
+                                inet_ntoa(((struct sockaddr_in *)\r
+                                           ep_ptr->param.\r
+                                           remote_ia_address_ptr)->sin_addr));\r
+                       goto bail;\r
+               }\r
+       }\r
+\r
+       /* check for consumer or protocol stack reject */\r
+       if (ntohs(cm_ptr->msg.op) == DCM_REP)\r
+               event = IB_CME_CONNECTED;\r
+       else if (ntohs(cm_ptr->msg.op) == DCM_REJ_USER) \r
+               event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;\r
+       else  \r
+               event = IB_CME_DESTINATION_REJECT;\r
+       \r
+       if (event != IB_CME_CONNECTED) {\r
+               dapl_log(DAPL_DBG_TYPE_CM,\r
+                        " CONN_RTU: reject from %s %x\n",\r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                   &cm_ptr->msg.daddr.so)->sin_addr),\r
+                        ntohs(((struct sockaddr_in *)\r
+                                &cm_ptr->msg.daddr.so)->sin_port));\r
+               goto bail;\r
+       }\r
+\r
+       /* modify QP to RTR and then to RTS with remote info */\r
+       if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+                                 IBV_QPS_RTR, \r
+                                 cm_ptr->msg.saddr.ib.qpn,\r
+                                 cm_ptr->msg.saddr.ib.lid,\r
+                                 (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " CONN_RTU: QPS_RTR ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",\r
+                        strerror(errno), ep_ptr->qp_handle->qp_type,\r
+                        ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,\r
+                        ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+                        ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                   &cm_ptr->msg.daddr.so)->sin_addr),\r
+                        ntohs(((struct sockaddr_in *)\r
+                                &cm_ptr->msg.daddr.so)->sin_port));\r
+               goto bail;\r
+       }\r
+       if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+                                 IBV_QPS_RTS, \r
+                                 cm_ptr->msg.saddr.ib.qpn,\r
+                                 cm_ptr->msg.saddr.ib.lid,\r
+                                 NULL) != DAT_SUCCESS) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " CONN_RTU: QPS_RTS ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",\r
+                        strerror(errno), ep_ptr->qp_handle->qp_type,\r
+                        ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,\r
+                        ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+                        ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                   &cm_ptr->msg.daddr.so)->sin_addr),\r
+                        ntohs(((struct sockaddr_in *)\r
+                                &cm_ptr->msg.daddr.so)->sin_port));\r
+               goto bail;\r
+       }\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: send RTU\n");\r
+\r
+       /* complete handshake after final QP state change, Just ver+op */\r
+       cm_ptr->state = DCM_CONNECTED;\r
+       cm_ptr->msg.op = ntohs(DCM_RTU);\r
+       if (send(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0) == -1) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " CONN_RTU: write error = %s\n", strerror(errno));\r
+               goto bail;\r
+       }\r
+       /* post the event with private data */\r
+       event = IB_CME_CONNECTED;\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ud_bail:\r
+       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {\r
+               DAT_IB_EXTENSION_EVENT_DATA xevent;\r
+               ib_pd_handle_t pd_handle = \r
+                       ((DAPL_PZ *)ep_ptr->param.pz_handle)->pd_handle;\r
+\r
+               if (event == IB_CME_CONNECTED) {\r
+                       cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,\r
+                                                    ep_ptr->qp_handle,\r
+                                                    cm_ptr->msg.saddr.ib.lid, \r
+                                                    NULL);\r
+                       if (cm_ptr->ah) {\r
+                               /* post UD extended EVENT */\r
+                               xevent.status = 0;\r
+                               xevent.type = DAT_IB_UD_REMOTE_AH;\r
+                               xevent.remote_ah.ah = cm_ptr->ah;\r
+                               xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);\r
+                               dapl_os_memcpy(&xevent.remote_ah.ia_addr,\r
+                                               &ep_ptr->remote_ia_address,\r
+                                               sizeof(union dcm_addr));\r
+                               event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;\r
+\r
+                               dapl_log(DAPL_DBG_TYPE_CM, \r
+                                       " CONN_RTU: UD AH %p for lid 0x%x"\r
+                                       " qpn 0x%x\n", \r
+                                       cm_ptr->ah, \r
+                                       ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                                       ntohl(cm_ptr->msg.saddr.ib.qpn));\r
+       \r
+                       } else \r
+                               event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+                       \r
+               } else if (event == IB_CME_LOCAL_FAILURE) {\r
+                       event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+               } else  \r
+                       event = DAT_IB_UD_CONNECTION_REJECT_EVENT;\r
+\r
+               dapls_evd_post_connection_event_ext(\r
+                               (DAPL_EVD *) ep_ptr->param.connect_evd_handle,\r
+                               event,\r
+                               (DAT_EP_HANDLE) ep_ptr,\r
+                               (DAT_COUNT) exp,\r
+                               (DAT_PVOID *) cm_ptr->msg.p_data,\r
+                               (DAT_PVOID *) &xevent);\r
+\r
+               /* cleanup and release from local list */\r
+               dapli_cm_free(cm_ptr);\r
+       \r
+       } else\r
+#endif\r
+       {\r
+               dapl_evd_connection_callback(cm_ptr, event, cm_ptr->msg.p_data,\r
+                                            DCM_MAX_PDATA_SIZE, ep_ptr);\r
+       }\r
+       return;\r
+\r
+bail:\r
+\r
+#ifdef DAT_EXTENSIONS\r
+       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) \r
+               goto ud_bail;\r
+#endif\r
+       /* close socket, and post error event */\r
+       cm_ptr->state = DCM_REJECTED;\r
+       dapl_evd_connection_callback(NULL, event, cm_ptr->msg.p_data,\r
+                                    DCM_MAX_PDATA_SIZE, ep_ptr);\r
+       dapli_cm_free(cm_ptr);\r
+}\r
+\r
+/*\r
+ * PASSIVE: Create socket, listen, accept, exchange QP information \r
+ */\r
+DAT_RETURN\r
+dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)\r
+{\r
+       struct sockaddr_in addr;\r
+       ib_cm_srvc_handle_t cm_ptr = NULL;\r
+       DAT_RETURN dat_status = DAT_SUCCESS;\r
        int opt = 1;\r
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " setup listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
-                    ia_ptr, serviceID, sp_ptr);
-
-       cm_ptr = dapli_cm_alloc(NULL);
-       if (cm_ptr == NULL)
-               return DAT_INSUFFICIENT_RESOURCES;
-
-       cm_ptr->sp = sp_ptr;
-       cm_ptr->hca = ia_ptr->hca_ptr;
-
-       /* bind, listen, set sockopt, accept, exchange data */
-       if ((cm_ptr->socket =
-            socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {
-               dapl_log(DAPL_DBG_TYPE_ERR, " ERR: listen socket create: %s\n",
-                        strerror(errno));
-               dat_status = DAT_INSUFFICIENT_RESOURCES;
-               goto bail;
-       }
-
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " setup listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",\r
+                    ia_ptr, serviceID, sp_ptr);\r
+\r
+       cm_ptr = dapli_cm_alloc(NULL);\r
+       if (cm_ptr == NULL)\r
+               return DAT_INSUFFICIENT_RESOURCES;\r
+\r
+       cm_ptr->sp = sp_ptr;\r
+       cm_ptr->hca = ia_ptr->hca_ptr;\r
+\r
+       /* bind, listen, set sockopt, accept, exchange data */\r
+       if ((cm_ptr->socket =\r
+            socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR, " ERR: listen socket create: %s\n",\r
+                        strerror(errno));\r
+               dat_status = DAT_INSUFFICIENT_RESOURCES;\r
+               goto bail;\r
+       }\r
+\r
        setsockopt(cm_ptr->socket, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt));\r
-       addr.sin_port = htons(serviceID + 1000);
-       addr.sin_family = AF_INET;
-       addr.sin_addr = ((struct sockaddr_in *) &ia_ptr->hca_ptr->hca_address)->sin_addr;
-
-       if ((bind(cm_ptr->socket, (struct sockaddr *)&addr, sizeof(addr)) < 0)
-           || (listen(cm_ptr->socket, 128) < 0)) {
-               dapl_log(DAPL_DBG_TYPE_CM,
-                        " listen: ERROR %s on port %d\n",
-                        strerror(errno), serviceID + 1000);
-               if (dapl_socket_errno() == EADDRINUSE)
-                       dat_status = DAT_CONN_QUAL_IN_USE;
-               else
-                       dat_status = DAT_CONN_QUAL_UNAVAILABLE;
-               goto bail;
-       }
-
-       /* set cm_handle for this service point, save listen socket */
-       sp_ptr->cm_srvc_handle = cm_ptr;
-       dapl_os_memcpy(&cm_ptr->addr, &addr, sizeof(addr)); 
-
-       /* queue up listen socket to process inbound CR's */
-       cm_ptr->state = DCM_LISTEN;
-       dapli_cm_queue(cm_ptr);
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " setup listen: port %d cr %p s_fd %d\n",
-                    serviceID + 1000, cm_ptr, cm_ptr->socket);
-
-       return dat_status;
-bail:
-       /* Never queued, destroy here */
-       dapls_cm_release(cm_ptr);
-       return dat_status;
-}
-
-/*
- * PASSIVE: accept socket 
- */
-static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
-{
-       dp_ib_cm_handle_t acm_ptr;
-       int ret, len, opt = 1;
-       socklen_t sl;
-
-       /* 
-        * Accept all CR's on this port to avoid half-connection (SYN_RCV)
-        * stalls with many to one connection storms
-        */
-       do {
-               /* Allocate accept CM and initialize */
-               if ((acm_ptr = dapli_cm_alloc(NULL)) == NULL)
-                       return;
-
-               acm_ptr->sp = cm_ptr->sp;
-               acm_ptr->hca = cm_ptr->hca;
-
-               len = sizeof(union dcm_addr);
-               acm_ptr->socket = accept(cm_ptr->socket,
-                                       (struct sockaddr *)
-                                       &acm_ptr->msg.daddr.so,
-                                       (socklen_t *) &len);
-               if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
-                       dapl_log(DAPL_DBG_TYPE_ERR,
-                               " ACCEPT: ERR %s on FD %d l_cr %p\n",
-                               strerror(errno), cm_ptr->socket, cm_ptr);
-                       dapls_cm_release(acm_ptr);
-                       return;
-               }
-               dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s %x\n",
-                            inet_ntoa(((struct sockaddr_in *)
-                                       &acm_ptr->msg.daddr.so)->sin_addr),
-                            ntohs(((struct sockaddr_in *)
-                                       &acm_ptr->msg.daddr.so)->sin_port));
-
-               /* no delay for small packets */
-               ret = setsockopt(acm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
-                          (char *)&opt, sizeof(opt));
-               if (ret)
-                       dapl_log(DAPL_DBG_TYPE_ERR,
-                                " ACCEPT: NODELAY setsockopt: 0x%x 0x%x %s\n",
-                                ret, dapl_socket_errno(), strerror(dapl_socket_errno()));
-               
-               /* get local address information from socket */
-               sl = sizeof(acm_ptr->addr);
-               getsockname(acm_ptr->socket, (struct sockaddr *)&acm_ptr->addr, &sl);
-               acm_ptr->state = DCM_ACCEPTING;
-               dapli_cm_queue(acm_ptr);
-       
-       } while (dapl_poll(cm_ptr->socket, DAPL_FD_READ) == DAPL_FD_READ);
-}
-
-/*
- * PASSIVE: receive peer QP information, private data, post cr_event 
- */
-static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
-{
-       int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
-       void *p_data = NULL;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket accepted, read QP data\n");
-
-       /* read in DST QP info, IA address. check for private data */
-       len = recv(acm_ptr->socket, (char *)&acm_ptr->msg, exp, 0);
-       if (len != exp || ntohs(acm_ptr->msg.ver) != DCM_VER) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " ACCEPT read: ERR %s, rcnt=%d, ver=%d\n",
-                        strerror(errno), len, ntohs(acm_ptr->msg.ver));
-               goto bail;
-       }
-
-       /* keep the QP, address info in network order */
-
-       /* validate private data size before reading */
-       exp = ntohs(acm_ptr->msg.p_size);
-       if (exp > DCM_MAX_PDATA_SIZE) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                            " accept read: psize (%d) wrong\n",
-                            acm_ptr->msg.p_size);
-               goto bail;
-       }
-
-       /* read private data into cm_handle if any present */
-       if (exp) {
-               len = recv(acm_ptr->socket, acm_ptr->msg.p_data, exp, 0);
-               if (len != exp) {
-                       dapl_log(DAPL_DBG_TYPE_ERR,
-                                " accept read pdata: ERR %s, rcnt=%d\n",
-                                strerror(errno), len);
-                       goto bail;
-               }
-               p_data = acm_ptr->msg.p_data;
-       }
-
-       acm_ptr->state = DCM_ACCEPTING_DATA;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " ACCEPT: DST %s %x lid=0x%x, qpn=0x%x, psz=%d\n",
-                    inet_ntoa(((struct sockaddr_in *)
-                               &acm_ptr->msg.daddr.so)->sin_addr), 
-                    ntohs(((struct sockaddr_in *)
-                            &acm_ptr->msg.daddr.so)->sin_port),
-                    ntohs(acm_ptr->msg.saddr.ib.lid), 
-                    ntohl(acm_ptr->msg.saddr.ib.qpn), exp);
-
-#ifdef DAT_EXTENSIONS
-       if (acm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
-               DAT_IB_EXTENSION_EVENT_DATA xevent;
-
-               /* post EVENT, modify_qp created ah */
-               xevent.status = 0;
-               xevent.type = DAT_IB_UD_CONNECT_REQUEST;
-
-               dapls_evd_post_cr_event_ext(acm_ptr->sp,
-                                           DAT_IB_UD_CONNECTION_REQUEST_EVENT,
-                                           acm_ptr,
-                                           (DAT_COUNT) exp,
-                                           (DAT_PVOID *) acm_ptr->msg.p_data,
-                                           (DAT_PVOID *) &xevent);
-       } else
-#endif
-               /* trigger CR event and return SUCCESS */
-               dapls_cr_callback(acm_ptr,
-                                 IB_CME_CONNECTION_REQUEST_PENDING,
-                                 p_data, exp, acm_ptr->sp);
-       return;
-bail:
-       /* mark for destroy, active will see socket close as rej */
-       dapli_cm_free(acm_ptr);
-       return;
-}
-
-/*
- * PASSIVE: consumer accept, send local QP information, private data, 
- * queue on work thread to receive RTU information to avoid blocking
- * user thread. 
- */
-static DAT_RETURN
-dapli_socket_accept_usr(DAPL_EP * ep_ptr,
-                       DAPL_CR * cr_ptr, DAT_COUNT p_size, DAT_PVOID p_data)
-{
-       DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
-       dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
-       ib_cm_msg_t local;
-       struct iovec iov[2];
-       int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
-       DAT_RETURN ret = DAT_INTERNAL_ERROR;
-       socklen_t sl;
-
-       if (p_size > DCM_MAX_PDATA_SIZE) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " accept_usr: psize(%d) too large\n", p_size);
-               return DAT_LENGTH_ERROR;
-       }
-
-       /* must have a accepted socket */
-       if (cm_ptr->socket == DAPL_INVALID_SOCKET) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " accept_usr: cm socket invalid\n");
-               goto bail;
-       }
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " ACCEPT_USR: remote lid=0x%x"
-                    " qpn=0x%x qp_type %d, psize=%d\n",
-                    ntohs(cm_ptr->msg.saddr.ib.lid),
-                    ntohl(cm_ptr->msg.saddr.ib.qpn), 
-                    cm_ptr->msg.saddr.ib.qp_type, 
-                    ntohs(cm_ptr->msg.p_size));
-
-#ifdef DAT_EXTENSIONS
-       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD &&
-           ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " ACCEPT_USR: ERR remote QP is UD,"
-                        ", but local QP is not\n");
-               ret = (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
-               goto bail;
-       }
-#endif
-
-       /* modify QP to RTR and then to RTS with remote info already read */
-       if (dapls_modify_qp_state(ep_ptr->qp_handle,
-                                 IBV_QPS_RTR, 
-                                 cm_ptr->msg.saddr.ib.qpn,
-                                 cm_ptr->msg.saddr.ib.lid,
-                                 (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " ACCEPT_USR: QPS_RTR ERR %s -> %s\n",
-                        strerror(errno), 
-                        inet_ntoa(((struct sockaddr_in *)
-                                    &cm_ptr->msg.daddr.so)->sin_addr));
-               goto bail;
-       }
-       if (dapls_modify_qp_state(ep_ptr->qp_handle,
-                                 IBV_QPS_RTS, 
-                                 cm_ptr->msg.saddr.ib.qpn,
-                                 cm_ptr->msg.saddr.ib.lid,
-                                 NULL) != DAT_SUCCESS) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " ACCEPT_USR: QPS_RTS ERR %s -> %s\n",
-                        strerror(errno), 
-                        inet_ntoa(((struct sockaddr_in *)
-                                    &cm_ptr->msg.daddr.so)->sin_addr));
-               goto bail;
-       }
-
-       /* save remote address information */
-       dapl_os_memcpy(&ep_ptr->remote_ia_address,
-                      &cm_ptr->msg.daddr.so,
-                      sizeof(union dcm_addr));
-
-       /* send our QP info, IA address, pdata. Don't overwrite dst data */
-       local.ver = htons(DCM_VER);
-       local.op = htons(DCM_REP);
-       local.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);
-       local.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;
-       local.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
-       dapl_os_memcpy(&local.saddr.ib.gid[0], 
-                      &ia_ptr->hca_ptr->ib_trans.gid, 16);
-       
-       /* Get local address information from socket */
-       sl = sizeof(local.daddr.so);
-       getsockname(cm_ptr->socket, (struct sockaddr *)&local.daddr.so, &sl);
-
-#ifdef DAPL_DBG
-       /* DBG: Active PID [0], PASSIVE PID [2] */
-       *(uint16_t*)&cm_ptr->msg.resv[2] = htons((uint16_t)dapl_os_getpid()); 
-       dapl_os_memcpy(local.resv, cm_ptr->msg.resv, 4); 
-#endif
-       cm_ptr->hca = ia_ptr->hca_ptr;
-       cm_ptr->state = DCM_ACCEPTED;
-
-       local.p_size = htons(p_size);
-       iov[0].iov_base = (void *)&local;
-       iov[0].iov_len = exp;
-       
-       if (p_size) {
-               iov[1].iov_base = p_data;
-               iov[1].iov_len = p_size;
-               len = writev(cm_ptr->socket, iov, 2);
-       } else 
-               len = writev(cm_ptr->socket, iov, 1);
-       
-       if (len != (p_size + exp)) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
-                        strerror(errno), len, 
-                        inet_ntoa(((struct sockaddr_in *)
-                                  &cm_ptr->msg.daddr.so)->sin_addr));
-               goto bail;
-       }
-
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " ACCEPT_USR: local lid=0x%x qpn=0x%x psz=%d\n",
-                    ntohs(local.saddr.ib.lid),
-                    ntohl(local.saddr.ib.qpn), ntohs(local.p_size));
-       dapl_dbg_log(DAPL_DBG_TYPE_CM,
-                    " ACCEPT_USR: SRC GID subnet %016llx id %016llx\n",
-                    (unsigned long long)
-                    htonll(*(uint64_t*)&local.saddr.ib.gid[0]),
-                    (unsigned long long)
-                    htonll(*(uint64_t*)&local.saddr.ib.gid[8]));
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: accepted!\n");
-
-       /* Link CM to EP, already queued on work thread */
-       dapl_ep_link_cm(ep_ptr, cm_ptr);
-       cm_ptr->ep = ep_ptr;
-       return DAT_SUCCESS;
-bail:
-       /* schedule cleanup from workq */
-       dapli_cm_free(cm_ptr);
-       return ret;
-}
-
-/*
- * PASSIVE: read RTU from active peer, post CONN event
- */
-static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
-{
-       int len;
-       ib_cm_events_t event = IB_CME_CONNECTED;
-
-       /* complete handshake after final QP state change, VER and OP */
-       len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0);
-       if (len != 4 || ntohs(cm_ptr->msg.op) != DCM_RTU) {
-               dapl_log(DAPL_DBG_TYPE_ERR,
-                        " ACCEPT_RTU: rcv ERR, rcnt=%d op=%x\n",
-                        len, ntohs(cm_ptr->msg.op),
-                        inet_ntoa(((struct sockaddr_in *)
-                                   &cm_ptr->msg.daddr.so)->sin_addr));
-               event = IB_CME_DESTINATION_REJECT;
-               goto bail;
-       }
-
-       /* save state and reference to EP, queue for disc event */
-       cm_ptr->state = DCM_CONNECTED;
-
-       /* final data exchange if remote QP state is good to go */
-       dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: connected!\n");
-
-#ifdef DAT_EXTENSIONS
-ud_bail:
-       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
-               DAT_IB_EXTENSION_EVENT_DATA xevent;
-
-               ib_pd_handle_t pd_handle = 
-                       ((DAPL_PZ *)cm_ptr->ep->param.pz_handle)->pd_handle;
-               
-               if (event == IB_CME_CONNECTED) {
-                       cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
-                                               cm_ptr->ep->qp_handle,
-                                               cm_ptr->msg.saddr.ib.lid, 
-                                               NULL);
-                       if (cm_ptr->ah) { 
-                               /* post EVENT, modify_qp created ah */
-                               xevent.status = 0;
-                               xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;
-                               xevent.remote_ah.ah = cm_ptr->ah;
-                               xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
-                               dapl_os_memcpy(&xevent.remote_ah.ia_addr,
-                                       &cm_ptr->msg.daddr.so,
-                                       sizeof(union dcm_addr));
-                               event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
-                       } else 
-                               event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
-               } else 
-                       event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
-
-               dapl_log(DAPL_DBG_TYPE_CM, 
-                       " CONN_RTU: UD AH %p for lid 0x%x qpn 0x%x\n", 
-                       cm_ptr->ah, ntohs(cm_ptr->msg.saddr.ib.lid),
-                       ntohl(cm_ptr->msg.saddr.ib.qpn));
-
-               dapls_evd_post_connection_event_ext(
-                               (DAPL_EVD *) 
-                               cm_ptr->ep->param.connect_evd_handle,
-                               event,
-                               (DAT_EP_HANDLE) cm_ptr->ep,
-                               (DAT_COUNT) ntohs(cm_ptr->msg.p_size),
-                               (DAT_PVOID *) cm_ptr->msg.p_data,
-                               (DAT_PVOID *) &xevent);
-
-                /* cleanup and release from local list, still on EP list */
-               dapli_cm_free(cm_ptr);
-                
-       } else 
-#endif
-       {
-               dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);
-       }
-       return;
-      
-bail:
-#ifdef DAT_EXTENSIONS
-       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) 
-               goto ud_bail;
-#endif
-       cm_ptr->state = DCM_REJECTED;
-       dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);
-       dapli_cm_free(cm_ptr);
-}
-
-/*
- * dapls_ib_connect
- *
- * Initiate a connection with the passive listener on another node
- *
- * Input:
- *     ep_handle,
- *     remote_ia_address,
- *     remote_conn_qual,
- *     prd_size                size of private data and structure
- *     prd_prt                 pointer to private data structure
- *
- * Output:
- *     none
- *
- * Returns:
- *     DAT_SUCCESS
- *     DAT_INSUFFICIENT_RESOURCES
- *     DAT_INVALID_PARAMETER
- *
- */
-DAT_RETURN
-dapls_ib_connect(IN DAT_EP_HANDLE ep_handle,
-                IN DAT_IA_ADDRESS_PTR remote_ia_address,
-                IN DAT_CONN_QUAL remote_conn_qual,
-                IN DAT_COUNT private_data_size, IN void *private_data)
-{
-       DAPL_EP *ep_ptr = (DAPL_EP *) ep_handle;
-       
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " connect(ep_handle %p ....)\n", ep_handle);
-
-       return (dapli_socket_connect(ep_ptr, remote_ia_address,
-                                    remote_conn_qual,
-                                    private_data_size, private_data));
-}
-
-/*
- * dapls_ib_disconnect
- *
- * Disconnect an EP
- *
- * Input:
- *     ep_handle,
- *     disconnect_flags
- *
- * Output:
- *     none
- *
- * Returns:
- *     DAT_SUCCESS
- */
-DAT_RETURN
-dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)
-{
-       dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);
-
-       if (ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED ||
-               ep_ptr->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC) {
-               return DAT_SUCCESS;
-       } 
-       
-       /* RC. Transition to error state to flush queue */
-        dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
-
-       return (dapli_socket_disconnect(cm_ptr));
-}
-
-/*
- * dapls_ib_disconnect_clean
- *
- * Clean up outstanding connection data. This routine is invoked
- * after the final disconnect callback has occurred. Only on the
- * ACTIVE side of a connection. It is also called if dat_ep_connect
- * times out using the consumer supplied timeout value.
- *
- * Input:
- *     ep_ptr          DAPL_EP
- *     active          Indicates active side of connection
- *
- * Output:
- *     none
- *
- * Returns:
- *     void
- *
- */
-void
-dapls_ib_disconnect_clean(IN DAPL_EP * ep_ptr,
-                         IN DAT_BOOLEAN active,
-                         IN const ib_cm_events_t ib_cm_event)
-{
-       if (ib_cm_event == IB_CME_TIMEOUT) {
-               dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);
-
-               dapl_log(DAPL_DBG_TYPE_WARN,
-                       "dapls_ib_disc_clean: CONN_TIMEOUT ep %p cm %p %s\n",
-                       ep_ptr, cm_ptr, dapl_cm_state_str(cm_ptr->state));
-               
-               /* schedule release of socket and local resources */
-               dapli_cm_free(cm_ptr);
-       }
-}
-
-/*
- * dapl_ib_setup_conn_listener
- *
- * Have the CM set up a connection listener.
- *
- * Input:
- *     ibm_hca_handle          HCA handle
- *     qp_handle                       QP handle
- *
- * Output:
- *     none
- *
- * Returns:
- *     DAT_SUCCESS
- *     DAT_INSUFFICIENT_RESOURCES
- *     DAT_INTERNAL_ERROR
- *     DAT_CONN_QUAL_UNAVAILBLE
- *     DAT_CONN_QUAL_IN_USE
- *
- */
-DAT_RETURN
-dapls_ib_setup_conn_listener(IN DAPL_IA * ia_ptr,
-                            IN DAT_UINT64 ServiceID, IN DAPL_SP * sp_ptr)
-{
-       return (dapli_socket_listen(ia_ptr, ServiceID, sp_ptr));
-}
-
-/*
- * dapl_ib_remove_conn_listener
- *
- * Have the CM remove a connection listener.
- *
- * Input:
- *     ia_handle               IA handle
- *     ServiceID               IB Channel Service ID
- *
- * Output:
- *     none
- *
- * Returns:
- *     DAT_SUCCESS
- *     DAT_INVALID_STATE
- *
- */
-DAT_RETURN
-dapls_ib_remove_conn_listener(IN DAPL_IA * ia_ptr, IN DAPL_SP * sp_ptr)
-{
-       ib_cm_srvc_handle_t cm_ptr = sp_ptr->cm_srvc_handle;
-
-       /* free cm_srvc_handle, release will cleanup */
-       if (cm_ptr != NULL) {
-               /* cr_thread will free */
-               sp_ptr->cm_srvc_handle = NULL;
-               dapli_cm_free(cm_ptr);
-       }
-       return DAT_SUCCESS;
-}
-
-/*
- * dapls_ib_accept_connection
- *
- * Perform necessary steps to accept a connection
- *
- * Input:
- *     cr_handle
- *     ep_handle
- *     private_data_size
- *     private_data
- *
- * Output:
- *     none
- *
- * Returns:
- *     DAT_SUCCESS
- *     DAT_INSUFFICIENT_RESOURCES
- *     DAT_INTERNAL_ERROR
- *
- */
-DAT_RETURN
-dapls_ib_accept_connection(IN DAT_CR_HANDLE cr_handle,
-                          IN DAT_EP_HANDLE ep_handle,
-                          IN DAT_COUNT p_size, IN const DAT_PVOID p_data)
-{
-       DAPL_CR *cr_ptr;
-       DAPL_EP *ep_ptr;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",
-                    cr_handle, ep_handle, p_data, p_size);
-
-       cr_ptr = (DAPL_CR *) cr_handle;
-       ep_ptr = (DAPL_EP *) ep_handle;
-
-       /* allocate and attach a QP if necessary */
-       if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {
-               DAT_RETURN status;
-               status = dapls_ib_qp_alloc(ep_ptr->header.owner_ia,
-                                          ep_ptr, ep_ptr);
-               if (status != DAT_SUCCESS)
-                       return status;
-       }
-       return (dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data));
-}
-
-/*
- * dapls_ib_reject_connection
- *
- * Reject a connection
- *
- * Input:
- *     cr_handle
- *
- * Output:
- *     none
- *
- * Returns:
- *     DAT_SUCCESS
- *     DAT_INTERNAL_ERROR
- *
- */
-DAT_RETURN
-dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,
-                          IN int reason,
-                          IN DAT_COUNT psize, IN const DAT_PVOID pdata)
-{
-       struct iovec iov[2];
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    " reject(cm %p reason %x, pdata %p, psize %d)\n",
-                    cm_ptr, reason, pdata, psize);
-
-        if (psize > DCM_MAX_PDATA_SIZE)
-                return DAT_LENGTH_ERROR;
-
-       /* write reject data to indicate reject */
-       cm_ptr->msg.op = htons(DCM_REJ_USER);
-       cm_ptr->msg.p_size = htons(psize);
-       
-       iov[0].iov_base = (void *)&cm_ptr->msg;
-       iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
-       if (psize) {
-               iov[1].iov_base = pdata;
-               iov[1].iov_len = psize;
-               writev(cm_ptr->socket, iov, 2);
-       } else {
-               writev(cm_ptr->socket, iov, 1);
-       }
-
-       /* release and cleanup CM object */
-       dapli_cm_free(cm_ptr);
-       return DAT_SUCCESS;
-}
-
-/*
- * dapls_ib_cm_remote_addr
- *
- * Obtain the remote IP address given a connection
- *
- * Input:
- *     cr_handle
- *
- * Output:
- *     remote_ia_address: where to place the remote address
- *
- * Returns:
- *     DAT_SUCCESS
- *     DAT_INVALID_HANDLE
- *
- */
-DAT_RETURN
-dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle,
-                       OUT DAT_SOCK_ADDR6 * remote_ia_address)
-{
-       DAPL_HEADER *header;
-       dp_ib_cm_handle_t conn;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_EP,
-                    "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",
-                    dat_handle);
-
-       header = (DAPL_HEADER *) dat_handle;
-
-       if (header->magic == DAPL_MAGIC_EP)
-               conn = dapl_get_cm_from_ep((DAPL_EP *) dat_handle);
-       else if (header->magic == DAPL_MAGIC_CR)
-               conn = ((DAPL_CR *) dat_handle)->ib_cm_handle;
-       else
-               return DAT_INVALID_HANDLE;
-
-       dapl_os_memcpy(remote_ia_address,
-                      &conn->msg.daddr.so, sizeof(DAT_SOCK_ADDR6));
-
-       return DAT_SUCCESS;
-}
-
-int dapls_ib_private_data_size(
-       IN DAPL_HCA *hca_ptr)
-{
-       return DCM_MAX_PDATA_SIZE;
-}
-
-/* outbound/inbound CR processing thread to avoid blocking applications */
-void cr_thread(void *arg)
-{
-       struct dapl_hca *hca_ptr = arg;
-       dp_ib_cm_handle_t cr, next_cr;
-       int opt, ret;
-       socklen_t opt_len;
-       char rbuf[2];
-       struct dapl_fd_set *set;
-       enum DAPL_FD_EVENTS event;
-
-       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cr_thread: ENTER hca %p\n", hca_ptr);
-       set = dapl_alloc_fd_set();
-       if (!set)
-               goto out;
-
-       dapl_os_lock(&hca_ptr->ib_trans.lock);
-       hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
-
-       while (1) {
-               dapl_fd_zero(set);
-               dapl_fd_set(hca_ptr->ib_trans.scm[0], set, DAPL_FD_READ);
-
-               if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
-                       next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);
-               else
-                       next_cr = NULL;
-
-               while (next_cr) {
-                       cr = next_cr;
-                       next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
-                                                       (DAPL_LLIST_ENTRY *) 
-                                                       &cr->local_entry);
-                       dapls_cm_acquire(cr); /* hold thread ref */
-                       dapl_os_lock(&cr->lock);
-                       if (cr->state == DCM_FREE || 
-                           hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
-                               dapl_log(DAPL_DBG_TYPE_CM, 
-                                        " CM FREE: %p ep=%p st=%s sck=%d refs=%d\n", 
-                                        cr, cr->ep, dapl_cm_state_str(cr->state), 
-                                        cr->socket, cr->ref_count);
-
-                               if (cr->socket != DAPL_INVALID_SOCKET) {
-                                       shutdown(cr->socket, SHUT_RDWR);
-                                       closesocket(cr->socket);
-                                       cr->socket = DAPL_INVALID_SOCKET;
-                               }
-                               dapl_os_unlock(&cr->lock);
-                               dapls_cm_release(cr); /* release alloc ref */
-                               dapli_cm_dequeue(cr); /* release workq ref */
-                               dapls_cm_release(cr); /* release thread ref */
-                               continue;
-                       }
-
-                       event = (cr->state == DCM_CONN_PENDING) ?
-                                       DAPL_FD_WRITE : DAPL_FD_READ;
-
-                       if (dapl_fd_set(cr->socket, set, event)) {
-                               dapl_log(DAPL_DBG_TYPE_ERR,
-                                        " cr_thread: fd_set ERR st=%d fd %d"
-                                        " -> %s\n", cr->state, cr->socket,
-                                        inet_ntoa(((struct sockaddr_in *)
-                                               &cr->msg.daddr.so)->sin_addr));
-                               dapl_os_unlock(&cr->lock);
-                               dapls_cm_release(cr); /* release ref */
-                               continue;
-                       }
-                       dapl_os_unlock(&cr->lock);
-                       dapl_os_unlock(&hca_ptr->ib_trans.lock);
-                       
-                       ret = dapl_poll(cr->socket, event);
-
-                       dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
-                                    " poll ret=0x%x %s sck=%d\n",
-                                    ret, dapl_cm_state_str(cr->state), 
-                                    cr->socket);
-
-                       /* data on listen, qp exchange, and on disc req */
-                       if ((ret == DAPL_FD_READ) || 
-                           (cr->state != DCM_CONN_PENDING && ret == DAPL_FD_ERROR)) {
-                               if (cr->socket != DAPL_INVALID_SOCKET) {
-                                       switch (cr->state) {
-                                       case DCM_LISTEN:
-                                               dapli_socket_accept(cr);
-                                               break;
-                                       case DCM_ACCEPTING:
-                                               dapli_socket_accept_data(cr);
-                                               break;
-                                       case DCM_ACCEPTED:
-                                               dapli_socket_accept_rtu(cr);
-                                               break;
-                                       case DCM_REP_PENDING:
-                                               dapli_socket_connect_rtu(cr);
-                                               break;
-                                       case DCM_CONNECTED:
-                                               dapli_socket_disconnect(cr);
-                                               break;
-                                       default:
-                                               break;
-                                       }
-                               }
-                       /* ASYNC connections, writable, readable, error; check status */
-                       } else if (ret == DAPL_FD_WRITE ||
-                                  (cr->state == DCM_CONN_PENDING && 
-                                   ret == DAPL_FD_ERROR)) {
-
-                               if (ret == DAPL_FD_ERROR)
-                                       dapl_log(DAPL_DBG_TYPE_ERR, " CONN_PENDING - FD_ERROR\n");
-                               
-                               opt = 0;
-                               opt_len = sizeof(opt);
-                               ret = getsockopt(cr->socket, SOL_SOCKET,
-                                                SO_ERROR, (char *)&opt,
-                                                &opt_len);
-                               if (!ret && !opt)
-                                       dapli_socket_connected(cr, opt);
-                               else
-                                       dapli_socket_connected(cr, opt ? opt : dapl_socket_errno());
-                       } 
-
-                       dapls_cm_release(cr); /* release ref */
-                       dapl_os_lock(&hca_ptr->ib_trans.lock);
-               }
-
-               /* set to exit and all resources destroyed */
-               if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&
-                   (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))
-                       break;
-
-               dapl_os_unlock(&hca_ptr->ib_trans.lock);
-               dapl_select(set);
-
-               /* if pipe used to wakeup, consume */
-               while (dapl_poll(hca_ptr->ib_trans.scm[0], 
-                                DAPL_FD_READ) == DAPL_FD_READ) {
-                       if (recv(hca_ptr->ib_trans.scm[0], rbuf, 2, 0) == -1)
-                               dapl_log(DAPL_DBG_TYPE_THREAD,
-                                        " cr_thread: read pipe error = %s\n",
-                                        strerror(errno));
-               }
-               dapl_os_lock(&hca_ptr->ib_trans.lock);
-               
-               /* set to exit and all resources destroyed */
-               if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&
-                   (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))
-                       break;
-       }
-
-       dapl_os_unlock(&hca_ptr->ib_trans.lock);
-       dapl_os_free(set, sizeof(struct dapl_fd_set));
-out:
-       hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
-       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " cr_thread(hca %p) exit\n", hca_ptr);
-}
-
-
-#ifdef DAPL_COUNTERS
-/* Debug aid: List all Connections in process and state */
-void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
-{
-       /* Print in process CR's for this IA, if debug type set */
-       int i = 0;
-       dp_ib_cm_handle_t cr, next_cr;
-
-       dapl_os_lock(&ia_ptr->hca_ptr->ib_trans.lock);
-       if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)
-                                &ia_ptr->hca_ptr->ib_trans.list))
-                                next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
-                                &ia_ptr->hca_ptr->ib_trans.list);
-       else
-               next_cr = NULL;
-
-        printf("\n DAPL IA CONNECTIONS IN PROCESS:\n");
-       while (next_cr) {
-               cr = next_cr;
-               next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)
-                                &ia_ptr->hca_ptr->ib_trans.list,
-                               (DAPL_LLIST_ENTRY*)&cr->local_entry);
-
-               printf( "  CONN[%d]: sp %p ep %p sock %d %s %s %s %s %s %s PORT L-%x R-%x PID L-%x R-%x\n",
-                       i, cr->sp, cr->ep, cr->socket,
-                       cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
-                       dapl_cm_state_str(cr->state), dapl_cm_op_str(ntohs(cr->msg.op)),
-                       ntohs(cr->msg.op) == DCM_REQ ? /* local address */
-                               inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr) :
-                               inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr),
-                       cr->sp ? "<-" : "->",
-                               ntohs(cr->msg.op) == DCM_REQ ? /* remote address */
-                               inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr) :
-                               inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr),
-
-                       ntohs(cr->msg.op) == DCM_REQ ? /* local port */
-                               ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port) :
-                               ntohs(((struct sockaddr_in *)&cr->addr)->sin_port),
-
-                       ntohs(cr->msg.op) == DCM_REQ ? /* remote port */
-                               ntohs(((struct sockaddr_in *)&cr->addr)->sin_port) :
-                               ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port),
-
-                       cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[2]) : ntohs(*(uint16_t*)&cr->msg.resv[0]),
-                       cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[0]) : ntohs(*(uint16_t*)&cr->msg.resv[2]));
-
-               i++;
-       }
-       printf("\n");
-       dapl_os_unlock(&ia_ptr->hca_ptr->ib_trans.lock);
-}
-#endif
+       addr.sin_port = htons(serviceID + 1000);\r
+       addr.sin_family = AF_INET;\r
+       addr.sin_addr = ((struct sockaddr_in *) &ia_ptr->hca_ptr->hca_address)->sin_addr;\r
+\r
+       if ((bind(cm_ptr->socket, (struct sockaddr *)&addr, sizeof(addr)) < 0)\r
+           || (listen(cm_ptr->socket, 128) < 0)) {\r
+               dapl_log(DAPL_DBG_TYPE_CM,\r
+                        " listen: ERROR %s on port %d\n",\r
+                        strerror(errno), serviceID + 1000);\r
+               if (dapl_socket_errno() == EADDRINUSE)\r
+                       dat_status = DAT_CONN_QUAL_IN_USE;\r
+               else\r
+                       dat_status = DAT_CONN_QUAL_UNAVAILABLE;\r
+               goto bail;\r
+       }\r
+\r
+       /* set cm_handle for this service point, save listen socket */\r
+       sp_ptr->cm_srvc_handle = cm_ptr;\r
+       dapl_os_memcpy(&cm_ptr->addr, &addr, sizeof(addr)); \r
+\r
+       /* queue up listen socket to process inbound CR's */\r
+       cm_ptr->state = DCM_LISTEN;\r
+       dapli_cm_queue(cm_ptr);\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " setup listen: port %d cr %p s_fd %d\n",\r
+                    serviceID + 1000, cm_ptr, cm_ptr->socket);\r
+\r
+       return dat_status;\r
+bail:\r
+       /* Never queued, destroy here */\r
+       dapls_cm_release(cm_ptr);\r
+       return dat_status;\r
+}\r
+\r
+/*\r
+ * PASSIVE: accept socket \r
+ */\r
+static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)\r
+{\r
+       dp_ib_cm_handle_t acm_ptr;\r
+       int ret, len, opt = 1;\r
+       socklen_t sl;\r
+\r
+       /* \r
+        * Accept all CR's on this port to avoid half-connection (SYN_RCV)\r
+        * stalls with many to one connection storms\r
+        */\r
+       do {\r
+               /* Allocate accept CM and initialize */\r
+               if ((acm_ptr = dapli_cm_alloc(NULL)) == NULL)\r
+                       return;\r
+\r
+               acm_ptr->sp = cm_ptr->sp;\r
+               acm_ptr->hca = cm_ptr->hca;\r
+\r
+               len = sizeof(union dcm_addr);\r
+               acm_ptr->socket = accept(cm_ptr->socket,\r
+                                       (struct sockaddr *)\r
+                                       &acm_ptr->msg.daddr.so,\r
+                                       (socklen_t *) &len);\r
+               if (acm_ptr->socket == DAPL_INVALID_SOCKET) {\r
+                       dapl_log(DAPL_DBG_TYPE_ERR,\r
+                               " ACCEPT: ERR %s on FD %d l_cr %p\n",\r
+                               strerror(errno), cm_ptr->socket, cm_ptr);\r
+                       dapls_cm_release(acm_ptr);\r
+                       return;\r
+               }\r
+               dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s %x\n",\r
+                            inet_ntoa(((struct sockaddr_in *)\r
+                                       &acm_ptr->msg.daddr.so)->sin_addr),\r
+                            ntohs(((struct sockaddr_in *)\r
+                                       &acm_ptr->msg.daddr.so)->sin_port));\r
+\r
+               /* no delay for small packets */\r
+               ret = setsockopt(acm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,\r
+                          (char *)&opt, sizeof(opt));\r
+               if (ret)\r
+                       dapl_log(DAPL_DBG_TYPE_ERR,\r
+                                " ACCEPT: NODELAY setsockopt: 0x%x 0x%x %s\n",\r
+                                ret, dapl_socket_errno(), strerror(dapl_socket_errno()));\r
+               \r
+               /* get local address information from socket */\r
+               sl = sizeof(acm_ptr->addr);\r
+               getsockname(acm_ptr->socket, (struct sockaddr *)&acm_ptr->addr, &sl);\r
+               acm_ptr->state = DCM_ACCEPTING;\r
+               dapli_cm_queue(acm_ptr);\r
+       \r
+       } while (dapl_poll(cm_ptr->socket, DAPL_FD_READ) == DAPL_FD_READ);\r
+}\r
+\r
+/*\r
+ * PASSIVE: receive peer QP information, private data, post cr_event \r
+ */\r
+static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)\r
+{\r
+       int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+       void *p_data = NULL;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket accepted, read QP data\n");\r
+\r
+       /* read in DST QP info, IA address. check for private data */\r
+       len = recv(acm_ptr->socket, (char *)&acm_ptr->msg, exp, 0);\r
+       if (len != exp || ntohs(acm_ptr->msg.ver) != DCM_VER) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " ACCEPT read: ERR %s, rcnt=%d, ver=%d\n",\r
+                        strerror(errno), len, ntohs(acm_ptr->msg.ver));\r
+               goto bail;\r
+       }\r
+\r
+       /* keep the QP, address info in network order */\r
+\r
+       /* validate private data size before reading */\r
+       exp = ntohs(acm_ptr->msg.p_size);\r
+       if (exp > DCM_MAX_PDATA_SIZE) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                            " accept read: psize (%d) wrong\n",\r
+                            acm_ptr->msg.p_size);\r
+               goto bail;\r
+       }\r
+\r
+       /* read private data into cm_handle if any present */\r
+       if (exp) {\r
+               len = recv(acm_ptr->socket, acm_ptr->msg.p_data, exp, 0);\r
+               if (len != exp) {\r
+                       dapl_log(DAPL_DBG_TYPE_ERR,\r
+                                " accept read pdata: ERR %s, rcnt=%d\n",\r
+                                strerror(errno), len);\r
+                       goto bail;\r
+               }\r
+               p_data = acm_ptr->msg.p_data;\r
+       }\r
+\r
+       acm_ptr->state = DCM_ACCEPTING_DATA;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " ACCEPT: DST %s %x lid=0x%x, qpn=0x%x, psz=%d\n",\r
+                    inet_ntoa(((struct sockaddr_in *)\r
+                               &acm_ptr->msg.daddr.so)->sin_addr), \r
+                    ntohs(((struct sockaddr_in *)\r
+                            &acm_ptr->msg.daddr.so)->sin_port),\r
+                    ntohs(acm_ptr->msg.saddr.ib.lid), \r
+                    ntohl(acm_ptr->msg.saddr.ib.qpn), exp);\r
+\r
+#ifdef DAT_EXTENSIONS\r
+       if (acm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {\r
+               DAT_IB_EXTENSION_EVENT_DATA xevent;\r
+\r
+               /* post EVENT, modify_qp created ah */\r
+               xevent.status = 0;\r
+               xevent.type = DAT_IB_UD_CONNECT_REQUEST;\r
+\r
+               dapls_evd_post_cr_event_ext(acm_ptr->sp,\r
+                                           DAT_IB_UD_CONNECTION_REQUEST_EVENT,\r
+                                           acm_ptr,\r
+                                           (DAT_COUNT) exp,\r
+                                           (DAT_PVOID *) acm_ptr->msg.p_data,\r
+                                           (DAT_PVOID *) &xevent);\r
+       } else\r
+#endif\r
+               /* trigger CR event and return SUCCESS */\r
+               dapls_cr_callback(acm_ptr,\r
+                                 IB_CME_CONNECTION_REQUEST_PENDING,\r
+                                 p_data, exp, acm_ptr->sp);\r
+       return;\r
+bail:\r
+       /* mark for destroy, active will see socket close as rej */\r
+       dapli_cm_free(acm_ptr);\r
+       return;\r
+}\r
+\r
+/*\r
+ * PASSIVE: consumer accept, send local QP information, private data, \r
+ * queue on work thread to receive RTU information to avoid blocking\r
+ * user thread. \r
+ */\r
+static DAT_RETURN\r
+dapli_socket_accept_usr(DAPL_EP * ep_ptr,\r
+                       DAPL_CR * cr_ptr, DAT_COUNT p_size, DAT_PVOID p_data)\r
+{\r
+       DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;\r
+       dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;\r
+       ib_cm_msg_t local;\r
+       struct iovec iov[2];\r
+       int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+       DAT_RETURN ret = DAT_INTERNAL_ERROR;\r
+       socklen_t sl;\r
+\r
+       if (p_size > DCM_MAX_PDATA_SIZE) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " accept_usr: psize(%d) too large\n", p_size);\r
+               return DAT_LENGTH_ERROR;\r
+       }\r
+\r
+       /* must have a accepted socket */\r
+       if (cm_ptr->socket == DAPL_INVALID_SOCKET) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " accept_usr: cm socket invalid\n");\r
+               goto bail;\r
+       }\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " ACCEPT_USR: remote lid=0x%x"\r
+                    " qpn=0x%x qp_type %d, psize=%d\n",\r
+                    ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                    ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+                    cm_ptr->msg.saddr.ib.qp_type, \r
+                    ntohs(cm_ptr->msg.p_size));\r
+\r
+#ifdef DAT_EXTENSIONS\r
+       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD &&\r
+           ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " ACCEPT_USR: ERR remote QP is UD,"\r
+                        ", but local QP is not\n");\r
+               ret = (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);\r
+               goto bail;\r
+       }\r
+#endif\r
+\r
+       /* modify QP to RTR and then to RTS with remote info already read */\r
+       if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+                                 IBV_QPS_RTR, \r
+                                 cm_ptr->msg.saddr.ib.qpn,\r
+                                 cm_ptr->msg.saddr.ib.lid,\r
+                                 (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " ACCEPT_USR: QPS_RTR ERR %s -> %s\n",\r
+                        strerror(errno), \r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                    &cm_ptr->msg.daddr.so)->sin_addr));\r
+               goto bail;\r
+       }\r
+       if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+                                 IBV_QPS_RTS, \r
+                                 cm_ptr->msg.saddr.ib.qpn,\r
+                                 cm_ptr->msg.saddr.ib.lid,\r
+                                 NULL) != DAT_SUCCESS) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " ACCEPT_USR: QPS_RTS ERR %s -> %s\n",\r
+                        strerror(errno), \r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                    &cm_ptr->msg.daddr.so)->sin_addr));\r
+               goto bail;\r
+       }\r
+\r
+       /* save remote address information */\r
+       dapl_os_memcpy(&ep_ptr->remote_ia_address,\r
+                      &cm_ptr->msg.daddr.so,\r
+                      sizeof(union dcm_addr));\r
+\r
+       /* send our QP info, IA address, pdata. Don't overwrite dst data */\r
+       local.ver = htons(DCM_VER);\r
+       local.op = htons(DCM_REP);\r
+       local.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);\r
+       local.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;\r
+       local.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;\r
+       dapl_os_memcpy(&local.saddr.ib.gid[0], \r
+                      &ia_ptr->hca_ptr->ib_trans.gid, 16);\r
+       \r
+       /* Get local address information from socket */\r
+       sl = sizeof(local.daddr.so);\r
+       getsockname(cm_ptr->socket, (struct sockaddr *)&local.daddr.so, &sl);\r
+\r
+#ifdef DAPL_DBG\r
+       /* DBG: Active PID [0], PASSIVE PID [2] */\r
+       *(uint16_t*)&cm_ptr->msg.resv[2] = htons((uint16_t)dapl_os_getpid()); \r
+       dapl_os_memcpy(local.resv, cm_ptr->msg.resv, 4); \r
+#endif\r
+       cm_ptr->hca = ia_ptr->hca_ptr;\r
+       cm_ptr->state = DCM_ACCEPTED;\r
+\r
+       /* Link CM to EP, already queued on work thread */\r
+       dapl_ep_link_cm(ep_ptr, cm_ptr);\r
+       cm_ptr->ep = ep_ptr;\r
+\r
+       local.p_size = htons(p_size);\r
+       iov[0].iov_base = (void *)&local;\r
+       iov[0].iov_len = exp;\r
+       \r
+       if (p_size) {\r
+               iov[1].iov_base = p_data;\r
+               iov[1].iov_len = p_size;\r
+               len = writev(cm_ptr->socket, iov, 2);\r
+       } else \r
+               len = writev(cm_ptr->socket, iov, 1);\r
+       \r
+       if (len != (p_size + exp)) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",\r
+                        strerror(errno), len, \r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                  &cm_ptr->msg.daddr.so)->sin_addr));\r
+               dapl_ep_unlink_cm(ep_ptr, cm_ptr);\r
+               cm_ptr->ep = NULL;\r
+               goto bail;\r
+       }\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " ACCEPT_USR: local lid=0x%x qpn=0x%x psz=%d\n",\r
+                    ntohs(local.saddr.ib.lid),\r
+                    ntohl(local.saddr.ib.qpn), ntohs(local.p_size));\r
+       dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+                    " ACCEPT_USR: SRC GID subnet %016llx id %016llx\n",\r
+                    (unsigned long long)\r
+                    htonll(*(uint64_t*)&local.saddr.ib.gid[0]),\r
+                    (unsigned long long)\r
+                    htonll(*(uint64_t*)&local.saddr.ib.gid[8]));\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: accepted!\n");\r
+\r
+       return DAT_SUCCESS;\r
+bail:\r
+       /* schedule cleanup from workq */\r
+       dapli_cm_free(cm_ptr);\r
+       return ret;\r
+}\r
+\r
+/*\r
+ * PASSIVE: read RTU from active peer, post CONN event\r
+ */\r
+static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+       int len;\r
+       ib_cm_events_t event = IB_CME_CONNECTED;\r
+\r
+       /* complete handshake after final QP state change, VER and OP */\r
+       len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0);\r
+       if (len != 4 || ntohs(cm_ptr->msg.op) != DCM_RTU) {\r
+               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                        " ACCEPT_RTU: rcv ERR, rcnt=%d op=%x\n",\r
+                        len, ntohs(cm_ptr->msg.op),\r
+                        inet_ntoa(((struct sockaddr_in *)\r
+                                   &cm_ptr->msg.daddr.so)->sin_addr));\r
+               event = IB_CME_DESTINATION_REJECT;\r
+               goto bail;\r
+       }\r
+\r
+       /* save state and reference to EP, queue for disc event */\r
+       cm_ptr->state = DCM_CONNECTED;\r
+\r
+       /* final data exchange if remote QP state is good to go */\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: connected!\n");\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ud_bail:\r
+       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {\r
+               DAT_IB_EXTENSION_EVENT_DATA xevent;\r
+\r
+               ib_pd_handle_t pd_handle = \r
+                       ((DAPL_PZ *)cm_ptr->ep->param.pz_handle)->pd_handle;\r
+               \r
+               if (event == IB_CME_CONNECTED) {\r
+                       cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,\r
+                                               cm_ptr->ep->qp_handle,\r
+                                               cm_ptr->msg.saddr.ib.lid, \r
+                                               NULL);\r
+                       if (cm_ptr->ah) { \r
+                               /* post EVENT, modify_qp created ah */\r
+                               xevent.status = 0;\r
+                               xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;\r
+                               xevent.remote_ah.ah = cm_ptr->ah;\r
+                               xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);\r
+                               dapl_os_memcpy(&xevent.remote_ah.ia_addr,\r
+                                       &cm_ptr->msg.daddr.so,\r
+                                       sizeof(union dcm_addr));\r
+                               event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;\r
+                       } else \r
+                               event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+               } else \r
+                       event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+\r
+               dapl_log(DAPL_DBG_TYPE_CM, \r
+                       " CONN_RTU: UD AH %p for lid 0x%x qpn 0x%x\n", \r
+                       cm_ptr->ah, ntohs(cm_ptr->msg.saddr.ib.lid),\r
+                       ntohl(cm_ptr->msg.saddr.ib.qpn));\r
+\r
+               dapls_evd_post_connection_event_ext(\r
+                               (DAPL_EVD *) \r
+                               cm_ptr->ep->param.connect_evd_handle,\r
+                               event,\r
+                               (DAT_EP_HANDLE) cm_ptr->ep,\r
+                               (DAT_COUNT) ntohs(cm_ptr->msg.p_size),\r
+                               (DAT_PVOID *) cm_ptr->msg.p_data,\r
+                               (DAT_PVOID *) &xevent);\r
+\r
+                /* cleanup and release from local list, still on EP list */\r
+               dapli_cm_free(cm_ptr);\r
+                \r
+       } else \r
+#endif\r
+       {\r
+               dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);\r
+       }\r
+       return;\r
+      \r
+bail:\r
+#ifdef DAT_EXTENSIONS\r
+       if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) \r
+               goto ud_bail;\r
+#endif\r
+       cm_ptr->state = DCM_REJECTED;\r
+       dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);\r
+       dapli_cm_free(cm_ptr);\r
+}\r
+\r
+/*\r
+ * dapls_ib_connect\r
+ *\r
+ * Initiate a connection with the passive listener on another node\r
+ *\r
+ * Input:\r
+ *     ep_handle,\r
+ *     remote_ia_address,\r
+ *     remote_conn_qual,\r
+ *     prd_size                size of private data and structure\r
+ *     prd_prt                 pointer to private data structure\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ *     DAT_INSUFFICIENT_RESOURCES\r
+ *     DAT_INVALID_PARAMETER\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_connect(IN DAT_EP_HANDLE ep_handle,\r
+                IN DAT_IA_ADDRESS_PTR remote_ia_address,\r
+                IN DAT_CONN_QUAL remote_conn_qual,\r
+                IN DAT_COUNT private_data_size, IN void *private_data)\r
+{\r
+       DAPL_EP *ep_ptr = (DAPL_EP *) ep_handle;\r
+       \r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+                    " connect(ep_handle %p ....)\n", ep_handle);\r
+\r
+       return (dapli_socket_connect(ep_ptr, remote_ia_address,\r
+                                    remote_conn_qual,\r
+                                    private_data_size, private_data));\r
+}\r
+\r
+/*\r
+ * dapls_ib_disconnect\r
+ *\r
+ * Disconnect an EP\r
+ *\r
+ * Input:\r
+ *     ep_handle,\r
+ *     disconnect_flags\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ */\r
+DAT_RETURN\r
+dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)\r
+{\r
+       dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);\r
+\r
+       if (ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED ||\r
+               ep_ptr->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC) {\r
+               return DAT_SUCCESS;\r
+       } \r
+       \r
+       /* RC. Transition to error state to flush queue */\r
+        dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);\r
+\r
+       return (dapli_socket_disconnect(cm_ptr));\r
+}\r
+\r
+/*\r
+ * dapls_ib_disconnect_clean\r
+ *\r
+ * Clean up outstanding connection data. This routine is invoked\r
+ * after the final disconnect callback has occurred. Only on the\r
+ * ACTIVE side of a connection. It is also called if dat_ep_connect\r
+ * times out using the consumer supplied timeout value.\r
+ *\r
+ * Input:\r
+ *     ep_ptr          DAPL_EP\r
+ *     active          Indicates active side of connection\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     void\r
+ *\r
+ */\r
+void\r
+dapls_ib_disconnect_clean(IN DAPL_EP * ep_ptr,\r
+                         IN DAT_BOOLEAN active,\r
+                         IN const ib_cm_events_t ib_cm_event)\r
+{\r
+       if (ib_cm_event == IB_CME_TIMEOUT) {\r
+               dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);\r
+\r
+               dapl_log(DAPL_DBG_TYPE_WARN,\r
+                       "dapls_ib_disc_clean: CONN_TIMEOUT ep %p cm %p %s\n",\r
+                       ep_ptr, cm_ptr, dapl_cm_state_str(cm_ptr->state));\r
+               \r
+               /* schedule release of socket and local resources */\r
+               dapli_cm_free(cm_ptr);\r
+       }\r
+}\r
+\r
+/*\r
+ * dapl_ib_setup_conn_listener\r
+ *\r
+ * Have the CM set up a connection listener.\r
+ *\r
+ * Input:\r
+ *     ibm_hca_handle          HCA handle\r
+ *     qp_handle                       QP handle\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ *     DAT_INSUFFICIENT_RESOURCES\r
+ *     DAT_INTERNAL_ERROR\r
+ *     DAT_CONN_QUAL_UNAVAILBLE\r
+ *     DAT_CONN_QUAL_IN_USE\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_setup_conn_listener(IN DAPL_IA * ia_ptr,\r
+                            IN DAT_UINT64 ServiceID, IN DAPL_SP * sp_ptr)\r
+{\r
+       return (dapli_socket_listen(ia_ptr, ServiceID, sp_ptr));\r
+}\r
+\r
+/*\r
+ * dapl_ib_remove_conn_listener\r
+ *\r
+ * Have the CM remove a connection listener.\r
+ *\r
+ * Input:\r
+ *     ia_handle               IA handle\r
+ *     ServiceID               IB Channel Service ID\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ *     DAT_INVALID_STATE\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_remove_conn_listener(IN DAPL_IA * ia_ptr, IN DAPL_SP * sp_ptr)\r
+{\r
+       ib_cm_srvc_handle_t cm_ptr = sp_ptr->cm_srvc_handle;\r
+\r
+       /* free cm_srvc_handle, release will cleanup */\r
+       if (cm_ptr != NULL) {\r
+               /* cr_thread will free */\r
+               sp_ptr->cm_srvc_handle = NULL;\r
+               dapli_cm_free(cm_ptr);\r
+       }\r
+       return DAT_SUCCESS;\r
+}\r
+\r
+/*\r
+ * dapls_ib_accept_connection\r
+ *\r
+ * Perform necessary steps to accept a connection\r
+ *\r
+ * Input:\r
+ *     cr_handle\r
+ *     ep_handle\r
+ *     private_data_size\r
+ *     private_data\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ *     DAT_INSUFFICIENT_RESOURCES\r
+ *     DAT_INTERNAL_ERROR\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_accept_connection(IN DAT_CR_HANDLE cr_handle,\r
+                          IN DAT_EP_HANDLE ep_handle,\r
+                          IN DAT_COUNT p_size, IN const DAT_PVOID p_data)\r
+{\r
+       DAPL_CR *cr_ptr;\r
+       DAPL_EP *ep_ptr;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+                    "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",\r
+                    cr_handle, ep_handle, p_data, p_size);\r
+\r
+       cr_ptr = (DAPL_CR *) cr_handle;\r
+       ep_ptr = (DAPL_EP *) ep_handle;\r
+\r
+       /* allocate and attach a QP if necessary */\r
+       if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {\r
+               DAT_RETURN status;\r
+               status = dapls_ib_qp_alloc(ep_ptr->header.owner_ia,\r
+                                          ep_ptr, ep_ptr);\r
+               if (status != DAT_SUCCESS)\r
+                       return status;\r
+       }\r
+       return (dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data));\r
+}\r
+\r
+/*\r
+ * dapls_ib_reject_connection\r
+ *\r
+ * Reject a connection\r
+ *\r
+ * Input:\r
+ *     cr_handle\r
+ *\r
+ * Output:\r
+ *     none\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ *     DAT_INTERNAL_ERROR\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,\r
+                          IN int reason,\r
+                          IN DAT_COUNT psize, IN const DAT_PVOID pdata)\r
+{\r
+       struct iovec iov[2];\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+                    " reject(cm %p reason %x, pdata %p, psize %d)\n",\r
+                    cm_ptr, reason, pdata, psize);\r
+\r
+        if (psize > DCM_MAX_PDATA_SIZE)\r
+                return DAT_LENGTH_ERROR;\r
+\r
+       /* write reject data to indicate reject */\r
+       cm_ptr->msg.op = htons(DCM_REJ_USER);\r
+       cm_ptr->msg.p_size = htons(psize);\r
+       \r
+       iov[0].iov_base = (void *)&cm_ptr->msg;\r
+       iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+       if (psize) {\r
+               iov[1].iov_base = pdata;\r
+               iov[1].iov_len = psize;\r
+               writev(cm_ptr->socket, iov, 2);\r
+       } else {\r
+               writev(cm_ptr->socket, iov, 1);\r
+       }\r
+\r
+       /* release and cleanup CM object */\r
+       dapli_cm_free(cm_ptr);\r
+       return DAT_SUCCESS;\r
+}\r
+\r
+/*\r
+ * dapls_ib_cm_remote_addr\r
+ *\r
+ * Obtain the remote IP address given a connection\r
+ *\r
+ * Input:\r
+ *     cr_handle\r
+ *\r
+ * Output:\r
+ *     remote_ia_address: where to place the remote address\r
+ *\r
+ * Returns:\r
+ *     DAT_SUCCESS\r
+ *     DAT_INVALID_HANDLE\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle,\r
+                       OUT DAT_SOCK_ADDR6 * remote_ia_address)\r
+{\r
+       DAPL_HEADER *header;\r
+       dp_ib_cm_handle_t conn;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+                    "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",\r
+                    dat_handle);\r
+\r
+       header = (DAPL_HEADER *) dat_handle;\r
+\r
+       if (header->magic == DAPL_MAGIC_EP)\r
+               conn = dapl_get_cm_from_ep((DAPL_EP *) dat_handle);\r
+       else if (header->magic == DAPL_MAGIC_CR)\r
+               conn = ((DAPL_CR *) dat_handle)->ib_cm_handle;\r
+       else\r
+               return DAT_INVALID_HANDLE;\r
+\r
+       dapl_os_memcpy(remote_ia_address,\r
+                      &conn->msg.daddr.so, sizeof(DAT_SOCK_ADDR6));\r
+\r
+       return DAT_SUCCESS;\r
+}\r
+\r
+int dapls_ib_private_data_size(\r
+       IN DAPL_HCA *hca_ptr)\r
+{\r
+       return DCM_MAX_PDATA_SIZE;\r
+}\r
+\r
+/* outbound/inbound CR processing thread to avoid blocking applications */\r
+void cr_thread(void *arg)\r
+{\r
+       struct dapl_hca *hca_ptr = arg;\r
+       dp_ib_cm_handle_t cr, next_cr;\r
+       int opt, ret;\r
+       socklen_t opt_len;\r
+       char rbuf[2];\r
+       struct dapl_fd_set *set;\r
+       enum DAPL_FD_EVENTS event;\r
+\r
+       dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cr_thread: ENTER hca %p\n", hca_ptr);\r
+       set = dapl_alloc_fd_set();\r
+       if (!set)\r
+               goto out;\r
+\r
+       dapl_os_lock(&hca_ptr->ib_trans.lock);\r
+       hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;\r
+\r
+       while (1) {\r
+               dapl_fd_zero(set);\r
+               dapl_fd_set(hca_ptr->ib_trans.scm[0], set, DAPL_FD_READ);\r
+\r
+               if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))\r
+                       next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);\r
+               else\r
+                       next_cr = NULL;\r
+\r
+               while (next_cr) {\r
+                       cr = next_cr;\r
+                       next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,\r
+                                                       (DAPL_LLIST_ENTRY *) \r
+                                                       &cr->local_entry);\r
+                       dapls_cm_acquire(cr); /* hold thread ref */\r
+                       dapl_os_lock(&cr->lock);\r
+                       if (cr->state == DCM_FREE || \r
+                           hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {\r
+                               dapl_log(DAPL_DBG_TYPE_CM, \r
+                                        " CM FREE: %p ep=%p st=%s sck=%d refs=%d\n", \r
+                                        cr, cr->ep, dapl_cm_state_str(cr->state), \r
+                                        cr->socket, cr->ref_count);\r
+\r
+                               if (cr->socket != DAPL_INVALID_SOCKET) {\r
+                                       shutdown(cr->socket, SHUT_RDWR);\r
+                                       closesocket(cr->socket);\r
+                                       cr->socket = DAPL_INVALID_SOCKET;\r
+                               }\r
+                               dapl_os_unlock(&cr->lock);\r
+                               dapls_cm_release(cr); /* release alloc ref */\r
+                               dapli_cm_dequeue(cr); /* release workq ref */\r
+                               dapls_cm_release(cr); /* release thread ref */\r
+                               continue;\r
+                       }\r
+\r
+                       event = (cr->state == DCM_CONN_PENDING) ?\r
+                                       DAPL_FD_WRITE : DAPL_FD_READ;\r
+\r
+                       if (dapl_fd_set(cr->socket, set, event)) {\r
+                               dapl_log(DAPL_DBG_TYPE_ERR,\r
+                                        " cr_thread: fd_set ERR st=%d fd %d"\r
+                                        " -> %s\n", cr->state, cr->socket,\r
+                                        inet_ntoa(((struct sockaddr_in *)\r
+                                               &cr->msg.daddr.so)->sin_addr));\r
+                               dapl_os_unlock(&cr->lock);\r
+                               dapls_cm_release(cr); /* release ref */\r
+                               continue;\r
+                       }\r
+                       dapl_os_unlock(&cr->lock);\r
+                       dapl_os_unlock(&hca_ptr->ib_trans.lock);\r
+                       \r
+                       ret = dapl_poll(cr->socket, event);\r
+\r
+                       dapl_dbg_log(DAPL_DBG_TYPE_THREAD,\r
+                                    " poll ret=0x%x %s sck=%d\n",\r
+                                    ret, dapl_cm_state_str(cr->state), \r
+                                    cr->socket);\r
+\r
+                       /* data on listen, qp exchange, and on disc req */\r
+                       if ((ret == DAPL_FD_READ) || \r
+                           (cr->state != DCM_CONN_PENDING && ret == DAPL_FD_ERROR)) {\r
+                               if (cr->socket != DAPL_INVALID_SOCKET) {\r
+                                       switch (cr->state) {\r
+                                       case DCM_LISTEN:\r
+                                               dapli_socket_accept(cr);\r
+                                               break;\r
+                                       case DCM_ACCEPTING:\r
+                                               dapli_socket_accept_data(cr);\r
+                                               break;\r
+                                       case DCM_ACCEPTED:\r
+                                               dapli_socket_accept_rtu(cr);\r
+                                               break;\r
+                                       case DCM_REP_PENDING:\r
+                                               dapli_socket_connect_rtu(cr);\r
+                                               break;\r
+                                       case DCM_CONNECTED:\r
+                                               dapli_socket_disconnect(cr);\r
+                                               break;\r
+                                       default:\r
+                                               break;\r
+                                       }\r
+                               }\r
+                       /* ASYNC connections, writable, readable, error; check status */\r
+                       } else if (ret == DAPL_FD_WRITE ||\r
+                                  (cr->state == DCM_CONN_PENDING && \r
+                                   ret == DAPL_FD_ERROR)) {\r
+\r
+                               if (ret == DAPL_FD_ERROR)\r
+                                       dapl_log(DAPL_DBG_TYPE_ERR, " CONN_PENDING - FD_ERROR\n");\r
+                               \r
+                               opt = 0;\r
+                               opt_len = sizeof(opt);\r
+                               ret = getsockopt(cr->socket, SOL_SOCKET,\r
+                                                SO_ERROR, (char *)&opt,\r
+                                                &opt_len);\r
+                               if (!ret && !opt)\r
+                                       dapli_socket_connected(cr, opt);\r
+                               else\r
+                                       dapli_socket_connected(cr, opt ? opt : dapl_socket_errno());\r
+                       } \r
+\r
+                       dapls_cm_release(cr); /* release ref */\r
+                       dapl_os_lock(&hca_ptr->ib_trans.lock);\r
+               }\r
+\r
+               /* set to exit and all resources destroyed */\r
+               if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&\r
+                   (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))\r
+                       break;\r
+\r
+               dapl_os_unlock(&hca_ptr->ib_trans.lock);\r
+               dapl_select(set);\r
+\r
+               /* if pipe used to wakeup, consume */\r
+               while (dapl_poll(hca_ptr->ib_trans.scm[0], \r
+                                DAPL_FD_READ) == DAPL_FD_READ) {\r
+                       if (recv(hca_ptr->ib_trans.scm[0], rbuf, 2, 0) == -1)\r
+                               dapl_log(DAPL_DBG_TYPE_THREAD,\r
+                                        " cr_thread: read pipe error = %s\n",\r
+                                        strerror(errno));\r
+               }\r
+               dapl_os_lock(&hca_ptr->ib_trans.lock);\r
+               \r
+               /* set to exit and all resources destroyed */\r
+               if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&\r
+                   (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))\r
+                       break;\r
+       }\r
+\r
+       dapl_os_unlock(&hca_ptr->ib_trans.lock);\r
+       dapl_os_free(set, sizeof(struct dapl_fd_set));\r
+out:\r
+       hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;\r
+       dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " cr_thread(hca %p) exit\n", hca_ptr);\r
+}\r
+\r
+\r
+#ifdef DAPL_COUNTERS\r
+/* Debug aid: List all Connections in process and state */\r
+void dapls_print_cm_list(IN DAPL_IA *ia_ptr)\r
+{\r
+       /* Print in process CR's for this IA, if debug type set */\r
+       int i = 0;\r
+       dp_ib_cm_handle_t cr, next_cr;\r
+\r
+       dapl_os_lock(&ia_ptr->hca_ptr->ib_trans.lock);\r
+       if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)\r
+                                &ia_ptr->hca_ptr->ib_trans.list))\r
+                                next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)\r
+                                &ia_ptr->hca_ptr->ib_trans.list);\r
+       else\r
+               next_cr = NULL;\r
+\r
+        printf("\n DAPL IA CONNECTIONS IN PROCESS:\n");\r
+       while (next_cr) {\r
+               cr = next_cr;\r
+               next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)\r
+                                &ia_ptr->hca_ptr->ib_trans.list,\r
+                               (DAPL_LLIST_ENTRY*)&cr->local_entry);\r
+\r
+               printf( "  CONN[%d]: sp %p ep %p sock %d %s %s %s %s %s %s PORT L-%x R-%x PID L-%x R-%x\n",\r
+                       i, cr->sp, cr->ep, cr->socket,\r
+                       cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",\r
+                       dapl_cm_state_str(cr->state), dapl_cm_op_str(ntohs(cr->msg.op)),\r
+                       ntohs(cr->msg.op) == DCM_REQ ? /* local address */\r
+                               inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr) :\r
+                               inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr),\r
+                       cr->sp ? "<-" : "->",\r
+                               ntohs(cr->msg.op) == DCM_REQ ? /* remote address */\r
+                               inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr) :\r
+                               inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr),\r
+\r
+                       ntohs(cr->msg.op) == DCM_REQ ? /* local port */\r
+                               ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port) :\r
+                               ntohs(((struct sockaddr_in *)&cr->addr)->sin_port),\r
+\r
+                       ntohs(cr->msg.op) == DCM_REQ ? /* remote port */\r
+                               ntohs(((struct sockaddr_in *)&cr->addr)->sin_port) :\r
+                               ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port),\r
+\r
+                       cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[2]) : ntohs(*(uint16_t*)&cr->msg.resv[0]),\r
+                       cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[0]) : ntohs(*(uint16_t*)&cr->msg.resv[2]));\r
+\r
+               i++;\r
+       }\r
+       printf("\n");\r
+       dapl_os_unlock(&ia_ptr->hca_ptr->ib_trans.lock);\r
+}\r
+#endif\r