2 * Copyright (c) 2005 SilverStorm Technologies. All rights reserved.
\r
4 * This software is available to you under the OpenIB.org BSD license
\r
7 * Redistribution and use in source and binary forms, with or
\r
8 * without modification, are permitted provided that the following
\r
9 * conditions are met:
\r
11 * - Redistributions of source code must retain the above
\r
12 * copyright notice, this list of conditions and the following
\r
15 * - Redistributions in binary form must reproduce the above
\r
16 * copyright notice, this list of conditions and the following
\r
17 * disclaimer in the documentation and/or other materials
\r
18 * provided with the distribution.
\r
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
\r
21 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
\r
22 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
\r
23 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
\r
24 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
\r
25 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
26 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
\r
32 #include "ibspdll.h"
\r
34 #ifdef PERFMON_ENABLED
\r
35 #include "ibsp_perfmon.h"
\r
39 typedef struct _io_comp_info
\r
41 struct ibsp_socket_info *p_socket;
\r
42 LPWSAOVERLAPPED p_ov;
\r
47 /* Work queue entry completion routine. */
\r
50 IN const ib_wc_t *wc,
\r
51 OUT io_comp_info_t *p_io_info )
\r
53 struct _wr *wr = NULL;
\r
54 struct _recv_wr *p_recv_wr = NULL;
\r
55 LPWSAOVERLAPPED lpOverlapped = NULL;
\r
56 struct ibsp_socket_info *socket_info = NULL;
\r
58 IBSP_ENTER( IBSP_DBG_IO );
\r
60 wr = (struct _wr * __ptr64)wc->wr_id;
\r
61 p_recv_wr = (struct _recv_wr * __ptr64)wc->wr_id;
\r
65 socket_info = wr->socket_info;
\r
66 p_io_info->p_socket = socket_info;
\r
68 lpOverlapped = wr->lpOverlapped;
\r
70 IBSP_TRACE4( IBSP_DBG_IO,
\r
71 ("socket %p, ov %p, work completion status=%s, wc_type=%s\n",
\r
72 socket_info, lpOverlapped, ib_get_wc_status_str( wc->status ),
\r
73 ib_get_wc_type_str( wc->wc_type )) );
\r
75 /* Set the windows error code. It's not easy to find an easy
\r
76 * correspondence between the IBAL error codes and windows error
\r
77 * codes; but it probably does not matter, as long as it returns an
\r
79 switch( wc->status )
\r
81 case IB_WCS_SUCCESS:
\r
83 * Set the length of the operation. Under Infiniband, the work
\r
84 * completion length is only valid for a receive
\r
85 * operation. Fortunately we had already set the length during the
\r
88 * lpWPUCompleteOverlappedRequest is supposed to store the length
\r
89 * into InternalHigh, however it will not be called if the low
\r
90 * order bit of lpOverlapped->hEvent is set. So we do it and hope
\r
93 * NOTE: Without a valid length, the switch doesn't seem to call
\r
94 * GetOverlappedResult() even if we call lpWPUCompleteOverlappedRequest()
\r
96 switch ( wc->wc_type )
\r
99 lpOverlapped->InternalHigh = wc->length;
\r
100 #ifdef IBSP_LOGGING
\r
101 cl_spinlock_acquire( &socket_info->recv_lock );
\r
102 DataLogger_WriteData(&socket_info->RecvDataLogger,
\r
103 p_recv_wr->idx, (void * __ptr64)p_recv_wr->ds_array[0].vaddr,
\r
105 cl_spinlock_release( &socket_info->recv_lock );
\r
107 #ifdef PERFMON_ENABLED
\r
108 InterlockedIncrement64( &g_pm_stat.pdata[COMP_RECV] );
\r
109 InterlockedExchangeAdd64( &g_pm_stat.pdata[BYTES_RECV],
\r
110 lpOverlapped->InternalHigh );
\r
113 cl_atomic_inc(&g_ibsp.total_recv_compleated);
\r
116 #ifdef PERFMON_ENABLED
\r
118 case IB_WC_RDMA_READ:
\r
119 lpOverlapped->InternalHigh = wc->length;
\r
120 InterlockedIncrement64( &g_pm_stat.pdata[COMP_RECV] );
\r
121 InterlockedExchangeAdd64( &g_pm_stat.pdata[BYTES_READ],
\r
122 lpOverlapped->InternalHigh );
\r
126 InterlockedIncrement64( &g_pm_stat.pdata[COMP_SEND] );
\r
127 InterlockedExchangeAdd64( &g_pm_stat.pdata[BYTES_SEND],
\r
128 lpOverlapped->InternalHigh );
\r
131 case IB_WC_RDMA_WRITE:
\r
132 InterlockedIncrement64( &g_pm_stat.pdata[COMP_SEND] );
\r
133 InterlockedExchangeAdd64( &g_pm_stat.pdata[BYTES_WRITE],
\r
134 lpOverlapped->InternalHigh );
\r
135 #endif /* PERFMON_ENABLED */
\r
141 lpOverlapped->OffsetHigh = 0;
\r
144 case IB_WCS_WR_FLUSHED_ERR:
\r
145 cl_spinlock_acquire( &socket_info->mutex );
\r
147 if( socket_info->socket_state == IBSP_DUPLICATING_REMOTE &&
\r
148 wc->wc_type == IB_WC_RECV )
\r
151 * Take the wr off the wr_list, and place onto the
\r
152 * dup_wr_list. We will post them later on the new QP.
\r
154 cl_spinlock_acquire( &socket_info->recv_lock );
\r
156 /* Copy to the duplicate WR array. */
\r
157 socket_info->dup_wr[socket_info->dup_idx] = *p_recv_wr;
\r
159 #if QP_ATTRIB_RQ_DEPTH == 256 || QP_ATTRIB_RQ_DEPTH == 128 || \
\r
160 QP_ATTRIB_RQ_DEPTH == 64 || QP_ATTRIB_RQ_DEPTH == 32 || \
\r
161 QP_ATTRIB_RQ_DEPTH == 16 || QP_ATTRIB_RQ_DEPTH == 8
\r
162 socket_info->dup_idx++;
\r
163 socket_info->dup_idx &= (QP_ATTRIB_RQ_DEPTH - 1);
\r
165 if( ++socket_info->dup_idx == QP_ATTRIB_RQ_DEPTH )
\r
166 socket_info->dup_idx = 0;
\r
169 cl_atomic_inc( &socket_info->dup_cnt );
\r
170 /* ib_cq_comp will decrement the receive count. */
\r
171 cl_atomic_dec( &socket_info->recv_cnt );
\r
173 cl_spinlock_release( &socket_info->recv_lock );
\r
175 cl_spinlock_release( &socket_info->mutex );
\r
176 p_io_info->p_ov = NULL;
\r
177 IBSP_EXIT( IBSP_DBG_IO );
\r
181 /* Check for flushing the receive buffers on purpose. */
\r
182 if( socket_info->socket_state == IBSP_DUPLICATING_OLD )
\r
183 wr->lpOverlapped->OffsetHigh = 0;
\r
185 wr->lpOverlapped->OffsetHigh = WSA_OPERATION_ABORTED;
\r
187 cl_spinlock_release( &socket_info->mutex );
\r
189 /* Override the length, as per the WSD specs. */
\r
190 wr->lpOverlapped->InternalHigh = 0;
\r
193 case IB_WCS_LOCAL_LEN_ERR:
\r
194 case IB_WCS_LOCAL_OP_ERR:
\r
195 case IB_WCS_LOCAL_PROTECTION_ERR:
\r
196 case IB_WCS_MEM_WINDOW_BIND_ERR:
\r
197 case IB_WCS_REM_ACCESS_ERR:
\r
198 case IB_WCS_REM_OP_ERR:
\r
199 case IB_WCS_RNR_RETRY_ERR:
\r
200 case IB_WCS_TIMEOUT_RETRY_ERR:
\r
201 case IB_WCS_REM_INVALID_REQ_ERR:
\r
203 IBSP_ERROR( ("%s error: %s\n",
\r
204 ib_get_wc_type_str( wc->wc_type ),
\r
205 ib_get_wc_status_str( wc->status )) );
\r
206 lpOverlapped->OffsetHigh = WSAECONNABORTED;
\r
207 wr->lpOverlapped->InternalHigh = 0;
\r
208 socket_info->qp_error = WSAECONNABORTED;
\r
212 #ifdef PERFMON_ENABLED
\r
213 InterlockedIncrement64( &g_pm_stat.pdata[COMP_TOTAL] );
\r
217 if( wc->wc_type == IB_WC_RECV )
\r
219 // This code requires the recv count to be decremented here, but it needs
\r
220 // to be decremented after any callbacks are invoked so socket destruction
\r
221 // gets delayed until all callbacks have been invoked.
\r
225 // cl_spinlock_acquire( &socket_info->recv_lock );
\r
226 // idx = socket_info->recv_idx - (uint8_t)socket_info->recv_cnt;
\r
227 // if( idx >= QP_ATTRIB_RQ_DEPTH )
\r
228 // idx += QP_ATTRIB_RQ_DEPTH;
\r
230 // CL_ASSERT( wc->wr_id == (uint64_t)(void* __ptr64)&socket_info->recv_wr[idx] );
\r
231 // cl_atomic_dec( &socket_info->recv_cnt );
\r
232 // cl_spinlock_release( &socket_info->recv_lock );
\r
235 if( wc->status == IB_SUCCESS && p_recv_wr->ds_array[0].length >= 40 )
\r
237 debug_dump_buffer( IBSP_DBG_WQ | IBSP_DBG_LEVEL4, "RECV",
\r
238 (void * __ptr64)p_recv_wr->ds_array[0].vaddr, 40 );
\r
241 cl_atomic_dec( &g_ibsp.recv_count );
\r
242 cl_atomic_inc( &socket_info->recv_comp );
\r
244 memset( p_recv_wr, 0x33, sizeof(struct _recv_wr) );
\r
248 // This code requires the send count to be decremented here, but it needs
\r
249 // to be decremented after any callbacks are invoked so socket destruction
\r
250 // gets delayed until all callbacks have been invoked.
\r
254 // cl_spinlock_acquire( &socket_info->send_lock );
\r
255 // idx = socket_info->send_idx - (uint8_t)socket_info->send_cnt;
\r
256 // if( idx >= QP_ATTRIB_SQ_DEPTH )
\r
257 // idx += QP_ATTRIB_SQ_DEPTH;
\r
258 // CL_ASSERT( wc->wr_id == (uint64_t)(void* __ptr64)&socket_info->send_wr[idx] );
\r
259 // cl_atomic_dec( &socket_info->send_cnt );
\r
260 // cl_spinlock_release( &socket_info->send_lock );
\r
263 if( wc->wc_type == IB_WC_SEND )
\r
265 cl_atomic_dec( &g_ibsp.send_count );
\r
266 cl_atomic_inc( &socket_info->send_comp );
\r
268 fzprint(("%s():%d:0x%x:0x%x: send_count=%d\n",
\r
270 __LINE__, GetCurrentProcessId(), GetCurrentThreadId(), g_ibsp.send_count));
\r
273 memset( wr, 0x33, sizeof(struct _wr) );
\r
277 IBSP_TRACE4( IBSP_DBG_IO,
\r
278 ("overlapped=%p, InternalHigh=%d, hEvent=%x\n",
\r
279 lpOverlapped, lpOverlapped->InternalHigh,
\r
280 (uintptr_t) lpOverlapped->hEvent) );
\r
282 /* Don't notify the switch for that completion only if:
\r
283 * - the switch don't want a notification
\r
284 * - the wq completed with success
\r
285 * - the socket is still connected
\r
287 if( ((uintptr_t) lpOverlapped->hEvent) & 0x00000001 )
\r
289 /* Indicate this operation is complete. The switch will poll
\r
290 * with calls to WSPGetOverlappedResult(). */
\r
293 cl_atomic_dec( &g_ibsp.overlap_h1_comp_count );
\r
295 fzprint(("%s():%d:0x%x:0x%x: ov=0x%p h0=%d h1=%d h1_c=%d send=%d recv=%d\n",
\r
296 __FUNCTION__, __LINE__, GetCurrentProcessId(),
\r
297 GetCurrentThreadId(), lpOverlapped,
\r
298 g_ibsp.overlap_h0_count, g_ibsp.overlap_h1_count,
\r
299 g_ibsp.overlap_h1_comp_count, g_ibsp.send_count, g_ibsp.recv_count));
\r
302 IBSP_TRACE1( IBSP_DBG_IO,
\r
303 ("Not calling lpWPUCompleteOverlappedRequest: "
\r
304 "socket=%p, ov=%p OffsetHigh=%d, InternalHigh=%d hEvent=%p\n",
\r
305 socket_info, lpOverlapped, lpOverlapped->OffsetHigh,
\r
306 lpOverlapped->InternalHigh, lpOverlapped->hEvent) );
\r
308 lpOverlapped->Internal = 0;
\r
309 p_io_info->p_ov = NULL;
\r
314 cl_atomic_dec( &g_ibsp.overlap_h0_count );
\r
316 fzprint(("%s():%d:0x%x:0x%x: ov=0x%p h0=%d h1=%d h1_c=%d send=%d recv=%d\n",
\r
317 __FUNCTION__, __LINE__, GetCurrentProcessId(),
\r
318 GetCurrentThreadId(), lpOverlapped,
\r
319 g_ibsp.overlap_h0_count, g_ibsp.overlap_h1_count,
\r
320 g_ibsp.overlap_h1_comp_count, g_ibsp.send_count, g_ibsp.recv_count));
\r
323 p_io_info->p_ov = lpOverlapped;
\r
324 cl_atomic_inc( &socket_info->ref_cnt );
\r
327 if( wc->wc_type == IB_WC_RECV )
\r
329 cl_atomic_dec( &socket_info->recv_cnt );
\r
333 cl_atomic_dec( &socket_info->send_cnt );
\r
336 IBSP_EXIT( IBSP_DBG_IO );
\r
340 /* CQ completion handler. */
\r
345 struct cq_thread_info *cq_tinfo = cq_context;
\r
346 ib_api_status_t status;
\r
347 ib_wc_t wclist[WC_LIST_SIZE];
\r
348 ib_wc_t *free_wclist;
\r
349 ib_wc_t *done_wclist;
\r
350 io_comp_info_t info[WC_LIST_SIZE];
\r
358 IBSP_ENTER( IBSP_DBG_WQ );
\r
360 CL_ASSERT( WC_LIST_SIZE >= 1 );
\r
364 /* Try to retrieve up to WC_LIST_SIZE completions at a time. */
\r
365 for( i = 0; i < (WC_LIST_SIZE - 1); i++ )
\r
367 wclist[i].p_next = &wclist[i + 1];
\r
369 wclist[(WC_LIST_SIZE - 1)].p_next = NULL;
\r
371 free_wclist = &wclist[0];
\r
372 done_wclist = NULL;
\r
374 status = ib_poll_cq( cq_tinfo->cq, &free_wclist, &done_wclist );
\r
376 IBSP_TRACE( IBSP_DBG_WQ,
\r
377 ("%s():%d:0x%x:0x%x: poll CQ got status %d, free=%p, done=%p\n",
\r
378 __FUNCTION__, __LINE__, GetCurrentProcessId(), GetCurrentThreadId(),
\r
379 status, free_wclist, done_wclist) );
\r
387 case IB_INVALID_CQ_HANDLE:
\r
388 /* This happens when the switch closes the socket while the
\r
389 * execution thread was calling lpWPUCompleteOverlappedRequest. */
\r
391 ("ib_poll_cq returned IB_INVLALID_CQ_HANDLE\n") );
\r
396 ("ib_poll_cq failed returned %s\n", ib_get_err_str( status )) );
\r
404 /* We have some completions. */
\r
406 while( done_wclist )
\r
411 complete_wq( done_wclist, &info[cb_idx++] );
\r
413 done_wclist = done_wclist->p_next;
\r
416 for( i = 0; i < cb_idx; i++ )
\r
423 IBSP_TRACE1( IBSP_DBG_IO,
\r
424 ("Calling lpWPUCompleteOverlappedRequest: "
\r
425 "socket=%p, ov=%p OffsetHigh=%d "
\r
426 "InternalHigh=%d hEvent=%p\n",
\r
427 info[i].p_socket, info[i].p_ov, info[i].p_ov->OffsetHigh,
\r
428 info[i].p_ov->InternalHigh, info[i].p_ov->hEvent) );
\r
430 ret = g_ibsp.up_call_table.lpWPUCompleteOverlappedRequest(
\r
431 info[i].p_socket->switch_socket, info[i].p_ov,
\r
432 info[i].p_ov->OffsetHigh,
\r
433 (DWORD)info[i].p_ov->InternalHigh, &error );
\r
436 IBSP_ERROR( ("WPUCompleteOverlappedRequest for ov=%p "
\r
437 "returned %d err %d\n", info[i].p_ov, ret, error) );
\r
439 deref_socket_info( info[i].p_socket );
\r
446 if( comp_count > g_ibsp.max_comp_count )
\r
448 g_ibsp.max_comp_count = comp_count;
\r
451 } while( !free_wclist );
\r
456 fzprint(("%s():%d:0x%x:0x%x: overlap_h0_count=%d overlap_h1_count=%d\n",
\r
458 __LINE__, GetCurrentProcessId(),
\r
459 GetCurrentThreadId(), g_ibsp.overlap_h0_count, g_ibsp.overlap_h1_count));
\r
462 IBSP_EXIT( IBSP_DBG_WQ );
\r
467 /* IB completion thread */
\r
468 static DWORD WINAPI
\r
470 LPVOID lpParameter )
\r
472 struct cq_thread_info *cq_tinfo = (struct cq_thread_info *)lpParameter;
\r
473 cl_status_t cl_status;
\r
474 ib_api_status_t status;
\r
477 IBSP_ENTER( IBSP_DBG_HW );
\r
479 fzprint(("%s():%d:0x%x:0x%x: cq_tinfo=0x%p\n", __FUNCTION__,
\r
480 __LINE__, GetCurrentProcessId(), GetCurrentThreadId(), cq_tinfo));
\r
484 cl_status = cl_waitobj_wait_on( cq_tinfo->cq_waitobj, EVENT_NO_TIMEOUT, TRUE );
\r
485 if( cl_status != CL_SUCCESS )
\r
488 ("cl_waitobj_wait_on() (%d)\n", cl_status) );
\r
492 * TODO: By rearranging thread creation and cq creation, this check
\r
493 * may be eliminated.
\r
495 if( cq_tinfo->cq != NULL )
\r
497 fzprint(("%s():%d:0x%x:0x%x: Calling ib_cq_comp().\n", __FUNCTION__,
\r
498 __LINE__, GetCurrentProcessId(), GetCurrentThreadId()));
\r
500 #ifdef PERFMON_ENABLED
\r
501 InterlockedIncrement64( &g_pm_stat.pdata[INTR_TOTAL] );
\r
506 if( ib_cq_comp( cq_tinfo ) )
\r
511 fzprint(("%s():%d:0x%x:0x%x: Done calling ib_cq_comp().\n", __FUNCTION__,
\r
512 __LINE__, GetCurrentProcessId(), GetCurrentThreadId()));
\r
514 status = ib_rearm_cq( cq_tinfo->cq, FALSE );
\r
515 if( status != IB_SUCCESS )
\r
518 ("ib_rearm_cq returned %s)\n", ib_get_err_str( status )) );
\r
522 } while( !cq_tinfo->ib_cq_thread_exit_wanted );
\r
524 cl_status = cl_waitobj_destroy( cq_tinfo->cq_waitobj );
\r
525 if( cl_status != CL_SUCCESS )
\r
528 ("cl_waitobj_destroy() returned %s\n", CL_STATUS_MSG(cl_status)) );
\r
530 HeapFree( g_ibsp.heap, 0, cq_tinfo );
\r
532 /* No special exit code, even on errors. */
\r
533 IBSP_EXIT( IBSP_DBG_HW );
\r
538 /* Called with the HCA's CQ lock held. */
\r
539 static struct cq_thread_info *
\r
541 struct ibsp_hca *hca )
\r
543 struct cq_thread_info *cq_tinfo = NULL;
\r
544 ib_cq_create_t cq_create;
\r
545 ib_api_status_t status;
\r
546 cl_status_t cl_status;
\r
548 IBSP_ENTER( IBSP_DBG_HW );
\r
550 cq_tinfo = HeapAlloc(
\r
551 g_ibsp.heap, HEAP_ZERO_MEMORY, sizeof(struct cq_thread_info) );
\r
555 IBSP_ERROR_EXIT( ("HeapAlloc() Failed.\n") );
\r
559 cl_status = cl_waitobj_create( FALSE, &cq_tinfo->cq_waitobj );
\r
560 if( cl_status != CL_SUCCESS )
\r
562 cq_tinfo->cq_waitobj = NULL;
\r
563 ib_destroy_cq_tinfo( cq_tinfo );
\r
565 ("cl_waitobj_create() returned %s\n", CL_STATUS_MSG(cl_status)) );
\r
569 cq_tinfo->hca = hca;
\r
570 cq_tinfo->ib_cq_thread_exit_wanted = FALSE;
\r
572 cq_tinfo->ib_cq_thread = CreateThread( NULL, 0, ib_cq_thread, cq_tinfo, 0,
\r
573 (LPDWORD)&cq_tinfo->ib_cq_thread_id );
\r
575 if( cq_tinfo->ib_cq_thread == NULL )
\r
577 ib_destroy_cq_tinfo( cq_tinfo );
\r
578 IBSP_ERROR_EXIT( ("CreateThread failed (%d)", GetLastError()) );
\r
582 STAT_INC( thread_num );
\r
584 /* Completion queue */
\r
585 cq_create.size = IB_INIT_CQ_SIZE;
\r
587 cq_create.pfn_comp_cb = NULL;
\r
588 cq_create.h_wait_obj = cq_tinfo->cq_waitobj;
\r
590 status = ib_create_cq( hca->hca_handle, &cq_create, cq_tinfo,
\r
591 NULL, &cq_tinfo->cq );
\r
594 ib_destroy_cq_tinfo( cq_tinfo );
\r
596 ("ib_create_cq returned %s\n", ib_get_err_str( status )) );
\r
600 STAT_INC( cq_num );
\r
602 status = ib_rearm_cq( cq_tinfo->cq, FALSE );
\r
605 ib_destroy_cq_tinfo( cq_tinfo );
\r
607 ("ib_rearm_cq returned %s\n", ib_get_err_str( status )) );
\r
611 cq_tinfo->cqe_size = cq_create.size;
\r
613 if( hca->cq_tinfo )
\r
615 __cl_primitive_insert(
\r
616 &hca->cq_tinfo->list_item, &cq_tinfo->list_item );
\r
620 /* Setup the list entry to point to itself. */
\r
621 cq_tinfo->list_item.p_next = &cq_tinfo->list_item;
\r
622 cq_tinfo->list_item.p_prev = &cq_tinfo->list_item;
\r
625 /* We will be assigned to a QP - set the QP count. */
\r
626 cq_tinfo->qp_count = 1;
\r
628 /* Upon allocation, the new CQ becomes the primary. */
\r
629 hca->cq_tinfo = cq_tinfo;
\r
631 IBSP_EXIT( IBSP_DBG_HW );
\r
637 ib_destroy_cq_tinfo(
\r
638 struct cq_thread_info *cq_tinfo )
\r
641 ib_wc_t *free_wclist;
\r
642 ib_wc_t *done_wclist;
\r
643 ib_api_status_t status;
\r
644 HANDLE h_cq_thread;
\r
645 DWORD cq_thread_id;
\r
647 IBSP_ENTER( IBSP_DBG_HW );
\r
649 CL_ASSERT( cq_tinfo );
\r
650 CL_ASSERT( cq_tinfo->qp_count == 0 );
\r
654 wclist.p_next = NULL;
\r
655 free_wclist = &wclist;
\r
658 cq_tinfo->cq, &free_wclist, &done_wclist ) == IB_SUCCESS )
\r
660 IBSP_TRACE1( IBSP_DBG_WQ,
\r
661 ("free=%p, done=%p\n", free_wclist, done_wclist) );
\r
664 IBSP_TRACE4( IBSP_DBG_WQ, ("ib_destroy_cq() start..\n") );
\r
667 * Called from cleanup thread, okay to block.
\r
669 status = ib_destroy_cq( cq_tinfo->cq, ib_sync_destroy );
\r
673 ("ib_destroy_cq returned %s\n", ib_get_err_str( status )) );
\r
677 IBSP_TRACE4( IBSP_DBG_WQ, ("ib_destroy_cq() finished.\n") );
\r
679 cq_tinfo->cq = NULL;
\r
681 STAT_DEC( cq_num );
\r
685 if( cq_tinfo->ib_cq_thread )
\r
687 /* ib_cq_thread() will release the cq_tinfo before exit. Don't
\r
688 reference cq_tinfo after signaling */
\r
689 h_cq_thread = cq_tinfo->ib_cq_thread;
\r
690 cq_tinfo->ib_cq_thread = NULL;
\r
691 cq_thread_id = cq_tinfo->ib_cq_thread_id;
\r
693 cq_tinfo->ib_cq_thread_exit_wanted = TRUE;
\r
694 cl_waitobj_signal( cq_tinfo->cq_waitobj );
\r
696 /* Wait for ib_cq_thread to die, if we are not running on it */
\r
697 if( GetCurrentThreadId() != cq_thread_id )
\r
699 fzprint(("%s():%d:0x%x:0x%x: Waiting for ib_cq_thread=0x%x to die\n",
\r
700 __FUNCTION__, __LINE__, GetCurrentProcessId(), GetCurrentThreadId(),
\r
702 if( WaitForSingleObject( h_cq_thread, INFINITE ) != WAIT_OBJECT_0 )
\r
704 IBSP_ERROR( ("WaitForSingleObject failed\n") );
\r
708 STAT_DEC( thread_num );
\r
713 fzprint(("%s():%d:0x%x:0x%x: Currently on ib_cq_thread.\n", __FUNCTION__,
\r
714 __LINE__, GetCurrentProcessId(), GetCurrentThreadId()));
\r
715 STAT_DEC( thread_num );
\r
717 CloseHandle( h_cq_thread );
\r
721 /* There was no thread created, destroy cq_waitobj and
\r
723 if( cq_tinfo->cq_waitobj )
\r
725 cl_waitobj_destroy( cq_tinfo->cq_waitobj );
\r
726 cq_tinfo->cq_waitobj = NULL;
\r
728 HeapFree( g_ibsp.heap, 0, cq_tinfo );
\r
731 IBSP_EXIT( IBSP_DBG_HW );
\r
735 static struct cq_thread_info *
\r
736 ib_acquire_cq_tinfo(
\r
737 struct ibsp_hca *hca )
\r
739 struct cq_thread_info *cq_tinfo = NULL, *cq_end;
\r
741 ib_api_status_t status;
\r
743 IBSP_ENTER( IBSP_DBG_HW );
\r
745 cl_spinlock_acquire( &hca->cq_lock );
\r
747 if( !hca->cq_tinfo )
\r
749 cq_tinfo = ib_alloc_cq_tinfo( hca );
\r
751 IBSP_ERROR( ("ib_alloc_cq_tinfo() failed\n") );
\r
752 cl_spinlock_release( &hca->cq_lock );
\r
753 IBSP_EXIT( IBSP_DBG_HW );
\r
757 cq_tinfo = hca->cq_tinfo;
\r
759 cqe_size = (cq_tinfo->qp_count + 1) * IB_CQ_SIZE;
\r
763 if( cq_tinfo->cqe_size >= cqe_size )
\r
765 cq_tinfo->qp_count++;
\r
766 cl_spinlock_release( &hca->cq_lock );
\r
767 IBSP_EXIT( IBSP_DBG_HW );
\r
771 status = ib_modify_cq( cq_tinfo->cq, &cqe_size );
\r
775 cq_tinfo->cqe_size = cqe_size;
\r
776 cq_tinfo->qp_count++;
\r
781 ("ib_modify_cq() returned %s\n", ib_get_err_str(status)) );
\r
782 case IB_INVALID_CQ_SIZE:
\r
783 case IB_UNSUPPORTED:
\r
784 cq_tinfo = PARENT_STRUCT(
\r
785 cl_qlist_next( &cq_tinfo->list_item ), struct cq_thread_info,
\r
787 cqe_size = (cq_tinfo->qp_count + 1) * IB_CQ_SIZE;
\r
790 } while( cq_tinfo != cq_end );
\r
792 if( cq_tinfo == cq_end )
\r
793 cq_tinfo = ib_alloc_cq_tinfo( hca );
\r
795 cl_spinlock_release( &hca->cq_lock );
\r
796 IBSP_EXIT( IBSP_DBG_HW );
\r
801 ib_release_cq_tinfo(
\r
802 struct cq_thread_info *cq_tinfo )
\r
804 IBSP_ENTER( IBSP_DBG_HW );
\r
806 CL_ASSERT( cq_tinfo );
\r
807 CL_ASSERT( cq_tinfo->hca );
\r
809 cl_spinlock_acquire( &cq_tinfo->hca->cq_lock );
\r
810 /* If this CQ now has fewer QPs than the primary, make it the primary. */
\r
811 if( --cq_tinfo->qp_count < cq_tinfo->hca->cq_tinfo->qp_count )
\r
812 cq_tinfo->hca->cq_tinfo = cq_tinfo;
\r
813 cl_spinlock_release( &cq_tinfo->hca->cq_lock );
\r
815 IBSP_EXIT( IBSP_DBG_HW );
\r
819 /* Release IB ressources. */
\r
823 cl_fmap_item_t *p_item;
\r
825 IBSP_ENTER( IBSP_DBG_HW );
\r
827 if( g_ibsp.al_handle )
\r
829 cl_list_item_t *item;
\r
830 ib_api_status_t status;
\r
834 while( (item = cl_qlist_head( &g_ibsp.hca_list )) != cl_qlist_end( &g_ibsp.hca_list ) )
\r
836 struct ibsp_hca *hca = PARENT_STRUCT(item, struct ibsp_hca, item);
\r
838 pnp_ca_remove( hca );
\r
841 fzprint(("%s():%d:0x%x:0x%x: Calling ib_close_al...\n", __FUNCTION__,
\r
842 __LINE__, GetCurrentProcessId(), GetCurrentThreadId()));
\r
844 status = ib_close_al( g_ibsp.al_handle );
\r
846 fzprint(("%s():%d:0x%x:0x%x: Done calling ib_close_al, status=%d.\n",
\r
847 __FUNCTION__, __LINE__, GetCurrentProcessId(), GetCurrentThreadId(),
\r
849 if( status != IB_SUCCESS )
\r
852 ("ib_close_al returned %s\n", ib_get_err_str( status )) );
\r
856 IBSP_TRACE( IBSP_DBG_HW, ("ib_close_al success\n") );
\r
857 STAT_DEC( al_num );
\r
859 g_ibsp.al_handle = NULL;
\r
862 for( p_item = cl_fmap_head( &g_ibsp.ip_map );
\r
863 p_item != cl_fmap_end( &g_ibsp.ip_map );
\r
864 p_item = cl_fmap_head( &g_ibsp.ip_map ) )
\r
866 cl_fmap_remove_item( &g_ibsp.ip_map, p_item );
\r
868 HeapFree( g_ibsp.heap, 0,
\r
869 PARENT_STRUCT(p_item, struct ibsp_ip_addr, item) );
\r
872 IBSP_EXIT( IBSP_DBG_HW );
\r
876 /* Initialize IB ressources. */
\r
878 ibsp_initialize(void)
\r
880 ib_api_status_t status;
\r
883 IBSP_ENTER( IBSP_DBG_HW );
\r
885 CL_ASSERT( g_ibsp.al_handle == NULL );
\r
886 CL_ASSERT( cl_qlist_count( &g_ibsp.hca_list ) == 0 );
\r
888 /* Open the IB library */
\r
889 status = ib_open_al( &g_ibsp.al_handle );
\r
891 IBSP_TRACE( IBSP_DBG_HW, ("open is %d %p\n", status, g_ibsp.al_handle) );
\r
893 if( status != IB_SUCCESS )
\r
895 IBSP_ERROR( ("ib_open_al failed (%d)\n", status) );
\r
896 ret = WSAEPROVIDERFAILEDINIT;
\r
900 STAT_INC( al_num );
\r
902 /* Register for PNP events */
\r
903 status = register_pnp();
\r
906 IBSP_ERROR( ("register_pnp failed (%d)\n", status) );
\r
907 ret = WSAEPROVIDERFAILEDINIT;
\r
911 STAT_INC( thread_num );
\r
917 /* Free up resources. */
\r
921 IBSP_EXIT( IBSP_DBG_HW );
\r
927 /* Destroys the infiniband ressources of a socket. */
\r
930 IN OUT struct ibsp_socket_info *socket_info )
\r
932 ib_api_status_t status;
\r
934 IBSP_ENTER( IBSP_DBG_EP );
\r
936 if( socket_info->qp )
\r
938 cl_atomic_inc( &socket_info->ref_cnt );
\r
939 status = ib_destroy_qp( socket_info->qp, deref_socket_info );
\r
940 if( status != IB_SUCCESS )
\r
942 IBSP_ERROR( ("ib_destroy_qp returned %s\n",
\r
943 ib_get_err_str( status )) );
\r
944 deref_socket_info( socket_info );
\r
947 ib_release_cq_tinfo( socket_info->cq_tinfo );
\r
949 socket_info->qp = NULL;
\r
952 IBSP_EXIT( IBSP_DBG_EP );
\r
957 * Creates the necessary IB ressources for a socket
\r
961 IN OUT struct ibsp_socket_info *socket_info)
\r
963 ib_qp_create_t qp_create;
\r
964 ib_api_status_t status;
\r
965 ib_qp_attr_t qp_attr;
\r
967 IBSP_ENTER( IBSP_DBG_EP );
\r
969 CL_ASSERT( socket_info != NULL );
\r
970 CL_ASSERT( socket_info->port != NULL );
\r
971 CL_ASSERT( socket_info->qp == NULL );
\r
973 socket_info->hca_pd = socket_info->port->hca->pd;
\r
975 /* Get the completion queue and thread info for this socket */
\r
976 socket_info->cq_tinfo = ib_acquire_cq_tinfo( socket_info->port->hca );
\r
977 if( !socket_info->cq_tinfo )
\r
979 IBSP_ERROR_EXIT( ("ib_acquire_cq_tinfo failed\n") );
\r
984 qp_create.qp_type = IB_QPT_RELIABLE_CONN;
\r
985 qp_create.sq_depth = QP_ATTRIB_SQ_DEPTH;
\r
986 qp_create.rq_depth = QP_ATTRIB_RQ_DEPTH;
\r
987 qp_create.sq_sge = QP_ATTRIB_SQ_SGE;
\r
988 qp_create.rq_sge = 1;
\r
989 qp_create.h_rq_cq = socket_info->cq_tinfo->cq;
\r
990 qp_create.h_sq_cq = socket_info->cq_tinfo->cq;
\r
991 qp_create.sq_signaled = TRUE;
\r
993 status = ib_create_qp( socket_info->hca_pd, &qp_create, socket_info, /* context */
\r
994 NULL, /* async handler */
\r
995 &socket_info->qp );
\r
998 ib_release_cq_tinfo( socket_info->cq_tinfo );
\r
1000 ("ib_create_qp returned %s\n", ib_get_err_str( status )) );
\r
1001 return WSAENOBUFS;
\r
1004 status = ib_query_qp( socket_info->qp, &qp_attr );
\r
1005 if( status == IB_SUCCESS )
\r
1007 socket_info->max_inline = min( g_max_inline, qp_attr.sq_max_inline );
\r
1011 IBSP_ERROR( ("ib_query_qp returned %s\n", ib_get_err_str( status )) );
\r
1012 socket_info->max_inline = 0;
\r
1015 STAT_INC( qp_num );
\r
1017 IBSP_EXIT( IBSP_DBG_EP );
\r
1024 IN OUT struct ibsp_socket_info *socket_info )
\r
1026 IBSP_ENTER( IBSP_DBG_EP );
\r
1028 if( socket_info->cq_tinfo == NULL )
\r
1030 IBSP_EXIT( IBSP_DBG_EP );
\r
1034 /* Wait for the QP to be drained. */
\r
1035 while( socket_info->send_cnt || socket_info->recv_cnt )
\r
1037 fzprint(("%s():%d:0x%x:0x%x: socket=0x%p wr_list_count=%d qp state=%d\n",
\r
1038 __FUNCTION__, __LINE__, GetCurrentProcessId(), GetCurrentThreadId(),
\r
1039 socket_info, cl_qlist_count(&socket_info->wr_list)));
\r
1044 IBSP_EXIT( IBSP_DBG_EP );
\r
1049 ibsp_dup_overlap_abort(
\r
1050 IN OUT struct ibsp_socket_info *socket_info )
\r
1052 LPWSAOVERLAPPED lpOverlapped = NULL;
\r
1057 IBSP_ENTER( IBSP_DBG_EP );
\r
1058 CL_ASSERT( !socket_info->send_cnt && !socket_info->recv_cnt );
\r
1060 /* Browse the list of all posted overlapped structures
\r
1061 * to mark them as aborted. */
\r
1062 idx = socket_info->dup_idx - (uint8_t)socket_info->dup_cnt;
\r
1063 if( idx >= QP_ATTRIB_RQ_DEPTH )
\r
1064 idx += QP_ATTRIB_RQ_DEPTH;
\r
1066 while( socket_info->dup_cnt )
\r
1068 lpOverlapped = socket_info->dup_wr[idx].wr.lpOverlapped;
\r
1070 fzprint(("%s():%d:0x%x:0x%x: socket=0x%p wr=0x%p overlapped=0x%p Internal=%d InternalHigh=%d hEvent=%d\n",
\r
1071 __FUNCTION__, __LINE__, GetCurrentProcessId(), GetCurrentThreadId(), socket_info, &socket_info->dup_wr[idx], lpOverlapped, lpOverlapped->Internal, lpOverlapped->InternalHigh, lpOverlapped->hEvent));
\r
1073 lpOverlapped->OffsetHigh = WSAECONNABORTED;
\r
1074 lpOverlapped->InternalHigh = 0;
\r
1076 if( ((uintptr_t) lpOverlapped->hEvent) & 0x00000001 )
\r
1078 /* Indicate this operation is complete. The switch will poll
\r
1079 * with calls to WSPGetOverlappedResult(). */
\r
1081 cl_atomic_dec(&g_ibsp.overlap_h1_comp_count);
\r
1083 fzprint(("%s():%d:0x%x:0x%x: ov=0x%p h0=%d h1=%d h1_c=%d send=%d recv=%d\n",
\r
1084 __FUNCTION__, __LINE__, GetCurrentProcessId(),
\r
1085 GetCurrentThreadId(), lpOverlapped,
\r
1086 g_ibsp.overlap_h0_count, g_ibsp.overlap_h1_count,
\r
1087 g_ibsp.overlap_h1_comp_count, g_ibsp.send_count, g_ibsp.recv_count));
\r
1090 CL_TRACE(IBSP_DBG_WQ, gdbg_lvl,
\r
1091 ("%s: set internal overlapped=0x%p Internal=%d OffsetHigh=%d\n",
\r
1092 __FUNCTION__, lpOverlapped, lpOverlapped->Internal,
\r
1093 lpOverlapped->OffsetHigh));
\r
1095 lpOverlapped->Internal = 0;
\r
1100 cl_atomic_dec(&g_ibsp.overlap_h0_count);
\r
1103 fzprint(("%s():%d:0x%x:0x%x: ov=0x%p h0=%d h1=%d h1_c=%d send=%d recv=%d\n",
\r
1104 __FUNCTION__, __LINE__, GetCurrentProcessId(),
\r
1105 GetCurrentThreadId(), lpOverlapped,
\r
1106 g_ibsp.overlap_h0_count, g_ibsp.overlap_h1_count,
\r
1107 g_ibsp.overlap_h1_comp_count, g_ibsp.send_count, g_ibsp.recv_count));
\r
1109 CL_TRACE(IBSP_DBG_WQ, gdbg_lvl,
\r
1110 ("%s: calls lpWPUCompleteOverlappedRequest, overlapped=0x%p OffsetHigh=%d InternalHigh=%d hEvent=%d\n",
\r
1111 __FUNCTION__, lpOverlapped, lpOverlapped->OffsetHigh,
\r
1112 lpOverlapped->InternalHigh, lpOverlapped->hEvent));
\r
1114 ret = g_ibsp.up_call_table.lpWPUCompleteOverlappedRequest
\r
1115 (socket_info->switch_socket,
\r
1117 lpOverlapped->OffsetHigh, (DWORD) lpOverlapped->InternalHigh, &error);
\r
1121 CL_ERROR(IBSP_DBG_EP, gdbg_lvl,
\r
1122 ("lpWPUCompleteOverlappedRequest failed with %d/%d\n", ret,
\r
1126 cl_atomic_dec( &socket_info->dup_cnt );
\r
1129 IBSP_EXIT( IBSP_DBG_EP );
\r
1133 /* Closes a connection and release its ressources. */
\r
1135 shutdown_and_destroy_socket_info(
\r
1136 IN OUT struct ibsp_socket_info *socket_info )
\r
1138 enum ibsp_socket_state old_state;
\r
1140 IBSP_ENTER( IBSP_DBG_EP );
\r
1142 cl_spinlock_acquire( &socket_info->mutex );
\r
1143 old_state = socket_info->socket_state;
\r
1144 IBSP_CHANGE_SOCKET_STATE( socket_info, IBSP_CLOSED );
\r
1145 cl_spinlock_release( &socket_info->mutex );
\r
1147 if( socket_info->listen.handle )
\r
1149 /* Stop listening and reject queued connections. */
\r
1150 ib_listen_cancel( socket_info );
\r
1153 cl_spinlock_acquire( &g_ibsp.socket_info_mutex );
\r
1154 cl_qlist_remove_item( &g_ibsp.socket_info_list, &socket_info->item );
\r
1156 switch( old_state )
\r
1160 /* Nothing to do. */
\r
1163 case IBSP_CONNECTED:
\r
1165 struct disconnect_reason reason;
\r
1167 memset( &reason, 0, sizeof(reason) );
\r
1168 reason.type = DISC_SHUTDOWN;
\r
1169 ib_disconnect( socket_info, &reason );
\r
1171 /* Fall through. */
\r
1173 case IBSP_CONNECT:
\r
1174 case IBSP_DISCONNECTED:
\r
1175 /* We changed the state - remove from connection map. */
\r
1176 CL_ASSERT( socket_info->conn_item.p_map );
\r
1177 cl_rbmap_remove_item( &g_ibsp.conn_map, &socket_info->conn_item );
\r
1180 cl_spinlock_release( &g_ibsp.socket_info_mutex );
\r
1182 /* Flush all completions. */
\r
1183 if( socket_info->dup_cnt )
\r
1184 ibsp_dup_overlap_abort( socket_info );
\r
1186 while( socket_info->send_cnt || socket_info->recv_cnt )
\r
1187 ib_cq_comp( socket_info->cq_tinfo );
\r
1189 ibsp_dereg_socket( socket_info );
\r
1191 ib_destroy_socket( socket_info );
\r
1193 #ifdef IBSP_LOGGING
\r
1194 DataLogger_Shutdown(&socket_info->SendDataLogger);
\r
1195 DataLogger_Shutdown(&socket_info->RecvDataLogger);
\r
1198 /* Release the initial reference and clean up. */
\r
1199 deref_socket_info( socket_info );
\r
1201 IBSP_EXIT( IBSP_DBG_EP );
\r
1207 IN struct ibsp_socket_info *s )
\r
1209 struct ibsp_socket_info *p_sock;
\r
1210 cl_rbmap_item_t *p_item, *p_insert_at;
\r
1211 boolean_t left = TRUE;
\r
1213 cl_spinlock_acquire( &g_ibsp.socket_info_mutex );
\r
1214 p_item = cl_rbmap_root( &g_ibsp.conn_map );
\r
1215 p_insert_at = p_item;
\r
1217 CL_ASSERT( !s->conn_item.p_map );
\r
1218 while( p_item != cl_rbmap_end( &g_ibsp.conn_map ) )
\r
1220 p_insert_at = p_item;
\r
1221 p_sock = PARENT_STRUCT( p_item, struct ibsp_socket_info, conn_item );
\r
1222 if( p_sock->local_addr.sin_family < s->local_addr.sin_family )
\r
1223 p_item = cl_rbmap_left( p_item ), left = TRUE;
\r
1224 else if( p_sock->local_addr.sin_family > s->local_addr.sin_family )
\r
1225 p_item = cl_rbmap_right( p_item ), left = FALSE;
\r
1226 else if( p_sock->local_addr.sin_addr.S_un.S_addr < s->local_addr.sin_addr.S_un.S_addr )
\r
1227 p_item = cl_rbmap_left( p_item ), left = TRUE;
\r
1228 else if( p_sock->local_addr.sin_addr.S_un.S_addr > s->local_addr.sin_addr.S_un.S_addr )
\r
1229 p_item = cl_rbmap_right( p_item ), left = FALSE;
\r
1230 else if( p_sock->local_addr.sin_port < s->local_addr.sin_port )
\r
1231 p_item = cl_rbmap_left( p_item ), left = TRUE;
\r
1232 else if( p_sock->local_addr.sin_port > s->local_addr.sin_port )
\r
1233 p_item = cl_rbmap_right( p_item ), left = FALSE;
\r
1234 else if( p_sock->peer_addr.sin_family < s->peer_addr.sin_family )
\r
1235 p_item = cl_rbmap_left( p_item ), left = TRUE;
\r
1236 else if( p_sock->peer_addr.sin_family > s->peer_addr.sin_family )
\r
1237 p_item = cl_rbmap_right( p_item ), left = FALSE;
\r
1238 else if( p_sock->peer_addr.sin_addr.S_un.S_addr < s->peer_addr.sin_addr.S_un.S_addr )
\r
1239 p_item = cl_rbmap_left( p_item ), left = TRUE;
\r
1240 else if( p_sock->peer_addr.sin_addr.S_un.S_addr > s->peer_addr.sin_addr.S_un.S_addr )
\r
1241 p_item = cl_rbmap_right( p_item ), left = FALSE;
\r
1242 else if( p_sock->peer_addr.sin_port < s->peer_addr.sin_port )
\r
1243 p_item = cl_rbmap_left( p_item ), left = TRUE;
\r
1244 else if( p_sock->peer_addr.sin_port > s->peer_addr.sin_port )
\r
1245 p_item = cl_rbmap_right( p_item ), left = FALSE;
\r
1250 cl_rbmap_insert( &g_ibsp.conn_map, p_insert_at, &s->conn_item, left );
\r
1253 cl_spinlock_release( &g_ibsp.socket_info_mutex );
\r
1254 return p_item == cl_rbmap_end( &g_ibsp.conn_map );
\r
1260 IN struct ibsp_socket_info *s )
\r
1262 cl_spinlock_acquire( &g_ibsp.socket_info_mutex );
\r
1263 CL_ASSERT( s->conn_item.p_map );
\r
1264 cl_rbmap_remove_item( &g_ibsp.conn_map, &s->conn_item );
\r
1265 cl_spinlock_release( &g_ibsp.socket_info_mutex );
\r