From 43cdc20e7e7f136c96814bf752ef1fbc9b6fec33 Mon Sep 17 00:00:00 2001 From: Yawning Angel Date: Fri, 30 Oct 2015 09:45:26 +0000 Subject: meek-lite: combine small writes at request dispatch time. This dramatically improves bulk upload performance, from totally shit to just shit. --- transports/meeklite/meek.go | 70 +++++++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 28 deletions(-) (limited to 'transports') diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go index 5842704..8957ceb 100644 --- a/transports/meeklite/meek.go +++ b/transports/meeklite/meek.go @@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) { return 0, io.ErrClosedPipe } - if len(b) > 0 { - // Copy the data to be written to a new slice, since - // we return immediately after queuing and the peer can - // happily reuse `b` before data has been sent. - toWrite := len(b) - b2 := make([]byte, toWrite) - copy(b2, b) - offset := 0 - for toWrite > 0 { - // Chunk up the writes to keep them under the maximum - // payload length. - sz := toWrite - if sz > maxPayloadLength { - sz = maxPayloadLength - } + if len(b) == 0 { + return 0, nil + } - // Enqueue a properly sized subslice of our copy. - if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok { - // Technically we did enqueue data, but the worker's - // got closed out from under us. - return 0, io.ErrClosedPipe - } - toWrite -= sz - offset += sz - runtime.Gosched() - } + // Copy the data to be written to a new slice, since + // we return immediately after queuing and the peer can + // happily reuse `b` before data has been sent. + toWrite := len(b) + b2 := make([]byte, toWrite) + copy(b2, b) + if ok := c.enqueueWrite(b2); !ok { + // Technically we did enqueue data, but the worker's + // got closed out from under us. + return 0, io.ErrClosedPipe } + runtime.Gosched() return len(b), nil } @@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) { func (c *meekConn) ioWorker() { interval := initPollInterval + var sndBuf, leftBuf []byte loop: + for { - var sndBuf []byte + sndBuf = nil select { case <-time.After(interval): // If the poll interval has elapsed, issue a request. @@ -281,19 +272,42 @@ loop: break loop } + // Combine short writes as long as data is available to be + // sent immediately and it will not put us over the max + // payload limit. Any excess data is stored and dispatched + // as the next request). + sndBuf = append(leftBuf, sndBuf...) + wrSz := len(sndBuf) + for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength { + b := <-c.workerWrChan + sndBuf = append(sndBuf, b...) + wrSz = len(sndBuf) + } + if wrSz > maxPayloadLength { + wrSz = maxPayloadLength + } + // Issue a request. - rdBuf, err := c.roundTrip(sndBuf) + rdBuf, err := c.roundTrip(sndBuf[:wrSz]) if err != nil { // Welp, something went horrifically wrong. break loop } + + // Stash the remaining payload if any. + leftBuf = sndBuf[wrSz:] // Store the remaining data + if len(leftBuf) == 0 { + leftBuf = nil + } + + // Determine the next poll interval. if len(rdBuf) > 0 { // Received data, enqueue the read. c.workerRdChan <- rdBuf // And poll immediately. interval = 0 - } else if sndBuf != nil { + } else if wrSz > 0 { // Sent data, poll immediately. interval = 0 } else if interval == 0 { -- cgit v1.2.3