- Fixes hang in TCP CLOSE/CLOSE_WAIT stages
[mirror/scst/.git] / iscsi-scst / kernel / nthread.c
1 /*
2  *  Network threads.
3  *
4  *  Copyright (C) 2004 - 2005 FUJITA Tomonori <tomof@acm.org>
5  *  Copyright (C) 2007 Vladislav Bolkhovitin
6  *  Copyright (C) 2007 CMS Distribution Limited
7  * 
8  *  This program is free software; you can redistribute it and/or
9  *  modify it under the terms of the GNU General Public License
10  *  as published by the Free Software Foundation.
11  * 
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  *  GNU General Public License for more details.
16  */
17
18 #include <linux/sched.h>
19 #include <linux/file.h>
20 #include <linux/kthread.h>
21 #include <asm/ioctls.h>
22 #include <linux/delay.h>
23 #include <net/tcp.h>
24
25 #include "iscsi.h"
26 #include "digest.h"
27
28 enum rx_state {
29         RX_INIT_BHS, /* Must be zero. */
30         RX_BHS,
31
32         RX_INIT_AHS,
33         RX_AHS,
34
35         RX_INIT_HDIGEST,
36         RX_HDIGEST,
37         RX_CHECK_HDIGEST,
38
39         RX_INIT_DATA,
40         RX_DATA,
41
42         RX_INIT_DDIGEST,
43         RX_DDIGEST,
44         RX_CHECK_DDIGEST,
45
46         RX_END,
47 };
48
49 enum tx_state {
50         TX_INIT, /* Must be zero. */
51         TX_BHS_DATA,
52         TX_INIT_DDIGEST,
53         TX_DDIGEST,
54         TX_END,
55 };
56
57 #if defined(NET_PAGE_CALLBACKS_DEFINED)
58 static void iscsi_check_closewait(struct iscsi_conn *conn)
59 {
60         struct iscsi_cmnd *cmnd;
61
62         TRACE_ENTRY();
63
64         if ((conn->sock->sk->sk_state != TCP_CLOSE_WAIT) &&
65             (conn->sock->sk->sk_state != TCP_CLOSE)) {
66                 TRACE_CONN_CLOSE_DBG("sk_state %d, skipping",
67                         conn->sock->sk->sk_state);
68                 goto out;
69         }
70
71         /*
72          * No data are going to be sent, so all being sent buffers can be freed
73          * now. Strange that TCP doesn't do that itself.
74          */
75
76 again:
77         spin_lock_bh(&conn->cmd_list_lock);
78         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
79                 TRACE_CONN_CLOSE_DBG("cmd %p, scst_state %x, data_waiting %d, "
80                         "ref_cnt %d, parent_req %p, net_ref_cnt %d, sg %p",
81                         cmnd, cmnd->scst_state, cmnd->data_waiting,
82                         atomic_read(&cmnd->ref_cnt), cmnd->parent_req,
83                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
84                 sBUG_ON(cmnd->parent_req != NULL);
85                 if (cmnd->sg != NULL) {
86                         int sg_cnt, i, restart = 0;
87                         sg_cnt = get_pgcnt(cmnd->bufflen,
88                                 cmnd->sg[0].offset);
89                         cmnd_get(cmnd);
90                         for(i = 0; i < sg_cnt; i++) {
91                                 TRACE_CONN_CLOSE_DBG("page %p, net_priv %p, _count %d",
92                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
93                                         atomic_read(&cmnd->sg[i].page->_count));
94                                 if (cmnd->sg[i].page->net_priv != NULL) {
95                                         if (restart == 0) {
96                                                 spin_unlock_bh(&conn->cmd_list_lock);
97                                                 restart = 1;
98                                         }
99                                         while(cmnd->sg[i].page->net_priv != NULL)
100                                                 iscsi_put_page_callback(cmnd->sg[i].page);
101                                 }
102                         }
103                         cmnd_put(cmnd);
104                         if (restart)
105                                 goto again;
106                 }
107         }
108         spin_unlock_bh(&conn->cmd_list_lock);
109
110 out:
111         TRACE_EXIT();
112         return;
113 }
114 #else
115 static inline void iscsi_check_closewait(struct iscsi_conn *conn) {};
116 #endif
117
118 /* No locks */
119 static void close_conn(struct iscsi_conn *conn)
120 {
121         struct iscsi_session *session = conn->session;
122         struct iscsi_target *target = conn->target;
123
124         TRACE_ENTRY();
125
126         TRACE_CONN_CLOSE("Closing connection %p (conn_ref_cnt=%d)", conn,
127                 atomic_read(&conn->conn_ref_cnt));
128
129         iscsi_extracheck_is_rd_thread(conn);
130
131         /* We want all our already send operations to complete */
132         conn->sock->ops->shutdown(conn->sock, RCV_SHUTDOWN);
133
134         conn_abort(conn);
135
136         if (conn->read_state != RX_INIT_BHS) {
137                 req_cmnd_release_force(conn->read_cmnd, 0);
138                 conn->read_cmnd = NULL;
139                 conn->read_state = RX_INIT_BHS;
140         }
141
142         /* ToDo: not the best way to wait */
143         while(atomic_read(&conn->conn_ref_cnt) != 0) {
144                 struct iscsi_cmnd *cmnd;
145
146                 if (!list_empty(&session->pending_list)) {
147                         struct list_head *pending_list = &session->pending_list;
148                         struct iscsi_cmnd *tmp;
149
150                         TRACE_CONN_CLOSE("Disposing pending commands on "
151                                 "connection %p (conn_ref_cnt=%d)", conn,
152                                 atomic_read(&conn->conn_ref_cnt));
153  
154                         list_for_each_entry_safe(cmnd, tmp, pending_list,
155                                                 pending_list_entry) {
156                                 if (cmnd->conn == conn) {
157                                         TRACE_CONN_CLOSE("Freeing pending cmd %p",
158                                                 cmnd);
159                                         list_del(&cmnd->pending_list_entry);
160                                         cmnd->pending = 0;
161                                         req_cmnd_release_force(cmnd, 0);
162                                 }
163                         }
164                 }
165
166                 iscsi_make_conn_wr_active(conn);
167                 msleep(50);
168
169                 TRACE_CONN_CLOSE("conn %p, conn_ref_cnt %d left, wr_state %d",
170                         conn, atomic_read(&conn->conn_ref_cnt), conn->wr_state);
171 #ifdef DEBUG
172                 {
173 #ifdef NET_PAGE_CALLBACKS_DEFINED
174                         struct iscsi_cmnd *rsp;
175 #endif
176                         spin_lock_bh(&conn->cmd_list_lock);
177                         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
178                                 TRACE_CONN_CLOSE_DBG("cmd %p, scst_state %x, data_waiting "
179                                         "%d, ref_cnt %d, parent_req %p", cmnd,
180                                         cmnd->scst_state, cmnd->data_waiting,
181                                         atomic_read(&cmnd->ref_cnt), cmnd->parent_req);
182 #ifdef NET_PAGE_CALLBACKS_DEFINED
183                                 TRACE_CONN_CLOSE_DBG("net_ref_cnt %d, sg %p",
184                                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
185                                 if (cmnd->sg != NULL) {
186                                         int sg_cnt, i;
187                                         sg_cnt = get_pgcnt(cmnd->bufflen,
188                                                 cmnd->sg[0].offset);
189                                         for(i = 0; i < sg_cnt; i++) {
190                                                 TRACE_CONN_CLOSE_DBG("page %p, net_priv %p, _count %d",
191                                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
192                                                         atomic_read(&cmnd->sg[i].page->_count));
193                                         }
194                                 }
195
196                                 sBUG_ON(cmnd->parent_req != NULL);
197                                 
198                                 spin_lock_bh(&cmnd->rsp_cmd_lock);
199                                 list_for_each_entry(rsp, &cmnd->rsp_cmd_list, rsp_cmd_list_entry) {
200                                         TRACE_CONN_CLOSE_DBG("  rsp %p, ref_cnt %d, net_ref_cnt %d, "
201                                                 "sg %p", rsp, atomic_read(&rsp->ref_cnt),
202                                                 atomic_read(&rsp->net_ref_cnt), rsp->sg);
203                                         if ((rsp->sg != cmnd->sg) && (rsp->sg != NULL)) {
204                                                 int sg_cnt, i;
205                                                 sg_cnt = get_pgcnt(rsp->bufflen,
206                                                         rsp->sg[0].offset);
207                                                 sBUG_ON(rsp->sg_cnt != sg_cnt);
208                                                 for(i = 0; i < sg_cnt; i++) {
209                                                         TRACE_CONN_CLOSE_DBG("    page %p, net_priv %p, "
210                                                                 "_count %d", rsp->sg[i].page,
211                                                                 rsp->sg[i].page->net_priv,
212                                                                 atomic_read(&rsp->sg[i].page->_count));
213                                                 }
214                                         }
215                                 }
216                                 spin_unlock_bh(&cmnd->rsp_cmd_lock);
217 #endif
218                         }
219                         spin_unlock_bh(&conn->cmd_list_lock);
220                 }
221 #endif
222                 iscsi_check_closewait(conn);
223         }
224
225         write_lock_bh(&conn->sock->sk->sk_callback_lock);
226         conn->sock->sk->sk_state_change = conn->old_state_change;
227         conn->sock->sk->sk_data_ready = conn->old_data_ready;
228         conn->sock->sk->sk_write_space = conn->old_write_space;
229         write_unlock_bh(&conn->sock->sk->sk_callback_lock);
230
231         while(conn->wr_state != ISCSI_CONN_WR_STATE_IDLE) {
232                 TRACE_CONN_CLOSE("Waiting for wr thread (conn %p), wr_state %x",
233                         conn, conn->wr_state);
234                 msleep(50);
235         }
236
237         TRACE_CONN_CLOSE("Notifying user space about closing connection %p", conn);
238         event_send(target->tid, session->sid, conn->cid, E_CONN_CLOSE, 0);
239
240         mutex_lock(&target->target_mutex);
241         conn_free(conn);
242         if (list_empty(&session->conn_list))
243                 session_del(target, session->sid);
244         mutex_unlock(&target->target_mutex);
245
246         TRACE_EXIT();
247         return;
248 }
249
250 static inline void iscsi_conn_init_read(struct iscsi_conn *conn, void *data, size_t len)
251 {
252         len = (len + 3) & -4; // XXX ???
253         conn->read_iov[0].iov_base = data;
254         conn->read_iov[0].iov_len = len;
255         conn->read_msg.msg_iov = conn->read_iov;
256         conn->read_msg.msg_iovlen = 1;
257         conn->read_size = (len + 3) & -4;
258 }
259
260 static void iscsi_conn_read_ahs(struct iscsi_conn *conn, struct iscsi_cmnd *cmnd)
261 {
262         /* ToDo: __GFP_NOFAIL ?? */
263         cmnd->pdu.ahs = kmalloc(cmnd->pdu.ahssize, __GFP_NOFAIL|GFP_KERNEL);
264         sBUG_ON(cmnd->pdu.ahs == NULL);
265         iscsi_conn_init_read(conn, cmnd->pdu.ahs, cmnd->pdu.ahssize);
266 }
267
268 static struct iscsi_cmnd *iscsi_get_send_cmnd(struct iscsi_conn *conn)
269 {
270         struct iscsi_cmnd *cmnd = NULL;
271
272         spin_lock(&conn->write_list_lock);
273         if (!list_empty(&conn->write_list)) {
274                 cmnd = list_entry(conn->write_list.next, struct iscsi_cmnd,
275                                 write_list_entry);
276                 cmd_del_from_write_list(cmnd);
277                 cmnd->write_processing_started = 1;
278         }
279         spin_unlock(&conn->write_list_lock);
280
281         return cmnd;
282 }
283
284 static int do_recv(struct iscsi_conn *conn, int state)
285 {
286         mm_segment_t oldfs;
287         struct msghdr msg;
288         int res, first_len;
289
290         if (unlikely(conn->closing)) {
291                 res = -EIO;
292                 goto out;
293         }
294
295         memset(&msg, 0, sizeof(msg));
296         msg.msg_iov = conn->read_msg.msg_iov;
297         msg.msg_iovlen = conn->read_msg.msg_iovlen;
298         first_len = msg.msg_iov->iov_len;
299
300         oldfs = get_fs();
301         set_fs(get_ds());
302         res = sock_recvmsg(conn->sock, &msg, conn->read_size, MSG_DONTWAIT | MSG_NOSIGNAL);
303         set_fs(oldfs);
304
305         if (res <= 0) {
306                 switch (res) {
307                 case -EAGAIN:
308                 case -ERESTARTSYS:
309                         TRACE_DBG("EAGAIN or ERESTARTSYS (%d) received for "
310                                 "conn %p", res, conn);
311                         break;
312                 default:
313                         PRINT_ERROR("sock_recvmsg() failed: %d", res);
314                         mark_conn_closed(conn);
315                         break;
316                 }
317         } else {
318                 /*
319                  * To save some considerable effort and CPU power we suppose
320                  * that TCP functions adjust conn->read_msg.msg_iov and
321                  * conn->read_msg.msg_iovlen on amount of copied data. This
322                  * BUG_ON is intended to catch if it is changed in the future.
323                  */
324                 sBUG_ON((res >= first_len) &&
325                         (conn->read_msg.msg_iov->iov_len != 0));
326                 conn->read_size -= res;
327                 if (conn->read_size) {
328                         if (res >= first_len) {
329                                 int done = 1 + ((res - first_len) >> PAGE_SHIFT);
330                                 conn->read_msg.msg_iov += done;
331                                 conn->read_msg.msg_iovlen -= done;
332                         }
333                 } else
334                         conn->read_state = state;
335         }
336
337 out:
338         TRACE_EXIT_RES(res);
339         return res;
340 }
341
342 static int rx_hdigest(struct iscsi_conn *conn)
343 {
344         struct iscsi_cmnd *cmnd = conn->read_cmnd;
345         int res = digest_rx_header(cmnd);
346
347         if (unlikely(res != 0)) {
348                 PRINT_ERROR("rx header digest for initiator %s failed "
349                         "(%d)", conn->session->initiator_name, res);
350                 mark_conn_closed(conn);
351         }
352         return res;
353 }
354
355 static struct iscsi_cmnd *create_cmnd(struct iscsi_conn *conn)
356 {
357         struct iscsi_cmnd *cmnd;
358
359         cmnd = cmnd_alloc(conn, NULL);
360         iscsi_conn_init_read(cmnd->conn, &cmnd->pdu.bhs, sizeof(cmnd->pdu.bhs));
361         conn->read_state = RX_BHS;
362
363         return cmnd;
364 }
365
366 /* Returns >0 for success, <=0 for error or successful finish */
367 static int recv(struct iscsi_conn *conn)
368 {
369         struct iscsi_cmnd *cmnd = conn->read_cmnd;
370         int hdigest, ddigest, res = 1, rc;
371
372         TRACE_ENTRY();
373
374         hdigest = conn->hdigest_type & DIGEST_NONE ? 0 : 1;
375         ddigest = conn->ddigest_type & DIGEST_NONE ? 0 : 1;
376
377         switch (conn->read_state) {
378         case RX_INIT_BHS:
379                 sBUG_ON(cmnd != NULL);
380                 cmnd = conn->read_cmnd = create_cmnd(conn);
381         case RX_BHS:
382                 res = do_recv(conn, RX_INIT_AHS);
383                 if (res <= 0 || conn->read_state != RX_INIT_AHS)
384                         break;
385         case RX_INIT_AHS:
386                 iscsi_cmnd_get_length(&cmnd->pdu);
387                 if (cmnd->pdu.ahssize) {
388                         iscsi_conn_read_ahs(conn, cmnd);
389                         conn->read_state = RX_AHS;
390                 } else
391                         conn->read_state = hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA;
392
393                 if (conn->read_state != RX_AHS)
394                         break;
395         case RX_AHS:
396                 res = do_recv(conn, hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA);
397                 if (res <= 0 || conn->read_state != RX_INIT_HDIGEST)
398                         break;
399         case RX_INIT_HDIGEST:
400                 iscsi_conn_init_read(conn, &cmnd->hdigest, sizeof(u32));
401                 conn->read_state = RX_HDIGEST;
402         case RX_HDIGEST:
403                 res = do_recv(conn, RX_CHECK_HDIGEST);
404                 if (res <= 0 || conn->read_state != RX_CHECK_HDIGEST)
405                         break;
406         case RX_CHECK_HDIGEST:
407                 rc = rx_hdigest(conn);
408                 if (likely(rc == 0))
409                         conn->read_state = RX_INIT_DATA;
410                 else {
411                         res = rc;
412                         break;
413                 }
414         case RX_INIT_DATA:
415                 rc = cmnd_rx_start(cmnd);
416                 if (unlikely(rc != 0)) {
417                         sBUG_ON(!conn->closing);
418                         conn->read_state = RX_END;
419                         res = rc;
420                         /* cmnd will be freed in close_conn() */
421                         goto out;
422                 }
423                 conn->read_state = cmnd->pdu.datasize ? RX_DATA : RX_END;
424                 if (conn->read_state != RX_DATA)
425                         break;
426         case RX_DATA:
427                 res = do_recv(conn, ddigest ? RX_INIT_DDIGEST : RX_END);
428                 if (res <= 0 || conn->read_state != RX_INIT_DDIGEST)
429                         break;
430         case RX_INIT_DDIGEST:
431                 iscsi_conn_init_read(conn, &cmnd->ddigest, sizeof(u32));
432                 conn->read_state = RX_DDIGEST;
433         case RX_DDIGEST:
434                 res = do_recv(conn, RX_CHECK_DDIGEST);
435                 if (res <= 0 || conn->read_state != RX_CHECK_DDIGEST)
436                         break;
437         case RX_CHECK_DDIGEST:
438                 conn->read_state = RX_END;
439                 if (cmnd_opcode(cmnd) == ISCSI_OP_SCSI_CMD) {
440                         TRACE_DBG("Adding RX ddigest cmd %p to digest list "
441                                 "of self", cmnd);
442                         list_add_tail(&cmnd->rx_ddigest_cmd_list_entry,
443                                 &cmnd->rx_ddigest_cmd_list);
444                         cmnd_get(cmnd);
445                         conn->read_state = RX_END;
446                 } else if (cmnd_opcode(cmnd) != ISCSI_OP_SCSI_DATA_OUT) {
447                         /*
448                          * We could get here only for NOP-Out. ISCSI RFC doesn't
449                          * specify how to deal with digest errors in this case.
450                          * Is closing connection correct?
451                          */
452                         TRACE_DBG("cmnd %p, opcode %x: checking RX "
453                                 "ddigest inline", cmnd, cmnd_opcode(cmnd));
454                         rc = digest_rx_data(cmnd);
455                         if (unlikely(rc != 0)) {
456                                 conn->read_state = RX_CHECK_DDIGEST;
457                                 mark_conn_closed(conn);
458                         }
459                 }
460                 break;
461         default:
462                 PRINT_ERROR("%d %x", conn->read_state, cmnd_opcode(cmnd));
463                 sBUG();
464         }
465
466         if (res <= 0)
467                 goto out;
468
469         if (conn->read_state != RX_END)
470                 goto out;
471
472         if (conn->read_size) {
473                 PRINT_ERROR("%d %x %d", res, cmnd_opcode(cmnd), conn->read_size);
474                 sBUG();
475         }
476
477         cmnd_rx_end(cmnd);
478
479         sBUG_ON(conn->read_size != 0);
480
481         conn->read_cmnd = NULL;
482         conn->read_state = RX_INIT_BHS;
483         res = 0;
484
485 out:
486         TRACE_EXIT_RES(res);
487         return res;
488 }
489
490 /* No locks, conn is rd processing */
491 static int process_read_io(struct iscsi_conn *conn, int *closed)
492 {
493         int res;
494
495         do {
496                 res = recv(conn);
497                 if (unlikely(conn->closing)) {
498                         close_conn(conn);
499                         *closed = 1;
500                         break;
501                 }
502         } while(res > 0);
503
504         TRACE_EXIT_RES(res);
505         return res;
506 }
507
508 /*
509  * Called under iscsi_rd_lock and BHs disabled, but will drop it inside,
510  * then reaquire.
511  */
512 static void scst_do_job_rd(void)
513 {
514         TRACE_ENTRY();
515
516         /* We delete/add to tail connections to maintain fairness between them */
517
518         while(!list_empty(&iscsi_rd_list)) {
519                 int rc, closed = 0;
520                 struct iscsi_conn *conn = list_entry(iscsi_rd_list.next,
521                         typeof(*conn), rd_list_entry);
522
523                 list_del(&conn->rd_list_entry);
524
525                 sBUG_ON(conn->rd_state == ISCSI_CONN_RD_STATE_PROCESSING);
526                 conn->rd_data_ready = 0;
527                 conn->rd_state = ISCSI_CONN_RD_STATE_PROCESSING;
528 #ifdef EXTRACHECKS
529                 conn->rd_task = current;
530 #endif
531                 spin_unlock_bh(&iscsi_rd_lock);
532
533                 rc = process_read_io(conn, &closed);
534
535                 spin_lock_bh(&iscsi_rd_lock);
536
537                 if (closed)
538                         continue;
539
540 #ifdef EXTRACHECKS
541                 conn->rd_task = NULL;
542 #endif
543                 if ((rc == 0) || conn->rd_data_ready) {
544                         list_add_tail(&conn->rd_list_entry, &iscsi_rd_list);
545                         conn->rd_state = ISCSI_CONN_RD_STATE_IN_LIST;
546                 } else
547                         conn->rd_state = ISCSI_CONN_RD_STATE_IDLE;
548         }
549
550         TRACE_EXIT();
551         return;
552 }
553
554 static inline int test_rd_list(void)
555 {
556         int res = !list_empty(&iscsi_rd_list) ||
557                   unlikely(kthread_should_stop());
558         return res;
559 }
560
561 int istrd(void *arg)
562 {
563         TRACE_ENTRY();
564
565         current->flags |= PF_NOFREEZE;
566
567         spin_lock_bh(&iscsi_rd_lock);
568         while(!kthread_should_stop()) {
569                 wait_queue_t wait;
570                 init_waitqueue_entry(&wait, current);
571
572                 if (!test_rd_list()) {
573                         add_wait_queue_exclusive(&iscsi_rd_waitQ, &wait);
574                         for (;;) {
575                                 set_current_state(TASK_INTERRUPTIBLE);
576                                 if (test_rd_list())
577                                         break;
578                                 spin_unlock_bh(&iscsi_rd_lock);
579                                 schedule();
580                                 spin_lock_bh(&iscsi_rd_lock);
581                         }
582                         set_current_state(TASK_RUNNING);
583                         remove_wait_queue(&iscsi_rd_waitQ, &wait);
584                 }
585                 scst_do_job_rd();
586         }
587         spin_unlock_bh(&iscsi_rd_lock);
588
589         /*
590          * If kthread_should_stop() is true, we are guaranteed to be
591          * on the module unload, so iscsi_rd_list must be empty.
592          */
593         sBUG_ON(!list_empty(&iscsi_rd_list));
594
595         TRACE_EXIT();
596         return 0;
597 }
598
599 #ifdef NET_PAGE_CALLBACKS_DEFINED
600 void iscsi_get_page_callback(struct page *page)
601 {
602         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
603         int v;
604
605         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
606                 cmd, page, atomic_read(&page->_count),
607                 atomic_read(&cmd->net_ref_cnt)+1);
608
609         v = atomic_inc_return(&cmd->net_ref_cnt);
610         if (v == 1) {
611                 TRACE_NET_PAGE("getting cmd %p for page %p", cmd, page);
612                 cmnd_get(cmd);
613         }
614 }
615
616 void iscsi_put_page_callback(struct page *page)
617 {
618         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
619
620         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
621                 cmd, page, atomic_read(&page->_count),
622                 atomic_read(&cmd->net_ref_cnt)-1);
623
624         if (atomic_dec_and_test(&cmd->net_ref_cnt)) {
625                 int i, sg_cnt = get_pgcnt(cmd->bufflen, cmd->sg[0].offset);
626                 for(i = 0; i < sg_cnt; i++) {
627                         TRACE_NET_PAGE("Clearing page %p", cmd->sg[i].page);
628                         cmd->sg[i].page->net_priv = NULL;
629                 }
630                 cmnd_put(cmd);
631         }
632 }
633
634 static void check_net_priv(struct iscsi_cmnd *cmd, struct page *page)
635 {
636         if (atomic_read(&cmd->net_ref_cnt) == 0) {
637                 TRACE_DBG("%s", "sendpage() not called get_page(), "
638                         "zeroing net_priv");
639                 page->net_priv = NULL;
640         }
641 }
642 #else
643 static inline void check_net_priv(struct iscsi_cmnd *cmd, struct page *page) {}
644 #endif
645
646 /* This is partially taken from the Ardis code. */
647 static int write_data(struct iscsi_conn *conn)
648 {
649         mm_segment_t oldfs;
650         struct file *file;
651         struct socket *sock;
652         ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
653         struct iscsi_cmnd *write_cmnd = conn->write_cmnd;
654         struct iscsi_cmnd *ref_cmd;
655         struct scatterlist *sg;
656         struct iovec *iop;
657         int saved_size, size, sendsize;
658         int offset, idx;
659         int flags, res, count;
660
661         iscsi_extracheck_is_wr_thread(conn);
662
663         if (write_cmnd->own_sg == 0)
664                 ref_cmd = write_cmnd->parent_req;
665         else
666                 ref_cmd = write_cmnd;
667
668         file = conn->file;
669         saved_size = size = conn->write_size;
670         iop = conn->write_iop;
671         count = conn->write_iop_used;
672
673         if (iop) while (1) {
674                 loff_t off = 0;
675                 int rest;
676
677                 sBUG_ON(count > sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
678 retry:
679                 oldfs = get_fs();
680                 set_fs(KERNEL_DS);
681                 res = vfs_writev(file, (struct iovec __user *)iop, count, &off);
682                 set_fs(oldfs);
683                 TRACE(TRACE_D_WRITE, "%#Lx:%u: %d(%ld)",
684                         (unsigned long long) conn->session->sid, conn->cid,
685                         res, (long) iop->iov_len);
686                 if (unlikely(res <= 0)) {
687                         if (res == -EAGAIN) {
688                                 conn->write_iop = iop;
689                                 conn->write_iop_used = count;
690                                 goto out_iov;
691                         } else if (res == -EINTR)
692                                 goto retry;
693                         goto err;
694                 }
695
696                 rest = res;
697                 size -= res;
698                 while (iop->iov_len <= rest && rest) {
699                         rest -= iop->iov_len;
700                         iop++;
701                         count--;
702                 }
703                 if (count == 0) {
704                         conn->write_iop = NULL;
705                         conn->write_iop_used = 0;
706                         if (size)
707                                 break;
708                         goto out_iov;
709                 }
710                 sBUG_ON(iop > conn->write_iov + 
711                         sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
712                 iop->iov_base += rest;
713                 iop->iov_len -= rest;
714         }
715
716         sg = write_cmnd->sg;
717         if (sg == NULL) {
718                 PRINT_ERROR("%s", "warning data missing!");
719                 return 0;
720         }
721         offset = conn->write_offset;
722         idx = offset >> PAGE_SHIFT;
723         offset &= ~PAGE_MASK;
724
725         sock = conn->sock;
726
727 #ifdef NET_PAGE_CALLBACKS_DEFINED
728         sendpage = sock->ops->sendpage;
729 #else
730         if ((write_cmnd->parent_req->scst_cmd != NULL) &&
731             scst_cmd_get_data_buff_alloced(write_cmnd->parent_req->scst_cmd))
732                 sendpage = sock_no_sendpage;
733         else
734                 sendpage = sock->ops->sendpage;
735 #endif
736
737         flags = MSG_DONTWAIT;
738
739         while (1) {
740 #ifdef NET_PAGE_CALLBACKS_DEFINED
741                 if (unlikely((sg[idx].page->net_priv != NULL) &&
742                                 (sg[idx].page->net_priv != ref_cmd))) {
743                         PRINT_ERROR("net_priv isn't NULL and != ref_cmd "
744                                 "(write_cmnd %p, ref_cmd %p, sg %p, idx %d, "
745                                 "net_priv %p)", write_cmnd, ref_cmd, sg, idx,
746                                 sg[idx].page->net_priv);
747                         sBUG();
748                 }
749                 sg[idx].page->net_priv = ref_cmd;
750 #endif
751                 sendsize = PAGE_SIZE - offset;
752                 if (size <= sendsize) {
753 retry2:
754                         res = sendpage(sock, sg[idx].page, offset, size, flags);
755                         TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
756                                 sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
757                                 (unsigned long long)conn->session->sid, conn->cid,
758                                 res, sg[idx].page->index, offset, size);
759                         if (unlikely(res <= 0)) {
760                                 if (res == -EINTR)
761                                         goto retry2;
762                                 else
763                                         goto out_res;
764                         }
765                         check_net_priv(ref_cmd, sg[idx].page);
766                         if (res == size) {
767                                 conn->write_size = 0;
768                                 return saved_size;
769                         }
770                         offset += res;
771                         size -= res;
772                         continue;
773                 }
774
775 retry1:
776                 res = sendpage(sock, sg[idx].page, offset, sendsize,
777                         flags | MSG_MORE);
778                 TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
779                         sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
780                         (unsigned long long ) conn->session->sid, conn->cid,
781                         res, sg[idx].page->index, offset, sendsize);
782                 if (unlikely(res <= 0)) {
783                         if (res == -EINTR)
784                                 goto retry1;
785                         else
786                                 goto out_res;
787                 }
788                 check_net_priv(ref_cmd, sg[idx].page);
789                 if (res == sendsize) {
790                         idx++;
791                         offset = 0;
792                 } else
793                         offset += res;
794                 size -= res;
795         }
796 out:
797         conn->write_offset = (idx << PAGE_SHIFT) + offset;
798 out_iov:
799         conn->write_size = size;
800         if ((saved_size == size) && res == -EAGAIN)
801                 return res;
802
803         return saved_size - size;
804
805 out_res:
806         check_net_priv(ref_cmd, sg[idx].page);
807         if (res == -EAGAIN)
808                 goto out;
809         /* else go through */
810
811 err:
812 #ifndef DEBUG
813         if (!conn->closing)
814 #endif
815         {
816                 PRINT_ERROR("error %d at sid:cid %#Lx:%u, cmnd %p", res,
817                         (unsigned long long)conn->session->sid, conn->cid,
818                         conn->write_cmnd);
819         }
820         return res;
821 }
822
823 static int exit_tx(struct iscsi_conn *conn, int res)
824 {
825         iscsi_extracheck_is_wr_thread(conn);
826
827         switch (res) {
828         case -EAGAIN:
829         case -ERESTARTSYS:
830                 res = 0;
831                 break;
832         default:
833 #ifndef DEBUG
834                 if (!conn->closing)
835 #endif
836                 {
837                         PRINT_ERROR("Sending data failed: initiator %s, "
838                                 "write_size %d, write_state %d, res %d",
839                                 conn->session->initiator_name, conn->write_size,
840                                 conn->write_state, res);
841                 }
842                 conn->write_state = TX_END;
843                 conn->write_size = 0;
844                 mark_conn_closed(conn);
845                 break;
846         }
847         return res;
848 }
849
850 static int tx_ddigest(struct iscsi_cmnd *cmnd, int state)
851 {
852         int res, rest = cmnd->conn->write_size;
853         struct msghdr msg = {.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT};
854         struct kvec iov;
855
856         iscsi_extracheck_is_wr_thread(cmnd->conn);
857
858         TRACE_DBG("Sending data digest %x (cmd %p)", cmnd->ddigest, cmnd);
859
860         iov.iov_base = (char *) (&cmnd->ddigest) + (sizeof(u32) - rest);
861         iov.iov_len = rest;
862
863         res = kernel_sendmsg(cmnd->conn->sock, &msg, &iov, 1, rest);
864         if (res > 0) {
865                 cmnd->conn->write_size -= res;
866                 if (!cmnd->conn->write_size)
867                         cmnd->conn->write_state = state;
868         } else
869                 res = exit_tx(cmnd->conn, res);
870
871         return res;
872 }
873
874 static void init_tx_hdigest(struct iscsi_cmnd *cmnd)
875 {
876         struct iscsi_conn *conn = cmnd->conn;
877         struct iovec *iop;
878
879         iscsi_extracheck_is_wr_thread(conn);
880
881         digest_tx_header(cmnd);
882
883         sBUG_ON(conn->write_iop_used >= sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
884         iop = &conn->write_iop[conn->write_iop_used];
885         conn->write_iop_used++;
886         iop->iov_base = &(cmnd->hdigest);
887         iop->iov_len = sizeof(u32);
888         conn->write_size += sizeof(u32);
889
890         return;
891 }
892
893 static int iscsi_do_send(struct iscsi_conn *conn, int state)
894 {
895         int res;
896
897         iscsi_extracheck_is_wr_thread(conn);
898
899         res = write_data(conn);
900         if (res > 0) {
901                 if (!conn->write_size)
902                         conn->write_state = state;
903         } else
904                 res = exit_tx(conn, res);
905
906         return res;
907 }
908
909 /* 
910  * No locks, conn is wr processing.
911  *
912  * IMPORTANT! Connection conn must be protected by additional conn_get()
913  * upon entrance in this function, because otherwise it could be destroyed
914  * inside as a result of cmnd release.
915  */
916 int iscsi_send(struct iscsi_conn *conn)
917 {
918         struct iscsi_cmnd *cmnd = conn->write_cmnd;
919         int ddigest, res = 0;
920
921         TRACE_ENTRY();
922
923         TRACE_DBG("conn %p, write_cmnd %p", conn, cmnd);
924
925         iscsi_extracheck_is_wr_thread(conn);
926
927         ddigest = conn->ddigest_type != DIGEST_NONE ? 1 : 0;
928
929         switch (conn->write_state) {
930         case TX_INIT:
931                 sBUG_ON(cmnd != NULL);
932                 cmnd = conn->write_cmnd = iscsi_get_send_cmnd(conn);
933                 if (!cmnd)
934                         goto out;
935                 cmnd_tx_start(cmnd);
936                 if (!(conn->hdigest_type & DIGEST_NONE))
937                     init_tx_hdigest(cmnd);
938                 conn->write_state = TX_BHS_DATA;
939         case TX_BHS_DATA:
940                 res = iscsi_do_send(conn, ddigest && cmnd->pdu.datasize ? 
941                                         TX_INIT_DDIGEST : TX_END);
942                 if (res <= 0 || conn->write_state != TX_INIT_DDIGEST)
943                         break;
944         case TX_INIT_DDIGEST:
945                 cmnd->conn->write_size = sizeof(u32);
946                 conn->write_state = TX_DDIGEST;
947         case TX_DDIGEST:
948                 res = tx_ddigest(cmnd, TX_END);
949                 break;
950         default:
951                 PRINT_ERROR("%d %d %x", res, conn->write_state,
952                         cmnd_opcode(cmnd));
953                 sBUG();
954         }
955
956         if (res == 0)
957                 goto out;
958
959         if (conn->write_state != TX_END)
960                 goto out;
961
962         if (conn->write_size) {
963                 PRINT_ERROR("%d %x %u", res, cmnd_opcode(cmnd),
964                         conn->write_size);
965                 sBUG();
966         }
967         cmnd_tx_end(cmnd);
968
969         rsp_cmnd_release(cmnd);
970
971         conn->write_cmnd = NULL;
972         conn->write_state = TX_INIT;
973
974 out:
975         TRACE_EXIT_RES(res);
976         return res;
977 }
978
979 /* No locks, conn is wr processing.
980  *
981  * IMPORTANT! Connection conn must be protected by additional conn_get()
982  * upon entrance in this function, because otherwise it could be destroyed
983  * inside as a result of iscsi_send(), which releases sent commands.
984  */
985 static int process_write_queue(struct iscsi_conn *conn)
986 {
987         int res = 0;
988
989         TRACE_ENTRY();
990
991         if (likely(test_write_ready(conn)))
992                 res = iscsi_send(conn);
993
994         TRACE_EXIT_RES(res);
995         return res;
996 }
997
998 /*
999  * Called under iscsi_wr_lock and BHs disabled, but will drop it inside,
1000  * then reaquire.
1001  */
1002 static void scst_do_job_wr(void)
1003 {
1004         TRACE_ENTRY();
1005
1006         /* We delete/add to tail connections to maintain fairness between them */
1007
1008         while(!list_empty(&iscsi_wr_list)) {
1009                 int rc;
1010                 struct iscsi_conn *conn = list_entry(iscsi_wr_list.next,
1011                         typeof(*conn), wr_list_entry);
1012
1013                 TRACE_DBG("conn %p, wr_state %x, wr_space_ready %d, "
1014                         "write ready %d", conn, conn->wr_state,
1015                         conn->wr_space_ready, test_write_ready(conn));
1016
1017                 list_del(&conn->wr_list_entry);
1018
1019                 sBUG_ON(conn->wr_state == ISCSI_CONN_WR_STATE_PROCESSING);
1020
1021                 conn->wr_state = ISCSI_CONN_WR_STATE_PROCESSING;
1022                 conn->wr_space_ready = 0;
1023 #ifdef EXTRACHECKS
1024                 conn->wr_task = current;
1025 #endif
1026                 spin_unlock_bh(&iscsi_wr_lock);
1027
1028                 conn_get(conn);
1029
1030                 rc = process_write_queue(conn);
1031
1032                 spin_lock_bh(&iscsi_wr_lock);
1033 #ifdef EXTRACHECKS
1034                 conn->wr_task = NULL;
1035 #endif
1036                 if ((rc == -EAGAIN) && !conn->wr_space_ready) {
1037                         conn->wr_state = ISCSI_CONN_WR_STATE_SPACE_WAIT;
1038                         goto cont;
1039                 }
1040
1041                 if (test_write_ready(conn)) {
1042                         list_add_tail(&conn->wr_list_entry, &iscsi_wr_list);
1043                         conn->wr_state = ISCSI_CONN_WR_STATE_IN_LIST;
1044                 } else
1045                         conn->wr_state = ISCSI_CONN_WR_STATE_IDLE;
1046
1047 cont:
1048                 conn_put(conn);
1049         }
1050
1051         TRACE_EXIT();
1052         return;
1053 }
1054
1055 static inline int test_wr_list(void)
1056 {
1057         int res = !list_empty(&iscsi_wr_list) ||
1058                   unlikely(kthread_should_stop());
1059         return res;
1060 }
1061
1062 int istwr(void *arg)
1063 {
1064         TRACE_ENTRY();
1065
1066         current->flags |= PF_NOFREEZE;
1067
1068         spin_lock_bh(&iscsi_wr_lock);
1069         while(!kthread_should_stop()) {
1070                 wait_queue_t wait;
1071                 init_waitqueue_entry(&wait, current);
1072
1073                 if (!test_wr_list()) {
1074                         add_wait_queue_exclusive(&iscsi_wr_waitQ, &wait);
1075                         for (;;) {
1076                                 set_current_state(TASK_INTERRUPTIBLE);
1077                                 if (test_wr_list())
1078                                         break;
1079                                 spin_unlock_bh(&iscsi_wr_lock);
1080                                 schedule();
1081                                 spin_lock_bh(&iscsi_wr_lock);
1082                         }
1083                         set_current_state(TASK_RUNNING);
1084                         remove_wait_queue(&iscsi_wr_waitQ, &wait);
1085                 }
1086                 scst_do_job_wr();
1087         }
1088         spin_unlock_bh(&iscsi_wr_lock);
1089
1090         /*
1091          * If kthread_should_stop() is true, we are guaranteed to be
1092          * on the module unload, so iscsi_wr_list must be empty.
1093          */
1094         sBUG_ON(!list_empty(&iscsi_wr_list));
1095
1096         TRACE_EXIT();
1097         return 0;
1098 }