- Update to the latest IET r145
[mirror/scst/.git] / iscsi-scst / kernel / nthread.c
1 /*
2  *  Network threads.
3  *
4  *  Copyright (C) 2004 - 2005 FUJITA Tomonori <tomof@acm.org>
5  *  Copyright (C) 2007 Vladislav Bolkhovitin
6  *  Copyright (C) 2007 CMS Distribution Limited
7  * 
8  *  This program is free software; you can redistribute it and/or
9  *  modify it under the terms of the GNU General Public License
10  *  as published by the Free Software Foundation.
11  * 
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15  *  GNU General Public License for more details.
16  */
17
18 #include <linux/sched.h>
19 #include <linux/file.h>
20 #include <linux/kthread.h>
21 #include <asm/ioctls.h>
22 #include <linux/delay.h>
23 #include <net/tcp.h>
24
25 #include "iscsi.h"
26 #include "digest.h"
27
28 enum rx_state {
29         RX_INIT_BHS, /* Must be zero. */
30         RX_BHS,
31
32         RX_INIT_AHS,
33         RX_AHS,
34
35         RX_INIT_HDIGEST,
36         RX_HDIGEST,
37         RX_CHECK_HDIGEST,
38
39         RX_INIT_DATA,
40         RX_DATA,
41
42         RX_INIT_DDIGEST,
43         RX_DDIGEST,
44         RX_CHECK_DDIGEST,
45
46         RX_END,
47 };
48
49 enum tx_state {
50         TX_INIT, /* Must be zero. */
51         TX_BHS_DATA,
52         TX_INIT_DDIGEST,
53         TX_DDIGEST,
54         TX_END,
55 };
56
57 #if defined(NET_PAGE_CALLBACKS_DEFINED)
58 static void iscsi_check_closewait(struct iscsi_conn *conn)
59 {
60         struct iscsi_cmnd *cmnd;
61
62         TRACE_ENTRY();
63
64         if ((conn->sock->sk->sk_state != TCP_CLOSE_WAIT) &&
65             (conn->sock->sk->sk_state != TCP_CLOSE)) {
66                 TRACE_CONN_CLOSE_DBG("sk_state %d, skipping",
67                         conn->sock->sk->sk_state);
68                 goto out;
69         }
70
71         /*
72          * No data are going to be sent, so all being sent buffers can be freed
73          * now. Strange that TCP doesn't do that itself.
74          */
75
76 again:
77         spin_lock_bh(&conn->cmd_list_lock);
78         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
79                 TRACE_CONN_CLOSE_DBG("cmd %p, scst_state %x, data_waiting %d, "
80                         "ref_cnt %d, parent_req %p, net_ref_cnt %d, sg %p",
81                         cmnd, cmnd->scst_state, cmnd->data_waiting,
82                         atomic_read(&cmnd->ref_cnt), cmnd->parent_req,
83                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
84                 sBUG_ON(cmnd->parent_req != NULL);
85                 if (cmnd->sg != NULL) {
86                         int sg_cnt, i, restart = 0;
87                         sg_cnt = get_pgcnt(cmnd->bufflen,
88                                 cmnd->sg[0].offset);
89                         cmnd_get(cmnd);
90                         for(i = 0; i < sg_cnt; i++) {
91                                 TRACE_CONN_CLOSE_DBG("page %p, net_priv %p, _count %d",
92                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
93                                         atomic_read(&cmnd->sg[i].page->_count));
94                                 if (cmnd->sg[i].page->net_priv != NULL) {
95                                         if (restart == 0) {
96                                                 spin_unlock_bh(&conn->cmd_list_lock);
97                                                 restart = 1;
98                                         }
99                                         while(cmnd->sg[i].page->net_priv != NULL)
100                                                 iscsi_put_page_callback(cmnd->sg[i].page);
101                                 }
102                         }
103                         cmnd_put(cmnd);
104                         if (restart)
105                                 goto again;
106                 }
107         }
108         spin_unlock_bh(&conn->cmd_list_lock);
109
110 out:
111         TRACE_EXIT();
112         return;
113 }
114 #else
115 static inline void iscsi_check_closewait(struct iscsi_conn *conn) {};
116 #endif
117
118 /* No locks */
119 static void close_conn(struct iscsi_conn *conn)
120 {
121         struct iscsi_session *session = conn->session;
122         struct iscsi_target *target = conn->target;
123 #ifdef DEBUG
124         unsigned long start_waiting = jiffies;
125 #endif
126
127         TRACE_ENTRY();
128
129         TRACE_CONN_CLOSE("Closing connection %p (conn_ref_cnt=%d)", conn,
130                 atomic_read(&conn->conn_ref_cnt));
131
132         iscsi_extracheck_is_rd_thread(conn);
133
134         /* We want all our already send operations to complete */
135         conn->sock->ops->shutdown(conn->sock, RCV_SHUTDOWN);
136
137         conn_abort(conn);
138
139         mutex_lock(&target->target_mutex);
140         spin_lock(&session->sn_lock);
141         if ((session->tm_rsp != NULL) && (session->tm_rsp->conn == conn)) {
142                 struct iscsi_cmnd *tm_rsp = session->tm_rsp;
143                 TRACE(TRACE_MGMT_MINOR, "Dropping delayed TM rsp %p", tm_rsp);
144                 session->tm_rsp = NULL;
145                 session->tm_active = 0;
146                 spin_unlock(&session->sn_lock);
147                 mutex_unlock(&target->target_mutex);
148
149                 rsp_cmnd_release(tm_rsp);
150         } else {
151                 spin_unlock(&session->sn_lock);
152                 mutex_unlock(&target->target_mutex);
153         }
154
155         if (conn->read_state != RX_INIT_BHS) {
156                 req_cmnd_release_force(conn->read_cmnd, 0);
157                 conn->read_cmnd = NULL;
158                 conn->read_state = RX_INIT_BHS;
159         }
160
161         /* ToDo: not the best way to wait */
162         while(atomic_read(&conn->conn_ref_cnt) != 0) {
163                 struct iscsi_cmnd *cmnd;
164
165                 if (!list_empty(&session->pending_list)) {
166                         struct list_head *pending_list = &session->pending_list;
167                         struct iscsi_cmnd *tmp;
168
169                         TRACE_CONN_CLOSE("Disposing pending commands on "
170                                 "connection %p (conn_ref_cnt=%d)", conn,
171                                 atomic_read(&conn->conn_ref_cnt));
172  
173                         list_for_each_entry_safe(cmnd, tmp, pending_list,
174                                                 pending_list_entry) {
175                                 if (cmnd->conn == conn) {
176                                         TRACE_CONN_CLOSE("Freeing pending cmd %p",
177                                                 cmnd);
178                                         list_del(&cmnd->pending_list_entry);
179                                         cmnd->pending = 0;
180                                         req_cmnd_release_force(cmnd, 0);
181                                 }
182                         }
183                 }
184
185                 iscsi_make_conn_wr_active(conn);
186                 msleep(50);
187
188                 TRACE_CONN_CLOSE("conn %p, conn_ref_cnt %d left, wr_state %d",
189                         conn, atomic_read(&conn->conn_ref_cnt), conn->wr_state);
190 #ifdef DEBUG
191                 {
192 #ifdef NET_PAGE_CALLBACKS_DEFINED
193                         struct iscsi_cmnd *rsp;
194 #endif
195                         if (time_after(jiffies, start_waiting+10*HZ))
196                                 trace_flag |= TRACE_CONN_OC_DBG;
197
198                         spin_lock_bh(&conn->cmd_list_lock);
199                         list_for_each_entry(cmnd, &conn->cmd_list, cmd_list_entry) {
200                                 TRACE_CONN_CLOSE_DBG("cmd %p, scst_state %x, scst_cmd "
201                                         "state %d, data_waiting %d, ref_cnt %d, "
202                                         "parent_req %p", cmnd, cmnd->scst_state,
203                                         (cmnd->scst_cmd != NULL) ? cmnd->scst_cmd->state : -1,
204                                         cmnd->data_waiting, atomic_read(&cmnd->ref_cnt),
205                                         cmnd->parent_req);
206 #ifdef NET_PAGE_CALLBACKS_DEFINED
207                                 TRACE_CONN_CLOSE_DBG("net_ref_cnt %d, sg %p",
208                                         atomic_read(&cmnd->net_ref_cnt), cmnd->sg);
209                                 if (cmnd->sg != NULL) {
210                                         int sg_cnt, i;
211                                         sg_cnt = get_pgcnt(cmnd->bufflen,
212                                                 cmnd->sg[0].offset);
213                                         for(i = 0; i < sg_cnt; i++) {
214                                                 TRACE_CONN_CLOSE_DBG("page %p, net_priv %p, _count %d",
215                                                         cmnd->sg[i].page, cmnd->sg[i].page->net_priv,
216                                                         atomic_read(&cmnd->sg[i].page->_count));
217                                         }
218                                 }
219
220                                 sBUG_ON(cmnd->parent_req != NULL);
221                                 
222                                 spin_lock_bh(&cmnd->rsp_cmd_lock);
223                                 list_for_each_entry(rsp, &cmnd->rsp_cmd_list, rsp_cmd_list_entry) {
224                                         TRACE_CONN_CLOSE_DBG("  rsp %p, ref_cnt %d, net_ref_cnt %d, "
225                                                 "sg %p", rsp, atomic_read(&rsp->ref_cnt),
226                                                 atomic_read(&rsp->net_ref_cnt), rsp->sg);
227                                         if ((rsp->sg != cmnd->sg) && (rsp->sg != NULL)) {
228                                                 int sg_cnt, i;
229                                                 sg_cnt = get_pgcnt(rsp->bufflen,
230                                                         rsp->sg[0].offset);
231                                                 sBUG_ON(rsp->sg_cnt != sg_cnt);
232                                                 for(i = 0; i < sg_cnt; i++) {
233                                                         TRACE_CONN_CLOSE_DBG("    page %p, net_priv %p, "
234                                                                 "_count %d", rsp->sg[i].page,
235                                                                 rsp->sg[i].page->net_priv,
236                                                                 atomic_read(&rsp->sg[i].page->_count));
237                                                 }
238                                         }
239                                 }
240                                 spin_unlock_bh(&cmnd->rsp_cmd_lock);
241 #endif
242                         }
243                         spin_unlock_bh(&conn->cmd_list_lock);
244                 }
245 #endif
246                 iscsi_check_closewait(conn);
247         }
248
249         write_lock_bh(&conn->sock->sk->sk_callback_lock);
250         conn->sock->sk->sk_state_change = conn->old_state_change;
251         conn->sock->sk->sk_data_ready = conn->old_data_ready;
252         conn->sock->sk->sk_write_space = conn->old_write_space;
253         write_unlock_bh(&conn->sock->sk->sk_callback_lock);
254
255         while(conn->wr_state != ISCSI_CONN_WR_STATE_IDLE) {
256                 TRACE_CONN_CLOSE("Waiting for wr thread (conn %p), wr_state %x",
257                         conn, conn->wr_state);
258                 msleep(50);
259         }
260
261         TRACE_CONN_CLOSE("Notifying user space about closing connection %p", conn);
262         event_send(target->tid, session->sid, conn->cid, E_CONN_CLOSE, 0);
263
264         mutex_lock(&target->target_mutex);
265         conn_free(conn);
266         if (list_empty(&session->conn_list))
267                 session_del(target, session->sid);
268         mutex_unlock(&target->target_mutex);
269
270         TRACE_EXIT();
271         return;
272 }
273
274 static inline void iscsi_conn_init_read(struct iscsi_conn *conn, void *data, size_t len)
275 {
276         len = (len + 3) & -4; // XXX ???
277         conn->read_iov[0].iov_base = data;
278         conn->read_iov[0].iov_len = len;
279         conn->read_msg.msg_iov = conn->read_iov;
280         conn->read_msg.msg_iovlen = 1;
281         conn->read_size = (len + 3) & -4;
282 }
283
284 static void iscsi_conn_read_ahs(struct iscsi_conn *conn, struct iscsi_cmnd *cmnd)
285 {
286         /* ToDo: __GFP_NOFAIL ?? */
287         cmnd->pdu.ahs = kmalloc(cmnd->pdu.ahssize, __GFP_NOFAIL|GFP_KERNEL);
288         sBUG_ON(cmnd->pdu.ahs == NULL);
289         iscsi_conn_init_read(conn, cmnd->pdu.ahs, cmnd->pdu.ahssize);
290 }
291
292 static struct iscsi_cmnd *iscsi_get_send_cmnd(struct iscsi_conn *conn)
293 {
294         struct iscsi_cmnd *cmnd = NULL;
295
296         spin_lock(&conn->write_list_lock);
297         if (!list_empty(&conn->write_list)) {
298                 cmnd = list_entry(conn->write_list.next, struct iscsi_cmnd,
299                                 write_list_entry);
300                 cmd_del_from_write_list(cmnd);
301                 cmnd->write_processing_started = 1;
302         }
303         spin_unlock(&conn->write_list_lock);
304
305         return cmnd;
306 }
307
308 static int do_recv(struct iscsi_conn *conn, int state)
309 {
310         mm_segment_t oldfs;
311         struct msghdr msg;
312         int res, first_len;
313
314         if (unlikely(conn->closing)) {
315                 res = -EIO;
316                 goto out;
317         }
318
319         memset(&msg, 0, sizeof(msg));
320         msg.msg_iov = conn->read_msg.msg_iov;
321         msg.msg_iovlen = conn->read_msg.msg_iovlen;
322         first_len = msg.msg_iov->iov_len;
323
324         oldfs = get_fs();
325         set_fs(get_ds());
326         res = sock_recvmsg(conn->sock, &msg, conn->read_size, MSG_DONTWAIT | MSG_NOSIGNAL);
327         set_fs(oldfs);
328
329         if (res <= 0) {
330                 switch (res) {
331                 case -EAGAIN:
332                 case -ERESTARTSYS:
333                         TRACE_DBG("EAGAIN or ERESTARTSYS (%d) received for "
334                                 "conn %p", res, conn);
335                         break;
336                 default:
337                         PRINT_ERROR("sock_recvmsg() failed: %d", res);
338                         mark_conn_closed(conn);
339                         break;
340                 }
341         } else {
342                 /*
343                  * To save some considerable effort and CPU power we suppose
344                  * that TCP functions adjust conn->read_msg.msg_iov and
345                  * conn->read_msg.msg_iovlen on amount of copied data. This
346                  * BUG_ON is intended to catch if it is changed in the future.
347                  */
348                 sBUG_ON((res >= first_len) &&
349                         (conn->read_msg.msg_iov->iov_len != 0));
350                 conn->read_size -= res;
351                 if (conn->read_size) {
352                         if (res >= first_len) {
353                                 int done = 1 + ((res - first_len) >> PAGE_SHIFT);
354                                 conn->read_msg.msg_iov += done;
355                                 conn->read_msg.msg_iovlen -= done;
356                         }
357                 } else
358                         conn->read_state = state;
359         }
360
361 out:
362         TRACE_EXIT_RES(res);
363         return res;
364 }
365
366 static int rx_hdigest(struct iscsi_conn *conn)
367 {
368         struct iscsi_cmnd *cmnd = conn->read_cmnd;
369         int res = digest_rx_header(cmnd);
370
371         if (unlikely(res != 0)) {
372                 PRINT_ERROR("rx header digest for initiator %s failed "
373                         "(%d)", conn->session->initiator_name, res);
374                 mark_conn_closed(conn);
375         }
376         return res;
377 }
378
379 static struct iscsi_cmnd *create_cmnd(struct iscsi_conn *conn)
380 {
381         struct iscsi_cmnd *cmnd;
382
383         cmnd = cmnd_alloc(conn, NULL);
384         iscsi_conn_init_read(cmnd->conn, &cmnd->pdu.bhs, sizeof(cmnd->pdu.bhs));
385         conn->read_state = RX_BHS;
386
387         return cmnd;
388 }
389
390 /* Returns >0 for success, <=0 for error or successful finish */
391 static int recv(struct iscsi_conn *conn)
392 {
393         struct iscsi_cmnd *cmnd = conn->read_cmnd;
394         int hdigest, ddigest, res = 1, rc;
395
396         TRACE_ENTRY();
397
398         hdigest = conn->hdigest_type & DIGEST_NONE ? 0 : 1;
399         ddigest = conn->ddigest_type & DIGEST_NONE ? 0 : 1;
400
401         switch (conn->read_state) {
402         case RX_INIT_BHS:
403                 sBUG_ON(cmnd != NULL);
404                 cmnd = conn->read_cmnd = create_cmnd(conn);
405         case RX_BHS:
406                 res = do_recv(conn, RX_INIT_AHS);
407                 if (res <= 0 || conn->read_state != RX_INIT_AHS)
408                         break;
409         case RX_INIT_AHS:
410                 iscsi_cmnd_get_length(&cmnd->pdu);
411                 if (cmnd->pdu.ahssize) {
412                         iscsi_conn_read_ahs(conn, cmnd);
413                         conn->read_state = RX_AHS;
414                 } else
415                         conn->read_state = hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA;
416
417                 if (conn->read_state != RX_AHS)
418                         break;
419         case RX_AHS:
420                 res = do_recv(conn, hdigest ? RX_INIT_HDIGEST : RX_INIT_DATA);
421                 if (res <= 0 || conn->read_state != RX_INIT_HDIGEST)
422                         break;
423         case RX_INIT_HDIGEST:
424                 iscsi_conn_init_read(conn, &cmnd->hdigest, sizeof(u32));
425                 conn->read_state = RX_HDIGEST;
426         case RX_HDIGEST:
427                 res = do_recv(conn, RX_CHECK_HDIGEST);
428                 if (res <= 0 || conn->read_state != RX_CHECK_HDIGEST)
429                         break;
430         case RX_CHECK_HDIGEST:
431                 rc = rx_hdigest(conn);
432                 if (likely(rc == 0))
433                         conn->read_state = RX_INIT_DATA;
434                 else {
435                         res = rc;
436                         break;
437                 }
438         case RX_INIT_DATA:
439                 rc = cmnd_rx_start(cmnd);
440                 if (unlikely(rc != 0)) {
441                         sBUG_ON(!conn->closing);
442                         conn->read_state = RX_END;
443                         res = rc;
444                         /* cmnd will be freed in close_conn() */
445                         goto out;
446                 }
447                 conn->read_state = cmnd->pdu.datasize ? RX_DATA : RX_END;
448                 if (conn->read_state != RX_DATA)
449                         break;
450         case RX_DATA:
451                 res = do_recv(conn, ddigest ? RX_INIT_DDIGEST : RX_END);
452                 if (res <= 0 || conn->read_state != RX_INIT_DDIGEST)
453                         break;
454         case RX_INIT_DDIGEST:
455                 iscsi_conn_init_read(conn, &cmnd->ddigest, sizeof(u32));
456                 conn->read_state = RX_DDIGEST;
457         case RX_DDIGEST:
458                 res = do_recv(conn, RX_CHECK_DDIGEST);
459                 if (res <= 0 || conn->read_state != RX_CHECK_DDIGEST)
460                         break;
461         case RX_CHECK_DDIGEST:
462                 conn->read_state = RX_END;
463                 if (cmnd_opcode(cmnd) == ISCSI_OP_SCSI_CMD) {
464                         TRACE_DBG("Adding RX ddigest cmd %p to digest list "
465                                 "of self", cmnd);
466                         list_add_tail(&cmnd->rx_ddigest_cmd_list_entry,
467                                 &cmnd->rx_ddigest_cmd_list);
468                         cmnd_get(cmnd);
469                         conn->read_state = RX_END;
470                 } else if (cmnd_opcode(cmnd) != ISCSI_OP_SCSI_DATA_OUT) {
471                         /*
472                          * We could get here only for NOP-Out. ISCSI RFC doesn't
473                          * specify how to deal with digest errors in this case.
474                          * Is closing connection correct?
475                          */
476                         TRACE_DBG("cmnd %p, opcode %x: checking RX "
477                                 "ddigest inline", cmnd, cmnd_opcode(cmnd));
478                         rc = digest_rx_data(cmnd);
479                         if (unlikely(rc != 0)) {
480                                 conn->read_state = RX_CHECK_DDIGEST;
481                                 mark_conn_closed(conn);
482                         }
483                 }
484                 break;
485         default:
486                 PRINT_ERROR("%d %x", conn->read_state, cmnd_opcode(cmnd));
487                 sBUG();
488         }
489
490         if (res <= 0)
491                 goto out;
492
493         if (conn->read_state != RX_END)
494                 goto out;
495
496         if (conn->read_size) {
497                 PRINT_ERROR("%d %x %d", res, cmnd_opcode(cmnd), conn->read_size);
498                 sBUG();
499         }
500
501         cmnd_rx_end(cmnd);
502
503         sBUG_ON(conn->read_size != 0);
504
505         conn->read_cmnd = NULL;
506         conn->read_state = RX_INIT_BHS;
507         res = 0;
508
509 out:
510         TRACE_EXIT_RES(res);
511         return res;
512 }
513
514 /* No locks, conn is rd processing */
515 static int process_read_io(struct iscsi_conn *conn, int *closed)
516 {
517         int res;
518
519         do {
520                 res = recv(conn);
521                 if (unlikely(conn->closing)) {
522                         close_conn(conn);
523                         *closed = 1;
524                         break;
525                 }
526         } while(res > 0);
527
528         TRACE_EXIT_RES(res);
529         return res;
530 }
531
532 /*
533  * Called under iscsi_rd_lock and BHs disabled, but will drop it inside,
534  * then reaquire.
535  */
536 static void scst_do_job_rd(void)
537 {
538         TRACE_ENTRY();
539
540         /* We delete/add to tail connections to maintain fairness between them */
541
542         while(!list_empty(&iscsi_rd_list)) {
543                 int rc, closed = 0;
544                 struct iscsi_conn *conn = list_entry(iscsi_rd_list.next,
545                         typeof(*conn), rd_list_entry);
546
547                 list_del(&conn->rd_list_entry);
548
549                 sBUG_ON(conn->rd_state == ISCSI_CONN_RD_STATE_PROCESSING);
550                 conn->rd_data_ready = 0;
551                 conn->rd_state = ISCSI_CONN_RD_STATE_PROCESSING;
552 #ifdef EXTRACHECKS
553                 conn->rd_task = current;
554 #endif
555                 spin_unlock_bh(&iscsi_rd_lock);
556
557                 rc = process_read_io(conn, &closed);
558
559                 spin_lock_bh(&iscsi_rd_lock);
560
561                 if (closed)
562                         continue;
563
564 #ifdef EXTRACHECKS
565                 conn->rd_task = NULL;
566 #endif
567                 if ((rc == 0) || conn->rd_data_ready) {
568                         list_add_tail(&conn->rd_list_entry, &iscsi_rd_list);
569                         conn->rd_state = ISCSI_CONN_RD_STATE_IN_LIST;
570                 } else
571                         conn->rd_state = ISCSI_CONN_RD_STATE_IDLE;
572         }
573
574         TRACE_EXIT();
575         return;
576 }
577
578 static inline int test_rd_list(void)
579 {
580         int res = !list_empty(&iscsi_rd_list) ||
581                   unlikely(kthread_should_stop());
582         return res;
583 }
584
585 int istrd(void *arg)
586 {
587         TRACE_ENTRY();
588
589         PRINT_INFO("Read thread started, PID %d", current->pid);
590
591         current->flags |= PF_NOFREEZE;
592
593         spin_lock_bh(&iscsi_rd_lock);
594         while(!kthread_should_stop()) {
595                 wait_queue_t wait;
596                 init_waitqueue_entry(&wait, current);
597
598                 if (!test_rd_list()) {
599                         add_wait_queue_exclusive(&iscsi_rd_waitQ, &wait);
600                         for (;;) {
601                                 set_current_state(TASK_INTERRUPTIBLE);
602                                 if (test_rd_list())
603                                         break;
604                                 spin_unlock_bh(&iscsi_rd_lock);
605                                 schedule();
606                                 spin_lock_bh(&iscsi_rd_lock);
607                         }
608                         set_current_state(TASK_RUNNING);
609                         remove_wait_queue(&iscsi_rd_waitQ, &wait);
610                 }
611                 scst_do_job_rd();
612         }
613         spin_unlock_bh(&iscsi_rd_lock);
614
615         /*
616          * If kthread_should_stop() is true, we are guaranteed to be
617          * on the module unload, so iscsi_rd_list must be empty.
618          */
619         sBUG_ON(!list_empty(&iscsi_rd_list));
620
621         PRINT_INFO("Read thread PID %d finished", current->pid);
622
623         TRACE_EXIT();
624         return 0;
625 }
626
627 #ifdef NET_PAGE_CALLBACKS_DEFINED
628 void iscsi_get_page_callback(struct page *page)
629 {
630         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
631         int v;
632
633         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
634                 cmd, page, atomic_read(&page->_count),
635                 atomic_read(&cmd->net_ref_cnt)+1);
636
637         v = atomic_inc_return(&cmd->net_ref_cnt);
638         if (v == 1) {
639                 TRACE_NET_PAGE("getting cmd %p for page %p", cmd, page);
640                 cmnd_get(cmd);
641         }
642 }
643
644 void iscsi_put_page_callback(struct page *page)
645 {
646         struct iscsi_cmnd *cmd = (struct iscsi_cmnd*)page->net_priv;
647
648         TRACE_NET_PAGE("cmd %p, page %p, _count %d, new net_ref_cnt %d",
649                 cmd, page, atomic_read(&page->_count),
650                 atomic_read(&cmd->net_ref_cnt)-1);
651
652         if (atomic_dec_and_test(&cmd->net_ref_cnt)) {
653                 int i, sg_cnt = get_pgcnt(cmd->bufflen, cmd->sg[0].offset);
654                 for(i = 0; i < sg_cnt; i++) {
655                         TRACE_NET_PAGE("Clearing page %p", cmd->sg[i].page);
656                         cmd->sg[i].page->net_priv = NULL;
657                 }
658                 cmnd_put(cmd);
659         }
660 }
661
662 static void check_net_priv(struct iscsi_cmnd *cmd, struct page *page)
663 {
664         if (atomic_read(&cmd->net_ref_cnt) == 0) {
665                 TRACE_DBG("%s", "sendpage() not called get_page(), "
666                         "zeroing net_priv");
667                 page->net_priv = NULL;
668         }
669 }
670 #else
671 static inline void check_net_priv(struct iscsi_cmnd *cmd, struct page *page) {}
672 #endif
673
674 /* This is partially taken from the Ardis code. */
675 static int write_data(struct iscsi_conn *conn)
676 {
677         mm_segment_t oldfs;
678         struct file *file;
679         struct socket *sock;
680         ssize_t (*sendpage)(struct socket *, struct page *, int, size_t, int);
681         struct iscsi_cmnd *write_cmnd = conn->write_cmnd;
682         struct iscsi_cmnd *ref_cmd;
683         struct scatterlist *sg;
684         struct iovec *iop;
685         int saved_size, size, sendsize;
686         int offset, idx;
687         int flags, res, count;
688
689         iscsi_extracheck_is_wr_thread(conn);
690
691         if (write_cmnd->own_sg == 0)
692                 ref_cmd = write_cmnd->parent_req;
693         else
694                 ref_cmd = write_cmnd;
695
696         file = conn->file;
697         saved_size = size = conn->write_size;
698         iop = conn->write_iop;
699         count = conn->write_iop_used;
700
701         if (iop) while (1) {
702                 loff_t off = 0;
703                 int rest;
704
705                 sBUG_ON(count > sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
706 retry:
707                 oldfs = get_fs();
708                 set_fs(KERNEL_DS);
709                 res = vfs_writev(file, (struct iovec __user *)iop, count, &off);
710                 set_fs(oldfs);
711                 TRACE(TRACE_D_WRITE, "%#Lx:%u: %d(%ld)",
712                         (unsigned long long) conn->session->sid, conn->cid,
713                         res, (long) iop->iov_len);
714                 if (unlikely(res <= 0)) {
715                         if (res == -EAGAIN) {
716                                 conn->write_iop = iop;
717                                 conn->write_iop_used = count;
718                                 goto out_iov;
719                         } else if (res == -EINTR)
720                                 goto retry;
721                         goto err;
722                 }
723
724                 rest = res;
725                 size -= res;
726                 while (iop->iov_len <= rest && rest) {
727                         rest -= iop->iov_len;
728                         iop++;
729                         count--;
730                 }
731                 if (count == 0) {
732                         conn->write_iop = NULL;
733                         conn->write_iop_used = 0;
734                         if (size)
735                                 break;
736                         goto out_iov;
737                 }
738                 sBUG_ON(iop > conn->write_iov + 
739                         sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
740                 iop->iov_base += rest;
741                 iop->iov_len -= rest;
742         }
743
744         sg = write_cmnd->sg;
745         if (sg == NULL) {
746                 PRINT_ERROR("%s", "warning data missing!");
747                 return 0;
748         }
749         offset = conn->write_offset;
750         idx = offset >> PAGE_SHIFT;
751         offset &= ~PAGE_MASK;
752
753         sock = conn->sock;
754
755 #ifdef NET_PAGE_CALLBACKS_DEFINED
756         sendpage = sock->ops->sendpage;
757 #else
758         if ((write_cmnd->parent_req->scst_cmd != NULL) &&
759             scst_cmd_get_data_buff_alloced(write_cmnd->parent_req->scst_cmd))
760                 sendpage = sock_no_sendpage;
761         else
762                 sendpage = sock->ops->sendpage;
763 #endif
764
765         flags = MSG_DONTWAIT;
766
767         while (1) {
768 #ifdef NET_PAGE_CALLBACKS_DEFINED
769                 if (unlikely((sg[idx].page->net_priv != NULL) &&
770                                 (sg[idx].page->net_priv != ref_cmd))) {
771                         PRINT_ERROR("net_priv isn't NULL and != ref_cmd "
772                                 "(write_cmnd %p, ref_cmd %p, sg %p, idx %d, "
773                                 "net_priv %p)", write_cmnd, ref_cmd, sg, idx,
774                                 sg[idx].page->net_priv);
775                         sBUG();
776                 }
777                 sg[idx].page->net_priv = ref_cmd;
778 #endif
779                 sendsize = PAGE_SIZE - offset;
780                 if (size <= sendsize) {
781 retry2:
782                         res = sendpage(sock, sg[idx].page, offset, size, flags);
783                         TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
784                                 sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
785                                 (unsigned long long)conn->session->sid, conn->cid,
786                                 res, sg[idx].page->index, offset, size);
787                         if (unlikely(res <= 0)) {
788                                 if (res == -EINTR)
789                                         goto retry2;
790                                 else
791                                         goto out_res;
792                         }
793                         check_net_priv(ref_cmd, sg[idx].page);
794                         if (res == size) {
795                                 conn->write_size = 0;
796                                 return saved_size;
797                         }
798                         offset += res;
799                         size -= res;
800                         continue;
801                 }
802
803 retry1:
804                 res = sendpage(sock, sg[idx].page, offset, sendsize,
805                         flags | MSG_MORE);
806                 TRACE(TRACE_D_WRITE, "%s %#Lx:%u: %d(%lu,%u,%u)",
807                         sock->ops->sendpage ? "sendpage" : "sock_no_sendpage",
808                         (unsigned long long ) conn->session->sid, conn->cid,
809                         res, sg[idx].page->index, offset, sendsize);
810                 if (unlikely(res <= 0)) {
811                         if (res == -EINTR)
812                                 goto retry1;
813                         else
814                                 goto out_res;
815                 }
816                 check_net_priv(ref_cmd, sg[idx].page);
817                 if (res == sendsize) {
818                         idx++;
819                         offset = 0;
820                 } else
821                         offset += res;
822                 size -= res;
823         }
824 out:
825         conn->write_offset = (idx << PAGE_SHIFT) + offset;
826 out_iov:
827         conn->write_size = size;
828         if ((saved_size == size) && res == -EAGAIN)
829                 return res;
830
831         return saved_size - size;
832
833 out_res:
834         check_net_priv(ref_cmd, sg[idx].page);
835         if (res == -EAGAIN)
836                 goto out;
837         /* else go through */
838
839 err:
840 #ifndef DEBUG
841         if (!conn->closing)
842 #endif
843         {
844                 PRINT_ERROR("error %d at sid:cid %#Lx:%u, cmnd %p", res,
845                         (unsigned long long)conn->session->sid, conn->cid,
846                         conn->write_cmnd);
847         }
848         if (ref_cmd->scst_cmd != NULL)
849                 scst_set_delivery_status(ref_cmd->scst_cmd,
850                         SCST_CMD_DELIVERY_FAILED);
851         return res;
852 }
853
854 static int exit_tx(struct iscsi_conn *conn, int res)
855 {
856         iscsi_extracheck_is_wr_thread(conn);
857
858         switch (res) {
859         case -EAGAIN:
860         case -ERESTARTSYS:
861                 res = 0;
862                 break;
863         default:
864 #ifndef DEBUG
865                 if (!conn->closing)
866 #endif
867                 {
868                         PRINT_ERROR("Sending data failed: initiator %s, "
869                                 "write_size %d, write_state %d, res %d",
870                                 conn->session->initiator_name, conn->write_size,
871                                 conn->write_state, res);
872                 }
873                 conn->write_state = TX_END;
874                 conn->write_size = 0;
875                 mark_conn_closed(conn);
876                 break;
877         }
878         return res;
879 }
880
881 static int tx_ddigest(struct iscsi_cmnd *cmnd, int state)
882 {
883         int res, rest = cmnd->conn->write_size;
884         struct msghdr msg = {.msg_flags = MSG_NOSIGNAL | MSG_DONTWAIT};
885         struct kvec iov;
886
887         iscsi_extracheck_is_wr_thread(cmnd->conn);
888
889         TRACE_DBG("Sending data digest %x (cmd %p)", cmnd->ddigest, cmnd);
890
891         iov.iov_base = (char *) (&cmnd->ddigest) + (sizeof(u32) - rest);
892         iov.iov_len = rest;
893
894         res = kernel_sendmsg(cmnd->conn->sock, &msg, &iov, 1, rest);
895         if (res > 0) {
896                 cmnd->conn->write_size -= res;
897                 if (!cmnd->conn->write_size)
898                         cmnd->conn->write_state = state;
899         } else
900                 res = exit_tx(cmnd->conn, res);
901
902         return res;
903 }
904
905 static void init_tx_hdigest(struct iscsi_cmnd *cmnd)
906 {
907         struct iscsi_conn *conn = cmnd->conn;
908         struct iovec *iop;
909
910         iscsi_extracheck_is_wr_thread(conn);
911
912         digest_tx_header(cmnd);
913
914         sBUG_ON(conn->write_iop_used >= sizeof(conn->write_iov)/sizeof(conn->write_iov[0]));
915         iop = &conn->write_iop[conn->write_iop_used];
916         conn->write_iop_used++;
917         iop->iov_base = &(cmnd->hdigest);
918         iop->iov_len = sizeof(u32);
919         conn->write_size += sizeof(u32);
920
921         return;
922 }
923
924 static int iscsi_do_send(struct iscsi_conn *conn, int state)
925 {
926         int res;
927
928         iscsi_extracheck_is_wr_thread(conn);
929
930         res = write_data(conn);
931         if (res > 0) {
932                 if (!conn->write_size)
933                         conn->write_state = state;
934         } else
935                 res = exit_tx(conn, res);
936
937         return res;
938 }
939
940 /* 
941  * No locks, conn is wr processing.
942  *
943  * IMPORTANT! Connection conn must be protected by additional conn_get()
944  * upon entrance in this function, because otherwise it could be destroyed
945  * inside as a result of cmnd release.
946  */
947 int iscsi_send(struct iscsi_conn *conn)
948 {
949         struct iscsi_cmnd *cmnd = conn->write_cmnd;
950         int ddigest, res = 0;
951
952         TRACE_ENTRY();
953
954         TRACE_DBG("conn %p, write_cmnd %p", conn, cmnd);
955
956         iscsi_extracheck_is_wr_thread(conn);
957
958         ddigest = conn->ddigest_type != DIGEST_NONE ? 1 : 0;
959
960         switch (conn->write_state) {
961         case TX_INIT:
962                 sBUG_ON(cmnd != NULL);
963                 cmnd = conn->write_cmnd = iscsi_get_send_cmnd(conn);
964                 if (!cmnd)
965                         goto out;
966                 cmnd_tx_start(cmnd);
967                 if (!(conn->hdigest_type & DIGEST_NONE))
968                     init_tx_hdigest(cmnd);
969                 conn->write_state = TX_BHS_DATA;
970         case TX_BHS_DATA:
971                 res = iscsi_do_send(conn, ddigest && cmnd->pdu.datasize ? 
972                                         TX_INIT_DDIGEST : TX_END);
973                 if (res <= 0 || conn->write_state != TX_INIT_DDIGEST)
974                         break;
975         case TX_INIT_DDIGEST:
976                 cmnd->conn->write_size = sizeof(u32);
977                 conn->write_state = TX_DDIGEST;
978         case TX_DDIGEST:
979                 res = tx_ddigest(cmnd, TX_END);
980                 break;
981         default:
982                 PRINT_ERROR("%d %d %x", res, conn->write_state,
983                         cmnd_opcode(cmnd));
984                 sBUG();
985         }
986
987         if (res == 0)
988                 goto out;
989
990         if (conn->write_state != TX_END)
991                 goto out;
992
993         if (conn->write_size) {
994                 PRINT_ERROR("%d %x %u", res, cmnd_opcode(cmnd),
995                         conn->write_size);
996                 sBUG();
997         }
998         cmnd_tx_end(cmnd);
999
1000         rsp_cmnd_release(cmnd);
1001
1002         conn->write_cmnd = NULL;
1003         conn->write_state = TX_INIT;
1004
1005 out:
1006         TRACE_EXIT_RES(res);
1007         return res;
1008 }
1009
1010 /* No locks, conn is wr processing.
1011  *
1012  * IMPORTANT! Connection conn must be protected by additional conn_get()
1013  * upon entrance in this function, because otherwise it could be destroyed
1014  * inside as a result of iscsi_send(), which releases sent commands.
1015  */
1016 static int process_write_queue(struct iscsi_conn *conn)
1017 {
1018         int res = 0;
1019
1020         TRACE_ENTRY();
1021
1022         if (likely(test_write_ready(conn)))
1023                 res = iscsi_send(conn);
1024
1025         TRACE_EXIT_RES(res);
1026         return res;
1027 }
1028
1029 /*
1030  * Called under iscsi_wr_lock and BHs disabled, but will drop it inside,
1031  * then reaquire.
1032  */
1033 static void scst_do_job_wr(void)
1034 {
1035         TRACE_ENTRY();
1036
1037         /* We delete/add to tail connections to maintain fairness between them */
1038
1039         while(!list_empty(&iscsi_wr_list)) {
1040                 int rc;
1041                 struct iscsi_conn *conn = list_entry(iscsi_wr_list.next,
1042                         typeof(*conn), wr_list_entry);
1043
1044                 TRACE_DBG("conn %p, wr_state %x, wr_space_ready %d, "
1045                         "write ready %d", conn, conn->wr_state,
1046                         conn->wr_space_ready, test_write_ready(conn));
1047
1048                 list_del(&conn->wr_list_entry);
1049
1050                 sBUG_ON(conn->wr_state == ISCSI_CONN_WR_STATE_PROCESSING);
1051
1052                 conn->wr_state = ISCSI_CONN_WR_STATE_PROCESSING;
1053                 conn->wr_space_ready = 0;
1054 #ifdef EXTRACHECKS
1055                 conn->wr_task = current;
1056 #endif
1057                 spin_unlock_bh(&iscsi_wr_lock);
1058
1059                 conn_get(conn);
1060
1061                 rc = process_write_queue(conn);
1062
1063                 spin_lock_bh(&iscsi_wr_lock);
1064 #ifdef EXTRACHECKS
1065                 conn->wr_task = NULL;
1066 #endif
1067                 if ((rc == -EAGAIN) && !conn->wr_space_ready) {
1068                         conn->wr_state = ISCSI_CONN_WR_STATE_SPACE_WAIT;
1069                         goto cont;
1070                 }
1071
1072                 if (test_write_ready(conn)) {
1073                         list_add_tail(&conn->wr_list_entry, &iscsi_wr_list);
1074                         conn->wr_state = ISCSI_CONN_WR_STATE_IN_LIST;
1075                 } else
1076                         conn->wr_state = ISCSI_CONN_WR_STATE_IDLE;
1077
1078 cont:
1079                 conn_put(conn);
1080         }
1081
1082         TRACE_EXIT();
1083         return;
1084 }
1085
1086 static inline int test_wr_list(void)
1087 {
1088         int res = !list_empty(&iscsi_wr_list) ||
1089                   unlikely(kthread_should_stop());
1090         return res;
1091 }
1092
1093 int istwr(void *arg)
1094 {
1095         TRACE_ENTRY();
1096
1097         PRINT_INFO("Write thread started, PID %d", current->pid);
1098
1099         current->flags |= PF_NOFREEZE;
1100
1101         spin_lock_bh(&iscsi_wr_lock);
1102         while(!kthread_should_stop()) {
1103                 wait_queue_t wait;
1104                 init_waitqueue_entry(&wait, current);
1105
1106                 if (!test_wr_list()) {
1107                         add_wait_queue_exclusive(&iscsi_wr_waitQ, &wait);
1108                         for (;;) {
1109                                 set_current_state(TASK_INTERRUPTIBLE);
1110                                 if (test_wr_list())
1111                                         break;
1112                                 spin_unlock_bh(&iscsi_wr_lock);
1113                                 schedule();
1114                                 spin_lock_bh(&iscsi_wr_lock);
1115                         }
1116                         set_current_state(TASK_RUNNING);
1117                         remove_wait_queue(&iscsi_wr_waitQ, &wait);
1118                 }
1119                 scst_do_job_wr();
1120         }
1121         spin_unlock_bh(&iscsi_wr_lock);
1122
1123         /*
1124          * If kthread_should_stop() is true, we are guaranteed to be
1125          * on the module unload, so iscsi_wr_list must be empty.
1126          */
1127         sBUG_ON(!list_empty(&iscsi_wr_list));
1128
1129         PRINT_INFO("Write thread PID %d finished", current->pid);
1130
1131         TRACE_EXIT();
1132         return 0;
1133 }