429357416ab115f0bd40c128138b9b0c3508fd17
[mirror/scst/.git] / iscsi-scst / kernel / nthread.c
1 /*
2  *  Network threads.
3  *
4  *  Copyright (C) 2004 - 2005 FUJITA Tomonori <tomof@acm.org>
5  *  Copyright (C) 2007 Vladislav Bolkhovitin
6  *  Copyright (C) 2007 CMS Distribution Limited
7  * 
8  *  This program is free software; you can redistribute it and/or
9  *  modify it under the terms of the GNU General Public License
10  *  as published by the Free Software Foundation.
11  * 
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  *  GNU General Public License for more details.
16  */
17
18 #include <linux/sched.h>
19 #include <linux/file.h>
20 #include <linux/kthread.h>
21 #include <asm/ioctls.h>
22 #include <linux/delay.h>
23 #include <net/tcp.h>
24
25 #include "iscsi.h"
26 #include "digest.h"
27
28 enum rx_state {
29         RX_INIT_BHS, /* Must be zero. */
30         RX_BHS,
31
32         RX_INIT_AHS,
33         RX_AHS,
34
35         RX_INIT_HDIGEST,
36         RX_HDIGEST,
37         RX_CHECK_HDIGEST,
38
39         RX_INIT_DATA,
40         RX_DATA,
41
42         RX_INIT_DDIGEST,
43         RX_DDIGEST,
44         RX_CHECK_DDIGEST,
45
46         RX_END,
47 };
48
49 enum tx_state {
50         TX_INIT, /* Must be zero. */
51         TX_BHS_DATA,
52         TX_INIT_DDIGEST,
53         TX_DDIGEST,
54         TX_END,
55 };
56
57 /* No locks */
58 static void close_conn(struct iscsi_conn *conn)
59 {
60         struct iscsi_session *session = conn->session;
61         struct iscsi_target *target = conn->target;
62
63         TRACE_ENTRY();
64
65         TRACE_CONN_CLOSE("Closing connection %p (conn_ref_cnt=%d)", conn,
66                 atomic_read(&conn->conn_ref_cnt));
67
68         iscsi_extracheck_is_rd_thread(conn);
69
70         /* We want all our already send operations to complete */
71         conn->sock->ops->shutdown(conn->sock, RCV_SHUTDOWN);
72
73         conn_abort(conn);
74
75         if (conn->read_state != RX_INIT_BHS) {
76                 req_cmnd_release_force(conn->read_cmnd, 0);
77                 conn->read_cmnd = NULL;
78                 conn->read_state = RX_INIT_BHS;
79         }
80
81         /* ToDo: not the best way to wait */
82         while(atomic_read(&conn->conn_ref_cnt) != 0) {
83                 struct iscsi_cmnd *cmnd;
84
85                 if (!list_empty(&session->pending_list)) {
86                         struct list_head *pending_list = &session->pending_list;
87                         struct iscsi_cmnd *tmp;
88
89                         TRACE_CONN_CLOSE("Disposing pending commands on "
90                                 "connection %p (conn_ref_cnt=%d)", conn,
91                                 atomic_read(&conn->conn_ref_cnt));
92  
93                         list_for_each_entry_safe(cmnd, tmp, pending_list,
94                                                 pending_list_entry) {
95                                 if (cmnd->conn == conn) {
96                                         TRACE_CONN_CLOSE("Freeing pending cmd %p",
97                                                 cmnd);
98                                         list_del(&cmnd->pending_list_entry);
99                                         cmnd->pending = 0;
100                                         req_cmnd_release_force(cmnd, 0);
101                                 }
102                         }
103                 }
104
105                 iscsi_make_conn_wr_active(conn);
106                 msleep(50);
107
108                 TRACE_CONN_CLOSE("conn %p, conn_ref_cnt %d left, wr_state %d",
109                         conn, atomic_read(&conn->conn_ref_cnt), conn->wr_state);
110 #ifdef DEBUG
111                 {
112 #ifdef NET_PAGE_CALLBACKS_DEFINED
113                         struct iscsi_cmnd *rsp;
114 #endif
115                         spin_lock_bh(&conn->cmd_list_lock);
116                         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
117                                 TRACE_DBG("cmd %p, scst_state %x, data_waiting "
118                                         "%d, ref_cnt %d, parent_req %p", cmnd,
119                                         cmnd->scst_state, cmnd->data_waiting,
120                                         atomic_read(&cmnd->ref_cnt), cmnd->parent_req);
121 #ifdef NET_PAGE_CALLBACKS_DEFINED
122                                 TRACE_DBG("net_ref_cnt %d, sg %p",
123                                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
124                                 if (cmnd->sg != NULL) {
125                                         int sg_cnt, i;
126                                         sg_cnt = get_pgcnt(cmnd->bufflen,
127                                                 cmnd->sg[0].offset);
128                                         for(i = 0; i < sg_cnt; i++) {
129                                                 TRACE_DBG("page %p, net_priv %p, _count %d",
130                                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
131                                                         atomic_read(&cmnd->sg[i].page->_count));
132                                         }
133                                 }
134
135                                 sBUG_ON(cmnd->parent_req != NULL);
136                                 
137                                 spin_lock_bh(&cmnd->rsp_cmd_lock);
138                                 list_for_each_entry(rsp, &cmnd->rsp_cmd_list, rsp_cmd_list_entry) {
139                                         TRACE_DBG("  rsp %p, ref_cnt %d, net_ref_cnt %d, "
140                                                 "sg %p", rsp, atomic_read(&rsp->ref_cnt),
141                                                 atomic_read(&rsp->net_ref_cnt), rsp->sg);
142                                         if ((rsp->sg != cmnd->sg) && (rsp->sg != NULL)) {
143                                                 int sg_cnt, i;
144                                                 sg_cnt = get_pgcnt(rsp->bufflen,
145                                                         rsp->sg[0].offset);
146                                                 sBUG_ON(rsp->sg_cnt != sg_cnt);
147                                                 for(i = 0; i < sg_cnt; i++) {
148                                                         TRACE_DBG("    page %p, net_priv %p, "
149                                                                 "_count %d", rsp->sg[i].page,
150                                                                 rsp->sg[i].page->net_priv,
151                                                                 atomic_read(&rsp->sg[i].page->_count));
152                                                 }
153                                         }
154                                 }
155                                 spin_unlock_bh(&cmnd->rsp_cmd_lock);
156 #endif
157                         }
158                         spin_unlock_bh(&conn->cmd_list_lock);
159                 }
160 #endif
161         }
162
163         write_lock_bh(&conn->sock->sk->sk_callback_lock);
164         conn->sock->sk->sk_state_change = conn->old_state_change;
165         conn->sock->sk->sk_data_ready = conn->old_data_ready;
166         conn->sock->sk->sk_write_space = conn->old_write_space;
167         write_unlock_bh(&conn->sock->sk->sk_callback_lock);
168
169         while(conn->wr_state != ISCSI_CONN_WR_STATE_IDLE) {
170                 TRACE_CONN_CLOSE("Waiting for wr thread (conn %p), wr_state %x",
171                         conn, conn->wr_state);
172                 msleep(50);
173         }
174
175         TRACE_CONN_CLOSE("Notifying user space about closing connection %p", conn);
176         event_send(target->tid, session->sid, conn->cid, E_CONN_CLOSE, 0);
177
178         mutex_lock(&target->target_mutex);
179         conn_free(conn);
180         if (list_empty(&session->conn_list))
181                 session_del(target, session->sid);
182         mutex_unlock(&target->target_mutex);
183
184         TRACE_EXIT();
185         return;
186 }
187
188 static inline void iscsi_conn_init_read(struct iscsi_conn *conn, void *data, size_t len)
189 {
190         len = (len + 3) & -4; // XXX ???
191         conn->read_iov[0].iov_base = data;
192         conn->read_iov[0].iov_len = len;
193         conn->read_msg.msg_iov = conn->read_iov;
194         conn->read_msg.msg_iovlen = 1;
195         conn->read_size = (len + 3) & -4;
196 }
197
198 static void iscsi_conn_read_ahs(struct iscsi_conn *conn, struct iscsi_cmnd *cmnd)
199 {
200         /* ToDo: __GFP_NOFAIL ?? */
201         cmnd->pdu.ahs = kmalloc(cmnd->pdu.ahssize, __GFP_NOFAIL|GFP_KERNEL);
202         sBUG_ON(cmnd->pdu.ahs == NULL);
203         iscsi_conn_init_read(conn, cmnd->pdu.ahs, cmnd->pdu.ahssize);
204 }
205
206 static struct iscsi_cmnd *iscsi_get_send_cmnd(struct iscsi_conn *conn)
207 {
208         struct iscsi_cmnd *cmnd = NULL;
209
210         spin_lock(&conn->write_list_lock);
211         if (!list_empty(&conn->write_list)) {
212                 cmnd = list_entry(conn->write_list.next, struct iscsi_cmnd,
213                                 write_list_entry);
214                 cmd_del_from_write_list(cmnd);
215                 cmnd->write_processing_started = 1;
216         }
217         spin_unlock(&conn->write_list_lock);
218
219         return cmnd;
220 }
221
222 static int do_recv(struct iscsi_conn *conn, int state)
223 {
224         mm_segment_t oldfs;
225         struct msghdr msg;
226         int res, first_len;
227
228         if (unlikely(conn->closing)) {
229                 res = -EIO;
230                 goto out;
231         }
232
233         memset(&msg, 0, sizeof(msg));
234         msg.msg_iov = conn->read_msg.msg_iov;
235         msg.msg_iovlen = conn->read_msg.msg_iovlen;
236         first_len = msg.msg_iov->iov_len;
237
238         oldfs = get_fs();
239         set_fs(get_ds());
240         res = sock_recvmsg(conn->sock, &msg, conn->read_size, MSG_DONTWAIT | MSG_NOSIGNAL);
241         set_fs(oldfs);
242
243         if (res <= 0) {
244                 switch (res) {
245                 case -EAGAIN:
246                 case -ERESTARTSYS:
247                         TRACE_DBG("EAGAIN or ERESTARTSYS (%d) received for "
248                                 "conn %p", res, conn);
249                         break;
250                 default:
251                         PRINT_ERROR("sock_recvmsg() failed: %d", res);
252                         mark_conn_closed(conn);
253                         break;
254                 }
255         } else {
256                 /*
257                  * To save some considerable effort and CPU power we suppose
258                  * that TCP functions adjust conn->read_msg.msg_iov and
259                  * conn->read_msg.msg_iovlen on amount of copied data. This
260                  * BUG_ON is intended to catch if it is changed in the future.
261                  */
262                 sBUG_ON((res >= first_len) &&
263                         (conn->read_msg.msg_iov->iov_len != 0));
264                 conn->read_size -= res;
265                 if (conn->read_size) {
266                         if (res >= first_len) {
267                                 int done = 1 + ((res - first_len) >> PAGE_SHIFT);
268                                 conn->read_msg.msg_iov += done;
269                                 conn->read_msg.msg_iovlen -= done;
270                         }
271                 } else
272                         conn->read_state = state;
273         }
274
275 out:
276         TRACE_EXIT_RES(res);
277         return res;
278 }
279
280 static int rx_hdigest(struct iscsi_conn *conn)
281 {
282         struct iscsi_cmnd *cmnd = conn->read_cmnd;
283         int res = digest_rx_header(cmnd);
284
285         if (unlikely(res != 0)) {
286                 PRINT_ERROR("rx header digest for initiator %s failed "
287                         "(%d)", conn->session->initiator_name, res);
288                 mark_conn_closed(conn);
289         }
290         return res;
291 }
292
293 static struct iscsi_cmnd *create_cmnd(struct iscsi_conn *conn)
294 {
295         struct iscsi_cmnd *cmnd;
296
297         cmnd = cmnd_alloc(conn, NULL);
298         iscsi_conn_init_read(cmnd->conn, &cmnd->pdu.bhs, sizeof(cmnd->pdu.bhs));
299         conn->read_state = RX_BHS;
300
301         return cmnd;
302 }
303
304 /* Returns >0 for success, <=0 for error or successful finish */
305 static int recv(struct iscsi_conn *conn)
306 {
307         struct iscsi_cmnd *cmnd = conn->read_cmnd;
308         int hdigest, ddigest, res = 1, rc;
309
310         TRACE_ENTRY();
311
312         hdigest = conn->hdigest_type & DIGEST_NONE ? 0 : 1;
313         ddigest = conn->ddigest_type & DIGEST_NONE ? 0 : 1;
314
315         switch (conn->read_state) {
316         case RX_INIT_BHS:
317                 sBUG_ON(cmnd != NULL);
318                 cmnd = conn->read_cmnd = create_cmnd(conn);
319         case RX_BHS:
320                 res = do_recv(conn, RX_INIT_AHS);
321                 if (res <= 0 || conn->read_state != RX_INIT_AHS)
322                         break;
323         case RX_INIT_AHS:
324                 iscsi_cmnd_get_length(&cmnd->pdu);
325                 if (cmnd->pdu.ahssize) {
326                         iscsi_conn_read_ahs(conn, cmnd);
327                         conn->read_state = RX_AHS;
328                 } else
329                         conn->read_state = hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA;
330
331                 if (conn->read_state != RX_AHS)
332                         break;
333         case RX_AHS:
334                 res = do_recv(conn, hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA);
335                 if (res <= 0 || conn->read_state != RX_INIT_HDIGEST)
336                         break;
337         case RX_INIT_HDIGEST:
338                 iscsi_conn_init_read(conn, &cmnd->hdigest, sizeof(u32));
339                 conn->read_state = RX_HDIGEST;
340         case RX_HDIGEST:
341                 res = do_recv(conn, RX_CHECK_HDIGEST);
342                 if (res <= 0 || conn->read_state != RX_CHECK_HDIGEST)
343                         break;
344         case RX_CHECK_HDIGEST:
345                 rc = rx_hdigest(conn);
346                 if (likely(rc == 0))
347                         conn->read_state = RX_INIT_DATA;
348                 else {
349                         res = rc;
350                         break;
351                 }
352         case RX_INIT_DATA:
353                 rc = cmnd_rx_start(cmnd);
354                 if (unlikely(rc != 0)) {
355                         sBUG_ON(!conn->closing);
356                         conn->read_state = RX_END;
357                         res = rc;
358                         /* cmnd will be freed in close_conn() */
359                         goto out;
360                 }
361                 conn->read_state = cmnd->pdu.datasize ? RX_DATA : RX_END;
362                 if (conn->read_state != RX_DATA)
363                         break;
364         case RX_DATA:
365                 res = do_recv(conn, ddigest ? RX_INIT_DDIGEST : RX_END);
366                 if (res <= 0 || conn->read_state != RX_INIT_DDIGEST)
367                         break;
368         case RX_INIT_DDIGEST:
369                 iscsi_conn_init_read(conn, &cmnd->ddigest, sizeof(u32));
370                 conn->read_state = RX_DDIGEST;
371         case RX_DDIGEST:
372                 res = do_recv(conn, RX_CHECK_DDIGEST);
373                 if (res <= 0 || conn->read_state != RX_CHECK_DDIGEST)
374                         break;
375         case RX_CHECK_DDIGEST:
376                 conn->read_state = RX_END;
377                 if (cmnd_opcode(cmnd) == ISCSI_OP_SCSI_CMD) {
378                         TRACE_DBG("Adding RX ddigest cmd %p to digest list "
379                                 "of self", cmnd);
380                         list_add_tail(&cmnd->rx_ddigest_cmd_list_entry,
381                                 &cmnd->rx_ddigest_cmd_list);
382                         cmnd_get(cmnd);
383                         conn->read_state = RX_END;
384                 } else if (cmnd_opcode(cmnd) != ISCSI_OP_SCSI_DATA_OUT) {
385                         /*
386                          * We could get here only for NOP-Out. ISCSI RFC doesn't
387                          * specify how to deal with digest errors in this case.
388                          * Is closing connection correct?
389                          */
390                         TRACE_DBG("cmnd %p, opcode %x: checking RX "
391                                 "ddigest inline", cmnd, cmnd_opcode(cmnd));
392                         rc = digest_rx_data(cmnd);
393                         if (unlikely(rc != 0)) {
394                                 conn->read_state = RX_CHECK_DDIGEST;
395                                 mark_conn_closed(conn);
396                         }
397                 }
398                 break;
399         default:
400                 PRINT_ERROR("%d %x", conn->read_state, cmnd_opcode(cmnd));
401                 sBUG();
402         }
403
404         if (res <= 0)
405                 goto out;
406
407         if (conn->read_state != RX_END)
408                 goto out;
409
410         if (conn->read_size) {
411                 PRINT_ERROR("%d %x %d", res, cmnd_opcode(cmnd), conn->read_size);
412                 sBUG();
413         }
414
415         cmnd_rx_end(cmnd);
416
417         sBUG_ON(conn->read_size != 0);
418
419         conn->read_cmnd = NULL;
420         conn->read_state = RX_INIT_BHS;
421         res = 0;
422
423 out:
424         TRACE_EXIT_RES(res);
425         return res;
426 }
427
428 /* No locks, conn is rd processing */
429 static int process_read_io(struct iscsi_conn *conn, int *closed)
430 {
431         int res;
432
433         do {
434                 res = recv(conn);
435                 if (unlikely(conn->closing)) {
436                         close_conn(conn);
437                         *closed = 1;
438                         break;
439                 }
440         } while(res > 0);
441
442         TRACE_EXIT_RES(res);
443         return res;
444 }
445
446 /*
447  * Called under iscsi_rd_lock and BHs disabled, but will drop it inside,
448  * then reaquire.
449  */
450 static void scst_do_job_rd(void)
451 {
452         TRACE_ENTRY();
453
454         /* We delete/add to tail connections to maintain fairness between them */
455
456         while(!list_empty(&iscsi_rd_list)) {
457                 int rc, closed = 0;
458                 struct iscsi_conn *conn = list_entry(iscsi_rd_list.next,
459                         typeof(*conn), rd_list_entry);
460
461                 list_del(&conn->rd_list_entry);
462
463                 sBUG_ON(conn->rd_state == ISCSI_CONN_RD_STATE_PROCESSING);
464                 conn->rd_data_ready = 0;
465                 conn->rd_state = ISCSI_CONN_RD_STATE_PROCESSING;
466 #ifdef EXTRACHECKS
467                 conn->rd_task = current;
468 #endif
469                 spin_unlock_bh(&iscsi_rd_lock);
470
471                 rc = process_read_io(conn, &closed);
472
473                 spin_lock_bh(&iscsi_rd_lock);
474
475                 if (closed)
476                         continue;
477
478 #ifdef EXTRACHECKS
479                 conn->rd_task = NULL;
480 #endif
481                 if ((rc == 0) || conn->rd_data_ready) {
482                         list_add_tail(&conn->rd_list_entry, &iscsi_rd_list);
483                         conn->rd_state = ISCSI_CONN_RD_STATE_IN_LIST;
484                 } else
485                         conn->rd_state = ISCSI_CONN_RD_STATE_IDLE;
486         }
487
488         TRACE_EXIT();
489         return;
490 }
491
492 static inline int test_rd_list(void)
493 {
494         int res = !list_empty(&iscsi_rd_list) ||
495                   unlikely(kthread_should_stop());
496         return res;
497 }
498
499 int istrd(void *arg)
500 {
501         TRACE_ENTRY();
502
503         current->flags |= PF_NOFREEZE;
504
505         spin_lock_bh(&iscsi_rd_lock);
506         while(!kthread_should_stop()) {
507                 wait_queue_t wait;
508                 init_waitqueue_entry(&wait, current);
509
510                 if (!test_rd_list()) {
511                         add_wait_queue_exclusive(&iscsi_rd_waitQ, &wait);
512                         for (;;) {
513                                 set_current_state(TASK_INTERRUPTIBLE);
514                                 if (test_rd_list())
515                                         break;
516                                 spin_unlock_bh(&iscsi_rd_lock);
517                                 schedule();
518                                 spin_lock_bh(&iscsi_rd_lock);
519                         }
520                         set_current_state(TASK_RUNNING);
521                         remove_wait_queue(&iscsi_rd_waitQ, &wait);
522                 }
523                 scst_do_job_rd();
524         }
525         spin_unlock_bh(&iscsi_rd_lock);
526
527         /*
528          * If kthread_should_stop() is true, we are guaranteed to be
529          * on the module unload, so iscsi_rd_list must be empty.
530          */
531         sBUG_ON(!list_empty(&iscsi_rd_list));
532
533         TRACE_EXIT();
534         return 0;
535 }
536
537 #ifdef NET_PAGE_CALLBACKS_DEFINED
538 void iscsi_get_page_callback(struct page *page)
539 {
540         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
541         int v;
542
543         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
544                 cmd, page, atomic_read(&page->_count),
545                 atomic_read(&cmd->net_ref_cnt)+1);
546
547         v = atomic_inc_return(&cmd->net_ref_cnt);
548         if (v == 1) {
549                 TRACE_NET_PAGE("getting cmd %p for page %p", cmd, page);
550                 cmnd_get(cmd);
551         }
552 }
553
554 void iscsi_put_page_callback(struct page *page)
555 {
556         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
557
558         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
559                 cmd, page, atomic_read(&page->_count),
560                 atomic_read(&cmd->net_ref_cnt)-1);
561
562         if (atomic_dec_and_test(&cmd->net_ref_cnt)) {
563                 int i, sg_cnt = get_pgcnt(cmd->bufflen, cmd->sg[0].offset);
564                 for(i = 0; i < sg_cnt; i++) {
565                         TRACE_NET_PAGE("Clearing page %p", cmd->sg[i].page);
566                         cmd->sg[i].page->net_priv = NULL;
567                 }
568                 cmnd_put(cmd);
569         }
570 }
571
572 static void check_net_priv(struct iscsi_cmnd *cmd, struct page *page)
573 {
574         if (atomic_read(&cmd->net_ref_cnt) == 0) {
575                 TRACE_DBG("%s", "sendpage() not called get_page(), "
576                         "zeroing net_priv");
577                 page->net_priv = NULL;
578         }
579 }
580 #else
581 static inline void check_net_priv(struct iscsi_cmnd *cmd, struct page *page) {}
582 #endif
583
584 /* This is partially taken from the Ardis code. */
585 static int write_data(struct iscsi_conn *conn)
586 {
587         mm_segment_t oldfs;
588         struct file *file;
589         struct socket *sock;
590         ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
591         struct iscsi_cmnd *write_cmnd = conn->write_cmnd;
592         struct iscsi_cmnd *ref_cmd;
593         struct scatterlist *sg;
594         struct iovec *iop;
595         int saved_size, size, sendsize;
596         int offset, idx;
597         int flags, res, count;
598
599         iscsi_extracheck_is_wr_thread(conn);
600
601         if (write_cmnd->own_sg == 0)
602                 ref_cmd = write_cmnd->parent_req;
603         else
604                 ref_cmd = write_cmnd;
605
606         file = conn->file;
607         saved_size = size = conn->write_size;
608         iop = conn->write_iop;
609         count = conn->write_iop_used;
610
611         if (iop) while (1) {
612                 loff_t off = 0;
613                 int rest;
614
615                 sBUG_ON(count > sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
616 retry:
617                 oldfs = get_fs();
618                 set_fs(KERNEL_DS);
619                 res = vfs_writev(file, (struct iovec __user *)iop, count, &off);
620                 set_fs(oldfs);
621                 TRACE(TRACE_D_WRITE, "%#Lx:%u: %d(%ld)",
622                         (unsigned long long) conn->session->sid, conn->cid,
623                         res, (long) iop->iov_len);
624                 if (unlikely(res <= 0)) {
625                         if (res == -EAGAIN) {
626                                 conn->write_iop = iop;
627                                 conn->write_iop_used = count;
628                                 goto out_iov;
629                         } else if (res == -EINTR)
630                                 goto retry;
631                         goto err;
632                 }
633
634                 rest = res;
635                 size -= res;
636                 while (iop->iov_len <= rest && rest) {
637                         rest -= iop->iov_len;
638                         iop++;
639                         count--;
640                 }
641                 if (count == 0) {
642                         conn->write_iop = NULL;
643                         conn->write_iop_used = 0;
644                         if (size)
645                                 break;
646                         goto out_iov;
647                 }
648                 sBUG_ON(iop > conn->write_iov + 
649                         sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
650                 iop->iov_base += rest;
651                 iop->iov_len -= rest;
652         }
653
654         sg = write_cmnd->sg;
655         if (sg == NULL) {
656                 PRINT_ERROR("%s", "warning data missing!");
657                 return 0;
658         }
659         offset = conn->write_offset;
660         idx = offset >> PAGE_SHIFT;
661         offset &= ~PAGE_MASK;
662
663         sock = conn->sock;
664
665 #ifdef NET_PAGE_CALLBACKS_DEFINED
666         sendpage = sock->ops->sendpage;
667 #else
668         if ((write_cmnd->parent_req->scst_cmd != NULL) &&
669             scst_cmd_get_data_buff_alloced(write_cmnd->parent_req->scst_cmd))
670                 sendpage = sock_no_sendpage;
671         else
672                 sendpage = sock->ops->sendpage;
673 #endif
674
675         flags = MSG_DONTWAIT;
676
677         while (1) {
678 #ifdef NET_PAGE_CALLBACKS_DEFINED
679                 if (unlikely((sg[idx].page->net_priv != NULL) &&
680                                 (sg[idx].page->net_priv != ref_cmd))) {
681                         PRINT_ERROR("net_priv isn't NULL and != ref_cmd "
682                                 "(write_cmnd %p, ref_cmd %p, sg %p, idx %d, "
683                                 "net_priv %p)", write_cmnd, ref_cmd, sg, idx,
684                                 sg[idx].page->net_priv);
685                         sBUG();
686                 }
687                 sg[idx].page->net_priv = ref_cmd;
688 #endif
689                 sendsize = PAGE_SIZE - offset;
690                 if (size <= sendsize) {
691 retry2:
692                         res = sendpage(sock, sg[idx].page, offset, size, flags);
693                         TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
694                                 sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
695                                 (unsigned long long)conn->session->sid, conn->cid,
696                                 res, sg[idx].page->index, offset, size);
697                         if (unlikely(res <= 0)) {
698                                 if (res == -EINTR)
699                                         goto retry2;
700                                 else
701                                         goto out_res;
702                         }
703                         check_net_priv(ref_cmd, sg[idx].page);
704                         if (res == size) {
705                                 conn->write_size = 0;
706                                 return saved_size;
707                         }
708                         offset += res;
709                         size -= res;
710                         continue;
711                 }
712
713 retry1:
714                 res = sendpage(sock, sg[idx].page, offset, sendsize,
715                         flags | MSG_MORE);
716                 TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
717                         sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
718                         (unsigned long long ) conn->session->sid, conn->cid,
719                         res, sg[idx].page->index, offset, sendsize);
720                 if (unlikely(res <= 0)) {
721                         if (res == -EINTR)
722                                 goto retry1;
723                         else
724                                 goto out_res;
725                 }
726                 check_net_priv(ref_cmd, sg[idx].page);
727                 if (res == sendsize) {
728                         idx++;
729                         offset = 0;
730                 } else
731                         offset += res;
732                 size -= res;
733         }
734 out:
735         conn->write_offset = (idx << PAGE_SHIFT) + offset;
736 out_iov:
737         conn->write_size = size;
738         if ((saved_size == size) && res == -EAGAIN)
739                 return res;
740
741         return saved_size - size;
742
743 out_res:
744         check_net_priv(ref_cmd, sg[idx].page);
745         if (res == -EAGAIN)
746                 goto out;
747         /* else go through */
748
749 err:
750 #ifndef DEBUG
751         if (!conn->closing)
752 #endif
753         {
754                 PRINT_ERROR("error %d at sid:cid %#Lx:%u, cmnd %p", res,
755                         (unsigned long long)conn->session->sid, conn->cid,
756                         conn->write_cmnd);
757         }
758         return res;
759 }
760
761 static int exit_tx(struct iscsi_conn *conn, int res)
762 {
763         iscsi_extracheck_is_wr_thread(conn);
764
765         switch (res) {
766         case -EAGAIN:
767         case -ERESTARTSYS:
768                 res = 0;
769                 break;
770         default:
771 #ifndef DEBUG
772                 if (!conn->closing)
773 #endif
774                 {
775                         PRINT_ERROR("Sending data failed: initiator %s, "
776                                 "write_size %d, write_state %d, res %d",
777                                 conn->session->initiator_name, conn->write_size,
778                                 conn->write_state, res);
779                 }
780                 conn->write_state = TX_END;
781                 conn->write_size = 0;
782                 mark_conn_closed(conn);
783                 break;
784         }
785         return res;
786 }
787
788 static int tx_ddigest(struct iscsi_cmnd *cmnd, int state)
789 {
790         int res, rest = cmnd->conn->write_size;
791         struct msghdr msg = {.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT};
792         struct kvec iov;
793
794         iscsi_extracheck_is_wr_thread(cmnd->conn);
795
796         TRACE_DBG("Sending data digest %x (cmd %p)", cmnd->ddigest, cmnd);
797
798         iov.iov_base = (char *) (&cmnd->ddigest) + (sizeof(u32) - rest);
799         iov.iov_len = rest;
800
801         res = kernel_sendmsg(cmnd->conn->sock, &msg, &iov, 1, rest);
802         if (res > 0) {
803                 cmnd->conn->write_size -= res;
804                 if (!cmnd->conn->write_size)
805                         cmnd->conn->write_state = state;
806         } else
807                 res = exit_tx(cmnd->conn, res);
808
809         return res;
810 }
811
812 static void init_tx_hdigest(struct iscsi_cmnd *cmnd)
813 {
814         struct iscsi_conn *conn = cmnd->conn;
815         struct iovec *iop;
816
817         iscsi_extracheck_is_wr_thread(conn);
818
819         digest_tx_header(cmnd);
820
821         sBUG_ON(conn->write_iop_used >= sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
822         iop = &conn->write_iop[conn->write_iop_used];
823         conn->write_iop_used++;
824         iop->iov_base = &(cmnd->hdigest);
825         iop->iov_len = sizeof(u32);
826         conn->write_size += sizeof(u32);
827
828         return;
829 }
830
831 static int iscsi_do_send(struct iscsi_conn *conn, int state)
832 {
833         int res;
834
835         iscsi_extracheck_is_wr_thread(conn);
836
837         res = write_data(conn);
838         if (res > 0) {
839                 if (!conn->write_size)
840                         conn->write_state = state;
841         } else
842                 res = exit_tx(conn, res);
843
844         return res;
845 }
846
847 /* 
848  * No locks, conn is wr processing.
849  *
850  * IMPORTANT! Connection conn must be protected by additional conn_get()
851  * upon entrance in this function, because otherwise it could be destroyed
852  * inside as a result of cmnd release.
853  */
854 int iscsi_send(struct iscsi_conn *conn)
855 {
856         struct iscsi_cmnd *cmnd = conn->write_cmnd;
857         int ddigest, res = 0;
858
859         TRACE_ENTRY();
860
861         TRACE_DBG("conn %p, write_cmnd %p", conn, cmnd);
862
863         iscsi_extracheck_is_wr_thread(conn);
864
865         ddigest = conn->ddigest_type != DIGEST_NONE ? 1 : 0;
866
867         switch (conn->write_state) {
868         case TX_INIT:
869                 sBUG_ON(cmnd != NULL);
870                 cmnd = conn->write_cmnd = iscsi_get_send_cmnd(conn);
871                 if (!cmnd)
872                         goto out;
873                 cmnd_tx_start(cmnd);
874                 if (!(conn->hdigest_type & DIGEST_NONE))
875                     init_tx_hdigest(cmnd);
876                 conn->write_state = TX_BHS_DATA;
877         case TX_BHS_DATA:
878                 res = iscsi_do_send(conn, ddigest && cmnd->pdu.datasize ? 
879                                         TX_INIT_DDIGEST : TX_END);
880                 if (res <= 0 || conn->write_state != TX_INIT_DDIGEST)
881                         break;
882         case TX_INIT_DDIGEST:
883                 cmnd->conn->write_size = sizeof(u32);
884                 conn->write_state = TX_DDIGEST;
885         case TX_DDIGEST:
886                 res = tx_ddigest(cmnd, TX_END);
887                 break;
888         default:
889                 PRINT_ERROR("%d %d %x", res, conn->write_state,
890                         cmnd_opcode(cmnd));
891                 sBUG();
892         }
893
894         if (res == 0)
895                 goto out;
896
897         if (conn->write_state != TX_END)
898                 goto out;
899
900         if (conn->write_size) {
901                 PRINT_ERROR("%d %x %u", res, cmnd_opcode(cmnd),
902                         conn->write_size);
903                 sBUG();
904         }
905         cmnd_tx_end(cmnd);
906
907         rsp_cmnd_release(cmnd);
908
909         conn->write_cmnd = NULL;
910         conn->write_state = TX_INIT;
911
912 out:
913         TRACE_EXIT_RES(res);
914         return res;
915 }
916
917 /* No locks, conn is wr processing.
918  *
919  * IMPORTANT! Connection conn must be protected by additional conn_get()
920  * upon entrance in this function, because otherwise it could be destroyed
921  * inside as a result of iscsi_send(), which releases sent commands.
922  */
923 static int process_write_queue(struct iscsi_conn *conn)
924 {
925         int res = 0;
926
927         TRACE_ENTRY();
928
929         if (likely(test_write_ready(conn)))
930                 res = iscsi_send(conn);
931
932         TRACE_EXIT_RES(res);
933         return res;
934 }
935
936 /*
937  * Called under iscsi_wr_lock and BHs disabled, but will drop it inside,
938  * then reaquire.
939  */
940 static void scst_do_job_wr(void)
941 {
942         TRACE_ENTRY();
943
944         /* We delete/add to tail connections to maintain fairness between them */
945
946         while(!list_empty(&iscsi_wr_list)) {
947                 int rc;
948                 struct iscsi_conn *conn = list_entry(iscsi_wr_list.next,
949                         typeof(*conn), wr_list_entry);
950
951                 TRACE_DBG("conn %p, wr_state %x, wr_space_ready %d, "
952                         "write ready %d", conn, conn->wr_state,
953                         conn->wr_space_ready, test_write_ready(conn));
954
955                 list_del(&conn->wr_list_entry);
956
957                 sBUG_ON(conn->wr_state == ISCSI_CONN_WR_STATE_PROCESSING);
958
959                 conn->wr_state = ISCSI_CONN_WR_STATE_PROCESSING;
960                 conn->wr_space_ready = 0;
961 #ifdef EXTRACHECKS
962                 conn->wr_task = current;
963 #endif
964                 spin_unlock_bh(&iscsi_wr_lock);
965
966                 conn_get(conn);
967
968                 rc = process_write_queue(conn);
969
970                 spin_lock_bh(&iscsi_wr_lock);
971 #ifdef EXTRACHECKS
972                 conn->wr_task = NULL;
973 #endif
974                 if ((rc == -EAGAIN) && !conn->wr_space_ready) {
975                         conn->wr_state = ISCSI_CONN_WR_STATE_SPACE_WAIT;
976                         goto cont;
977                 }
978
979                 if (test_write_ready(conn)) {
980                         list_add_tail(&conn->wr_list_entry, &iscsi_wr_list);
981                         conn->wr_state = ISCSI_CONN_WR_STATE_IN_LIST;
982                 } else
983                         conn->wr_state = ISCSI_CONN_WR_STATE_IDLE;
984
985 cont:
986                 conn_put(conn);
987         }
988
989         TRACE_EXIT();
990         return;
991 }
992
993 static inline int test_wr_list(void)
994 {
995         int res = !list_empty(&iscsi_wr_list) ||
996                   unlikely(kthread_should_stop());
997         return res;
998 }
999
1000 int istwr(void *arg)
1001 {
1002         TRACE_ENTRY();
1003
1004         current->flags |= PF_NOFREEZE;
1005
1006         spin_lock_bh(&iscsi_wr_lock);
1007         while(!kthread_should_stop()) {
1008                 wait_queue_t wait;
1009                 init_waitqueue_entry(&wait, current);
1010
1011                 if (!test_wr_list()) {
1012                         add_wait_queue_exclusive(&iscsi_wr_waitQ, &wait);
1013                         for (;;) {
1014                                 set_current_state(TASK_INTERRUPTIBLE);
1015                                 if (test_wr_list())
1016                                         break;
1017                                 spin_unlock_bh(&iscsi_wr_lock);
1018                                 schedule();
1019                                 spin_lock_bh(&iscsi_wr_lock);
1020                         }
1021                         set_current_state(TASK_RUNNING);
1022                         remove_wait_queue(&iscsi_wr_waitQ, &wait);
1023                 }
1024                 scst_do_job_wr();
1025         }
1026         spin_unlock_bh(&iscsi_wr_lock);
1027
1028         /*
1029          * If kthread_should_stop() is true, we are guaranteed to be
1030          * on the module unload, so iscsi_wr_list must be empty.
1031          */
1032         sBUG_ON(!list_empty(&iscsi_wr_list));
1033
1034         TRACE_EXIT();
1035         return 0;
1036 }