[tcp] Handle out-of-order received packets
authorMichael Brown <mcb30@ipxe.org>
Tue, 20 Jul 2010 22:17:30 +0000 (23:17 +0100)
committerMarty Connor <mdc@etherboot.org>
Sun, 1 Aug 2010 20:13:35 +0000 (16:13 -0400)
Maintain a queue of received packets, so that lost packets need not
result in retransmission of the entire TCP window.

Increase the TCP window to 8kB, in order that we can potentially
transmit enough duplicate ACKs to trigger Fast Retransmission at the
sender.

Using a 10MB HTTP download in qemu-kvm with an artificial drop rate of
1 in 64 packets, this reduces the download time from around 26s to
around 4s.

Signed-off-by: Michael Brown <mcb30@ipxe.org>
Signed-off-by: Marty Connor <mdc@etherboot.org>
src/include/gpxe/tcp.h
src/net/tcp.c

index 7ae7eab..be76172 100644 (file)
@@ -287,7 +287,7 @@ struct tcp_options {
  * that payloads remain dword-aligned.
  */
 //#define TCP_MAX_WINDOW_SIZE  ( 65536 - 4 )
-#define TCP_MAX_WINDOW_SIZE    4096
+#define TCP_MAX_WINDOW_SIZE    8192
 
 /**
  * Path MTU
@@ -313,6 +313,35 @@ struct tcp_options {
  */
 #define TCP_MSL ( 2 * 60 * TICKS_PER_SEC )
 
+/**
+ * Compare TCP sequence numbers
+ *
+ * @v seq1             Sequence number 1
+ * @v seq2             Sequence number 2
+ * @ret diff           Sequence difference
+ *
+ * Analogous to memcmp(), returns an integer less than, equal to, or
+ * greater than zero if @c seq1 is found, respectively, to be before,
+ * equal to, or after @c seq2.
+ */
+static inline __attribute__ (( always_inline )) int32_t
+tcp_cmp ( uint32_t seq1, uint32_t seq2 ) {
+       return ( ( int32_t ) ( seq1 - seq2 ) );
+}
+
+/**
+ * Check if TCP sequence number lies within window
+ *
+ * @v seq              Sequence number
+ * @v start            Start of window
+ * @v len              Length of window
+ * @ret in_window      Sequence number is within window
+ */
+static inline int tcp_in_window ( uint32_t seq, uint32_t start,
+                                 uint32_t len ) {
+       return ( ( seq - start ) < len );
+}
+
 extern struct tcpip_protocol tcp_protocol;
 
 #endif /* _GPXE_TCP_H */
index 8df80a8..76fa5a3 100644 (file)
@@ -80,7 +80,9 @@ struct tcp_connection {
        uint32_t ts_recent;
 
        /** Transmit queue */
-       struct list_head queue;
+       struct list_head tx_queue;
+       /** Receive queue */
+       struct list_head rx_queue;
        /** Retransmission timer */
        struct retry_timer timer;
        /** Shutdown (TIME_WAIT) timer */
@@ -97,6 +99,29 @@ enum tcp_flags {
        TCP_ACK_PENDING = 0x0004,
 };
 
+/** TCP internal header
+ *
+ * This is the header that replaces the TCP header for packets
+ * enqueued on the receive queue.
+ */
+struct tcp_rx_queued_header {
+       /** SEQ value, in host-endian order
+        *
+        * This represents the SEQ value at the time the packet is
+        * enqueued, and so excludes the SYN, if present.
+        */
+       uint32_t seq;
+       /** Flags
+        *
+        * Only FIN is valid within this flags byte; all other flags
+        * have already been processed by the time the packet is
+        * enqueued.
+        */
+       uint8_t flags;
+       /** Reserved */
+       uint8_t reserved[3];
+};
+
 /**
  * List of registered TCP connections
  */
@@ -245,7 +270,8 @@ static int tcp_open ( struct xfer_interface *xfer, struct sockaddr *peer,
        tcp->tcp_state = TCP_STATE_SENT ( TCP_SYN );
        tcp_dump_state ( tcp );
        tcp->snd_seq = random();
-       INIT_LIST_HEAD ( &tcp->queue );
+       INIT_LIST_HEAD ( &tcp->tx_queue );
+       INIT_LIST_HEAD ( &tcp->rx_queue );
        memcpy ( &tcp->peer, st_peer, sizeof ( tcp->peer ) );
 
        /* Bind to local port */
@@ -296,8 +322,14 @@ static void tcp_close ( struct tcp_connection *tcp, int rc ) {
                tcp->tcp_state = TCP_CLOSED;
                tcp_dump_state ( tcp );
 
+               /* Free any unprocessed I/O buffers */
+               list_for_each_entry_safe ( iobuf, tmp, &tcp->rx_queue, list ) {
+                       list_del ( &iobuf->list );
+                       free_iob ( iobuf );
+               }
+
                /* Free any unsent I/O buffers */
-               list_for_each_entry_safe ( iobuf, tmp, &tcp->queue, list ) {
+               list_for_each_entry_safe ( iobuf, tmp, &tcp->tx_queue, list ) {
                        list_del ( &iobuf->list );
                        free_iob ( iobuf );
                }
@@ -318,7 +350,7 @@ static void tcp_close ( struct tcp_connection *tcp, int rc ) {
                tcp_rx_ack ( tcp, ( tcp->snd_seq + 1 ), 0 );
 
        /* If we have no data remaining to send, start sending FIN */
-       if ( list_empty ( &tcp->queue ) ) {
+       if ( list_empty ( &tcp->tx_queue ) ) {
                tcp->tcp_state |= TCP_STATE_SENT ( TCP_FIN );
                tcp_dump_state ( tcp );
        }
@@ -366,14 +398,14 @@ static size_t tcp_xmit_win ( struct tcp_connection *tcp ) {
  * (if provided) and, if @c remove is true, removed from the transmit
  * queue.
  */
-static size_t tcp_process_queue ( struct tcp_connection *tcp, size_t max_len,
-                                 struct io_buffer *dest, int remove ) {
+static size_t tcp_process_tx_queue ( struct tcp_connection *tcp, size_t max_len,
+                                    struct io_buffer *dest, int remove ) {
        struct io_buffer *iobuf;
        struct io_buffer *tmp;
        size_t frag_len;
        size_t len = 0;
 
-       list_for_each_entry_safe ( iobuf, tmp, &tcp->queue, list ) {
+       list_for_each_entry_safe ( iobuf, tmp, &tcp->tx_queue, list ) {
                frag_len = iob_len ( iobuf );
                if ( frag_len > max_len )
                        frag_len = max_len;
@@ -426,8 +458,8 @@ static int tcp_xmit ( struct tcp_connection *tcp ) {
         * lengths that we wish to transmit.
         */
        if ( TCP_CAN_SEND_DATA ( tcp->tcp_state ) ) {
-               len = tcp_process_queue ( tcp, tcp_xmit_win ( tcp ),
-                                         NULL, 0 );
+               len = tcp_process_tx_queue ( tcp, tcp_xmit_win ( tcp ),
+                                            NULL, 0 );
        }
        seq_len = len;
        flags = TCP_FLAGS_SENDING ( tcp->tcp_state );
@@ -461,7 +493,7 @@ static int tcp_xmit ( struct tcp_connection *tcp ) {
        iob_reserve ( iobuf, MAX_HDR_LEN );
 
        /* Fill data payload from transmit queue */
-       tcp_process_queue ( tcp, len, iobuf, 0 );
+       tcp_process_tx_queue ( tcp, len, iobuf, 0 );
 
        /* Expand receive window if possible */
        max_rcv_win = ( ( freemem * 3 ) / 4 );
@@ -735,7 +767,7 @@ static int tcp_rx_syn ( struct tcp_connection *tcp, uint32_t seq,
        }
 
        /* Ignore duplicate SYN */
-       if ( ( tcp->rcv_ack - seq ) > 0 )
+       if ( seq != tcp->rcv_ack )
                return 0;
 
        /* Acknowledge SYN */
@@ -806,14 +838,14 @@ static int tcp_rx_ack ( struct tcp_connection *tcp, uint32_t ack,
        tcp->snd_win = win;
 
        /* Remove any acknowledged data from transmit queue */
-       tcp_process_queue ( tcp, len, NULL, 1 );
+       tcp_process_tx_queue ( tcp, len, NULL, 1 );
                
        /* Mark SYN/FIN as acknowledged if applicable. */
        if ( acked_flags )
                tcp->tcp_state |= TCP_STATE_ACKED ( acked_flags );
 
        /* Start sending FIN if we've had all possible data ACKed */
-       if ( list_empty ( &tcp->queue ) && ( tcp->flags & TCP_XFER_CLOSED ) )
+       if ( list_empty ( &tcp->tx_queue ) && ( tcp->flags & TCP_XFER_CLOSED ) )
                tcp->tcp_state |= TCP_STATE_SENT ( TCP_FIN );
 
        return 0;
@@ -868,7 +900,7 @@ static int tcp_rx_data ( struct tcp_connection *tcp, uint32_t seq,
 static int tcp_rx_fin ( struct tcp_connection *tcp, uint32_t seq ) {
 
        /* Ignore duplicate or out-of-order FIN */
-       if ( ( tcp->rcv_ack - seq ) > 0 )
+       if ( seq != tcp->rcv_ack )
                return 0;
 
        /* Acknowledge FIN */
@@ -898,7 +930,7 @@ static int tcp_rx_rst ( struct tcp_connection *tcp, uint32_t seq ) {
         * ACKed.
         */
        if ( tcp->tcp_state & TCP_STATE_RCVD ( TCP_SYN ) ) {
-               if ( ( seq - tcp->rcv_ack ) >= tcp->rcv_win )
+               if ( ! tcp_in_window ( seq, tcp->rcv_ack, tcp->rcv_win ) )
                        return 0;
        } else {
                if ( ! ( tcp->tcp_state & TCP_STATE_ACKED ( TCP_SYN ) ) )
@@ -914,6 +946,95 @@ static int tcp_rx_rst ( struct tcp_connection *tcp, uint32_t seq ) {
        return -ECONNRESET;
 }
 
+/**
+ * Enqueue received TCP packet
+ *
+ * @v tcp              TCP connection
+ * @v seq              SEQ value (in host-endian order)
+ * @v flags            TCP flags
+ * @v iobuf            I/O buffer
+ */
+static void tcp_rx_enqueue ( struct tcp_connection *tcp, uint32_t seq,
+                            uint8_t flags, struct io_buffer *iobuf ) {
+       struct tcp_rx_queued_header *tcpqhdr;
+       struct io_buffer *queued;
+       size_t len;
+       uint32_t seq_len;
+
+       /* Calculate remaining flags and sequence length.  Note that
+        * SYN, if present, has already been processed by this point.
+        */
+       flags &= TCP_FIN;
+       len = iob_len ( iobuf );
+       seq_len = ( len + ( flags ? 1 : 0 ) );
+
+       /* Discard immediately (to save memory) if:
+        *
+        * a) we have not yet received a SYN (and so have no defined
+        *    receive window), or
+        * b) the packet lies entirely outside the receive window, or
+        * c) there is no further content to process.
+        */
+       if ( ( ! ( tcp->tcp_state & TCP_STATE_RCVD ( TCP_SYN ) ) ) ||
+            ( tcp_cmp ( seq, tcp->rcv_ack + tcp->rcv_win ) >= 0 ) ||
+            ( tcp_cmp ( seq + seq_len, tcp->rcv_ack ) < 0 ) ||
+            ( seq_len == 0 ) ) {
+               free_iob ( iobuf );
+               return;
+       }
+
+       /* Add internal header */
+       tcpqhdr = iob_push ( iobuf, sizeof ( *tcpqhdr ) );
+       tcpqhdr->seq = seq;
+       tcpqhdr->flags = flags;
+
+       /* Add to RX queue */
+       list_for_each_entry ( queued, &tcp->rx_queue, list ) {
+               tcpqhdr = queued->data;
+               if ( tcp_cmp ( seq, tcpqhdr->seq ) < 0 )
+                       break;
+       }
+       list_add_tail ( &iobuf->list, &queued->list );
+}
+
+/**
+ * Process receive queue
+ *
+ * @v tcp              TCP connection
+ */
+static void tcp_process_rx_queue ( struct tcp_connection *tcp ) {
+       struct io_buffer *iobuf;
+       struct io_buffer *tmp;
+       struct tcp_rx_queued_header *tcpqhdr;
+       uint32_t seq;
+       unsigned int flags;
+       size_t len;
+
+       /* Process all applicable received buffers */
+       list_for_each_entry_safe ( iobuf, tmp, &tcp->rx_queue, list ) {
+               tcpqhdr = iobuf->data;
+               if ( tcp_cmp ( tcpqhdr->seq, tcp->rcv_ack ) > 0 )
+                       break;
+
+               /* Strip internal header and remove from RX queue */
+               list_del ( &iobuf->list );
+               seq = tcpqhdr->seq;
+               flags = tcpqhdr->flags;
+               iob_pull ( iobuf, sizeof ( *tcpqhdr ) );
+               len = iob_len ( iobuf );
+
+               /* Handle new data, if any */
+               tcp_rx_data ( tcp, seq, iob_disown ( iobuf ) );
+               seq += len;
+
+               /* Handle FIN, if present */
+               if ( flags & TCP_FIN ) {
+                       tcp_rx_fin ( tcp, seq );
+                       seq++;
+               }
+       }
+}
+
 /**
  * Process received packet
  *
@@ -935,9 +1056,9 @@ static int tcp_rx ( struct io_buffer *iobuf,
        uint32_t seq;
        uint32_t ack;
        uint32_t win;
-       uint32_t ts_recent;
        unsigned int flags;
        size_t len;
+       uint32_t seq_len;
        int rc;
 
        /* Sanity check packet */
@@ -977,17 +1098,16 @@ static int tcp_rx ( struct io_buffer *iobuf,
        flags = tcphdr->flags;
        tcp_rx_opts ( tcp, ( ( ( void * ) tcphdr ) + sizeof ( *tcphdr ) ),
                      ( hlen - sizeof ( *tcphdr ) ), &options );
-       ts_recent = ( options.tsopt ?
-                     ntohl ( options.tsopt->tsval ) : tcp->ts_recent );
        iob_pull ( iobuf, hlen );
        len = iob_len ( iobuf );
+       seq_len = ( len + ( ( flags & TCP_SYN ) ? 1 : 0 ) +
+                   ( ( flags & TCP_FIN ) ? 1 : 0 ) );
 
        /* Dump header */
        DBGC2 ( tcp, "TCP %p RX %d<-%d           %08x %08x..%08zx %4zd",
                tcp, ntohs ( tcphdr->dest ), ntohs ( tcphdr->src ),
                ntohl ( tcphdr->ack ), ntohl ( tcphdr->seq ),
-               ( ntohl ( tcphdr->seq ) + len +
-                 ( ( tcphdr->flags & ( TCP_SYN | TCP_FIN ) ) ? 1 : 0 )), len);
+               ( ntohl ( tcphdr->seq ) + seq_len ), len );
        tcp_dump_flags ( tcp, tcphdr->flags );
        DBGC2 ( tcp, "\n" );
 
@@ -998,6 +1118,10 @@ static int tcp_rx ( struct io_buffer *iobuf,
                goto discard;
        }
 
+       /* Update timestamp, if applicable */
+       if ( options.tsopt && tcp_in_window ( tcp->rcv_ack, seq, seq_len ) )
+               tcp->ts_recent = ntohl ( options.tsopt->tsval );
+
        /* Handle ACK, if present */
        if ( flags & TCP_ACK ) {
                if ( ( rc = tcp_rx_ack ( tcp, ack, win ) ) != 0 ) {
@@ -1024,19 +1148,11 @@ static int tcp_rx ( struct io_buffer *iobuf,
                        goto discard;
        }
 
-       /* Handle new data, if any */
-       tcp_rx_data ( tcp, seq, iob_disown ( iobuf ) );
-       seq += len;
+       /* Enqueue received data */
+       tcp_rx_enqueue ( tcp, seq, flags, iob_disown ( iobuf ) );
 
-       /* Handle FIN, if present */
-       if ( flags & TCP_FIN ) {
-               tcp_rx_fin ( tcp, seq );
-               seq++;
-       }
-
-       /* Update timestamp, if applicable */
-       if ( seq == tcp->rcv_ack )
-               tcp->ts_recent = ts_recent;
+       /* Process receive queue */
+       tcp_process_rx_queue ( tcp );
 
        /* Dump out any state change as a result of the received packet */
        tcp_dump_state ( tcp );
@@ -1105,7 +1221,7 @@ static size_t tcp_xfer_window ( struct xfer_interface *xfer ) {
         * of only one unACKed packet in the TX queue at any time; we
         * do this to conserve memory usage.
         */
-       if ( ! list_empty ( &tcp->queue ) )
+       if ( ! list_empty ( &tcp->tx_queue ) )
                return 0;
 
        /* Return TCP window length */
@@ -1127,7 +1243,7 @@ static int tcp_xfer_deliver_iob ( struct xfer_interface *xfer,
                container_of ( xfer, struct tcp_connection, xfer );
 
        /* Enqueue packet */
-       list_add_tail ( &iobuf->list, &tcp->queue );
+       list_add_tail ( &iobuf->list, &tcp->tx_queue );
 
        /* Transmit data, if possible */
        tcp_xmit ( tcp );