-/*
- * This Software is licensed under one of the following licenses:
- *
- * 1) under the terms of the "Common Public License 1.0" a copy of which is
- * available from the Open Source Initiative, see
- * http://www.opensource.org/licenses/cpl.php.
- *
- * 2) under the terms of the "The BSD License" a copy of which is
- * available from the Open Source Initiative, see
- * http://www.opensource.org/licenses/bsd-license.php.
- *
- * 3) under the terms of the "GNU General Public License (GPL) Version 2" a
- * copy of which is available from the Open Source Initiative, see
- * http://www.opensource.org/licenses/gpl-license.php.
- *
- * Licensee has the right to choose one of the above licenses.
- *
- * Redistributions of source code must retain the above copyright
- * notice and one of the license notices.
- *
- * Redistributions in binary form must reproduce both the above copyright
- * notice, one of the license notices in the documentation
- * and/or other materials provided with the distribution.
- */
-
-/***************************************************************************
- *
- * Module: uDAPL
- *
- * Filename: dapl_ib_cm.c
- *
- * Author: Arlin Davis
- *
- * Created: 3/10/2005
- *
- * Description:
- *
- * The uDAPL openib provider - connection management
- *
- ****************************************************************************
- * Source Control System Information
- *
- * $Id: $
- *
- * Copyright (c) 2005 Intel Corporation. All rights reserved.
- *
- **************************************************************************/
-
-#if defined(_WIN32)
-#define FD_SETSIZE 1024
-#define DAPL_FD_SETSIZE FD_SETSIZE
-#endif
-
-#include "dapl.h"
-#include "dapl_adapter_util.h"
-#include "dapl_evd_util.h"
-#include "dapl_cr_util.h"
-#include "dapl_name_service.h"
-#include "dapl_ib_util.h"
-#include "dapl_ep_util.h"
-#include "dapl_osd.h"
-
-#if defined(_WIN32) || defined(_WIN64)
-enum DAPL_FD_EVENTS {
- DAPL_FD_READ = 0x1,
- DAPL_FD_WRITE = 0x2,
- DAPL_FD_ERROR = 0x4
-};
-
-static int dapl_config_socket(DAPL_SOCKET s)
-{
- unsigned long nonblocking = 1;
- int ret, opt = 1;
-
- ret = ioctlsocket(s, FIONBIO, &nonblocking);
-
- /* no delay for small packets */
- if (!ret)
- ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
- (char *)&opt, sizeof(opt));
- return ret;
-}
-
-static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,
- int addrlen)
-{
- int err;
-
- err = connect(s, addr, addrlen);
- if (err == SOCKET_ERROR)
- err = WSAGetLastError();
- return (err == WSAEWOULDBLOCK) ? EAGAIN : err;
-}
-
-struct dapl_fd_set {
- struct fd_set set[3];
-};
-
-static struct dapl_fd_set *dapl_alloc_fd_set(void)
-{
- return dapl_os_alloc(sizeof(struct dapl_fd_set));
-}
-
-static void dapl_fd_zero(struct dapl_fd_set *set)
-{
- FD_ZERO(&set->set[0]);
- FD_ZERO(&set->set[1]);
- FD_ZERO(&set->set[2]);
-}
-
-static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
- enum DAPL_FD_EVENTS event)
-{
- FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);
- FD_SET(s, &set->set[2]);
- return 0;
-}
-
-static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
-{
- struct fd_set rw_fds;
- struct fd_set err_fds;
- struct timeval tv;
- int ret;
-
- FD_ZERO(&rw_fds);
- FD_ZERO(&err_fds);
- FD_SET(s, &rw_fds);
- FD_SET(s, &err_fds);
-
- tv.tv_sec = 0;
- tv.tv_usec = 0;
-
- if (event == DAPL_FD_READ)
- ret = select(1, &rw_fds, NULL, &err_fds, &tv);
- else
- ret = select(1, NULL, &rw_fds, &err_fds, &tv);
-
- if (ret == 0)
- return 0;
- else if (ret == SOCKET_ERROR)
- return DAPL_FD_ERROR;
- else if (FD_ISSET(s, &rw_fds))
- return event;
- else
- return DAPL_FD_ERROR;
-}
-
-static int dapl_select(struct dapl_fd_set *set)
-{
- int ret;
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep\n");
- ret = select(0, &set->set[0], &set->set[1], &set->set[2], NULL);
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup\n");
-
- if (ret == SOCKET_ERROR)
- dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
- " dapl_select: error 0x%x\n", WSAGetLastError());
-
- return ret;
-}
-
-static int dapl_socket_errno(void)
-{
- int err;
-
- err = WSAGetLastError();
- switch (err) {
- case WSAEACCES:
- case WSAEADDRINUSE:
- return EADDRINUSE;
- case WSAECONNRESET:
- return ECONNRESET;
- default:
- return err;
- }
-}
-#else // _WIN32 || _WIN64
-enum DAPL_FD_EVENTS {
- DAPL_FD_READ = POLLIN,
- DAPL_FD_WRITE = POLLOUT,
- DAPL_FD_ERROR = POLLERR
-};
-
-static int dapl_config_socket(DAPL_SOCKET s)
-{
- int ret, opt = 1;
-
- /* non-blocking */
- ret = fcntl(s, F_GETFL);
- if (ret >= 0)
- ret = fcntl(s, F_SETFL, ret | O_NONBLOCK);
-
- /* no delay for small packets */
- if (!ret)
- ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY,
- (char *)&opt, sizeof(opt));
- return ret;
-}
-
-static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,
- int addrlen)
-{
- int ret;
-
- ret = connect(s, addr, addrlen);
-
- return (errno == EINPROGRESS) ? EAGAIN : ret;
-}
-
-struct dapl_fd_set {
- int index;
- struct pollfd set[DAPL_FD_SETSIZE];
-};
-
-static struct dapl_fd_set *dapl_alloc_fd_set(void)
-{
- return dapl_os_alloc(sizeof(struct dapl_fd_set));
-}
-
-static void dapl_fd_zero(struct dapl_fd_set *set)
-{
- set->index = 0;
-}
-
-static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,
- enum DAPL_FD_EVENTS event)
-{
- if (set->index == DAPL_FD_SETSIZE - 1) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",
- set->index + 1);
- return -1;
- }
-
- set->set[set->index].fd = s;
- set->set[set->index].revents = 0;
- set->set[set->index++].events = event;
- return 0;
-}
-
-static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)
-{
- struct pollfd fds;
- int ret;
-
- fds.fd = s;
- fds.events = event;
- fds.revents = 0;
- ret = poll(&fds, 1, 0);
- dapl_log(DAPL_DBG_TYPE_THREAD, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",
- s, ret, fds.revents);
- if (ret == 0)
- return 0;
- else if (ret < 0 || (fds.revents & (POLLERR | POLLHUP | POLLNVAL)))
- return DAPL_FD_ERROR;
- else
- return event;
-}
-
-static int dapl_select(struct dapl_fd_set *set)
-{
- int ret;
-
- dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: sleep, fds=%d\n", set->index);
- ret = poll(set->set, set->index, -1);
- dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: wakeup, ret=0x%x\n", ret);
- return ret;
-}
-
-#define dapl_socket_errno() errno
-#endif
-
-static void dapli_cm_thread_signal(dp_ib_cm_handle_t cm_ptr)
-{
- send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);
-}
-
-static void dapli_cm_free(dp_ib_cm_handle_t cm_ptr)
-{
- dapl_os_lock(&cm_ptr->lock);
- cm_ptr->state = DCM_FREE;
- dapl_os_unlock(&cm_ptr->lock);
- dapli_cm_thread_signal(cm_ptr);
-}
-
-static void dapli_cm_dealloc(dp_ib_cm_handle_t cm_ptr)
-{
- dapl_os_assert(!cm_ptr->ref_count);
-
- if (cm_ptr->socket != DAPL_INVALID_SOCKET) {
- shutdown(cm_ptr->socket, SHUT_RDWR);
- closesocket(cm_ptr->socket);
- }
- if (cm_ptr->ah)
- ibv_destroy_ah(cm_ptr->ah);
-
- dapl_os_lock_destroy(&cm_ptr->lock);
- dapl_os_wait_object_destroy(&cm_ptr->event);
- dapl_os_free(cm_ptr, sizeof(*cm_ptr));
-}
-
-void dapls_cm_acquire(dp_ib_cm_handle_t cm_ptr)
-{
- dapl_os_lock(&cm_ptr->lock);
- cm_ptr->ref_count++;
- dapl_os_unlock(&cm_ptr->lock);
-}
-
-void dapls_cm_release(dp_ib_cm_handle_t cm_ptr)
-{
- dapl_os_lock(&cm_ptr->lock);
+/*\r
+ * This Software is licensed under one of the following licenses:\r
+ *\r
+ * 1) under the terms of the "Common Public License 1.0" a copy of which is\r
+ * available from the Open Source Initiative, see\r
+ * http://www.opensource.org/licenses/cpl.php.\r
+ *\r
+ * 2) under the terms of the "The BSD License" a copy of which is\r
+ * available from the Open Source Initiative, see\r
+ * http://www.opensource.org/licenses/bsd-license.php.\r
+ *\r
+ * 3) under the terms of the "GNU General Public License (GPL) Version 2" a\r
+ * copy of which is available from the Open Source Initiative, see\r
+ * http://www.opensource.org/licenses/gpl-license.php.\r
+ *\r
+ * Licensee has the right to choose one of the above licenses.\r
+ *\r
+ * Redistributions of source code must retain the above copyright\r
+ * notice and one of the license notices.\r
+ *\r
+ * Redistributions in binary form must reproduce both the above copyright\r
+ * notice, one of the license notices in the documentation\r
+ * and/or other materials provided with the distribution.\r
+ */\r
+\r
+/***************************************************************************\r
+ *\r
+ * Module: uDAPL\r
+ *\r
+ * Filename: dapl_ib_cm.c\r
+ *\r
+ * Author: Arlin Davis\r
+ *\r
+ * Created: 3/10/2005\r
+ *\r
+ * Description: \r
+ *\r
+ * The uDAPL openib provider - connection management\r
+ *\r
+ ****************************************************************************\r
+ * Source Control System Information\r
+ *\r
+ * $Id: $\r
+ *\r
+ * Copyright (c) 2005 Intel Corporation. All rights reserved.\r
+ *\r
+ **************************************************************************/\r
+\r
+#if defined(_WIN32)\r
+#define FD_SETSIZE 1024\r
+#define DAPL_FD_SETSIZE FD_SETSIZE\r
+#endif\r
+\r
+#include "dapl.h"\r
+#include "dapl_adapter_util.h"\r
+#include "dapl_evd_util.h"\r
+#include "dapl_cr_util.h"\r
+#include "dapl_name_service.h"\r
+#include "dapl_ib_util.h"\r
+#include "dapl_ep_util.h"\r
+#include "dapl_osd.h"\r
+\r
+#if defined(_WIN32) || defined(_WIN64)\r
+enum DAPL_FD_EVENTS {\r
+ DAPL_FD_READ = 0x1,\r
+ DAPL_FD_WRITE = 0x2,\r
+ DAPL_FD_ERROR = 0x4\r
+};\r
+\r
+static int dapl_config_socket(DAPL_SOCKET s)\r
+{\r
+ unsigned long nonblocking = 1;\r
+ int ret, opt = 1;\r
+\r
+ ret = ioctlsocket(s, FIONBIO, &nonblocking);\r
+\r
+ /* no delay for small packets */\r
+ if (!ret)\r
+ ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, \r
+ (char *)&opt, sizeof(opt));\r
+ return ret;\r
+}\r
+\r
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,\r
+ int addrlen)\r
+{\r
+ int err;\r
+\r
+ err = connect(s, addr, addrlen);\r
+ if (err == SOCKET_ERROR)\r
+ err = WSAGetLastError();\r
+ return (err == WSAEWOULDBLOCK) ? EAGAIN : err;\r
+}\r
+\r
+struct dapl_fd_set {\r
+ struct fd_set set[3];\r
+};\r
+\r
+static struct dapl_fd_set *dapl_alloc_fd_set(void)\r
+{\r
+ return dapl_os_alloc(sizeof(struct dapl_fd_set));\r
+}\r
+\r
+static void dapl_fd_zero(struct dapl_fd_set *set)\r
+{\r
+ FD_ZERO(&set->set[0]);\r
+ FD_ZERO(&set->set[1]);\r
+ FD_ZERO(&set->set[2]);\r
+}\r
+\r
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,\r
+ enum DAPL_FD_EVENTS event)\r
+{\r
+ FD_SET(s, &set->set[(event == DAPL_FD_READ) ? 0 : 1]);\r
+ FD_SET(s, &set->set[2]);\r
+ return 0;\r
+}\r
+\r
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)\r
+{\r
+ struct fd_set rw_fds;\r
+ struct fd_set err_fds;\r
+ struct timeval tv;\r
+ int ret;\r
+\r
+ FD_ZERO(&rw_fds);\r
+ FD_ZERO(&err_fds);\r
+ FD_SET(s, &rw_fds);\r
+ FD_SET(s, &err_fds);\r
+\r
+ tv.tv_sec = 0;\r
+ tv.tv_usec = 0;\r
+\r
+ if (event == DAPL_FD_READ)\r
+ ret = select(1, &rw_fds, NULL, &err_fds, &tv);\r
+ else\r
+ ret = select(1, NULL, &rw_fds, &err_fds, &tv);\r
+\r
+ if (ret == 0)\r
+ return 0;\r
+ else if (ret == SOCKET_ERROR)\r
+ return DAPL_FD_ERROR;\r
+ else if (FD_ISSET(s, &rw_fds))\r
+ return event;\r
+ else\r
+ return DAPL_FD_ERROR;\r
+}\r
+\r
+static int dapl_select(struct dapl_fd_set *set)\r
+{\r
+ int ret;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: sleep\n");\r
+ ret = select(0, &set->set[0], &set->set[1], &set->set[2], NULL);\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " dapl_select: wakeup\n");\r
+\r
+ if (ret == SOCKET_ERROR)\r
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD,\r
+ " dapl_select: error 0x%x\n", WSAGetLastError());\r
+\r
+ return ret;\r
+}\r
+\r
+static int dapl_socket_errno(void)\r
+{\r
+ int err;\r
+\r
+ err = WSAGetLastError();\r
+ switch (err) {\r
+ case WSAEACCES:\r
+ case WSAEADDRINUSE:\r
+ return EADDRINUSE;\r
+ case WSAECONNRESET:\r
+ return ECONNRESET;\r
+ default:\r
+ return err;\r
+ }\r
+}\r
+#else // _WIN32 || _WIN64\r
+enum DAPL_FD_EVENTS {\r
+ DAPL_FD_READ = POLLIN,\r
+ DAPL_FD_WRITE = POLLOUT,\r
+ DAPL_FD_ERROR = POLLERR\r
+};\r
+\r
+static int dapl_config_socket(DAPL_SOCKET s)\r
+{\r
+ int ret, opt = 1;\r
+\r
+ /* non-blocking */\r
+ ret = fcntl(s, F_GETFL);\r
+ if (ret >= 0)\r
+ ret = fcntl(s, F_SETFL, ret | O_NONBLOCK);\r
+\r
+ /* no delay for small packets */\r
+ if (!ret)\r
+ ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, \r
+ (char *)&opt, sizeof(opt));\r
+ return ret;\r
+}\r
+\r
+static int dapl_connect_socket(DAPL_SOCKET s, struct sockaddr *addr,\r
+ int addrlen)\r
+{\r
+ int ret;\r
+\r
+ ret = connect(s, addr, addrlen);\r
+\r
+ return (errno == EINPROGRESS) ? EAGAIN : ret;\r
+}\r
+\r
+struct dapl_fd_set {\r
+ int index;\r
+ struct pollfd set[DAPL_FD_SETSIZE];\r
+};\r
+\r
+static struct dapl_fd_set *dapl_alloc_fd_set(void)\r
+{\r
+ return dapl_os_alloc(sizeof(struct dapl_fd_set));\r
+}\r
+\r
+static void dapl_fd_zero(struct dapl_fd_set *set)\r
+{\r
+ set->index = 0;\r
+}\r
+\r
+static int dapl_fd_set(DAPL_SOCKET s, struct dapl_fd_set *set,\r
+ enum DAPL_FD_EVENTS event)\r
+{\r
+ if (set->index == DAPL_FD_SETSIZE - 1) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ "SCM ERR: cm_thread exceeded FD_SETSIZE %d\n",\r
+ set->index + 1);\r
+ return -1;\r
+ }\r
+\r
+ set->set[set->index].fd = s;\r
+ set->set[set->index].revents = 0;\r
+ set->set[set->index++].events = event;\r
+ return 0;\r
+}\r
+\r
+static enum DAPL_FD_EVENTS dapl_poll(DAPL_SOCKET s, enum DAPL_FD_EVENTS event)\r
+{\r
+ struct pollfd fds;\r
+ int ret;\r
+\r
+ fds.fd = s;\r
+ fds.events = event;\r
+ fds.revents = 0;\r
+ ret = poll(&fds, 1, 0);\r
+ dapl_log(DAPL_DBG_TYPE_THREAD, " dapl_poll: fd=%d ret=%d, evnts=0x%x\n",\r
+ s, ret, fds.revents);\r
+ if (ret == 0)\r
+ return 0;\r
+ else if (ret < 0 || (fds.revents & (POLLERR | POLLHUP | POLLNVAL))) \r
+ return DAPL_FD_ERROR;\r
+ else \r
+ return event;\r
+}\r
+\r
+static int dapl_select(struct dapl_fd_set *set)\r
+{\r
+ int ret;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: sleep, fds=%d\n", set->index);\r
+ ret = poll(set->set, set->index, -1);\r
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " dapl_select: wakeup, ret=0x%x\n", ret);\r
+ return ret;\r
+}\r
+\r
+#define dapl_socket_errno() errno\r
+#endif\r
+\r
+static void dapli_cm_thread_signal(dp_ib_cm_handle_t cm_ptr) \r
+{\r
+ send(cm_ptr->hca->ib_trans.scm[1], "w", sizeof "w", 0);\r
+}\r
+\r
+static void dapli_cm_free(dp_ib_cm_handle_t cm_ptr) \r
+{\r
+ dapl_os_lock(&cm_ptr->lock);\r
+ cm_ptr->state = DCM_FREE;\r
+ dapl_os_unlock(&cm_ptr->lock);\r
+ dapli_cm_thread_signal(cm_ptr);\r
+}\r
+\r
+static void dapli_cm_dealloc(dp_ib_cm_handle_t cm_ptr) \r
+{\r
+ dapl_os_assert(!cm_ptr->ref_count);\r
+ \r
+ if (cm_ptr->socket != DAPL_INVALID_SOCKET) {\r
+ shutdown(cm_ptr->socket, SHUT_RDWR);\r
+ closesocket(cm_ptr->socket);\r
+ }\r
+ if (cm_ptr->ah) \r
+ ibv_destroy_ah(cm_ptr->ah);\r
+ \r
+ dapl_os_lock_destroy(&cm_ptr->lock);\r
+ dapl_os_wait_object_destroy(&cm_ptr->event);\r
+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));\r
+}\r
+\r
+void dapls_cm_acquire(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ dapl_os_lock(&cm_ptr->lock);\r
+ cm_ptr->ref_count++;\r
+ dapl_os_unlock(&cm_ptr->lock);\r
+}\r
+\r
+void dapls_cm_release(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ dapl_os_lock(&cm_ptr->lock);\r
cm_ptr->ref_count--;\r
if (cm_ptr->ref_count) {\r
dapl_os_unlock(&cm_ptr->lock);\r
}\r
dapl_os_unlock(&cm_ptr->lock);\r
dapli_cm_dealloc(cm_ptr);\r
-}
-
-static dp_ib_cm_handle_t dapli_cm_alloc(DAPL_EP *ep_ptr)
-{
- dp_ib_cm_handle_t cm_ptr;
-
- /* Allocate CM, init lock, and initialize */
- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)
- return NULL;
-
- (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));
- if (dapl_os_lock_init(&cm_ptr->lock))
- goto bail;
-
- if (dapl_os_wait_object_init(&cm_ptr->event)) {
- dapl_os_lock_destroy(&cm_ptr->lock);
- goto bail;
- }
- dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->list_entry);
- dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);
-
- cm_ptr->msg.ver = htons(DCM_VER);
- cm_ptr->socket = DAPL_INVALID_SOCKET;
- dapls_cm_acquire(cm_ptr);
-
- /* Link EP and CM */
- if (ep_ptr != NULL) {
- dapl_ep_link_cm(ep_ptr, cm_ptr); /* ref++ */
- cm_ptr->ep = ep_ptr;
- cm_ptr->hca = ((DAPL_IA *)ep_ptr->param.ia_handle)->hca_ptr;
- }
- return cm_ptr;
-bail:
- dapl_os_free(cm_ptr, sizeof(*cm_ptr));
- return NULL;
-}
-
-/* queue socket for processing CM work */
-static void dapli_cm_queue(dp_ib_cm_handle_t cm_ptr)
-{
- /* add to work queue for cr thread processing */
- dapl_os_lock(&cm_ptr->hca->ib_trans.lock);
- dapls_cm_acquire(cm_ptr);
- dapl_llist_add_tail(&cm_ptr->hca->ib_trans.list,
- (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry, cm_ptr);
- dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);
- dapli_cm_thread_signal(cm_ptr);
-}
-
-/* called with local LIST lock */
-static void dapli_cm_dequeue(dp_ib_cm_handle_t cm_ptr)
-{
- /* Remove from work queue, cr thread processing */
- dapl_llist_remove_entry(&cm_ptr->hca->ib_trans.list,
- (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);
- dapls_cm_release(cm_ptr);
-}
-
-/* BLOCKING: called from dapl_ep_free, EP link will be last ref */
-void dapls_cm_free(dp_ib_cm_handle_t cm_ptr)
-{
- dapl_log(DAPL_DBG_TYPE_CM,
- " cm_free: cm %p %s ep %p refs=%d\n",
- cm_ptr, dapl_cm_state_str(cm_ptr->state),
- cm_ptr->ep, cm_ptr->ref_count);
-
- /* free from internal workq, wait until EP is last ref */
- dapl_os_lock(&cm_ptr->lock);
- cm_ptr->state = DCM_FREE;
- while (cm_ptr->ref_count != 1) {
- dapl_os_unlock(&cm_ptr->lock);
- dapl_os_sleep_usec(10000);
- dapl_os_lock(&cm_ptr->lock);
- }
- dapl_os_unlock(&cm_ptr->lock);
-
- /* unlink, dequeue from EP. Final ref so release will destroy */
- dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);
-}
-
-/*
- * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect
- * or from ep_free.
- */
-DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)
-{
- DAT_UINT32 disc_data = htonl(0xdead);
-
- dapl_os_lock(&cm_ptr->lock);
- if (cm_ptr->state != DCM_CONNECTED ||
- cm_ptr->state == DCM_DISCONNECTED) {
- dapl_os_unlock(&cm_ptr->lock);
- return DAT_SUCCESS;
- }
- cm_ptr->state = DCM_DISCONNECTED;
- dapl_os_unlock(&cm_ptr->lock);
-
- /* send disc date, close socket, schedule destroy */
- dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0,0,0);
- send(cm_ptr->socket, (char *)&disc_data, sizeof(disc_data), 0);
-
- /* disconnect events for RC's only */
- if (cm_ptr->ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) {
- if (cm_ptr->ep->cr_ptr) {
- dapls_cr_callback(cm_ptr,
- IB_CME_DISCONNECTED,
- NULL, 0, cm_ptr->sp);
- } else {
- dapl_evd_connection_callback(cm_ptr,
- IB_CME_DISCONNECTED,
- NULL, 0, cm_ptr->ep);
- }
- }
-
- /* release from workq */
- dapli_cm_free(cm_ptr);
-
- /* scheduled destroy via disconnect clean in callback */
- return DAT_SUCCESS;
-}
-
-/*
- * ACTIVE: socket connected, send QP information to peer
- */
-static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)
-{
- int len, exp;
- struct iovec iov[2];
- struct dapl_ep *ep_ptr = cm_ptr->ep;
-
- if (err) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_PENDING: %s ERR %s -> %s %d\n",
- err == -1 ? "POLL" : "SOCKOPT",
- err == -1 ? strerror(errno) : strerror(err),
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->addr)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &cm_ptr->addr)->sin_port));
- goto bail;
- }
-
- cm_ptr->state = DCM_REP_PENDING;
-
- /* send qp info and pdata to remote peer */
- exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
- iov[0].iov_base = (void *)&cm_ptr->msg;
- iov[0].iov_len = exp;
- if (cm_ptr->msg.p_size) {
- iov[1].iov_base = cm_ptr->msg.p_data;
- iov[1].iov_len = ntohs(cm_ptr->msg.p_size);
- len = writev(cm_ptr->socket, iov, 2);
- } else {
- len = writev(cm_ptr->socket, iov, 1);
- }
-
- if (len != (exp + ntohs(cm_ptr->msg.p_size))) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_PENDING len ERR %s, wcnt=%d(%d) -> %s\n",
- strerror(errno), len,
- exp + ntohs(cm_ptr->msg.p_size),
- inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr));
- goto bail;
- }
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " CONN_PENDING: sending SRC lid=0x%x,"
- " qpn=0x%x, psize=%d\n",
- ntohs(cm_ptr->msg.saddr.ib.lid),
- ntohl(cm_ptr->msg.saddr.ib.qpn),
- ntohs(cm_ptr->msg.p_size));
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " CONN_PENDING: SRC GID subnet %016llx id %016llx\n",
- (unsigned long long)
- htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[0]),
- (unsigned long long)
- htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[8]));
- return;
-
-bail:
- /* mark CM object for cleanup */
- dapli_cm_free(cm_ptr);
- dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, 0, ep_ptr);
-}
-
-/*
- * ACTIVE: Create socket, connect, defer exchange QP information to CR thread
- * to avoid blocking.
- */
-static DAT_RETURN
-dapli_socket_connect(DAPL_EP * ep_ptr,
- DAT_IA_ADDRESS_PTR r_addr,
- DAT_CONN_QUAL r_qual, DAT_COUNT p_size, DAT_PVOID p_data)
-{
- dp_ib_cm_handle_t cm_ptr;
- int ret;
- socklen_t sl;
- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- DAT_RETURN dat_ret = DAT_INSUFFICIENT_RESOURCES;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",
- r_qual, p_size);
-
- cm_ptr = dapli_cm_alloc(ep_ptr);
- if (cm_ptr == NULL)
- return dat_ret;
-
- /* create, connect, sockopt, and exchange QP information */
- if ((cm_ptr->socket =
- socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " connect: socket create ERR %s\n", strerror(errno));
- goto bail;
- }
-
- ret = dapl_config_socket(cm_ptr->socket);
- if (ret < 0) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " connect: config socket %d ERR %d %s\n",
- cm_ptr->socket, ret, strerror(dapl_socket_errno()));
- dat_ret = DAT_INTERNAL_ERROR;
- goto bail;
- }
-
- /* save remote address */
- dapl_os_memcpy(&cm_ptr->addr, r_addr, sizeof(*r_addr));
-
-#ifdef DAPL_DBG
- /* DBG: Active PID [0], PASSIVE PID [2]*/
- *(uint16_t*)&cm_ptr->msg.resv[0] = htons((uint16_t)dapl_os_getpid());
- *(uint16_t*)&cm_ptr->msg.resv[2] = ((struct sockaddr_in *)&cm_ptr->addr)->sin_port;
-#endif
- ((struct sockaddr_in *)&cm_ptr->addr)->sin_port = htons(r_qual + 1000);
- ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *)&cm_ptr->addr,
- sizeof(cm_ptr->addr));
- if (ret && ret != EAGAIN) {
- dat_ret = DAT_INVALID_ADDRESS;
- goto bail;
- }
-
- /* REQ: QP info in msg.saddr, IA address in msg.daddr, and pdata */
- cm_ptr->hca = ia_ptr->hca_ptr;
- cm_ptr->msg.op = ntohs(DCM_REQ);
- cm_ptr->msg.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);
- cm_ptr->msg.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;
- cm_ptr->msg.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
- dapl_os_memcpy(&cm_ptr->msg.saddr.ib.gid[0],
- &ia_ptr->hca_ptr->ib_trans.gid, 16);
-
- /* get local address information from socket */
- sl = sizeof(cm_ptr->msg.daddr.so);
- if (getsockname(cm_ptr->socket, (struct sockaddr *)&cm_ptr->msg.daddr.so, &sl)) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " connect getsockname ERROR: %s -> %s r_qual %d\n",
- strerror(errno),
- inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
- (unsigned int)r_qual);;
- }
-
- if (p_size) {
- cm_ptr->msg.p_size = htons(p_size);
- dapl_os_memcpy(cm_ptr->msg.p_data, p_data, p_size);
- }
-
- /* connected or pending, either way results via async event */
- if (ret == 0)
- dapli_socket_connected(cm_ptr, 0);
- else
- cm_ptr->state = DCM_CONN_PENDING;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: p_data=%p %p\n",
- cm_ptr->msg.p_data, cm_ptr->msg.p_data);
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect: %s r_qual %d pending, p_sz=%d, %d %d ...\n",
- inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),
- (unsigned int)r_qual, ntohs(cm_ptr->msg.p_size),
- cm_ptr->msg.p_data[0], cm_ptr->msg.p_data[1]);
-
- /* queue up on work thread */
- dapli_cm_queue(cm_ptr);
- return DAT_SUCCESS;
-bail:
- dapl_log(DAPL_DBG_TYPE_ERR,
- " connect ERROR: %s -> %s r_qual %d\n",
- strerror(errno),
- inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),
- (unsigned int)r_qual);
-
- /* Never queued, destroy */
- dapls_cm_release(cm_ptr);
- return dat_ret;
-}
-
-/*
- * ACTIVE: exchange QP information, called from CR thread
- */
-static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)
-{
- DAPL_EP *ep_ptr = cm_ptr->ep;
- int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
- ib_cm_events_t event = IB_CME_LOCAL_FAILURE;
- socklen_t sl;
-
- /* read DST information into cm_ptr, overwrite SRC info */
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: recv peer QP data\n");
-
- len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, exp, 0);
- if (len != exp || ntohs(cm_ptr->msg.ver) != DCM_VER) {
- dapl_log(DAPL_DBG_TYPE_WARN,
- " CONN_RTU read: sk %d ERR %s, rcnt=%d, v=%d -> %s PORT L-%x R-%x PID L-%x R-%x\n",
- cm_ptr->socket, strerror(errno), len, ntohs(cm_ptr->msg.ver),
- inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),
- ntohs(((struct sockaddr_in *)&cm_ptr->msg.daddr.so)->sin_port),
- ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port),
- ntohs(*(uint16_t*)&cm_ptr->msg.resv[0]),
- ntohs(*(uint16_t*)&cm_ptr->msg.resv[2]));
-
- /* Retry; corner case where server tcp stack resets under load */
- if (dapl_socket_errno() == ECONNRESET) {
- closesocket(cm_ptr->socket);
- cm_ptr->socket = DAPL_INVALID_SOCKET;
- dapli_socket_connect(cm_ptr->ep, (DAT_IA_ADDRESS_PTR)&cm_ptr->addr,
- ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port) - 1000,
- ntohs(cm_ptr->msg.p_size), &cm_ptr->msg.p_data);
- dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);
- dapli_cm_free(cm_ptr);
- return;
- }
- goto bail;
- }
-
- /* keep the QP, address info in network order */
-
- /* save remote address information, in msg.daddr */
- dapl_os_memcpy(&ep_ptr->remote_ia_address,
- &cm_ptr->msg.daddr.so,
- sizeof(union dcm_addr));
-
- /* save local address information from socket */
- sl = sizeof(cm_ptr->addr);
- getsockname(cm_ptr->socket,(struct sockaddr *)&cm_ptr->addr, &sl);
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " CONN_RTU: DST %s %d lid=0x%x,"
- " qpn=0x%x, qp_type=%d, psize=%d\n",
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_port),
- ntohs(cm_ptr->msg.saddr.ib.lid),
- ntohl(cm_ptr->msg.saddr.ib.qpn),
- cm_ptr->msg.saddr.ib.qp_type,
- ntohs(cm_ptr->msg.p_size));
-
- /* validate private data size before reading */
- if (ntohs(cm_ptr->msg.p_size) > DCM_MAX_PDATA_SIZE) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU read: psize (%d) wrong -> %s\n",
- ntohs(cm_ptr->msg.p_size),
- inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr));
- goto bail;
- }
-
- /* read private data into cm_handle if any present */
- dapl_dbg_log(DAPL_DBG_TYPE_EP," CONN_RTU: read private data\n");
- exp = ntohs(cm_ptr->msg.p_size);
- if (exp) {
- len = recv(cm_ptr->socket, cm_ptr->msg.p_data, exp, 0);
- if (len != exp) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",
- strerror(errno), len,
- inet_ntoa(((struct sockaddr_in *)
- ep_ptr->param.
- remote_ia_address_ptr)->sin_addr));
- goto bail;
- }
- }
-
- /* check for consumer or protocol stack reject */
- if (ntohs(cm_ptr->msg.op) == DCM_REP)
- event = IB_CME_CONNECTED;
- else if (ntohs(cm_ptr->msg.op) == DCM_REJ_USER)
- event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;
- else
- event = IB_CME_DESTINATION_REJECT;
-
- if (event != IB_CME_CONNECTED) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU: reject from %s %x\n",
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_port));
- goto bail;
- }
-
- /* modify QP to RTR and then to RTS with remote info */
- if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTR,
- cm_ptr->msg.saddr.ib.qpn,
- cm_ptr->msg.saddr.ib.lid,
- (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: QPS_RTR ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",
- strerror(errno), ep_ptr->qp_handle->qp_type,
- ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,
- ntohl(cm_ptr->msg.saddr.ib.qpn),
- ntohs(cm_ptr->msg.saddr.ib.lid),
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_port));
- goto bail;
- }
- if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTS,
- cm_ptr->msg.saddr.ib.qpn,
- cm_ptr->msg.saddr.ib.lid,
- NULL) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: QPS_RTS ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",
- strerror(errno), ep_ptr->qp_handle->qp_type,
- ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,
- ntohl(cm_ptr->msg.saddr.ib.qpn),
- ntohs(cm_ptr->msg.saddr.ib.lid),
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_port));
- goto bail;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: send RTU\n");
-
- /* complete handshake after final QP state change, Just ver+op */
- cm_ptr->state = DCM_CONNECTED;
- cm_ptr->msg.op = ntohs(DCM_RTU);
- if (send(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0) == -1) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " CONN_RTU: write error = %s\n", strerror(errno));
- goto bail;
- }
- /* post the event with private data */
- event = IB_CME_CONNECTED;
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");
-
-#ifdef DAT_EXTENSIONS
-ud_bail:
- if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
- DAT_IB_EXTENSION_EVENT_DATA xevent;
- ib_pd_handle_t pd_handle =
- ((DAPL_PZ *)ep_ptr->param.pz_handle)->pd_handle;
-
- if (event == IB_CME_CONNECTED) {
- cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
- ep_ptr->qp_handle,
- cm_ptr->msg.saddr.ib.lid,
- NULL);
- if (cm_ptr->ah) {
- /* post UD extended EVENT */
- xevent.status = 0;
- xevent.type = DAT_IB_UD_REMOTE_AH;
- xevent.remote_ah.ah = cm_ptr->ah;
- xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
- dapl_os_memcpy(&xevent.remote_ah.ia_addr,
- &ep_ptr->remote_ia_address,
- sizeof(union dcm_addr));
- event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
-
- dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU: UD AH %p for lid 0x%x"
- " qpn 0x%x\n",
- cm_ptr->ah,
- ntohs(cm_ptr->msg.saddr.ib.lid),
- ntohl(cm_ptr->msg.saddr.ib.qpn));
-
- } else
- event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
-
- } else if (event == IB_CME_LOCAL_FAILURE) {
- event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
- } else
- event = DAT_IB_UD_CONNECTION_REJECT_EVENT;
-
- dapls_evd_post_connection_event_ext(
- (DAPL_EVD *) ep_ptr->param.connect_evd_handle,
- event,
- (DAT_EP_HANDLE) ep_ptr,
- (DAT_COUNT) exp,
- (DAT_PVOID *) cm_ptr->msg.p_data,
- (DAT_PVOID *) &xevent);
-
- /* cleanup and release from local list */
- dapli_cm_free(cm_ptr);
-
- } else
-#endif
- {
- dapl_evd_connection_callback(cm_ptr, event, cm_ptr->msg.p_data,
- DCM_MAX_PDATA_SIZE, ep_ptr);
- }
- return;
-
-bail:
-
-#ifdef DAT_EXTENSIONS
- if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD)
- goto ud_bail;
-#endif
- /* close socket, and post error event */
- cm_ptr->state = DCM_REJECTED;
- dapl_evd_connection_callback(NULL, event, cm_ptr->msg.p_data,
- DCM_MAX_PDATA_SIZE, ep_ptr);
- dapli_cm_free(cm_ptr);
-}
-
-/*
- * PASSIVE: Create socket, listen, accept, exchange QP information
- */
-DAT_RETURN
-dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)
-{
- struct sockaddr_in addr;
- ib_cm_srvc_handle_t cm_ptr = NULL;
- DAT_RETURN dat_status = DAT_SUCCESS;
+}\r
+\r
+static dp_ib_cm_handle_t dapli_cm_alloc(DAPL_EP *ep_ptr)\r
+{\r
+ dp_ib_cm_handle_t cm_ptr;\r
+\r
+ /* Allocate CM, init lock, and initialize */\r
+ if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL)\r
+ return NULL;\r
+\r
+ (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr));\r
+ if (dapl_os_lock_init(&cm_ptr->lock))\r
+ goto bail;\r
+\r
+ if (dapl_os_wait_object_init(&cm_ptr->event)) {\r
+ dapl_os_lock_destroy(&cm_ptr->lock);\r
+ goto bail;\r
+ }\r
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->list_entry);\r
+ dapl_llist_init_entry((DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);\r
+\r
+ cm_ptr->msg.ver = htons(DCM_VER);\r
+ cm_ptr->socket = DAPL_INVALID_SOCKET;\r
+ dapls_cm_acquire(cm_ptr);\r
+ \r
+ /* Link EP and CM */\r
+ if (ep_ptr != NULL) {\r
+ dapl_ep_link_cm(ep_ptr, cm_ptr); /* ref++ */\r
+ cm_ptr->ep = ep_ptr;\r
+ cm_ptr->hca = ((DAPL_IA *)ep_ptr->param.ia_handle)->hca_ptr;\r
+ }\r
+ return cm_ptr;\r
+bail:\r
+ dapl_os_free(cm_ptr, sizeof(*cm_ptr));\r
+ return NULL;\r
+}\r
+\r
+/* queue socket for processing CM work */\r
+static void dapli_cm_queue(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ /* add to work queue for cr thread processing */\r
+ dapl_os_lock(&cm_ptr->hca->ib_trans.lock);\r
+ dapls_cm_acquire(cm_ptr);\r
+ dapl_llist_add_tail(&cm_ptr->hca->ib_trans.list,\r
+ (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry, cm_ptr);\r
+ dapl_os_unlock(&cm_ptr->hca->ib_trans.lock);\r
+ dapli_cm_thread_signal(cm_ptr);\r
+}\r
+\r
+/* called with local LIST lock */\r
+static void dapli_cm_dequeue(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ /* Remove from work queue, cr thread processing */\r
+ dapl_llist_remove_entry(&cm_ptr->hca->ib_trans.list,\r
+ (DAPL_LLIST_ENTRY *)&cm_ptr->local_entry);\r
+ dapls_cm_release(cm_ptr);\r
+}\r
+\r
+/* BLOCKING: called from dapl_ep_free, EP link will be last ref */\r
+void dapls_cm_free(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ dapl_log(DAPL_DBG_TYPE_CM,\r
+ " cm_free: cm %p %s ep %p refs=%d\n", \r
+ cm_ptr, dapl_cm_state_str(cm_ptr->state),\r
+ cm_ptr->ep, cm_ptr->ref_count);\r
+ \r
+ /* free from internal workq, wait until EP is last ref */\r
+ dapl_os_lock(&cm_ptr->lock);\r
+ cm_ptr->state = DCM_FREE;\r
+ while (cm_ptr->ref_count != 1) {\r
+ dapl_os_unlock(&cm_ptr->lock);\r
+ dapl_os_sleep_usec(10000);\r
+ dapl_os_lock(&cm_ptr->lock);\r
+ }\r
+ dapl_os_unlock(&cm_ptr->lock);\r
+\r
+ /* unlink, dequeue from EP. Final ref so release will destroy */\r
+ dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);\r
+}\r
+\r
+/*\r
+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect\r
+ * or from ep_free. \r
+ */\r
+DAT_RETURN dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ DAT_UINT32 disc_data = htonl(0xdead);\r
+\r
+ dapl_os_lock(&cm_ptr->lock);\r
+ if (cm_ptr->state != DCM_CONNECTED || \r
+ cm_ptr->state == DCM_DISCONNECTED) {\r
+ dapl_os_unlock(&cm_ptr->lock);\r
+ return DAT_SUCCESS;\r
+ }\r
+ cm_ptr->state = DCM_DISCONNECTED;\r
+ dapl_os_unlock(&cm_ptr->lock);\r
+ \r
+ /* send disc date, close socket, schedule destroy */\r
+ dapls_modify_qp_state(cm_ptr->ep->qp_handle, IBV_QPS_ERR, 0,0,0);\r
+ send(cm_ptr->socket, (char *)&disc_data, sizeof(disc_data), 0);\r
+\r
+ /* disconnect events for RC's only */\r
+ if (cm_ptr->ep->param.ep_attr.service_type == DAT_SERVICE_TYPE_RC) {\r
+ if (cm_ptr->ep->cr_ptr) {\r
+ dapls_cr_callback(cm_ptr,\r
+ IB_CME_DISCONNECTED,\r
+ NULL, 0, cm_ptr->sp);\r
+ } else {\r
+ dapl_evd_connection_callback(cm_ptr,\r
+ IB_CME_DISCONNECTED,\r
+ NULL, 0, cm_ptr->ep);\r
+ }\r
+ }\r
+ \r
+ /* release from workq */\r
+ dapli_cm_free(cm_ptr);\r
+\r
+ /* scheduled destroy via disconnect clean in callback */\r
+ return DAT_SUCCESS;\r
+}\r
+\r
+/*\r
+ * ACTIVE: socket connected, send QP information to peer \r
+ */\r
+static void dapli_socket_connected(dp_ib_cm_handle_t cm_ptr, int err)\r
+{\r
+ int len, exp;\r
+ struct iovec iov[2];\r
+ struct dapl_ep *ep_ptr = cm_ptr->ep;\r
+\r
+ if (err) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_PENDING: %s ERR %s -> %s %d\n",\r
+ err == -1 ? "POLL" : "SOCKOPT",\r
+ err == -1 ? strerror(errno) : strerror(err), \r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->addr)->sin_addr), \r
+ ntohs(((struct sockaddr_in *)\r
+ &cm_ptr->addr)->sin_port));\r
+ goto bail;\r
+ }\r
+\r
+ cm_ptr->state = DCM_REP_PENDING;\r
+\r
+ /* send qp info and pdata to remote peer */\r
+ exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+ iov[0].iov_base = (void *)&cm_ptr->msg;\r
+ iov[0].iov_len = exp;\r
+ if (cm_ptr->msg.p_size) {\r
+ iov[1].iov_base = cm_ptr->msg.p_data;\r
+ iov[1].iov_len = ntohs(cm_ptr->msg.p_size);\r
+ len = writev(cm_ptr->socket, iov, 2);\r
+ } else {\r
+ len = writev(cm_ptr->socket, iov, 1);\r
+ }\r
+\r
+ if (len != (exp + ntohs(cm_ptr->msg.p_size))) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_PENDING len ERR %s, wcnt=%d(%d) -> %s\n",\r
+ strerror(errno), len, \r
+ exp + ntohs(cm_ptr->msg.p_size), \r
+ inet_ntoa(((struct sockaddr_in *)\r
+ ep_ptr->param.\r
+ remote_ia_address_ptr)->sin_addr));\r
+ goto bail;\r
+ }\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " CONN_PENDING: sending SRC lid=0x%x,"\r
+ " qpn=0x%x, psize=%d\n",\r
+ ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+ ntohs(cm_ptr->msg.p_size));\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " CONN_PENDING: SRC GID subnet %016llx id %016llx\n",\r
+ (unsigned long long)\r
+ htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[0]),\r
+ (unsigned long long)\r
+ htonll(*(uint64_t*)&cm_ptr->msg.saddr.ib.gid[8]));\r
+ return;\r
+\r
+bail:\r
+ /* mark CM object for cleanup */\r
+ dapli_cm_free(cm_ptr);\r
+ dapl_evd_connection_callback(NULL, IB_CME_LOCAL_FAILURE, NULL, 0, ep_ptr);\r
+}\r
+\r
+/*\r
+ * ACTIVE: Create socket, connect, defer exchange QP information to CR thread\r
+ * to avoid blocking. \r
+ */\r
+static DAT_RETURN\r
+dapli_socket_connect(DAPL_EP * ep_ptr,\r
+ DAT_IA_ADDRESS_PTR r_addr,\r
+ DAT_CONN_QUAL r_qual, DAT_COUNT p_size, DAT_PVOID p_data)\r
+{\r
+ dp_ib_cm_handle_t cm_ptr;\r
+ int ret;\r
+ socklen_t sl;\r
+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;\r
+ DAT_RETURN dat_ret = DAT_INSUFFICIENT_RESOURCES;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d p_size=%d\n",\r
+ r_qual, p_size);\r
+\r
+ cm_ptr = dapli_cm_alloc(ep_ptr);\r
+ if (cm_ptr == NULL)\r
+ return dat_ret;\r
+\r
+ /* create, connect, sockopt, and exchange QP information */\r
+ if ((cm_ptr->socket =\r
+ socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " connect: socket create ERR %s\n", strerror(errno));\r
+ goto bail;\r
+ }\r
+\r
+ ret = dapl_config_socket(cm_ptr->socket);\r
+ if (ret < 0) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " connect: config socket %d ERR %d %s\n",\r
+ cm_ptr->socket, ret, strerror(dapl_socket_errno()));\r
+ dat_ret = DAT_INTERNAL_ERROR;\r
+ goto bail;\r
+ }\r
+\r
+ /* save remote address */\r
+ dapl_os_memcpy(&cm_ptr->addr, r_addr, sizeof(*r_addr));\r
+\r
+#ifdef DAPL_DBG\r
+ /* DBG: Active PID [0], PASSIVE PID [2]*/\r
+ *(uint16_t*)&cm_ptr->msg.resv[0] = htons((uint16_t)dapl_os_getpid()); \r
+ *(uint16_t*)&cm_ptr->msg.resv[2] = ((struct sockaddr_in *)&cm_ptr->addr)->sin_port;\r
+#endif\r
+ ((struct sockaddr_in *)&cm_ptr->addr)->sin_port = htons(r_qual + 1000);\r
+ ret = dapl_connect_socket(cm_ptr->socket, (struct sockaddr *)&cm_ptr->addr,\r
+ sizeof(cm_ptr->addr));\r
+ if (ret && ret != EAGAIN) {\r
+ dat_ret = DAT_INVALID_ADDRESS;\r
+ goto bail;\r
+ }\r
+\r
+ /* REQ: QP info in msg.saddr, IA address in msg.daddr, and pdata */\r
+ cm_ptr->hca = ia_ptr->hca_ptr;\r
+ cm_ptr->msg.op = ntohs(DCM_REQ);\r
+ cm_ptr->msg.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);\r
+ cm_ptr->msg.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;\r
+ cm_ptr->msg.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;\r
+ dapl_os_memcpy(&cm_ptr->msg.saddr.ib.gid[0], \r
+ &ia_ptr->hca_ptr->ib_trans.gid, 16);\r
+ \r
+ /* get local address information from socket */\r
+ sl = sizeof(cm_ptr->msg.daddr.so);\r
+ if (getsockname(cm_ptr->socket, (struct sockaddr *)&cm_ptr->msg.daddr.so, &sl)) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " connect getsockname ERROR: %s -> %s r_qual %d\n",\r
+ strerror(errno), \r
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),\r
+ (unsigned int)r_qual);;\r
+ }\r
+\r
+ if (p_size) {\r
+ cm_ptr->msg.p_size = htons(p_size);\r
+ dapl_os_memcpy(cm_ptr->msg.p_data, p_data, p_size);\r
+ }\r
+\r
+ /* connected or pending, either way results via async event */\r
+ if (ret == 0)\r
+ dapli_socket_connected(cm_ptr, 0);\r
+ else\r
+ cm_ptr->state = DCM_CONN_PENDING;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: p_data=%p %p\n",\r
+ cm_ptr->msg.p_data, cm_ptr->msg.p_data);\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+ " connect: %s r_qual %d pending, p_sz=%d, %d %d ...\n",\r
+ inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr), \r
+ (unsigned int)r_qual, ntohs(cm_ptr->msg.p_size),\r
+ cm_ptr->msg.p_data[0], cm_ptr->msg.p_data[1]);\r
+\r
+ /* queue up on work thread */\r
+ dapli_cm_queue(cm_ptr);\r
+ return DAT_SUCCESS;\r
+bail:\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " connect ERROR: %s -> %s r_qual %d\n",\r
+ strerror(errno), \r
+ inet_ntoa(((struct sockaddr_in *)r_addr)->sin_addr),\r
+ (unsigned int)r_qual);\r
+\r
+ /* Never queued, destroy */\r
+ dapls_cm_release(cm_ptr);\r
+ return dat_ret;\r
+}\r
+\r
+/*\r
+ * ACTIVE: exchange QP information, called from CR thread\r
+ */\r
+static void dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ DAPL_EP *ep_ptr = cm_ptr->ep;\r
+ int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+ ib_cm_events_t event = IB_CME_LOCAL_FAILURE;\r
+ socklen_t sl;\r
+\r
+ /* read DST information into cm_ptr, overwrite SRC info */\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: recv peer QP data\n");\r
+\r
+ len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, exp, 0);\r
+ if (len != exp || ntohs(cm_ptr->msg.ver) != DCM_VER) {\r
+ dapl_log(DAPL_DBG_TYPE_WARN,\r
+ " CONN_RTU read: sk %d ERR %s, rcnt=%d, v=%d -> %s PORT L-%x R-%x PID L-%x R-%x\n",\r
+ cm_ptr->socket, strerror(errno), len, ntohs(cm_ptr->msg.ver),\r
+ inet_ntoa(((struct sockaddr_in *)&cm_ptr->addr)->sin_addr),\r
+ ntohs(((struct sockaddr_in *)&cm_ptr->msg.daddr.so)->sin_port),\r
+ ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port),\r
+ ntohs(*(uint16_t*)&cm_ptr->msg.resv[0]),\r
+ ntohs(*(uint16_t*)&cm_ptr->msg.resv[2]));\r
+\r
+ /* Retry; corner case where server tcp stack resets under load */\r
+ if (dapl_socket_errno() == ECONNRESET) {\r
+ closesocket(cm_ptr->socket);\r
+ cm_ptr->socket = DAPL_INVALID_SOCKET;\r
+ dapli_socket_connect(cm_ptr->ep, (DAT_IA_ADDRESS_PTR)&cm_ptr->addr, \r
+ ntohs(((struct sockaddr_in *)&cm_ptr->addr)->sin_port) - 1000,\r
+ ntohs(cm_ptr->msg.p_size), &cm_ptr->msg.p_data);\r
+ dapl_ep_unlink_cm(cm_ptr->ep, cm_ptr);\r
+ dapli_cm_free(cm_ptr);\r
+ return;\r
+ }\r
+ goto bail;\r
+ }\r
+\r
+ /* keep the QP, address info in network order */\r
+ \r
+ /* save remote address information, in msg.daddr */\r
+ dapl_os_memcpy(&ep_ptr->remote_ia_address,\r
+ &cm_ptr->msg.daddr.so,\r
+ sizeof(union dcm_addr));\r
+\r
+ /* save local address information from socket */\r
+ sl = sizeof(cm_ptr->addr);\r
+ getsockname(cm_ptr->socket,(struct sockaddr *)&cm_ptr->addr, &sl);\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+ " CONN_RTU: DST %s %d lid=0x%x,"\r
+ " qpn=0x%x, qp_type=%d, psize=%d\n",\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr),\r
+ ntohs(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_port),\r
+ ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+ cm_ptr->msg.saddr.ib.qp_type, \r
+ ntohs(cm_ptr->msg.p_size));\r
+\r
+ /* validate private data size before reading */\r
+ if (ntohs(cm_ptr->msg.p_size) > DCM_MAX_PDATA_SIZE) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_RTU read: psize (%d) wrong -> %s\n",\r
+ ntohs(cm_ptr->msg.p_size), \r
+ inet_ntoa(((struct sockaddr_in *)\r
+ ep_ptr->param.\r
+ remote_ia_address_ptr)->sin_addr));\r
+ goto bail;\r
+ }\r
+\r
+ /* read private data into cm_handle if any present */\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP," CONN_RTU: read private data\n");\r
+ exp = ntohs(cm_ptr->msg.p_size);\r
+ if (exp) {\r
+ len = recv(cm_ptr->socket, cm_ptr->msg.p_data, exp, 0);\r
+ if (len != exp) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_RTU read pdata: ERR %s, rcnt=%d -> %s\n",\r
+ strerror(errno), len,\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ ep_ptr->param.\r
+ remote_ia_address_ptr)->sin_addr));\r
+ goto bail;\r
+ }\r
+ }\r
+\r
+ /* check for consumer or protocol stack reject */\r
+ if (ntohs(cm_ptr->msg.op) == DCM_REP)\r
+ event = IB_CME_CONNECTED;\r
+ else if (ntohs(cm_ptr->msg.op) == DCM_REJ_USER) \r
+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA;\r
+ else \r
+ event = IB_CME_DESTINATION_REJECT;\r
+ \r
+ if (event != IB_CME_CONNECTED) {\r
+ dapl_log(DAPL_DBG_TYPE_CM,\r
+ " CONN_RTU: reject from %s %x\n",\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr),\r
+ ntohs(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_port));\r
+ goto bail;\r
+ }\r
+\r
+ /* modify QP to RTR and then to RTS with remote info */\r
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+ IBV_QPS_RTR, \r
+ cm_ptr->msg.saddr.ib.qpn,\r
+ cm_ptr->msg.saddr.ib.lid,\r
+ (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_RTU: QPS_RTR ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",\r
+ strerror(errno), ep_ptr->qp_handle->qp_type,\r
+ ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+ ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr),\r
+ ntohs(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_port));\r
+ goto bail;\r
+ }\r
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+ IBV_QPS_RTS, \r
+ cm_ptr->msg.saddr.ib.qpn,\r
+ cm_ptr->msg.saddr.ib.lid,\r
+ NULL) != DAT_SUCCESS) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_RTU: QPS_RTS ERR %s (%d,%d,%x,%x,%x) -> %s %x\n",\r
+ strerror(errno), ep_ptr->qp_handle->qp_type,\r
+ ep_ptr->qp_state, ep_ptr->qp_handle->qp_num,\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+ ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr),\r
+ ntohs(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_port));\r
+ goto bail;\r
+ }\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect_rtu: send RTU\n");\r
+\r
+ /* complete handshake after final QP state change, Just ver+op */\r
+ cm_ptr->state = DCM_CONNECTED;\r
+ cm_ptr->msg.op = ntohs(DCM_RTU);\r
+ if (send(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0) == -1) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " CONN_RTU: write error = %s\n", strerror(errno));\r
+ goto bail;\r
+ }\r
+ /* post the event with private data */\r
+ event = IB_CME_CONNECTED;\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " ACTIVE: connected!\n");\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ud_bail:\r
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {\r
+ DAT_IB_EXTENSION_EVENT_DATA xevent;\r
+ ib_pd_handle_t pd_handle = \r
+ ((DAPL_PZ *)ep_ptr->param.pz_handle)->pd_handle;\r
+\r
+ if (event == IB_CME_CONNECTED) {\r
+ cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,\r
+ ep_ptr->qp_handle,\r
+ cm_ptr->msg.saddr.ib.lid, \r
+ NULL);\r
+ if (cm_ptr->ah) {\r
+ /* post UD extended EVENT */\r
+ xevent.status = 0;\r
+ xevent.type = DAT_IB_UD_REMOTE_AH;\r
+ xevent.remote_ah.ah = cm_ptr->ah;\r
+ xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);\r
+ dapl_os_memcpy(&xevent.remote_ah.ia_addr,\r
+ &ep_ptr->remote_ia_address,\r
+ sizeof(union dcm_addr));\r
+ event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;\r
+\r
+ dapl_log(DAPL_DBG_TYPE_CM, \r
+ " CONN_RTU: UD AH %p for lid 0x%x"\r
+ " qpn 0x%x\n", \r
+ cm_ptr->ah, \r
+ ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn));\r
+ \r
+ } else \r
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+ \r
+ } else if (event == IB_CME_LOCAL_FAILURE) {\r
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+ } else \r
+ event = DAT_IB_UD_CONNECTION_REJECT_EVENT;\r
+\r
+ dapls_evd_post_connection_event_ext(\r
+ (DAPL_EVD *) ep_ptr->param.connect_evd_handle,\r
+ event,\r
+ (DAT_EP_HANDLE) ep_ptr,\r
+ (DAT_COUNT) exp,\r
+ (DAT_PVOID *) cm_ptr->msg.p_data,\r
+ (DAT_PVOID *) &xevent);\r
+\r
+ /* cleanup and release from local list */\r
+ dapli_cm_free(cm_ptr);\r
+ \r
+ } else\r
+#endif\r
+ {\r
+ dapl_evd_connection_callback(cm_ptr, event, cm_ptr->msg.p_data,\r
+ DCM_MAX_PDATA_SIZE, ep_ptr);\r
+ }\r
+ return;\r
+\r
+bail:\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) \r
+ goto ud_bail;\r
+#endif\r
+ /* close socket, and post error event */\r
+ cm_ptr->state = DCM_REJECTED;\r
+ dapl_evd_connection_callback(NULL, event, cm_ptr->msg.p_data,\r
+ DCM_MAX_PDATA_SIZE, ep_ptr);\r
+ dapli_cm_free(cm_ptr);\r
+}\r
+\r
+/*\r
+ * PASSIVE: Create socket, listen, accept, exchange QP information \r
+ */\r
+DAT_RETURN\r
+dapli_socket_listen(DAPL_IA * ia_ptr, DAT_CONN_QUAL serviceID, DAPL_SP * sp_ptr)\r
+{\r
+ struct sockaddr_in addr;\r
+ ib_cm_srvc_handle_t cm_ptr = NULL;\r
+ DAT_RETURN dat_status = DAT_SUCCESS;\r
int opt = 1;\r
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " setup listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",
- ia_ptr, serviceID, sp_ptr);
-
- cm_ptr = dapli_cm_alloc(NULL);
- if (cm_ptr == NULL)
- return DAT_INSUFFICIENT_RESOURCES;
-
- cm_ptr->sp = sp_ptr;
- cm_ptr->hca = ia_ptr->hca_ptr;
-
- /* bind, listen, set sockopt, accept, exchange data */
- if ((cm_ptr->socket =
- socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {
- dapl_log(DAPL_DBG_TYPE_ERR, " ERR: listen socket create: %s\n",
- strerror(errno));
- dat_status = DAT_INSUFFICIENT_RESOURCES;
- goto bail;
- }
-
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " setup listen(ia_ptr %p ServiceID %d sp_ptr %p)\n",\r
+ ia_ptr, serviceID, sp_ptr);\r
+\r
+ cm_ptr = dapli_cm_alloc(NULL);\r
+ if (cm_ptr == NULL)\r
+ return DAT_INSUFFICIENT_RESOURCES;\r
+\r
+ cm_ptr->sp = sp_ptr;\r
+ cm_ptr->hca = ia_ptr->hca_ptr;\r
+\r
+ /* bind, listen, set sockopt, accept, exchange data */\r
+ if ((cm_ptr->socket =\r
+ socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) == DAPL_INVALID_SOCKET) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR, " ERR: listen socket create: %s\n",\r
+ strerror(errno));\r
+ dat_status = DAT_INSUFFICIENT_RESOURCES;\r
+ goto bail;\r
+ }\r
+\r
setsockopt(cm_ptr->socket, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt));\r
- addr.sin_port = htons(serviceID + 1000);
- addr.sin_family = AF_INET;
- addr.sin_addr = ((struct sockaddr_in *) &ia_ptr->hca_ptr->hca_address)->sin_addr;
-
- if ((bind(cm_ptr->socket, (struct sockaddr *)&addr, sizeof(addr)) < 0)
- || (listen(cm_ptr->socket, 128) < 0)) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " listen: ERROR %s on port %d\n",
- strerror(errno), serviceID + 1000);
- if (dapl_socket_errno() == EADDRINUSE)
- dat_status = DAT_CONN_QUAL_IN_USE;
- else
- dat_status = DAT_CONN_QUAL_UNAVAILABLE;
- goto bail;
- }
-
- /* set cm_handle for this service point, save listen socket */
- sp_ptr->cm_srvc_handle = cm_ptr;
- dapl_os_memcpy(&cm_ptr->addr, &addr, sizeof(addr));
-
- /* queue up listen socket to process inbound CR's */
- cm_ptr->state = DCM_LISTEN;
- dapli_cm_queue(cm_ptr);
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " setup listen: port %d cr %p s_fd %d\n",
- serviceID + 1000, cm_ptr, cm_ptr->socket);
-
- return dat_status;
-bail:
- /* Never queued, destroy here */
- dapls_cm_release(cm_ptr);
- return dat_status;
-}
-
-/*
- * PASSIVE: accept socket
- */
-static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)
-{
- dp_ib_cm_handle_t acm_ptr;
- int ret, len, opt = 1;
- socklen_t sl;
-
- /*
- * Accept all CR's on this port to avoid half-connection (SYN_RCV)
- * stalls with many to one connection storms
- */
- do {
- /* Allocate accept CM and initialize */
- if ((acm_ptr = dapli_cm_alloc(NULL)) == NULL)
- return;
-
- acm_ptr->sp = cm_ptr->sp;
- acm_ptr->hca = cm_ptr->hca;
-
- len = sizeof(union dcm_addr);
- acm_ptr->socket = accept(cm_ptr->socket,
- (struct sockaddr *)
- &acm_ptr->msg.daddr.so,
- (socklen_t *) &len);
- if (acm_ptr->socket == DAPL_INVALID_SOCKET) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT: ERR %s on FD %d l_cr %p\n",
- strerror(errno), cm_ptr->socket, cm_ptr);
- dapls_cm_release(acm_ptr);
- return;
- }
- dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s %x\n",
- inet_ntoa(((struct sockaddr_in *)
- &acm_ptr->msg.daddr.so)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &acm_ptr->msg.daddr.so)->sin_port));
-
- /* no delay for small packets */
- ret = setsockopt(acm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,
- (char *)&opt, sizeof(opt));
- if (ret)
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT: NODELAY setsockopt: 0x%x 0x%x %s\n",
- ret, dapl_socket_errno(), strerror(dapl_socket_errno()));
-
- /* get local address information from socket */
- sl = sizeof(acm_ptr->addr);
- getsockname(acm_ptr->socket, (struct sockaddr *)&acm_ptr->addr, &sl);
- acm_ptr->state = DCM_ACCEPTING;
- dapli_cm_queue(acm_ptr);
-
- } while (dapl_poll(cm_ptr->socket, DAPL_FD_READ) == DAPL_FD_READ);
-}
-
-/*
- * PASSIVE: receive peer QP information, private data, post cr_event
- */
-static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)
-{
- int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
- void *p_data = NULL;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket accepted, read QP data\n");
-
- /* read in DST QP info, IA address. check for private data */
- len = recv(acm_ptr->socket, (char *)&acm_ptr->msg, exp, 0);
- if (len != exp || ntohs(acm_ptr->msg.ver) != DCM_VER) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT read: ERR %s, rcnt=%d, ver=%d\n",
- strerror(errno), len, ntohs(acm_ptr->msg.ver));
- goto bail;
- }
-
- /* keep the QP, address info in network order */
-
- /* validate private data size before reading */
- exp = ntohs(acm_ptr->msg.p_size);
- if (exp > DCM_MAX_PDATA_SIZE) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " accept read: psize (%d) wrong\n",
- acm_ptr->msg.p_size);
- goto bail;
- }
-
- /* read private data into cm_handle if any present */
- if (exp) {
- len = recv(acm_ptr->socket, acm_ptr->msg.p_data, exp, 0);
- if (len != exp) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " accept read pdata: ERR %s, rcnt=%d\n",
- strerror(errno), len);
- goto bail;
- }
- p_data = acm_ptr->msg.p_data;
- }
-
- acm_ptr->state = DCM_ACCEPTING_DATA;
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT: DST %s %x lid=0x%x, qpn=0x%x, psz=%d\n",
- inet_ntoa(((struct sockaddr_in *)
- &acm_ptr->msg.daddr.so)->sin_addr),
- ntohs(((struct sockaddr_in *)
- &acm_ptr->msg.daddr.so)->sin_port),
- ntohs(acm_ptr->msg.saddr.ib.lid),
- ntohl(acm_ptr->msg.saddr.ib.qpn), exp);
-
-#ifdef DAT_EXTENSIONS
- if (acm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
- DAT_IB_EXTENSION_EVENT_DATA xevent;
-
- /* post EVENT, modify_qp created ah */
- xevent.status = 0;
- xevent.type = DAT_IB_UD_CONNECT_REQUEST;
-
- dapls_evd_post_cr_event_ext(acm_ptr->sp,
- DAT_IB_UD_CONNECTION_REQUEST_EVENT,
- acm_ptr,
- (DAT_COUNT) exp,
- (DAT_PVOID *) acm_ptr->msg.p_data,
- (DAT_PVOID *) &xevent);
- } else
-#endif
- /* trigger CR event and return SUCCESS */
- dapls_cr_callback(acm_ptr,
- IB_CME_CONNECTION_REQUEST_PENDING,
- p_data, exp, acm_ptr->sp);
- return;
-bail:
- /* mark for destroy, active will see socket close as rej */
- dapli_cm_free(acm_ptr);
- return;
-}
-
-/*
- * PASSIVE: consumer accept, send local QP information, private data,
- * queue on work thread to receive RTU information to avoid blocking
- * user thread.
- */
-static DAT_RETURN
-dapli_socket_accept_usr(DAPL_EP * ep_ptr,
- DAPL_CR * cr_ptr, DAT_COUNT p_size, DAT_PVOID p_data)
-{
- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;
- dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;
- ib_cm_msg_t local;
- struct iovec iov[2];
- int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
- DAT_RETURN ret = DAT_INTERNAL_ERROR;
- socklen_t sl;
-
- if (p_size > DCM_MAX_PDATA_SIZE) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " accept_usr: psize(%d) too large\n", p_size);
- return DAT_LENGTH_ERROR;
- }
-
- /* must have a accepted socket */
- if (cm_ptr->socket == DAPL_INVALID_SOCKET) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " accept_usr: cm socket invalid\n");
- goto bail;
- }
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT_USR: remote lid=0x%x"
- " qpn=0x%x qp_type %d, psize=%d\n",
- ntohs(cm_ptr->msg.saddr.ib.lid),
- ntohl(cm_ptr->msg.saddr.ib.qpn),
- cm_ptr->msg.saddr.ib.qp_type,
- ntohs(cm_ptr->msg.p_size));
-
-#ifdef DAT_EXTENSIONS
- if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD &&
- ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_USR: ERR remote QP is UD,"
- ", but local QP is not\n");
- ret = (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);
- goto bail;
- }
-#endif
-
- /* modify QP to RTR and then to RTS with remote info already read */
- if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTR,
- cm_ptr->msg.saddr.ib.qpn,
- cm_ptr->msg.saddr.ib.lid,
- (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_USR: QPS_RTR ERR %s -> %s\n",
- strerror(errno),
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr));
- goto bail;
- }
- if (dapls_modify_qp_state(ep_ptr->qp_handle,
- IBV_QPS_RTS,
- cm_ptr->msg.saddr.ib.qpn,
- cm_ptr->msg.saddr.ib.lid,
- NULL) != DAT_SUCCESS) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_USR: QPS_RTS ERR %s -> %s\n",
- strerror(errno),
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr));
- goto bail;
- }
-
- /* save remote address information */
- dapl_os_memcpy(&ep_ptr->remote_ia_address,
- &cm_ptr->msg.daddr.so,
- sizeof(union dcm_addr));
-
- /* send our QP info, IA address, pdata. Don't overwrite dst data */
- local.ver = htons(DCM_VER);
- local.op = htons(DCM_REP);
- local.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);
- local.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;
- local.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;
- dapl_os_memcpy(&local.saddr.ib.gid[0],
- &ia_ptr->hca_ptr->ib_trans.gid, 16);
-
- /* Get local address information from socket */
- sl = sizeof(local.daddr.so);
- getsockname(cm_ptr->socket, (struct sockaddr *)&local.daddr.so, &sl);
-
-#ifdef DAPL_DBG
- /* DBG: Active PID [0], PASSIVE PID [2] */
- *(uint16_t*)&cm_ptr->msg.resv[2] = htons((uint16_t)dapl_os_getpid());
- dapl_os_memcpy(local.resv, cm_ptr->msg.resv, 4);
-#endif
- cm_ptr->hca = ia_ptr->hca_ptr;
- cm_ptr->state = DCM_ACCEPTED;
-
- local.p_size = htons(p_size);
- iov[0].iov_base = (void *)&local;
- iov[0].iov_len = exp;
-
- if (p_size) {
- iov[1].iov_base = p_data;
- iov[1].iov_len = p_size;
- len = writev(cm_ptr->socket, iov, 2);
- } else
- len = writev(cm_ptr->socket, iov, 1);
-
- if (len != (p_size + exp)) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",
- strerror(errno), len,
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr));
- goto bail;
- }
-
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT_USR: local lid=0x%x qpn=0x%x psz=%d\n",
- ntohs(local.saddr.ib.lid),
- ntohl(local.saddr.ib.qpn), ntohs(local.p_size));
- dapl_dbg_log(DAPL_DBG_TYPE_CM,
- " ACCEPT_USR: SRC GID subnet %016llx id %016llx\n",
- (unsigned long long)
- htonll(*(uint64_t*)&local.saddr.ib.gid[0]),
- (unsigned long long)
- htonll(*(uint64_t*)&local.saddr.ib.gid[8]));
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: accepted!\n");
-
- /* Link CM to EP, already queued on work thread */
- dapl_ep_link_cm(ep_ptr, cm_ptr);
- cm_ptr->ep = ep_ptr;
- return DAT_SUCCESS;
-bail:
- /* schedule cleanup from workq */
- dapli_cm_free(cm_ptr);
- return ret;
-}
-
-/*
- * PASSIVE: read RTU from active peer, post CONN event
- */
-static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)
-{
- int len;
- ib_cm_events_t event = IB_CME_CONNECTED;
-
- /* complete handshake after final QP state change, VER and OP */
- len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0);
- if (len != 4 || ntohs(cm_ptr->msg.op) != DCM_RTU) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " ACCEPT_RTU: rcv ERR, rcnt=%d op=%x\n",
- len, ntohs(cm_ptr->msg.op),
- inet_ntoa(((struct sockaddr_in *)
- &cm_ptr->msg.daddr.so)->sin_addr));
- event = IB_CME_DESTINATION_REJECT;
- goto bail;
- }
-
- /* save state and reference to EP, queue for disc event */
- cm_ptr->state = DCM_CONNECTED;
-
- /* final data exchange if remote QP state is good to go */
- dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: connected!\n");
-
-#ifdef DAT_EXTENSIONS
-ud_bail:
- if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {
- DAT_IB_EXTENSION_EVENT_DATA xevent;
-
- ib_pd_handle_t pd_handle =
- ((DAPL_PZ *)cm_ptr->ep->param.pz_handle)->pd_handle;
-
- if (event == IB_CME_CONNECTED) {
- cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,
- cm_ptr->ep->qp_handle,
- cm_ptr->msg.saddr.ib.lid,
- NULL);
- if (cm_ptr->ah) {
- /* post EVENT, modify_qp created ah */
- xevent.status = 0;
- xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;
- xevent.remote_ah.ah = cm_ptr->ah;
- xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);
- dapl_os_memcpy(&xevent.remote_ah.ia_addr,
- &cm_ptr->msg.daddr.so,
- sizeof(union dcm_addr));
- event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;
- } else
- event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
- } else
- event = DAT_IB_UD_CONNECTION_ERROR_EVENT;
-
- dapl_log(DAPL_DBG_TYPE_CM,
- " CONN_RTU: UD AH %p for lid 0x%x qpn 0x%x\n",
- cm_ptr->ah, ntohs(cm_ptr->msg.saddr.ib.lid),
- ntohl(cm_ptr->msg.saddr.ib.qpn));
-
- dapls_evd_post_connection_event_ext(
- (DAPL_EVD *)
- cm_ptr->ep->param.connect_evd_handle,
- event,
- (DAT_EP_HANDLE) cm_ptr->ep,
- (DAT_COUNT) ntohs(cm_ptr->msg.p_size),
- (DAT_PVOID *) cm_ptr->msg.p_data,
- (DAT_PVOID *) &xevent);
-
- /* cleanup and release from local list, still on EP list */
- dapli_cm_free(cm_ptr);
-
- } else
-#endif
- {
- dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);
- }
- return;
-
-bail:
-#ifdef DAT_EXTENSIONS
- if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD)
- goto ud_bail;
-#endif
- cm_ptr->state = DCM_REJECTED;
- dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);
- dapli_cm_free(cm_ptr);
-}
-
-/*
- * dapls_ib_connect
- *
- * Initiate a connection with the passive listener on another node
- *
- * Input:
- * ep_handle,
- * remote_ia_address,
- * remote_conn_qual,
- * prd_size size of private data and structure
- * prd_prt pointer to private data structure
- *
- * Output:
- * none
- *
- * Returns:
- * DAT_SUCCESS
- * DAT_INSUFFICIENT_RESOURCES
- * DAT_INVALID_PARAMETER
- *
- */
-DAT_RETURN
-dapls_ib_connect(IN DAT_EP_HANDLE ep_handle,
- IN DAT_IA_ADDRESS_PTR remote_ia_address,
- IN DAT_CONN_QUAL remote_conn_qual,
- IN DAT_COUNT private_data_size, IN void *private_data)
-{
- DAPL_EP *ep_ptr = (DAPL_EP *) ep_handle;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " connect(ep_handle %p ....)\n", ep_handle);
-
- return (dapli_socket_connect(ep_ptr, remote_ia_address,
- remote_conn_qual,
- private_data_size, private_data));
-}
-
-/*
- * dapls_ib_disconnect
- *
- * Disconnect an EP
- *
- * Input:
- * ep_handle,
- * disconnect_flags
- *
- * Output:
- * none
- *
- * Returns:
- * DAT_SUCCESS
- */
-DAT_RETURN
-dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)
-{
- dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);
-
- if (ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED ||
- ep_ptr->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC) {
- return DAT_SUCCESS;
- }
-
- /* RC. Transition to error state to flush queue */
- dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);
-
- return (dapli_socket_disconnect(cm_ptr));
-}
-
-/*
- * dapls_ib_disconnect_clean
- *
- * Clean up outstanding connection data. This routine is invoked
- * after the final disconnect callback has occurred. Only on the
- * ACTIVE side of a connection. It is also called if dat_ep_connect
- * times out using the consumer supplied timeout value.
- *
- * Input:
- * ep_ptr DAPL_EP
- * active Indicates active side of connection
- *
- * Output:
- * none
- *
- * Returns:
- * void
- *
- */
-void
-dapls_ib_disconnect_clean(IN DAPL_EP * ep_ptr,
- IN DAT_BOOLEAN active,
- IN const ib_cm_events_t ib_cm_event)
-{
- if (ib_cm_event == IB_CME_TIMEOUT) {
- dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);
-
- dapl_log(DAPL_DBG_TYPE_WARN,
- "dapls_ib_disc_clean: CONN_TIMEOUT ep %p cm %p %s\n",
- ep_ptr, cm_ptr, dapl_cm_state_str(cm_ptr->state));
-
- /* schedule release of socket and local resources */
- dapli_cm_free(cm_ptr);
- }
-}
-
-/*
- * dapl_ib_setup_conn_listener
- *
- * Have the CM set up a connection listener.
- *
- * Input:
- * ibm_hca_handle HCA handle
- * qp_handle QP handle
- *
- * Output:
- * none
- *
- * Returns:
- * DAT_SUCCESS
- * DAT_INSUFFICIENT_RESOURCES
- * DAT_INTERNAL_ERROR
- * DAT_CONN_QUAL_UNAVAILBLE
- * DAT_CONN_QUAL_IN_USE
- *
- */
-DAT_RETURN
-dapls_ib_setup_conn_listener(IN DAPL_IA * ia_ptr,
- IN DAT_UINT64 ServiceID, IN DAPL_SP * sp_ptr)
-{
- return (dapli_socket_listen(ia_ptr, ServiceID, sp_ptr));
-}
-
-/*
- * dapl_ib_remove_conn_listener
- *
- * Have the CM remove a connection listener.
- *
- * Input:
- * ia_handle IA handle
- * ServiceID IB Channel Service ID
- *
- * Output:
- * none
- *
- * Returns:
- * DAT_SUCCESS
- * DAT_INVALID_STATE
- *
- */
-DAT_RETURN
-dapls_ib_remove_conn_listener(IN DAPL_IA * ia_ptr, IN DAPL_SP * sp_ptr)
-{
- ib_cm_srvc_handle_t cm_ptr = sp_ptr->cm_srvc_handle;
-
- /* free cm_srvc_handle, release will cleanup */
- if (cm_ptr != NULL) {
- /* cr_thread will free */
- sp_ptr->cm_srvc_handle = NULL;
- dapli_cm_free(cm_ptr);
- }
- return DAT_SUCCESS;
-}
-
-/*
- * dapls_ib_accept_connection
- *
- * Perform necessary steps to accept a connection
- *
- * Input:
- * cr_handle
- * ep_handle
- * private_data_size
- * private_data
- *
- * Output:
- * none
- *
- * Returns:
- * DAT_SUCCESS
- * DAT_INSUFFICIENT_RESOURCES
- * DAT_INTERNAL_ERROR
- *
- */
-DAT_RETURN
-dapls_ib_accept_connection(IN DAT_CR_HANDLE cr_handle,
- IN DAT_EP_HANDLE ep_handle,
- IN DAT_COUNT p_size, IN const DAT_PVOID p_data)
-{
- DAPL_CR *cr_ptr;
- DAPL_EP *ep_ptr;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",
- cr_handle, ep_handle, p_data, p_size);
-
- cr_ptr = (DAPL_CR *) cr_handle;
- ep_ptr = (DAPL_EP *) ep_handle;
-
- /* allocate and attach a QP if necessary */
- if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {
- DAT_RETURN status;
- status = dapls_ib_qp_alloc(ep_ptr->header.owner_ia,
- ep_ptr, ep_ptr);
- if (status != DAT_SUCCESS)
- return status;
- }
- return (dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data));
-}
-
-/*
- * dapls_ib_reject_connection
- *
- * Reject a connection
- *
- * Input:
- * cr_handle
- *
- * Output:
- * none
- *
- * Returns:
- * DAT_SUCCESS
- * DAT_INTERNAL_ERROR
- *
- */
-DAT_RETURN
-dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,
- IN int reason,
- IN DAT_COUNT psize, IN const DAT_PVOID pdata)
-{
- struct iovec iov[2];
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- " reject(cm %p reason %x, pdata %p, psize %d)\n",
- cm_ptr, reason, pdata, psize);
-
- if (psize > DCM_MAX_PDATA_SIZE)
- return DAT_LENGTH_ERROR;
-
- /* write reject data to indicate reject */
- cm_ptr->msg.op = htons(DCM_REJ_USER);
- cm_ptr->msg.p_size = htons(psize);
-
- iov[0].iov_base = (void *)&cm_ptr->msg;
- iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;
- if (psize) {
- iov[1].iov_base = pdata;
- iov[1].iov_len = psize;
- writev(cm_ptr->socket, iov, 2);
- } else {
- writev(cm_ptr->socket, iov, 1);
- }
-
- /* release and cleanup CM object */
- dapli_cm_free(cm_ptr);
- return DAT_SUCCESS;
-}
-
-/*
- * dapls_ib_cm_remote_addr
- *
- * Obtain the remote IP address given a connection
- *
- * Input:
- * cr_handle
- *
- * Output:
- * remote_ia_address: where to place the remote address
- *
- * Returns:
- * DAT_SUCCESS
- * DAT_INVALID_HANDLE
- *
- */
-DAT_RETURN
-dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle,
- OUT DAT_SOCK_ADDR6 * remote_ia_address)
-{
- DAPL_HEADER *header;
- dp_ib_cm_handle_t conn;
-
- dapl_dbg_log(DAPL_DBG_TYPE_EP,
- "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",
- dat_handle);
-
- header = (DAPL_HEADER *) dat_handle;
-
- if (header->magic == DAPL_MAGIC_EP)
- conn = dapl_get_cm_from_ep((DAPL_EP *) dat_handle);
- else if (header->magic == DAPL_MAGIC_CR)
- conn = ((DAPL_CR *) dat_handle)->ib_cm_handle;
- else
- return DAT_INVALID_HANDLE;
-
- dapl_os_memcpy(remote_ia_address,
- &conn->msg.daddr.so, sizeof(DAT_SOCK_ADDR6));
-
- return DAT_SUCCESS;
-}
-
-int dapls_ib_private_data_size(
- IN DAPL_HCA *hca_ptr)
-{
- return DCM_MAX_PDATA_SIZE;
-}
-
-/* outbound/inbound CR processing thread to avoid blocking applications */
-void cr_thread(void *arg)
-{
- struct dapl_hca *hca_ptr = arg;
- dp_ib_cm_handle_t cr, next_cr;
- int opt, ret;
- socklen_t opt_len;
- char rbuf[2];
- struct dapl_fd_set *set;
- enum DAPL_FD_EVENTS event;
-
- dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cr_thread: ENTER hca %p\n", hca_ptr);
- set = dapl_alloc_fd_set();
- if (!set)
- goto out;
-
- dapl_os_lock(&hca_ptr->ib_trans.lock);
- hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;
-
- while (1) {
- dapl_fd_zero(set);
- dapl_fd_set(hca_ptr->ib_trans.scm[0], set, DAPL_FD_READ);
-
- if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))
- next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);
- else
- next_cr = NULL;
-
- while (next_cr) {
- cr = next_cr;
- next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY *)
- &cr->local_entry);
- dapls_cm_acquire(cr); /* hold thread ref */
- dapl_os_lock(&cr->lock);
- if (cr->state == DCM_FREE ||
- hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {
- dapl_log(DAPL_DBG_TYPE_CM,
- " CM FREE: %p ep=%p st=%s sck=%d refs=%d\n",
- cr, cr->ep, dapl_cm_state_str(cr->state),
- cr->socket, cr->ref_count);
-
- if (cr->socket != DAPL_INVALID_SOCKET) {
- shutdown(cr->socket, SHUT_RDWR);
- closesocket(cr->socket);
- cr->socket = DAPL_INVALID_SOCKET;
- }
- dapl_os_unlock(&cr->lock);
- dapls_cm_release(cr); /* release alloc ref */
- dapli_cm_dequeue(cr); /* release workq ref */
- dapls_cm_release(cr); /* release thread ref */
- continue;
- }
-
- event = (cr->state == DCM_CONN_PENDING) ?
- DAPL_FD_WRITE : DAPL_FD_READ;
-
- if (dapl_fd_set(cr->socket, set, event)) {
- dapl_log(DAPL_DBG_TYPE_ERR,
- " cr_thread: fd_set ERR st=%d fd %d"
- " -> %s\n", cr->state, cr->socket,
- inet_ntoa(((struct sockaddr_in *)
- &cr->msg.daddr.so)->sin_addr));
- dapl_os_unlock(&cr->lock);
- dapls_cm_release(cr); /* release ref */
- continue;
- }
- dapl_os_unlock(&cr->lock);
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
-
- ret = dapl_poll(cr->socket, event);
-
- dapl_dbg_log(DAPL_DBG_TYPE_THREAD,
- " poll ret=0x%x %s sck=%d\n",
- ret, dapl_cm_state_str(cr->state),
- cr->socket);
-
- /* data on listen, qp exchange, and on disc req */
- if ((ret == DAPL_FD_READ) ||
- (cr->state != DCM_CONN_PENDING && ret == DAPL_FD_ERROR)) {
- if (cr->socket != DAPL_INVALID_SOCKET) {
- switch (cr->state) {
- case DCM_LISTEN:
- dapli_socket_accept(cr);
- break;
- case DCM_ACCEPTING:
- dapli_socket_accept_data(cr);
- break;
- case DCM_ACCEPTED:
- dapli_socket_accept_rtu(cr);
- break;
- case DCM_REP_PENDING:
- dapli_socket_connect_rtu(cr);
- break;
- case DCM_CONNECTED:
- dapli_socket_disconnect(cr);
- break;
- default:
- break;
- }
- }
- /* ASYNC connections, writable, readable, error; check status */
- } else if (ret == DAPL_FD_WRITE ||
- (cr->state == DCM_CONN_PENDING &&
- ret == DAPL_FD_ERROR)) {
-
- if (ret == DAPL_FD_ERROR)
- dapl_log(DAPL_DBG_TYPE_ERR, " CONN_PENDING - FD_ERROR\n");
-
- opt = 0;
- opt_len = sizeof(opt);
- ret = getsockopt(cr->socket, SOL_SOCKET,
- SO_ERROR, (char *)&opt,
- &opt_len);
- if (!ret && !opt)
- dapli_socket_connected(cr, opt);
- else
- dapli_socket_connected(cr, opt ? opt : dapl_socket_errno());
- }
-
- dapls_cm_release(cr); /* release ref */
- dapl_os_lock(&hca_ptr->ib_trans.lock);
- }
-
- /* set to exit and all resources destroyed */
- if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&
- (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))
- break;
-
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
- dapl_select(set);
-
- /* if pipe used to wakeup, consume */
- while (dapl_poll(hca_ptr->ib_trans.scm[0],
- DAPL_FD_READ) == DAPL_FD_READ) {
- if (recv(hca_ptr->ib_trans.scm[0], rbuf, 2, 0) == -1)
- dapl_log(DAPL_DBG_TYPE_THREAD,
- " cr_thread: read pipe error = %s\n",
- strerror(errno));
- }
- dapl_os_lock(&hca_ptr->ib_trans.lock);
-
- /* set to exit and all resources destroyed */
- if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&
- (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))
- break;
- }
-
- dapl_os_unlock(&hca_ptr->ib_trans.lock);
- dapl_os_free(set, sizeof(struct dapl_fd_set));
-out:
- hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;
- dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " cr_thread(hca %p) exit\n", hca_ptr);
-}
-
-
-#ifdef DAPL_COUNTERS
-/* Debug aid: List all Connections in process and state */
-void dapls_print_cm_list(IN DAPL_IA *ia_ptr)
-{
- /* Print in process CR's for this IA, if debug type set */
- int i = 0;
- dp_ib_cm_handle_t cr, next_cr;
-
- dapl_os_lock(&ia_ptr->hca_ptr->ib_trans.lock);
- if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)
- &ia_ptr->hca_ptr->ib_trans.list))
- next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)
- &ia_ptr->hca_ptr->ib_trans.list);
- else
- next_cr = NULL;
-
- printf("\n DAPL IA CONNECTIONS IN PROCESS:\n");
- while (next_cr) {
- cr = next_cr;
- next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)
- &ia_ptr->hca_ptr->ib_trans.list,
- (DAPL_LLIST_ENTRY*)&cr->local_entry);
-
- printf( " CONN[%d]: sp %p ep %p sock %d %s %s %s %s %s %s PORT L-%x R-%x PID L-%x R-%x\n",
- i, cr->sp, cr->ep, cr->socket,
- cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",
- dapl_cm_state_str(cr->state), dapl_cm_op_str(ntohs(cr->msg.op)),
- ntohs(cr->msg.op) == DCM_REQ ? /* local address */
- inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr) :
- inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr),
- cr->sp ? "<-" : "->",
- ntohs(cr->msg.op) == DCM_REQ ? /* remote address */
- inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr) :
- inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr),
-
- ntohs(cr->msg.op) == DCM_REQ ? /* local port */
- ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port) :
- ntohs(((struct sockaddr_in *)&cr->addr)->sin_port),
-
- ntohs(cr->msg.op) == DCM_REQ ? /* remote port */
- ntohs(((struct sockaddr_in *)&cr->addr)->sin_port) :
- ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port),
-
- cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[2]) : ntohs(*(uint16_t*)&cr->msg.resv[0]),
- cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[0]) : ntohs(*(uint16_t*)&cr->msg.resv[2]));
-
- i++;
- }
- printf("\n");
- dapl_os_unlock(&ia_ptr->hca_ptr->ib_trans.lock);
-}
-#endif
+ addr.sin_port = htons(serviceID + 1000);\r
+ addr.sin_family = AF_INET;\r
+ addr.sin_addr = ((struct sockaddr_in *) &ia_ptr->hca_ptr->hca_address)->sin_addr;\r
+\r
+ if ((bind(cm_ptr->socket, (struct sockaddr *)&addr, sizeof(addr)) < 0)\r
+ || (listen(cm_ptr->socket, 128) < 0)) {\r
+ dapl_log(DAPL_DBG_TYPE_CM,\r
+ " listen: ERROR %s on port %d\n",\r
+ strerror(errno), serviceID + 1000);\r
+ if (dapl_socket_errno() == EADDRINUSE)\r
+ dat_status = DAT_CONN_QUAL_IN_USE;\r
+ else\r
+ dat_status = DAT_CONN_QUAL_UNAVAILABLE;\r
+ goto bail;\r
+ }\r
+\r
+ /* set cm_handle for this service point, save listen socket */\r
+ sp_ptr->cm_srvc_handle = cm_ptr;\r
+ dapl_os_memcpy(&cm_ptr->addr, &addr, sizeof(addr)); \r
+\r
+ /* queue up listen socket to process inbound CR's */\r
+ cm_ptr->state = DCM_LISTEN;\r
+ dapli_cm_queue(cm_ptr);\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " setup listen: port %d cr %p s_fd %d\n",\r
+ serviceID + 1000, cm_ptr, cm_ptr->socket);\r
+\r
+ return dat_status;\r
+bail:\r
+ /* Never queued, destroy here */\r
+ dapls_cm_release(cm_ptr);\r
+ return dat_status;\r
+}\r
+\r
+/*\r
+ * PASSIVE: accept socket \r
+ */\r
+static void dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr)\r
+{\r
+ dp_ib_cm_handle_t acm_ptr;\r
+ int ret, len, opt = 1;\r
+ socklen_t sl;\r
+\r
+ /* \r
+ * Accept all CR's on this port to avoid half-connection (SYN_RCV)\r
+ * stalls with many to one connection storms\r
+ */\r
+ do {\r
+ /* Allocate accept CM and initialize */\r
+ if ((acm_ptr = dapli_cm_alloc(NULL)) == NULL)\r
+ return;\r
+\r
+ acm_ptr->sp = cm_ptr->sp;\r
+ acm_ptr->hca = cm_ptr->hca;\r
+\r
+ len = sizeof(union dcm_addr);\r
+ acm_ptr->socket = accept(cm_ptr->socket,\r
+ (struct sockaddr *)\r
+ &acm_ptr->msg.daddr.so,\r
+ (socklen_t *) &len);\r
+ if (acm_ptr->socket == DAPL_INVALID_SOCKET) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT: ERR %s on FD %d l_cr %p\n",\r
+ strerror(errno), cm_ptr->socket, cm_ptr);\r
+ dapls_cm_release(acm_ptr);\r
+ return;\r
+ }\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM, " accepting from %s %x\n",\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &acm_ptr->msg.daddr.so)->sin_addr),\r
+ ntohs(((struct sockaddr_in *)\r
+ &acm_ptr->msg.daddr.so)->sin_port));\r
+\r
+ /* no delay for small packets */\r
+ ret = setsockopt(acm_ptr->socket, IPPROTO_TCP, TCP_NODELAY,\r
+ (char *)&opt, sizeof(opt));\r
+ if (ret)\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT: NODELAY setsockopt: 0x%x 0x%x %s\n",\r
+ ret, dapl_socket_errno(), strerror(dapl_socket_errno()));\r
+ \r
+ /* get local address information from socket */\r
+ sl = sizeof(acm_ptr->addr);\r
+ getsockname(acm_ptr->socket, (struct sockaddr *)&acm_ptr->addr, &sl);\r
+ acm_ptr->state = DCM_ACCEPTING;\r
+ dapli_cm_queue(acm_ptr);\r
+ \r
+ } while (dapl_poll(cm_ptr->socket, DAPL_FD_READ) == DAPL_FD_READ);\r
+}\r
+\r
+/*\r
+ * PASSIVE: receive peer QP information, private data, post cr_event \r
+ */\r
+static void dapli_socket_accept_data(ib_cm_srvc_handle_t acm_ptr)\r
+{\r
+ int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+ void *p_data = NULL;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket accepted, read QP data\n");\r
+\r
+ /* read in DST QP info, IA address. check for private data */\r
+ len = recv(acm_ptr->socket, (char *)&acm_ptr->msg, exp, 0);\r
+ if (len != exp || ntohs(acm_ptr->msg.ver) != DCM_VER) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT read: ERR %s, rcnt=%d, ver=%d\n",\r
+ strerror(errno), len, ntohs(acm_ptr->msg.ver));\r
+ goto bail;\r
+ }\r
+\r
+ /* keep the QP, address info in network order */\r
+\r
+ /* validate private data size before reading */\r
+ exp = ntohs(acm_ptr->msg.p_size);\r
+ if (exp > DCM_MAX_PDATA_SIZE) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " accept read: psize (%d) wrong\n",\r
+ acm_ptr->msg.p_size);\r
+ goto bail;\r
+ }\r
+\r
+ /* read private data into cm_handle if any present */\r
+ if (exp) {\r
+ len = recv(acm_ptr->socket, acm_ptr->msg.p_data, exp, 0);\r
+ if (len != exp) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " accept read pdata: ERR %s, rcnt=%d\n",\r
+ strerror(errno), len);\r
+ goto bail;\r
+ }\r
+ p_data = acm_ptr->msg.p_data;\r
+ }\r
+\r
+ acm_ptr->state = DCM_ACCEPTING_DATA;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " ACCEPT: DST %s %x lid=0x%x, qpn=0x%x, psz=%d\n",\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &acm_ptr->msg.daddr.so)->sin_addr), \r
+ ntohs(((struct sockaddr_in *)\r
+ &acm_ptr->msg.daddr.so)->sin_port),\r
+ ntohs(acm_ptr->msg.saddr.ib.lid), \r
+ ntohl(acm_ptr->msg.saddr.ib.qpn), exp);\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ if (acm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {\r
+ DAT_IB_EXTENSION_EVENT_DATA xevent;\r
+\r
+ /* post EVENT, modify_qp created ah */\r
+ xevent.status = 0;\r
+ xevent.type = DAT_IB_UD_CONNECT_REQUEST;\r
+\r
+ dapls_evd_post_cr_event_ext(acm_ptr->sp,\r
+ DAT_IB_UD_CONNECTION_REQUEST_EVENT,\r
+ acm_ptr,\r
+ (DAT_COUNT) exp,\r
+ (DAT_PVOID *) acm_ptr->msg.p_data,\r
+ (DAT_PVOID *) &xevent);\r
+ } else\r
+#endif\r
+ /* trigger CR event and return SUCCESS */\r
+ dapls_cr_callback(acm_ptr,\r
+ IB_CME_CONNECTION_REQUEST_PENDING,\r
+ p_data, exp, acm_ptr->sp);\r
+ return;\r
+bail:\r
+ /* mark for destroy, active will see socket close as rej */\r
+ dapli_cm_free(acm_ptr);\r
+ return;\r
+}\r
+\r
+/*\r
+ * PASSIVE: consumer accept, send local QP information, private data, \r
+ * queue on work thread to receive RTU information to avoid blocking\r
+ * user thread. \r
+ */\r
+static DAT_RETURN\r
+dapli_socket_accept_usr(DAPL_EP * ep_ptr,\r
+ DAPL_CR * cr_ptr, DAT_COUNT p_size, DAT_PVOID p_data)\r
+{\r
+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia;\r
+ dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle;\r
+ ib_cm_msg_t local;\r
+ struct iovec iov[2];\r
+ int len, exp = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+ DAT_RETURN ret = DAT_INTERNAL_ERROR;\r
+ socklen_t sl;\r
+\r
+ if (p_size > DCM_MAX_PDATA_SIZE) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " accept_usr: psize(%d) too large\n", p_size);\r
+ return DAT_LENGTH_ERROR;\r
+ }\r
+\r
+ /* must have a accepted socket */\r
+ if (cm_ptr->socket == DAPL_INVALID_SOCKET) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " accept_usr: cm socket invalid\n");\r
+ goto bail;\r
+ }\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " ACCEPT_USR: remote lid=0x%x"\r
+ " qpn=0x%x qp_type %d, psize=%d\n",\r
+ ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn), \r
+ cm_ptr->msg.saddr.ib.qp_type, \r
+ ntohs(cm_ptr->msg.p_size));\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD &&\r
+ ep_ptr->qp_handle->qp_type != IBV_QPT_UD) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT_USR: ERR remote QP is UD,"\r
+ ", but local QP is not\n");\r
+ ret = (DAT_INVALID_HANDLE | DAT_INVALID_HANDLE_EP);\r
+ goto bail;\r
+ }\r
+#endif\r
+\r
+ /* modify QP to RTR and then to RTS with remote info already read */\r
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+ IBV_QPS_RTR, \r
+ cm_ptr->msg.saddr.ib.qpn,\r
+ cm_ptr->msg.saddr.ib.lid,\r
+ (ib_gid_handle_t)cm_ptr->msg.saddr.ib.gid) != DAT_SUCCESS) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT_USR: QPS_RTR ERR %s -> %s\n",\r
+ strerror(errno), \r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr));\r
+ goto bail;\r
+ }\r
+ if (dapls_modify_qp_state(ep_ptr->qp_handle,\r
+ IBV_QPS_RTS, \r
+ cm_ptr->msg.saddr.ib.qpn,\r
+ cm_ptr->msg.saddr.ib.lid,\r
+ NULL) != DAT_SUCCESS) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT_USR: QPS_RTS ERR %s -> %s\n",\r
+ strerror(errno), \r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr));\r
+ goto bail;\r
+ }\r
+\r
+ /* save remote address information */\r
+ dapl_os_memcpy(&ep_ptr->remote_ia_address,\r
+ &cm_ptr->msg.daddr.so,\r
+ sizeof(union dcm_addr));\r
+\r
+ /* send our QP info, IA address, pdata. Don't overwrite dst data */\r
+ local.ver = htons(DCM_VER);\r
+ local.op = htons(DCM_REP);\r
+ local.saddr.ib.qpn = htonl(ep_ptr->qp_handle->qp_num);\r
+ local.saddr.ib.qp_type = ep_ptr->qp_handle->qp_type;\r
+ local.saddr.ib.lid = ia_ptr->hca_ptr->ib_trans.lid;\r
+ dapl_os_memcpy(&local.saddr.ib.gid[0], \r
+ &ia_ptr->hca_ptr->ib_trans.gid, 16);\r
+ \r
+ /* Get local address information from socket */\r
+ sl = sizeof(local.daddr.so);\r
+ getsockname(cm_ptr->socket, (struct sockaddr *)&local.daddr.so, &sl);\r
+\r
+#ifdef DAPL_DBG\r
+ /* DBG: Active PID [0], PASSIVE PID [2] */\r
+ *(uint16_t*)&cm_ptr->msg.resv[2] = htons((uint16_t)dapl_os_getpid()); \r
+ dapl_os_memcpy(local.resv, cm_ptr->msg.resv, 4); \r
+#endif\r
+ cm_ptr->hca = ia_ptr->hca_ptr;\r
+ cm_ptr->state = DCM_ACCEPTED;\r
+\r
+ /* Link CM to EP, already queued on work thread */\r
+ dapl_ep_link_cm(ep_ptr, cm_ptr);\r
+ cm_ptr->ep = ep_ptr;\r
+\r
+ local.p_size = htons(p_size);\r
+ iov[0].iov_base = (void *)&local;\r
+ iov[0].iov_len = exp;\r
+ \r
+ if (p_size) {\r
+ iov[1].iov_base = p_data;\r
+ iov[1].iov_len = p_size;\r
+ len = writev(cm_ptr->socket, iov, 2);\r
+ } else \r
+ len = writev(cm_ptr->socket, iov, 1);\r
+ \r
+ if (len != (p_size + exp)) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT_USR: ERR %s, wcnt=%d -> %s\n",\r
+ strerror(errno), len, \r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr));\r
+ dapl_ep_unlink_cm(ep_ptr, cm_ptr);\r
+ cm_ptr->ep = NULL;\r
+ goto bail;\r
+ }\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " ACCEPT_USR: local lid=0x%x qpn=0x%x psz=%d\n",\r
+ ntohs(local.saddr.ib.lid),\r
+ ntohl(local.saddr.ib.qpn), ntohs(local.p_size));\r
+ dapl_dbg_log(DAPL_DBG_TYPE_CM,\r
+ " ACCEPT_USR: SRC GID subnet %016llx id %016llx\n",\r
+ (unsigned long long)\r
+ htonll(*(uint64_t*)&local.saddr.ib.gid[0]),\r
+ (unsigned long long)\r
+ htonll(*(uint64_t*)&local.saddr.ib.gid[8]));\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: accepted!\n");\r
+\r
+ return DAT_SUCCESS;\r
+bail:\r
+ /* schedule cleanup from workq */\r
+ dapli_cm_free(cm_ptr);\r
+ return ret;\r
+}\r
+\r
+/*\r
+ * PASSIVE: read RTU from active peer, post CONN event\r
+ */\r
+static void dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr)\r
+{\r
+ int len;\r
+ ib_cm_events_t event = IB_CME_CONNECTED;\r
+\r
+ /* complete handshake after final QP state change, VER and OP */\r
+ len = recv(cm_ptr->socket, (char *)&cm_ptr->msg, 4, 0);\r
+ if (len != 4 || ntohs(cm_ptr->msg.op) != DCM_RTU) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " ACCEPT_RTU: rcv ERR, rcnt=%d op=%x\n",\r
+ len, ntohs(cm_ptr->msg.op),\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cm_ptr->msg.daddr.so)->sin_addr));\r
+ event = IB_CME_DESTINATION_REJECT;\r
+ goto bail;\r
+ }\r
+\r
+ /* save state and reference to EP, queue for disc event */\r
+ cm_ptr->state = DCM_CONNECTED;\r
+\r
+ /* final data exchange if remote QP state is good to go */\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " PASSIVE: connected!\n");\r
+\r
+#ifdef DAT_EXTENSIONS\r
+ud_bail:\r
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) {\r
+ DAT_IB_EXTENSION_EVENT_DATA xevent;\r
+\r
+ ib_pd_handle_t pd_handle = \r
+ ((DAPL_PZ *)cm_ptr->ep->param.pz_handle)->pd_handle;\r
+ \r
+ if (event == IB_CME_CONNECTED) {\r
+ cm_ptr->ah = dapls_create_ah(cm_ptr->hca, pd_handle,\r
+ cm_ptr->ep->qp_handle,\r
+ cm_ptr->msg.saddr.ib.lid, \r
+ NULL);\r
+ if (cm_ptr->ah) { \r
+ /* post EVENT, modify_qp created ah */\r
+ xevent.status = 0;\r
+ xevent.type = DAT_IB_UD_PASSIVE_REMOTE_AH;\r
+ xevent.remote_ah.ah = cm_ptr->ah;\r
+ xevent.remote_ah.qpn = ntohl(cm_ptr->msg.saddr.ib.qpn);\r
+ dapl_os_memcpy(&xevent.remote_ah.ia_addr,\r
+ &cm_ptr->msg.daddr.so,\r
+ sizeof(union dcm_addr));\r
+ event = DAT_IB_UD_CONNECTION_EVENT_ESTABLISHED;\r
+ } else \r
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+ } else \r
+ event = DAT_IB_UD_CONNECTION_ERROR_EVENT;\r
+\r
+ dapl_log(DAPL_DBG_TYPE_CM, \r
+ " CONN_RTU: UD AH %p for lid 0x%x qpn 0x%x\n", \r
+ cm_ptr->ah, ntohs(cm_ptr->msg.saddr.ib.lid),\r
+ ntohl(cm_ptr->msg.saddr.ib.qpn));\r
+\r
+ dapls_evd_post_connection_event_ext(\r
+ (DAPL_EVD *) \r
+ cm_ptr->ep->param.connect_evd_handle,\r
+ event,\r
+ (DAT_EP_HANDLE) cm_ptr->ep,\r
+ (DAT_COUNT) ntohs(cm_ptr->msg.p_size),\r
+ (DAT_PVOID *) cm_ptr->msg.p_data,\r
+ (DAT_PVOID *) &xevent);\r
+\r
+ /* cleanup and release from local list, still on EP list */\r
+ dapli_cm_free(cm_ptr);\r
+ \r
+ } else \r
+#endif\r
+ {\r
+ dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);\r
+ }\r
+ return;\r
+ \r
+bail:\r
+#ifdef DAT_EXTENSIONS\r
+ if (cm_ptr->msg.saddr.ib.qp_type == IBV_QPT_UD) \r
+ goto ud_bail;\r
+#endif\r
+ cm_ptr->state = DCM_REJECTED;\r
+ dapls_cr_callback(cm_ptr, event, NULL, 0, cm_ptr->sp);\r
+ dapli_cm_free(cm_ptr);\r
+}\r
+\r
+/*\r
+ * dapls_ib_connect\r
+ *\r
+ * Initiate a connection with the passive listener on another node\r
+ *\r
+ * Input:\r
+ * ep_handle,\r
+ * remote_ia_address,\r
+ * remote_conn_qual,\r
+ * prd_size size of private data and structure\r
+ * prd_prt pointer to private data structure\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ * DAT_INSUFFICIENT_RESOURCES\r
+ * DAT_INVALID_PARAMETER\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_connect(IN DAT_EP_HANDLE ep_handle,\r
+ IN DAT_IA_ADDRESS_PTR remote_ia_address,\r
+ IN DAT_CONN_QUAL remote_conn_qual,\r
+ IN DAT_COUNT private_data_size, IN void *private_data)\r
+{\r
+ DAPL_EP *ep_ptr = (DAPL_EP *) ep_handle;\r
+ \r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+ " connect(ep_handle %p ....)\n", ep_handle);\r
+\r
+ return (dapli_socket_connect(ep_ptr, remote_ia_address,\r
+ remote_conn_qual,\r
+ private_data_size, private_data));\r
+}\r
+\r
+/*\r
+ * dapls_ib_disconnect\r
+ *\r
+ * Disconnect an EP\r
+ *\r
+ * Input:\r
+ * ep_handle,\r
+ * disconnect_flags\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ */\r
+DAT_RETURN\r
+dapls_ib_disconnect(IN DAPL_EP * ep_ptr, IN DAT_CLOSE_FLAGS close_flags)\r
+{\r
+ dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);\r
+\r
+ if (ep_ptr->param.ep_state == DAT_EP_STATE_DISCONNECTED ||\r
+ ep_ptr->param.ep_attr.service_type != DAT_SERVICE_TYPE_RC) {\r
+ return DAT_SUCCESS;\r
+ } \r
+ \r
+ /* RC. Transition to error state to flush queue */\r
+ dapls_modify_qp_state(ep_ptr->qp_handle, IBV_QPS_ERR, 0, 0, 0);\r
+\r
+ return (dapli_socket_disconnect(cm_ptr));\r
+}\r
+\r
+/*\r
+ * dapls_ib_disconnect_clean\r
+ *\r
+ * Clean up outstanding connection data. This routine is invoked\r
+ * after the final disconnect callback has occurred. Only on the\r
+ * ACTIVE side of a connection. It is also called if dat_ep_connect\r
+ * times out using the consumer supplied timeout value.\r
+ *\r
+ * Input:\r
+ * ep_ptr DAPL_EP\r
+ * active Indicates active side of connection\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * void\r
+ *\r
+ */\r
+void\r
+dapls_ib_disconnect_clean(IN DAPL_EP * ep_ptr,\r
+ IN DAT_BOOLEAN active,\r
+ IN const ib_cm_events_t ib_cm_event)\r
+{\r
+ if (ib_cm_event == IB_CME_TIMEOUT) {\r
+ dp_ib_cm_handle_t cm_ptr = dapl_get_cm_from_ep(ep_ptr);\r
+\r
+ dapl_log(DAPL_DBG_TYPE_WARN,\r
+ "dapls_ib_disc_clean: CONN_TIMEOUT ep %p cm %p %s\n",\r
+ ep_ptr, cm_ptr, dapl_cm_state_str(cm_ptr->state));\r
+ \r
+ /* schedule release of socket and local resources */\r
+ dapli_cm_free(cm_ptr);\r
+ }\r
+}\r
+\r
+/*\r
+ * dapl_ib_setup_conn_listener\r
+ *\r
+ * Have the CM set up a connection listener.\r
+ *\r
+ * Input:\r
+ * ibm_hca_handle HCA handle\r
+ * qp_handle QP handle\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ * DAT_INSUFFICIENT_RESOURCES\r
+ * DAT_INTERNAL_ERROR\r
+ * DAT_CONN_QUAL_UNAVAILBLE\r
+ * DAT_CONN_QUAL_IN_USE\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_setup_conn_listener(IN DAPL_IA * ia_ptr,\r
+ IN DAT_UINT64 ServiceID, IN DAPL_SP * sp_ptr)\r
+{\r
+ return (dapli_socket_listen(ia_ptr, ServiceID, sp_ptr));\r
+}\r
+\r
+/*\r
+ * dapl_ib_remove_conn_listener\r
+ *\r
+ * Have the CM remove a connection listener.\r
+ *\r
+ * Input:\r
+ * ia_handle IA handle\r
+ * ServiceID IB Channel Service ID\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ * DAT_INVALID_STATE\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_remove_conn_listener(IN DAPL_IA * ia_ptr, IN DAPL_SP * sp_ptr)\r
+{\r
+ ib_cm_srvc_handle_t cm_ptr = sp_ptr->cm_srvc_handle;\r
+\r
+ /* free cm_srvc_handle, release will cleanup */\r
+ if (cm_ptr != NULL) {\r
+ /* cr_thread will free */\r
+ sp_ptr->cm_srvc_handle = NULL;\r
+ dapli_cm_free(cm_ptr);\r
+ }\r
+ return DAT_SUCCESS;\r
+}\r
+\r
+/*\r
+ * dapls_ib_accept_connection\r
+ *\r
+ * Perform necessary steps to accept a connection\r
+ *\r
+ * Input:\r
+ * cr_handle\r
+ * ep_handle\r
+ * private_data_size\r
+ * private_data\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ * DAT_INSUFFICIENT_RESOURCES\r
+ * DAT_INTERNAL_ERROR\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_accept_connection(IN DAT_CR_HANDLE cr_handle,\r
+ IN DAT_EP_HANDLE ep_handle,\r
+ IN DAT_COUNT p_size, IN const DAT_PVOID p_data)\r
+{\r
+ DAPL_CR *cr_ptr;\r
+ DAPL_EP *ep_ptr;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+ "dapls_ib_accept_connection(cr %p ep %p prd %p,%d)\n",\r
+ cr_handle, ep_handle, p_data, p_size);\r
+\r
+ cr_ptr = (DAPL_CR *) cr_handle;\r
+ ep_ptr = (DAPL_EP *) ep_handle;\r
+\r
+ /* allocate and attach a QP if necessary */\r
+ if (ep_ptr->qp_state == DAPL_QP_STATE_UNATTACHED) {\r
+ DAT_RETURN status;\r
+ status = dapls_ib_qp_alloc(ep_ptr->header.owner_ia,\r
+ ep_ptr, ep_ptr);\r
+ if (status != DAT_SUCCESS)\r
+ return status;\r
+ }\r
+ return (dapli_socket_accept_usr(ep_ptr, cr_ptr, p_size, p_data));\r
+}\r
+\r
+/*\r
+ * dapls_ib_reject_connection\r
+ *\r
+ * Reject a connection\r
+ *\r
+ * Input:\r
+ * cr_handle\r
+ *\r
+ * Output:\r
+ * none\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ * DAT_INTERNAL_ERROR\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_reject_connection(IN dp_ib_cm_handle_t cm_ptr,\r
+ IN int reason,\r
+ IN DAT_COUNT psize, IN const DAT_PVOID pdata)\r
+{\r
+ struct iovec iov[2];\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+ " reject(cm %p reason %x, pdata %p, psize %d)\n",\r
+ cm_ptr, reason, pdata, psize);\r
+\r
+ if (psize > DCM_MAX_PDATA_SIZE)\r
+ return DAT_LENGTH_ERROR;\r
+\r
+ /* write reject data to indicate reject */\r
+ cm_ptr->msg.op = htons(DCM_REJ_USER);\r
+ cm_ptr->msg.p_size = htons(psize);\r
+ \r
+ iov[0].iov_base = (void *)&cm_ptr->msg;\r
+ iov[0].iov_len = sizeof(ib_cm_msg_t) - DCM_MAX_PDATA_SIZE;\r
+ if (psize) {\r
+ iov[1].iov_base = pdata;\r
+ iov[1].iov_len = psize;\r
+ writev(cm_ptr->socket, iov, 2);\r
+ } else {\r
+ writev(cm_ptr->socket, iov, 1);\r
+ }\r
+\r
+ /* release and cleanup CM object */\r
+ dapli_cm_free(cm_ptr);\r
+ return DAT_SUCCESS;\r
+}\r
+\r
+/*\r
+ * dapls_ib_cm_remote_addr\r
+ *\r
+ * Obtain the remote IP address given a connection\r
+ *\r
+ * Input:\r
+ * cr_handle\r
+ *\r
+ * Output:\r
+ * remote_ia_address: where to place the remote address\r
+ *\r
+ * Returns:\r
+ * DAT_SUCCESS\r
+ * DAT_INVALID_HANDLE\r
+ *\r
+ */\r
+DAT_RETURN\r
+dapls_ib_cm_remote_addr(IN DAT_HANDLE dat_handle,\r
+ OUT DAT_SOCK_ADDR6 * remote_ia_address)\r
+{\r
+ DAPL_HEADER *header;\r
+ dp_ib_cm_handle_t conn;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_EP,\r
+ "dapls_ib_cm_remote_addr(dat_handle %p, ....)\n",\r
+ dat_handle);\r
+\r
+ header = (DAPL_HEADER *) dat_handle;\r
+\r
+ if (header->magic == DAPL_MAGIC_EP)\r
+ conn = dapl_get_cm_from_ep((DAPL_EP *) dat_handle);\r
+ else if (header->magic == DAPL_MAGIC_CR)\r
+ conn = ((DAPL_CR *) dat_handle)->ib_cm_handle;\r
+ else\r
+ return DAT_INVALID_HANDLE;\r
+\r
+ dapl_os_memcpy(remote_ia_address,\r
+ &conn->msg.daddr.so, sizeof(DAT_SOCK_ADDR6));\r
+\r
+ return DAT_SUCCESS;\r
+}\r
+\r
+int dapls_ib_private_data_size(\r
+ IN DAPL_HCA *hca_ptr)\r
+{\r
+ return DCM_MAX_PDATA_SIZE;\r
+}\r
+\r
+/* outbound/inbound CR processing thread to avoid blocking applications */\r
+void cr_thread(void *arg)\r
+{\r
+ struct dapl_hca *hca_ptr = arg;\r
+ dp_ib_cm_handle_t cr, next_cr;\r
+ int opt, ret;\r
+ socklen_t opt_len;\r
+ char rbuf[2];\r
+ struct dapl_fd_set *set;\r
+ enum DAPL_FD_EVENTS event;\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_UTIL, " cr_thread: ENTER hca %p\n", hca_ptr);\r
+ set = dapl_alloc_fd_set();\r
+ if (!set)\r
+ goto out;\r
+\r
+ dapl_os_lock(&hca_ptr->ib_trans.lock);\r
+ hca_ptr->ib_trans.cr_state = IB_THREAD_RUN;\r
+\r
+ while (1) {\r
+ dapl_fd_zero(set);\r
+ dapl_fd_set(hca_ptr->ib_trans.scm[0], set, DAPL_FD_READ);\r
+\r
+ if (!dapl_llist_is_empty(&hca_ptr->ib_trans.list))\r
+ next_cr = dapl_llist_peek_head(&hca_ptr->ib_trans.list);\r
+ else\r
+ next_cr = NULL;\r
+\r
+ while (next_cr) {\r
+ cr = next_cr;\r
+ next_cr = dapl_llist_next_entry(&hca_ptr->ib_trans.list,\r
+ (DAPL_LLIST_ENTRY *) \r
+ &cr->local_entry);\r
+ dapls_cm_acquire(cr); /* hold thread ref */\r
+ dapl_os_lock(&cr->lock);\r
+ if (cr->state == DCM_FREE || \r
+ hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) {\r
+ dapl_log(DAPL_DBG_TYPE_CM, \r
+ " CM FREE: %p ep=%p st=%s sck=%d refs=%d\n", \r
+ cr, cr->ep, dapl_cm_state_str(cr->state), \r
+ cr->socket, cr->ref_count);\r
+\r
+ if (cr->socket != DAPL_INVALID_SOCKET) {\r
+ shutdown(cr->socket, SHUT_RDWR);\r
+ closesocket(cr->socket);\r
+ cr->socket = DAPL_INVALID_SOCKET;\r
+ }\r
+ dapl_os_unlock(&cr->lock);\r
+ dapls_cm_release(cr); /* release alloc ref */\r
+ dapli_cm_dequeue(cr); /* release workq ref */\r
+ dapls_cm_release(cr); /* release thread ref */\r
+ continue;\r
+ }\r
+\r
+ event = (cr->state == DCM_CONN_PENDING) ?\r
+ DAPL_FD_WRITE : DAPL_FD_READ;\r
+\r
+ if (dapl_fd_set(cr->socket, set, event)) {\r
+ dapl_log(DAPL_DBG_TYPE_ERR,\r
+ " cr_thread: fd_set ERR st=%d fd %d"\r
+ " -> %s\n", cr->state, cr->socket,\r
+ inet_ntoa(((struct sockaddr_in *)\r
+ &cr->msg.daddr.so)->sin_addr));\r
+ dapl_os_unlock(&cr->lock);\r
+ dapls_cm_release(cr); /* release ref */\r
+ continue;\r
+ }\r
+ dapl_os_unlock(&cr->lock);\r
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);\r
+ \r
+ ret = dapl_poll(cr->socket, event);\r
+\r
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD,\r
+ " poll ret=0x%x %s sck=%d\n",\r
+ ret, dapl_cm_state_str(cr->state), \r
+ cr->socket);\r
+\r
+ /* data on listen, qp exchange, and on disc req */\r
+ if ((ret == DAPL_FD_READ) || \r
+ (cr->state != DCM_CONN_PENDING && ret == DAPL_FD_ERROR)) {\r
+ if (cr->socket != DAPL_INVALID_SOCKET) {\r
+ switch (cr->state) {\r
+ case DCM_LISTEN:\r
+ dapli_socket_accept(cr);\r
+ break;\r
+ case DCM_ACCEPTING:\r
+ dapli_socket_accept_data(cr);\r
+ break;\r
+ case DCM_ACCEPTED:\r
+ dapli_socket_accept_rtu(cr);\r
+ break;\r
+ case DCM_REP_PENDING:\r
+ dapli_socket_connect_rtu(cr);\r
+ break;\r
+ case DCM_CONNECTED:\r
+ dapli_socket_disconnect(cr);\r
+ break;\r
+ default:\r
+ break;\r
+ }\r
+ }\r
+ /* ASYNC connections, writable, readable, error; check status */\r
+ } else if (ret == DAPL_FD_WRITE ||\r
+ (cr->state == DCM_CONN_PENDING && \r
+ ret == DAPL_FD_ERROR)) {\r
+\r
+ if (ret == DAPL_FD_ERROR)\r
+ dapl_log(DAPL_DBG_TYPE_ERR, " CONN_PENDING - FD_ERROR\n");\r
+ \r
+ opt = 0;\r
+ opt_len = sizeof(opt);\r
+ ret = getsockopt(cr->socket, SOL_SOCKET,\r
+ SO_ERROR, (char *)&opt,\r
+ &opt_len);\r
+ if (!ret && !opt)\r
+ dapli_socket_connected(cr, opt);\r
+ else\r
+ dapli_socket_connected(cr, opt ? opt : dapl_socket_errno());\r
+ } \r
+\r
+ dapls_cm_release(cr); /* release ref */\r
+ dapl_os_lock(&hca_ptr->ib_trans.lock);\r
+ }\r
+\r
+ /* set to exit and all resources destroyed */\r
+ if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&\r
+ (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))\r
+ break;\r
+\r
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);\r
+ dapl_select(set);\r
+\r
+ /* if pipe used to wakeup, consume */\r
+ while (dapl_poll(hca_ptr->ib_trans.scm[0], \r
+ DAPL_FD_READ) == DAPL_FD_READ) {\r
+ if (recv(hca_ptr->ib_trans.scm[0], rbuf, 2, 0) == -1)\r
+ dapl_log(DAPL_DBG_TYPE_THREAD,\r
+ " cr_thread: read pipe error = %s\n",\r
+ strerror(errno));\r
+ }\r
+ dapl_os_lock(&hca_ptr->ib_trans.lock);\r
+ \r
+ /* set to exit and all resources destroyed */\r
+ if ((hca_ptr->ib_trans.cr_state != IB_THREAD_RUN) &&\r
+ (dapl_llist_is_empty(&hca_ptr->ib_trans.list)))\r
+ break;\r
+ }\r
+\r
+ dapl_os_unlock(&hca_ptr->ib_trans.lock);\r
+ dapl_os_free(set, sizeof(struct dapl_fd_set));\r
+out:\r
+ hca_ptr->ib_trans.cr_state = IB_THREAD_EXIT;\r
+ dapl_dbg_log(DAPL_DBG_TYPE_THREAD, " cr_thread(hca %p) exit\n", hca_ptr);\r
+}\r
+\r
+\r
+#ifdef DAPL_COUNTERS\r
+/* Debug aid: List all Connections in process and state */\r
+void dapls_print_cm_list(IN DAPL_IA *ia_ptr)\r
+{\r
+ /* Print in process CR's for this IA, if debug type set */\r
+ int i = 0;\r
+ dp_ib_cm_handle_t cr, next_cr;\r
+\r
+ dapl_os_lock(&ia_ptr->hca_ptr->ib_trans.lock);\r
+ if (!dapl_llist_is_empty((DAPL_LLIST_HEAD*)\r
+ &ia_ptr->hca_ptr->ib_trans.list))\r
+ next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*)\r
+ &ia_ptr->hca_ptr->ib_trans.list);\r
+ else\r
+ next_cr = NULL;\r
+\r
+ printf("\n DAPL IA CONNECTIONS IN PROCESS:\n");\r
+ while (next_cr) {\r
+ cr = next_cr;\r
+ next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*)\r
+ &ia_ptr->hca_ptr->ib_trans.list,\r
+ (DAPL_LLIST_ENTRY*)&cr->local_entry);\r
+\r
+ printf( " CONN[%d]: sp %p ep %p sock %d %s %s %s %s %s %s PORT L-%x R-%x PID L-%x R-%x\n",\r
+ i, cr->sp, cr->ep, cr->socket,\r
+ cr->msg.saddr.ib.qp_type == IBV_QPT_RC ? "RC" : "UD",\r
+ dapl_cm_state_str(cr->state), dapl_cm_op_str(ntohs(cr->msg.op)),\r
+ ntohs(cr->msg.op) == DCM_REQ ? /* local address */\r
+ inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr) :\r
+ inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr),\r
+ cr->sp ? "<-" : "->",\r
+ ntohs(cr->msg.op) == DCM_REQ ? /* remote address */\r
+ inet_ntoa(((struct sockaddr_in *)&cr->addr)->sin_addr) :\r
+ inet_ntoa(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_addr),\r
+\r
+ ntohs(cr->msg.op) == DCM_REQ ? /* local port */\r
+ ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port) :\r
+ ntohs(((struct sockaddr_in *)&cr->addr)->sin_port),\r
+\r
+ ntohs(cr->msg.op) == DCM_REQ ? /* remote port */\r
+ ntohs(((struct sockaddr_in *)&cr->addr)->sin_port) :\r
+ ntohs(((struct sockaddr_in *)&cr->msg.daddr.so)->sin_port),\r
+\r
+ cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[2]) : ntohs(*(uint16_t*)&cr->msg.resv[0]),\r
+ cr->sp ? ntohs(*(uint16_t*)&cr->msg.resv[0]) : ntohs(*(uint16_t*)&cr->msg.resv[2]));\r
+\r
+ i++;\r
+ }\r
+ printf("\n");\r
+ dapl_os_unlock(&ia_ptr->hca_ptr->ib_trans.lock);\r
+}\r
+#endif\r