b5bf50fa6f6189979ed5bb6c9da310c58e2e2c35
[mirror/scst/.git] / iscsi-scst / kernel / nthread.c
1 /*
2  *  Network threads.
3  *
4  *  Copyright (C) 2004 - 2005 FUJITA Tomonori <tomof@acm.org>
5  *  Copyright (C) 2007 Vladislav Bolkhovitin
6  *  Copyright (C) 2007 CMS Distribution Limited
7  * 
8  *  This program is free software; you can redistribute it and/or
9  *  modify it under the terms of the GNU General Public License
10  *  as published by the Free Software Foundation.
11  * 
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  *  GNU General Public License for more details.
16  */
17
18 #include <linux/sched.h>
19 #include <linux/file.h>
20 #include <linux/kthread.h>
21 #include <asm/ioctls.h>
22 #include <linux/delay.h>
23 #include <net/tcp.h>
24
25 #include "iscsi.h"
26 #include "digest.h"
27
28 enum rx_state {
29         RX_INIT_BHS, /* Must be zero. */
30         RX_BHS,
31
32         RX_INIT_AHS,
33         RX_AHS,
34
35         RX_INIT_HDIGEST,
36         RX_HDIGEST,
37         RX_CHECK_HDIGEST,
38
39         RX_INIT_DATA,
40         RX_DATA,
41
42         RX_INIT_DDIGEST,
43         RX_DDIGEST,
44         RX_CHECK_DDIGEST,
45
46         RX_END,
47 };
48
49 enum tx_state {
50         TX_INIT, /* Must be zero. */
51         TX_BHS_DATA,
52         TX_INIT_DDIGEST,
53         TX_DDIGEST,
54         TX_END,
55 };
56
57 #if defined(NET_PAGE_CALLBACKS_DEFINED)
58 static void iscsi_check_closewait(struct iscsi_conn *conn)
59 {
60         struct iscsi_cmnd *cmnd;
61
62         TRACE_ENTRY();
63
64         if ((conn->sock->sk->sk_state != TCP_CLOSE_WAIT) &&
65             (conn->sock->sk->sk_state != TCP_CLOSE)) {
66                 TRACE_CONN_CLOSE_DBG("sk_state %d, skipping",
67                         conn->sock->sk->sk_state);
68                 goto out;
69         }
70
71         /*
72          * No data are going to be sent, so all being sent buffers can be freed
73          * now. Strange that TCP doesn't do that itself.
74          */
75
76 again:
77         spin_lock_bh(&conn->cmd_list_lock);
78         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
79                 TRACE_CONN_CLOSE_DBG("cmd %p, scst_state %x, data_waiting %d, "
80                         "ref_cnt %d, parent_req %p, net_ref_cnt %d, sg %p",
81                         cmnd, cmnd->scst_state, cmnd->data_waiting,
82                         atomic_read(&cmnd->ref_cnt), cmnd->parent_req,
83                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
84                 sBUG_ON(cmnd->parent_req != NULL);
85                 if (cmnd->sg != NULL) {
86                         int sg_cnt, i, restart = 0;
87                         sg_cnt = get_pgcnt(cmnd->bufflen,
88                                 cmnd->sg[0].offset);
89                         cmnd_get(cmnd);
90                         for(i = 0; i < sg_cnt; i++) {
91                                 TRACE_CONN_CLOSE_DBG("page %p, net_priv %p, _count %d",
92                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
93                                         atomic_read(&cmnd->sg[i].page->_count));
94                                 if (cmnd->sg[i].page->net_priv != NULL) {
95                                         if (restart == 0) {
96                                                 spin_unlock_bh(&conn->cmd_list_lock);
97                                                 restart = 1;
98                                         }
99                                         while(cmnd->sg[i].page->net_priv != NULL)
100                                                 iscsi_put_page_callback(cmnd->sg[i].page);
101                                 }
102                         }
103                         cmnd_put(cmnd);
104                         if (restart)
105                                 goto again;
106                 }
107         }
108         spin_unlock_bh(&conn->cmd_list_lock);
109
110 out:
111         TRACE_EXIT();
112         return;
113 }
114 #else
115 static inline void iscsi_check_closewait(struct iscsi_conn *conn) {};
116 #endif
117
118 /* No locks */
119 static void close_conn(struct iscsi_conn *conn)
120 {
121         struct iscsi_session *session = conn->session;
122         struct iscsi_target *target = conn->target;
123 #ifdef DEBUG
124         unsigned long start_waiting = jiffies;
125 #endif
126
127         TRACE_ENTRY();
128
129         TRACE_CONN_CLOSE("Closing connection %p (conn_ref_cnt=%d)", conn,
130                 atomic_read(&conn->conn_ref_cnt));
131
132         iscsi_extracheck_is_rd_thread(conn);
133
134         /* We want all our already send operations to complete */
135         conn->sock->ops->shutdown(conn->sock, RCV_SHUTDOWN);
136
137         conn_abort(conn);
138
139         if (conn->read_state != RX_INIT_BHS) {
140                 req_cmnd_release_force(conn->read_cmnd, 0);
141                 conn->read_cmnd = NULL;
142                 conn->read_state = RX_INIT_BHS;
143         }
144
145         /* ToDo: not the best way to wait */
146         while(atomic_read(&conn->conn_ref_cnt) != 0) {
147                 struct iscsi_cmnd *cmnd;
148
149                 if (!list_empty(&session->pending_list)) {
150                         struct list_head *pending_list = &session->pending_list;
151                         struct iscsi_cmnd *tmp;
152
153                         TRACE_CONN_CLOSE("Disposing pending commands on "
154                                 "connection %p (conn_ref_cnt=%d)", conn,
155                                 atomic_read(&conn->conn_ref_cnt));
156  
157                         list_for_each_entry_safe(cmnd, tmp, pending_list,
158                                                 pending_list_entry) {
159                                 if (cmnd->conn == conn) {
160                                         TRACE_CONN_CLOSE("Freeing pending cmd %p",
161                                                 cmnd);
162                                         list_del(&cmnd->pending_list_entry);
163                                         cmnd->pending = 0;
164                                         req_cmnd_release_force(cmnd, 0);
165                                 }
166                         }
167                 }
168
169                 iscsi_make_conn_wr_active(conn);
170                 msleep(50);
171
172                 TRACE_CONN_CLOSE("conn %p, conn_ref_cnt %d left, wr_state %d",
173                         conn, atomic_read(&conn->conn_ref_cnt), conn->wr_state);
174 #ifdef DEBUG
175                 {
176 #ifdef NET_PAGE_CALLBACKS_DEFINED
177                         struct iscsi_cmnd *rsp;
178 #endif
179                         if (time_after(jiffies, start_waiting+10*HZ))
180                                 trace_flag |= TRACE_CONN_OC_DBG;
181
182                         spin_lock_bh(&conn->cmd_list_lock);
183                         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
184                                 TRACE_CONN_CLOSE_DBG("cmd %p, scst_state %x, data_waiting "
185                                         "%d, ref_cnt %d, parent_req %p", cmnd,
186                                         cmnd->scst_state, cmnd->data_waiting,
187                                         atomic_read(&cmnd->ref_cnt), cmnd->parent_req);
188 #ifdef NET_PAGE_CALLBACKS_DEFINED
189                                 TRACE_CONN_CLOSE_DBG("net_ref_cnt %d, sg %p",
190                                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
191                                 if (cmnd->sg != NULL) {
192                                         int sg_cnt, i;
193                                         sg_cnt = get_pgcnt(cmnd->bufflen,
194                                                 cmnd->sg[0].offset);
195                                         for(i = 0; i < sg_cnt; i++) {
196                                                 TRACE_CONN_CLOSE_DBG("page %p, net_priv %p, _count %d",
197                                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
198                                                         atomic_read(&cmnd->sg[i].page->_count));
199                                         }
200                                 }
201
202                                 sBUG_ON(cmnd->parent_req != NULL);
203                                 
204                                 spin_lock_bh(&cmnd->rsp_cmd_lock);
205                                 list_for_each_entry(rsp, &cmnd->rsp_cmd_list, rsp_cmd_list_entry) {
206                                         TRACE_CONN_CLOSE_DBG("  rsp %p, ref_cnt %d, net_ref_cnt %d, "
207                                                 "sg %p", rsp, atomic_read(&rsp->ref_cnt),
208                                                 atomic_read(&rsp->net_ref_cnt), rsp->sg);
209                                         if ((rsp->sg != cmnd->sg) && (rsp->sg != NULL)) {
210                                                 int sg_cnt, i;
211                                                 sg_cnt = get_pgcnt(rsp->bufflen,
212                                                         rsp->sg[0].offset);
213                                                 sBUG_ON(rsp->sg_cnt != sg_cnt);
214                                                 for(i = 0; i < sg_cnt; i++) {
215                                                         TRACE_CONN_CLOSE_DBG("    page %p, net_priv %p, "
216                                                                 "_count %d", rsp->sg[i].page,
217                                                                 rsp->sg[i].page->net_priv,
218                                                                 atomic_read(&rsp->sg[i].page->_count));
219                                                 }
220                                         }
221                                 }
222                                 spin_unlock_bh(&cmnd->rsp_cmd_lock);
223 #endif
224                         }
225                         spin_unlock_bh(&conn->cmd_list_lock);
226                 }
227 #endif
228                 iscsi_check_closewait(conn);
229         }
230
231         write_lock_bh(&conn->sock->sk->sk_callback_lock);
232         conn->sock->sk->sk_state_change = conn->old_state_change;
233         conn->sock->sk->sk_data_ready = conn->old_data_ready;
234         conn->sock->sk->sk_write_space = conn->old_write_space;
235         write_unlock_bh(&conn->sock->sk->sk_callback_lock);
236
237         while(conn->wr_state != ISCSI_CONN_WR_STATE_IDLE) {
238                 TRACE_CONN_CLOSE("Waiting for wr thread (conn %p), wr_state %x",
239                         conn, conn->wr_state);
240                 msleep(50);
241         }
242
243         TRACE_CONN_CLOSE("Notifying user space about closing connection %p", conn);
244         event_send(target->tid, session->sid, conn->cid, E_CONN_CLOSE, 0);
245
246         mutex_lock(&target->target_mutex);
247         conn_free(conn);
248         if (list_empty(&session->conn_list))
249                 session_del(target, session->sid);
250         mutex_unlock(&target->target_mutex);
251
252         TRACE_EXIT();
253         return;
254 }
255
256 static inline void iscsi_conn_init_read(struct iscsi_conn *conn, void *data, size_t len)
257 {
258         len = (len + 3) & -4; // XXX ???
259         conn->read_iov[0].iov_base = data;
260         conn->read_iov[0].iov_len = len;
261         conn->read_msg.msg_iov = conn->read_iov;
262         conn->read_msg.msg_iovlen = 1;
263         conn->read_size = (len + 3) & -4;
264 }
265
266 static void iscsi_conn_read_ahs(struct iscsi_conn *conn, struct iscsi_cmnd *cmnd)
267 {
268         /* ToDo: __GFP_NOFAIL ?? */
269         cmnd->pdu.ahs = kmalloc(cmnd->pdu.ahssize, __GFP_NOFAIL|GFP_KERNEL);
270         sBUG_ON(cmnd->pdu.ahs == NULL);
271         iscsi_conn_init_read(conn, cmnd->pdu.ahs, cmnd->pdu.ahssize);
272 }
273
274 static struct iscsi_cmnd *iscsi_get_send_cmnd(struct iscsi_conn *conn)
275 {
276         struct iscsi_cmnd *cmnd = NULL;
277
278         spin_lock(&conn->write_list_lock);
279         if (!list_empty(&conn->write_list)) {
280                 cmnd = list_entry(conn->write_list.next, struct iscsi_cmnd,
281                                 write_list_entry);
282                 cmd_del_from_write_list(cmnd);
283                 cmnd->write_processing_started = 1;
284         }
285         spin_unlock(&conn->write_list_lock);
286
287         return cmnd;
288 }
289
290 static int do_recv(struct iscsi_conn *conn, int state)
291 {
292         mm_segment_t oldfs;
293         struct msghdr msg;
294         int res, first_len;
295
296         if (unlikely(conn->closing)) {
297                 res = -EIO;
298                 goto out;
299         }
300
301         memset(&msg, 0, sizeof(msg));
302         msg.msg_iov = conn->read_msg.msg_iov;
303         msg.msg_iovlen = conn->read_msg.msg_iovlen;
304         first_len = msg.msg_iov->iov_len;
305
306         oldfs = get_fs();
307         set_fs(get_ds());
308         res = sock_recvmsg(conn->sock, &msg, conn->read_size, MSG_DONTWAIT | MSG_NOSIGNAL);
309         set_fs(oldfs);
310
311         if (res <= 0) {
312                 switch (res) {
313                 case -EAGAIN:
314                 case -ERESTARTSYS:
315                         TRACE_DBG("EAGAIN or ERESTARTSYS (%d) received for "
316                                 "conn %p", res, conn);
317                         break;
318                 default:
319                         PRINT_ERROR("sock_recvmsg() failed: %d", res);
320                         mark_conn_closed(conn);
321                         break;
322                 }
323         } else {
324                 /*
325                  * To save some considerable effort and CPU power we suppose
326                  * that TCP functions adjust conn->read_msg.msg_iov and
327                  * conn->read_msg.msg_iovlen on amount of copied data. This
328                  * BUG_ON is intended to catch if it is changed in the future.
329                  */
330                 sBUG_ON((res >= first_len) &&
331                         (conn->read_msg.msg_iov->iov_len != 0));
332                 conn->read_size -= res;
333                 if (conn->read_size) {
334                         if (res >= first_len) {
335                                 int done = 1 + ((res - first_len) >> PAGE_SHIFT);
336                                 conn->read_msg.msg_iov += done;
337                                 conn->read_msg.msg_iovlen -= done;
338                         }
339                 } else
340                         conn->read_state = state;
341         }
342
343 out:
344         TRACE_EXIT_RES(res);
345         return res;
346 }
347
348 static int rx_hdigest(struct iscsi_conn *conn)
349 {
350         struct iscsi_cmnd *cmnd = conn->read_cmnd;
351         int res = digest_rx_header(cmnd);
352
353         if (unlikely(res != 0)) {
354                 PRINT_ERROR("rx header digest for initiator %s failed "
355                         "(%d)", conn->session->initiator_name, res);
356                 mark_conn_closed(conn);
357         }
358         return res;
359 }
360
361 static struct iscsi_cmnd *create_cmnd(struct iscsi_conn *conn)
362 {
363         struct iscsi_cmnd *cmnd;
364
365         cmnd = cmnd_alloc(conn, NULL);
366         iscsi_conn_init_read(cmnd->conn, &cmnd->pdu.bhs, sizeof(cmnd->pdu.bhs));
367         conn->read_state = RX_BHS;
368
369         return cmnd;
370 }
371
372 /* Returns >0 for success, <=0 for error or successful finish */
373 static int recv(struct iscsi_conn *conn)
374 {
375         struct iscsi_cmnd *cmnd = conn->read_cmnd;
376         int hdigest, ddigest, res = 1, rc;
377
378         TRACE_ENTRY();
379
380         hdigest = conn->hdigest_type & DIGEST_NONE ? 0 : 1;
381         ddigest = conn->ddigest_type & DIGEST_NONE ? 0 : 1;
382
383         switch (conn->read_state) {
384         case RX_INIT_BHS:
385                 sBUG_ON(cmnd != NULL);
386                 cmnd = conn->read_cmnd = create_cmnd(conn);
387         case RX_BHS:
388                 res = do_recv(conn, RX_INIT_AHS);
389                 if (res <= 0 || conn->read_state != RX_INIT_AHS)
390                         break;
391         case RX_INIT_AHS:
392                 iscsi_cmnd_get_length(&cmnd->pdu);
393                 if (cmnd->pdu.ahssize) {
394                         iscsi_conn_read_ahs(conn, cmnd);
395                         conn->read_state = RX_AHS;
396                 } else
397                         conn->read_state = hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA;
398
399                 if (conn->read_state != RX_AHS)
400                         break;
401         case RX_AHS:
402                 res = do_recv(conn, hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA);
403                 if (res <= 0 || conn->read_state != RX_INIT_HDIGEST)
404                         break;
405         case RX_INIT_HDIGEST:
406                 iscsi_conn_init_read(conn, &cmnd->hdigest, sizeof(u32));
407                 conn->read_state = RX_HDIGEST;
408         case RX_HDIGEST:
409                 res = do_recv(conn, RX_CHECK_HDIGEST);
410                 if (res <= 0 || conn->read_state != RX_CHECK_HDIGEST)
411                         break;
412         case RX_CHECK_HDIGEST:
413                 rc = rx_hdigest(conn);
414                 if (likely(rc == 0))
415                         conn->read_state = RX_INIT_DATA;
416                 else {
417                         res = rc;
418                         break;
419                 }
420         case RX_INIT_DATA:
421                 rc = cmnd_rx_start(cmnd);
422                 if (unlikely(rc != 0)) {
423                         sBUG_ON(!conn->closing);
424                         conn->read_state = RX_END;
425                         res = rc;
426                         /* cmnd will be freed in close_conn() */
427                         goto out;
428                 }
429                 conn->read_state = cmnd->pdu.datasize ? RX_DATA : RX_END;
430                 if (conn->read_state != RX_DATA)
431                         break;
432         case RX_DATA:
433                 res = do_recv(conn, ddigest ? RX_INIT_DDIGEST : RX_END);
434                 if (res <= 0 || conn->read_state != RX_INIT_DDIGEST)
435                         break;
436         case RX_INIT_DDIGEST:
437                 iscsi_conn_init_read(conn, &cmnd->ddigest, sizeof(u32));
438                 conn->read_state = RX_DDIGEST;
439         case RX_DDIGEST:
440                 res = do_recv(conn, RX_CHECK_DDIGEST);
441                 if (res <= 0 || conn->read_state != RX_CHECK_DDIGEST)
442                         break;
443         case RX_CHECK_DDIGEST:
444                 conn->read_state = RX_END;
445                 if (cmnd_opcode(cmnd) == ISCSI_OP_SCSI_CMD) {
446                         TRACE_DBG("Adding RX ddigest cmd %p to digest list "
447                                 "of self", cmnd);
448                         list_add_tail(&cmnd->rx_ddigest_cmd_list_entry,
449                                 &cmnd->rx_ddigest_cmd_list);
450                         cmnd_get(cmnd);
451                         conn->read_state = RX_END;
452                 } else if (cmnd_opcode(cmnd) != ISCSI_OP_SCSI_DATA_OUT) {
453                         /*
454                          * We could get here only for NOP-Out. ISCSI RFC doesn't
455                          * specify how to deal with digest errors in this case.
456                          * Is closing connection correct?
457                          */
458                         TRACE_DBG("cmnd %p, opcode %x: checking RX "
459                                 "ddigest inline", cmnd, cmnd_opcode(cmnd));
460                         rc = digest_rx_data(cmnd);
461                         if (unlikely(rc != 0)) {
462                                 conn->read_state = RX_CHECK_DDIGEST;
463                                 mark_conn_closed(conn);
464                         }
465                 }
466                 break;
467         default:
468                 PRINT_ERROR("%d %x", conn->read_state, cmnd_opcode(cmnd));
469                 sBUG();
470         }
471
472         if (res <= 0)
473                 goto out;
474
475         if (conn->read_state != RX_END)
476                 goto out;
477
478         if (conn->read_size) {
479                 PRINT_ERROR("%d %x %d", res, cmnd_opcode(cmnd), conn->read_size);
480                 sBUG();
481         }
482
483         cmnd_rx_end(cmnd);
484
485         sBUG_ON(conn->read_size != 0);
486
487         conn->read_cmnd = NULL;
488         conn->read_state = RX_INIT_BHS;
489         res = 0;
490
491 out:
492         TRACE_EXIT_RES(res);
493         return res;
494 }
495
496 /* No locks, conn is rd processing */
497 static int process_read_io(struct iscsi_conn *conn, int *closed)
498 {
499         int res;
500
501         do {
502                 res = recv(conn);
503                 if (unlikely(conn->closing)) {
504                         close_conn(conn);
505                         *closed = 1;
506                         break;
507                 }
508         } while(res > 0);
509
510         TRACE_EXIT_RES(res);
511         return res;
512 }
513
514 /*
515  * Called under iscsi_rd_lock and BHs disabled, but will drop it inside,
516  * then reaquire.
517  */
518 static void scst_do_job_rd(void)
519 {
520         TRACE_ENTRY();
521
522         /* We delete/add to tail connections to maintain fairness between them */
523
524         while(!list_empty(&iscsi_rd_list)) {
525                 int rc, closed = 0;
526                 struct iscsi_conn *conn = list_entry(iscsi_rd_list.next,
527                         typeof(*conn), rd_list_entry);
528
529                 list_del(&conn->rd_list_entry);
530
531                 sBUG_ON(conn->rd_state == ISCSI_CONN_RD_STATE_PROCESSING);
532                 conn->rd_data_ready = 0;
533                 conn->rd_state = ISCSI_CONN_RD_STATE_PROCESSING;
534 #ifdef EXTRACHECKS
535                 conn->rd_task = current;
536 #endif
537                 spin_unlock_bh(&iscsi_rd_lock);
538
539                 rc = process_read_io(conn, &closed);
540
541                 spin_lock_bh(&iscsi_rd_lock);
542
543                 if (closed)
544                         continue;
545
546 #ifdef EXTRACHECKS
547                 conn->rd_task = NULL;
548 #endif
549                 if ((rc == 0) || conn->rd_data_ready) {
550                         list_add_tail(&conn->rd_list_entry, &iscsi_rd_list);
551                         conn->rd_state = ISCSI_CONN_RD_STATE_IN_LIST;
552                 } else
553                         conn->rd_state = ISCSI_CONN_RD_STATE_IDLE;
554         }
555
556         TRACE_EXIT();
557         return;
558 }
559
560 static inline int test_rd_list(void)
561 {
562         int res = !list_empty(&iscsi_rd_list) ||
563                   unlikely(kthread_should_stop());
564         return res;
565 }
566
567 int istrd(void *arg)
568 {
569         TRACE_ENTRY();
570
571         current->flags |= PF_NOFREEZE;
572
573         spin_lock_bh(&iscsi_rd_lock);
574         while(!kthread_should_stop()) {
575                 wait_queue_t wait;
576                 init_waitqueue_entry(&wait, current);
577
578                 if (!test_rd_list()) {
579                         add_wait_queue_exclusive(&iscsi_rd_waitQ, &wait);
580                         for (;;) {
581                                 set_current_state(TASK_INTERRUPTIBLE);
582                                 if (test_rd_list())
583                                         break;
584                                 spin_unlock_bh(&iscsi_rd_lock);
585                                 schedule();
586                                 spin_lock_bh(&iscsi_rd_lock);
587                         }
588                         set_current_state(TASK_RUNNING);
589                         remove_wait_queue(&iscsi_rd_waitQ, &wait);
590                 }
591                 scst_do_job_rd();
592         }
593         spin_unlock_bh(&iscsi_rd_lock);
594
595         /*
596          * If kthread_should_stop() is true, we are guaranteed to be
597          * on the module unload, so iscsi_rd_list must be empty.
598          */
599         sBUG_ON(!list_empty(&iscsi_rd_list));
600
601         TRACE_EXIT();
602         return 0;
603 }
604
605 #ifdef NET_PAGE_CALLBACKS_DEFINED
606 void iscsi_get_page_callback(struct page *page)
607 {
608         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
609         int v;
610
611         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
612                 cmd, page, atomic_read(&page->_count),
613                 atomic_read(&cmd->net_ref_cnt)+1);
614
615         v = atomic_inc_return(&cmd->net_ref_cnt);
616         if (v == 1) {
617                 TRACE_NET_PAGE("getting cmd %p for page %p", cmd, page);
618                 cmnd_get(cmd);
619         }
620 }
621
622 void iscsi_put_page_callback(struct page *page)
623 {
624         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
625
626         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
627                 cmd, page, atomic_read(&page->_count),
628                 atomic_read(&cmd->net_ref_cnt)-1);
629
630         if (atomic_dec_and_test(&cmd->net_ref_cnt)) {
631                 int i, sg_cnt = get_pgcnt(cmd->bufflen, cmd->sg[0].offset);
632                 for(i = 0; i < sg_cnt; i++) {
633                         TRACE_NET_PAGE("Clearing page %p", cmd->sg[i].page);
634                         cmd->sg[i].page->net_priv = NULL;
635                 }
636                 cmnd_put(cmd);
637         }
638 }
639
640 static void check_net_priv(struct iscsi_cmnd *cmd, struct page *page)
641 {
642         if (atomic_read(&cmd->net_ref_cnt) == 0) {
643                 TRACE_DBG("%s", "sendpage() not called get_page(), "
644                         "zeroing net_priv");
645                 page->net_priv = NULL;
646         }
647 }
648 #else
649 static inline void check_net_priv(struct iscsi_cmnd *cmd, struct page *page) {}
650 #endif
651
652 /* This is partially taken from the Ardis code. */
653 static int write_data(struct iscsi_conn *conn)
654 {
655         mm_segment_t oldfs;
656         struct file *file;
657         struct socket *sock;
658         ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
659         struct iscsi_cmnd *write_cmnd = conn->write_cmnd;
660         struct iscsi_cmnd *ref_cmd;
661         struct scatterlist *sg;
662         struct iovec *iop;
663         int saved_size, size, sendsize;
664         int offset, idx;
665         int flags, res, count;
666
667         iscsi_extracheck_is_wr_thread(conn);
668
669         if (write_cmnd->own_sg == 0)
670                 ref_cmd = write_cmnd->parent_req;
671         else
672                 ref_cmd = write_cmnd;
673
674         file = conn->file;
675         saved_size = size = conn->write_size;
676         iop = conn->write_iop;
677         count = conn->write_iop_used;
678
679         if (iop) while (1) {
680                 loff_t off = 0;
681                 int rest;
682
683                 sBUG_ON(count > sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
684 retry:
685                 oldfs = get_fs();
686                 set_fs(KERNEL_DS);
687                 res = vfs_writev(file, (struct iovec __user *)iop, count, &off);
688                 set_fs(oldfs);
689                 TRACE(TRACE_D_WRITE, "%#Lx:%u: %d(%ld)",
690                         (unsigned long long) conn->session->sid, conn->cid,
691                         res, (long) iop->iov_len);
692                 if (unlikely(res <= 0)) {
693                         if (res == -EAGAIN) {
694                                 conn->write_iop = iop;
695                                 conn->write_iop_used = count;
696                                 goto out_iov;
697                         } else if (res == -EINTR)
698                                 goto retry;
699                         goto err;
700                 }
701
702                 rest = res;
703                 size -= res;
704                 while (iop->iov_len <= rest && rest) {
705                         rest -= iop->iov_len;
706                         iop++;
707                         count--;
708                 }
709                 if (count == 0) {
710                         conn->write_iop = NULL;
711                         conn->write_iop_used = 0;
712                         if (size)
713                                 break;
714                         goto out_iov;
715                 }
716                 sBUG_ON(iop > conn->write_iov + 
717                         sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
718                 iop->iov_base += rest;
719                 iop->iov_len -= rest;
720         }
721
722         sg = write_cmnd->sg;
723         if (sg == NULL) {
724                 PRINT_ERROR("%s", "warning data missing!");
725                 return 0;
726         }
727         offset = conn->write_offset;
728         idx = offset >> PAGE_SHIFT;
729         offset &= ~PAGE_MASK;
730
731         sock = conn->sock;
732
733 #ifdef NET_PAGE_CALLBACKS_DEFINED
734         sendpage = sock->ops->sendpage;
735 #else
736         if ((write_cmnd->parent_req->scst_cmd != NULL) &&
737             scst_cmd_get_data_buff_alloced(write_cmnd->parent_req->scst_cmd))
738                 sendpage = sock_no_sendpage;
739         else
740                 sendpage = sock->ops->sendpage;
741 #endif
742
743         flags = MSG_DONTWAIT;
744
745         while (1) {
746 #ifdef NET_PAGE_CALLBACKS_DEFINED
747                 if (unlikely((sg[idx].page->net_priv != NULL) &&
748                                 (sg[idx].page->net_priv != ref_cmd))) {
749                         PRINT_ERROR("net_priv isn't NULL and != ref_cmd "
750                                 "(write_cmnd %p, ref_cmd %p, sg %p, idx %d, "
751                                 "net_priv %p)", write_cmnd, ref_cmd, sg, idx,
752                                 sg[idx].page->net_priv);
753                         sBUG();
754                 }
755                 sg[idx].page->net_priv = ref_cmd;
756 #endif
757                 sendsize = PAGE_SIZE - offset;
758                 if (size <= sendsize) {
759 retry2:
760                         res = sendpage(sock, sg[idx].page, offset, size, flags);
761                         TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
762                                 sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
763                                 (unsigned long long)conn->session->sid, conn->cid,
764                                 res, sg[idx].page->index, offset, size);
765                         if (unlikely(res <= 0)) {
766                                 if (res == -EINTR)
767                                         goto retry2;
768                                 else
769                                         goto out_res;
770                         }
771                         check_net_priv(ref_cmd, sg[idx].page);
772                         if (res == size) {
773                                 conn->write_size = 0;
774                                 return saved_size;
775                         }
776                         offset += res;
777                         size -= res;
778                         continue;
779                 }
780
781 retry1:
782                 res = sendpage(sock, sg[idx].page, offset, sendsize,
783                         flags | MSG_MORE);
784                 TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
785                         sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
786                         (unsigned long long ) conn->session->sid, conn->cid,
787                         res, sg[idx].page->index, offset, sendsize);
788                 if (unlikely(res <= 0)) {
789                         if (res == -EINTR)
790                                 goto retry1;
791                         else
792                                 goto out_res;
793                 }
794                 check_net_priv(ref_cmd, sg[idx].page);
795                 if (res == sendsize) {
796                         idx++;
797                         offset = 0;
798                 } else
799                         offset += res;
800                 size -= res;
801         }
802 out:
803         conn->write_offset = (idx << PAGE_SHIFT) + offset;
804 out_iov:
805         conn->write_size = size;
806         if ((saved_size == size) && res == -EAGAIN)
807                 return res;
808
809         return saved_size - size;
810
811 out_res:
812         check_net_priv(ref_cmd, sg[idx].page);
813         if (res == -EAGAIN)
814                 goto out;
815         /* else go through */
816
817 err:
818 #ifndef DEBUG
819         if (!conn->closing)
820 #endif
821         {
822                 PRINT_ERROR("error %d at sid:cid %#Lx:%u, cmnd %p", res,
823                         (unsigned long long)conn->session->sid, conn->cid,
824                         conn->write_cmnd);
825         }
826         if (ref_cmd->scst_cmd != NULL)
827                 scst_set_delivery_status(ref_cmd->scst_cmd,
828                         SCST_CMD_DELIVERY_FAILED);
829         return res;
830 }
831
832 static int exit_tx(struct iscsi_conn *conn, int res)
833 {
834         iscsi_extracheck_is_wr_thread(conn);
835
836         switch (res) {
837         case -EAGAIN:
838         case -ERESTARTSYS:
839                 res = 0;
840                 break;
841         default:
842 #ifndef DEBUG
843                 if (!conn->closing)
844 #endif
845                 {
846                         PRINT_ERROR("Sending data failed: initiator %s, "
847                                 "write_size %d, write_state %d, res %d",
848                                 conn->session->initiator_name, conn->write_size,
849                                 conn->write_state, res);
850                 }
851                 conn->write_state = TX_END;
852                 conn->write_size = 0;
853                 mark_conn_closed(conn);
854                 break;
855         }
856         return res;
857 }
858
859 static int tx_ddigest(struct iscsi_cmnd *cmnd, int state)
860 {
861         int res, rest = cmnd->conn->write_size;
862         struct msghdr msg = {.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT};
863         struct kvec iov;
864
865         iscsi_extracheck_is_wr_thread(cmnd->conn);
866
867         TRACE_DBG("Sending data digest %x (cmd %p)", cmnd->ddigest, cmnd);
868
869         iov.iov_base = (char *) (&cmnd->ddigest) + (sizeof(u32) - rest);
870         iov.iov_len = rest;
871
872         res = kernel_sendmsg(cmnd->conn->sock, &msg, &iov, 1, rest);
873         if (res > 0) {
874                 cmnd->conn->write_size -= res;
875                 if (!cmnd->conn->write_size)
876                         cmnd->conn->write_state = state;
877         } else
878                 res = exit_tx(cmnd->conn, res);
879
880         return res;
881 }
882
883 static void init_tx_hdigest(struct iscsi_cmnd *cmnd)
884 {
885         struct iscsi_conn *conn = cmnd->conn;
886         struct iovec *iop;
887
888         iscsi_extracheck_is_wr_thread(conn);
889
890         digest_tx_header(cmnd);
891
892         sBUG_ON(conn->write_iop_used >= sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
893         iop = &conn->write_iop[conn->write_iop_used];
894         conn->write_iop_used++;
895         iop->iov_base = &(cmnd->hdigest);
896         iop->iov_len = sizeof(u32);
897         conn->write_size += sizeof(u32);
898
899         return;
900 }
901
902 static int iscsi_do_send(struct iscsi_conn *conn, int state)
903 {
904         int res;
905
906         iscsi_extracheck_is_wr_thread(conn);
907
908         res = write_data(conn);
909         if (res > 0) {
910                 if (!conn->write_size)
911                         conn->write_state = state;
912         } else
913                 res = exit_tx(conn, res);
914
915         return res;
916 }
917
918 /* 
919  * No locks, conn is wr processing.
920  *
921  * IMPORTANT! Connection conn must be protected by additional conn_get()
922  * upon entrance in this function, because otherwise it could be destroyed
923  * inside as a result of cmnd release.
924  */
925 int iscsi_send(struct iscsi_conn *conn)
926 {
927         struct iscsi_cmnd *cmnd = conn->write_cmnd;
928         int ddigest, res = 0;
929
930         TRACE_ENTRY();
931
932         TRACE_DBG("conn %p, write_cmnd %p", conn, cmnd);
933
934         iscsi_extracheck_is_wr_thread(conn);
935
936         ddigest = conn->ddigest_type != DIGEST_NONE ? 1 : 0;
937
938         switch (conn->write_state) {
939         case TX_INIT:
940                 sBUG_ON(cmnd != NULL);
941                 cmnd = conn->write_cmnd = iscsi_get_send_cmnd(conn);
942                 if (!cmnd)
943                         goto out;
944                 cmnd_tx_start(cmnd);
945                 if (!(conn->hdigest_type & DIGEST_NONE))
946                     init_tx_hdigest(cmnd);
947                 conn->write_state = TX_BHS_DATA;
948         case TX_BHS_DATA:
949                 res = iscsi_do_send(conn, ddigest && cmnd->pdu.datasize ? 
950                                         TX_INIT_DDIGEST : TX_END);
951                 if (res <= 0 || conn->write_state != TX_INIT_DDIGEST)
952                         break;
953         case TX_INIT_DDIGEST:
954                 cmnd->conn->write_size = sizeof(u32);
955                 conn->write_state = TX_DDIGEST;
956         case TX_DDIGEST:
957                 res = tx_ddigest(cmnd, TX_END);
958                 break;
959         default:
960                 PRINT_ERROR("%d %d %x", res, conn->write_state,
961                         cmnd_opcode(cmnd));
962                 sBUG();
963         }
964
965         if (res == 0)
966                 goto out;
967
968         if (conn->write_state != TX_END)
969                 goto out;
970
971         if (conn->write_size) {
972                 PRINT_ERROR("%d %x %u", res, cmnd_opcode(cmnd),
973                         conn->write_size);
974                 sBUG();
975         }
976         cmnd_tx_end(cmnd);
977
978         rsp_cmnd_release(cmnd);
979
980         conn->write_cmnd = NULL;
981         conn->write_state = TX_INIT;
982
983 out:
984         TRACE_EXIT_RES(res);
985         return res;
986 }
987
988 /* No locks, conn is wr processing.
989  *
990  * IMPORTANT! Connection conn must be protected by additional conn_get()
991  * upon entrance in this function, because otherwise it could be destroyed
992  * inside as a result of iscsi_send(), which releases sent commands.
993  */
994 static int process_write_queue(struct iscsi_conn *conn)
995 {
996         int res = 0;
997
998         TRACE_ENTRY();
999
1000         if (likely(test_write_ready(conn)))
1001                 res = iscsi_send(conn);
1002
1003         TRACE_EXIT_RES(res);
1004         return res;
1005 }
1006
1007 /*
1008  * Called under iscsi_wr_lock and BHs disabled, but will drop it inside,
1009  * then reaquire.
1010  */
1011 static void scst_do_job_wr(void)
1012 {
1013         TRACE_ENTRY();
1014
1015         /* We delete/add to tail connections to maintain fairness between them */
1016
1017         while(!list_empty(&iscsi_wr_list)) {
1018                 int rc;
1019                 struct iscsi_conn *conn = list_entry(iscsi_wr_list.next,
1020                         typeof(*conn), wr_list_entry);
1021
1022                 TRACE_DBG("conn %p, wr_state %x, wr_space_ready %d, "
1023                         "write ready %d", conn, conn->wr_state,
1024                         conn->wr_space_ready, test_write_ready(conn));
1025
1026                 list_del(&conn->wr_list_entry);
1027
1028                 sBUG_ON(conn->wr_state == ISCSI_CONN_WR_STATE_PROCESSING);
1029
1030                 conn->wr_state = ISCSI_CONN_WR_STATE_PROCESSING;
1031                 conn->wr_space_ready = 0;
1032 #ifdef EXTRACHECKS
1033                 conn->wr_task = current;
1034 #endif
1035                 spin_unlock_bh(&iscsi_wr_lock);
1036
1037                 conn_get(conn);
1038
1039                 rc = process_write_queue(conn);
1040
1041                 spin_lock_bh(&iscsi_wr_lock);
1042 #ifdef EXTRACHECKS
1043                 conn->wr_task = NULL;
1044 #endif
1045                 if ((rc == -EAGAIN) && !conn->wr_space_ready) {
1046                         conn->wr_state = ISCSI_CONN_WR_STATE_SPACE_WAIT;
1047                         goto cont;
1048                 }
1049
1050                 if (test_write_ready(conn)) {
1051                         list_add_tail(&conn->wr_list_entry, &iscsi_wr_list);
1052                         conn->wr_state = ISCSI_CONN_WR_STATE_IN_LIST;
1053                 } else
1054                         conn->wr_state = ISCSI_CONN_WR_STATE_IDLE;
1055
1056 cont:
1057                 conn_put(conn);
1058         }
1059
1060         TRACE_EXIT();
1061         return;
1062 }
1063
1064 static inline int test_wr_list(void)
1065 {
1066         int res = !list_empty(&iscsi_wr_list) ||
1067                   unlikely(kthread_should_stop());
1068         return res;
1069 }
1070
1071 int istwr(void *arg)
1072 {
1073         TRACE_ENTRY();
1074
1075         current->flags |= PF_NOFREEZE;
1076
1077         spin_lock_bh(&iscsi_wr_lock);
1078         while(!kthread_should_stop()) {
1079                 wait_queue_t wait;
1080                 init_waitqueue_entry(&wait, current);
1081
1082                 if (!test_wr_list()) {
1083                         add_wait_queue_exclusive(&iscsi_wr_waitQ, &wait);
1084                         for (;;) {
1085                                 set_current_state(TASK_INTERRUPTIBLE);
1086                                 if (test_wr_list())
1087                                         break;
1088                                 spin_unlock_bh(&iscsi_wr_lock);
1089                                 schedule();
1090                                 spin_lock_bh(&iscsi_wr_lock);
1091                         }
1092                         set_current_state(TASK_RUNNING);
1093                         remove_wait_queue(&iscsi_wr_waitQ, &wait);
1094                 }
1095                 scst_do_job_wr();
1096         }
1097         spin_unlock_bh(&iscsi_wr_lock);
1098
1099         /*
1100          * If kthread_should_stop() is true, we are guaranteed to be
1101          * on the module unload, so iscsi_wr_list must be empty.
1102          */
1103         sBUG_ON(!list_empty(&iscsi_wr_list));
1104
1105         TRACE_EXIT();
1106         return 0;
1107 }