summaryrefslogtreecommitdiff
path: root/transports
diff options
context:
space:
mode:
authorYawning Angel <yawning@torproject.org>2015-10-30 09:45:26 +0000
committerYawning Angel <yawning@torproject.org>2015-10-30 09:45:26 +0000
commit43cdc20e7e7f136c96814bf752ef1fbc9b6fec33 (patch)
treeee2571a09b8b44b9d68071defcd05800e9a0d89c /transports
parent611205be681322883a4d73dd00fcb13c4352fe53 (diff)
meek-lite: combine small writes at request dispatch time.
This dramatically improves bulk upload performance, from totally shit to just shit.
Diffstat (limited to 'transports')
-rw-r--r--transports/meeklite/meek.go70
1 files changed, 42 insertions, 28 deletions
diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index 5842704..8957ceb 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
return 0, io.ErrClosedPipe
}
- if len(b) > 0 {
- // Copy the data to be written to a new slice, since
- // we return immediately after queuing and the peer can
- // happily reuse `b` before data has been sent.
- toWrite := len(b)
- b2 := make([]byte, toWrite)
- copy(b2, b)
- offset := 0
- for toWrite > 0 {
- // Chunk up the writes to keep them under the maximum
- // payload length.
- sz := toWrite
- if sz > maxPayloadLength {
- sz = maxPayloadLength
- }
+ if len(b) == 0 {
+ return 0, nil
+ }
- // Enqueue a properly sized subslice of our copy.
- if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok {
- // Technically we did enqueue data, but the worker's
- // got closed out from under us.
- return 0, io.ErrClosedPipe
- }
- toWrite -= sz
- offset += sz
- runtime.Gosched()
- }
+ // Copy the data to be written to a new slice, since
+ // we return immediately after queuing and the peer can
+ // happily reuse `b` before data has been sent.
+ toWrite := len(b)
+ b2 := make([]byte, toWrite)
+ copy(b2, b)
+ if ok := c.enqueueWrite(b2); !ok {
+ // Technically we did enqueue data, but the worker's
+ // got closed out from under us.
+ return 0, io.ErrClosedPipe
}
+ runtime.Gosched()
return len(b), nil
}
@@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
func (c *meekConn) ioWorker() {
interval := initPollInterval
+ var sndBuf, leftBuf []byte
loop:
+
for {
- var sndBuf []byte
+ sndBuf = nil
select {
case <-time.After(interval):
// If the poll interval has elapsed, issue a request.
@@ -281,19 +272,42 @@ loop:
break loop
}
+ // Combine short writes as long as data is available to be
+ // sent immediately and it will not put us over the max
+ // payload limit. Any excess data is stored and dispatched
+ // as the next request).
+ sndBuf = append(leftBuf, sndBuf...)
+ wrSz := len(sndBuf)
+ for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
+ b := <-c.workerWrChan
+ sndBuf = append(sndBuf, b...)
+ wrSz = len(sndBuf)
+ }
+ if wrSz > maxPayloadLength {
+ wrSz = maxPayloadLength
+ }
+
// Issue a request.
- rdBuf, err := c.roundTrip(sndBuf)
+ rdBuf, err := c.roundTrip(sndBuf[:wrSz])
if err != nil {
// Welp, something went horrifically wrong.
break loop
}
+
+ // Stash the remaining payload if any.
+ leftBuf = sndBuf[wrSz:] // Store the remaining data
+ if len(leftBuf) == 0 {
+ leftBuf = nil
+ }
+
+ // Determine the next poll interval.
if len(rdBuf) > 0 {
// Received data, enqueue the read.
c.workerRdChan <- rdBuf
// And poll immediately.
interval = 0
- } else if sndBuf != nil {
+ } else if wrSz > 0 {
// Sent data, poll immediately.
interval = 0
} else if interval == 0 {