summaryrefslogtreecommitdiff
path: root/transports/meeklite/meek.go
diff options
context:
space:
mode:
Diffstat (limited to 'transports/meeklite/meek.go')
-rw-r--r--transports/meeklite/meek.go373
1 files changed, 0 insertions, 373 deletions
diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
deleted file mode 100644
index 79012ac..0000000
--- a/transports/meeklite/meek.go
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Copyright (c) 2015, Yawning Angel <yawning at torproject dot org>
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * * Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- *
- * * Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-package meeklite
-
-import (
- "bytes"
- "crypto/rand"
- "crypto/sha256"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "net"
- "net/http"
- gourl "net/url"
- "runtime"
- "sync"
- "time"
-
- "git.torproject.org/pluggable-transports/goptlib.git"
- "github.com/OperatorFoundation/obfs4/transports/base"
-)
-
-const (
- urlArg = "url"
- frontArg = "front"
-
- maxChanBacklog = 16
-
- // Constants shamelessly stolen from meek-client.go...
- maxPayloadLength = 0x10000
- initPollInterval = 100 * time.Millisecond
- maxPollInterval = 5 * time.Second
- pollIntervalMultiplier = 1.5
- maxRetries = 10
- retryDelay = 30 * time.Second
-)
-
-var (
- // ErrNotSupported is the error returned for a unsupported operation.
- ErrNotSupported = errors.New("meek_lite: operation not supported")
-
- loopbackAddr = net.IPv4(127, 0, 0, 1)
-)
-
-type meekClientArgs struct {
- url *gourl.URL
- front string
-}
-
-func (ca *meekClientArgs) Network() string {
- return transportName
-}
-
-func (ca *meekClientArgs) String() string {
- return transportName + ":" + ca.front + ":" + ca.url.String()
-}
-
-func newClientArgs(args *pt.Args) (ca *meekClientArgs, err error) {
- ca = &meekClientArgs{}
-
- // Parse the URL argument.
- str, ok := args.Get(urlArg)
- if !ok {
- return nil, fmt.Errorf("missing argument '%s'", urlArg)
- }
- ca.url, err = gourl.Parse(str)
- if err != nil {
- return nil, fmt.Errorf("malformed url: '%s'", str)
- }
- switch ca.url.Scheme {
- case "http", "https":
- default:
- return nil, fmt.Errorf("invalid scheme: '%s'", ca.url.Scheme)
- }
-
- // Parse the (optional) front argument.
- ca.front, _ = args.Get(frontArg)
-
- return ca, nil
-}
-
-type meekConn struct {
- sync.Mutex
-
- args *meekClientArgs
- sessionID string
- transport *http.Transport
-
- workerRunning bool
- workerWrChan chan []byte
- workerRdChan chan []byte
- workerCloseChan chan bool
- rdBuf *bytes.Buffer
-}
-
-func (c *meekConn) Read(p []byte) (n int, err error) {
- // If there is data left over from the previous read,
- // service the request using the buffered data.
- if c.rdBuf != nil {
- if c.rdBuf.Len() == 0 {
- panic("empty read buffer")
- }
- n, err = c.rdBuf.Read(p)
- if c.rdBuf.Len() == 0 {
- c.rdBuf = nil
- }
- return
- }
-
- // Wait for the worker to enqueue more incoming data.
- b, ok := <-c.workerRdChan
- if !ok {
- // Close() was called and the worker's shutting down.
- return 0, io.ErrClosedPipe
- }
-
- // Ew, an extra copy, but who am I kidding, it's meek.
- buf := bytes.NewBuffer(b)
- n, err = buf.Read(p)
- if buf.Len() > 0 {
- // If there's data pending, stash the buffer so the next
- // Read() call will use it to fulfuill the Read().
- c.rdBuf = buf
- }
- return
-}
-
-func (c *meekConn) Write(b []byte) (n int, err error) {
- // Check to see if the connection is actually open.
- c.Lock()
- closed := !c.workerRunning
- c.Unlock()
- if closed {
- return 0, io.ErrClosedPipe
- }
-
- if len(b) == 0 {
- return 0, nil
- }
-
- // Copy the data to be written to a new slice, since
- // we return immediately after queuing and the peer can
- // happily reuse `b` before data has been sent.
- toWrite := len(b)
- b2 := make([]byte, toWrite)
- copy(b2, b)
- if ok := c.enqueueWrite(b2); !ok {
- // Technically we did enqueue data, but the worker's
- // got closed out from under us.
- return 0, io.ErrClosedPipe
- }
- runtime.Gosched()
- return len(b), nil
-}
-
-func (c *meekConn) Close() error {
- // Ensure that we do this once and only once.
- c.Lock()
- defer c.Unlock()
- if !c.workerRunning {
- return nil
- }
-
- // Tear down the worker.
- c.workerRunning = false
- c.workerCloseChan <- true
-
- return nil
-}
-
-func (c *meekConn) LocalAddr() net.Addr {
- return &net.IPAddr{IP: loopbackAddr}
-}
-
-func (c *meekConn) RemoteAddr() net.Addr {
- return c.args
-}
-
-func (c *meekConn) SetDeadline(t time.Time) error {
- return ErrNotSupported
-}
-
-func (c *meekConn) SetReadDeadline(t time.Time) error {
- return ErrNotSupported
-}
-
-func (c *meekConn) SetWriteDeadline(t time.Time) error {
- return ErrNotSupported
-}
-
-func (c *meekConn) enqueueWrite(b []byte) (ok bool) {
- defer func() { recover() }()
- c.workerWrChan <- b
- return true
-}
-
-func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
- var req *http.Request
- var resp *http.Response
-
- for retries := 0; retries < maxRetries; retries++ {
- url := *c.args.url
- host := url.Host
- if c.args.front != "" {
- url.Host = c.args.front
- }
- req, err = http.NewRequest("POST", url.String(), bytes.NewReader(sndBuf))
- if err != nil {
- return nil, err
- }
- if c.args.front != "" {
- req.Host = host
- }
- req.Header.Set("X-Session-Id", c.sessionID)
- req.Header.Set("User-Agent", "")
-
- resp, err = c.transport.RoundTrip(req)
- if err != nil {
- return nil, err
- }
- if resp.StatusCode != http.StatusOK {
- err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
- time.Sleep(retryDelay)
- } else {
- defer resp.Body.Close()
- recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, maxPayloadLength))
- return
- }
- }
- return
-}
-
-func (c *meekConn) ioWorker() {
- interval := initPollInterval
- var sndBuf, leftBuf []byte
-loop:
-
- for {
- sndBuf = nil
- select {
- case <-time.After(interval):
- // If the poll interval has elapsed, issue a request.
- case sndBuf = <-c.workerWrChan:
- // If there is data pending a send, issue a request.
- case _ = <-c.workerCloseChan:
- break loop
- }
-
- // Combine short writes as long as data is available to be
- // sent immediately and it will not put us over the max
- // payload limit. Any excess data is stored and dispatched
- // as the next request).
- sndBuf = append(leftBuf, sndBuf...)
- wrSz := len(sndBuf)
- for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
- b := <-c.workerWrChan
- sndBuf = append(sndBuf, b...)
- wrSz = len(sndBuf)
- }
- if wrSz > maxPayloadLength {
- wrSz = maxPayloadLength
- }
-
- // Issue a request.
- rdBuf, err := c.roundTrip(sndBuf[:wrSz])
- if err != nil {
- // Welp, something went horrifically wrong.
- break loop
- }
-
- // Stash the remaining payload if any.
- leftBuf = sndBuf[wrSz:] // Store the remaining data
- if len(leftBuf) == 0 {
- leftBuf = nil
- }
-
- // Determine the next poll interval.
- if len(rdBuf) > 0 {
- // Received data, enqueue the read.
- c.workerRdChan <- rdBuf
-
- // And poll immediately.
- interval = 0
- } else if wrSz > 0 {
- // Sent data, poll immediately.
- interval = 0
- } else if interval == 0 {
- // Neither sent nor received data, initialize the delay.
- interval = initPollInterval
- } else {
- // Apply a multiplicative backoff.
- interval = time.Duration(float64(interval) * pollIntervalMultiplier)
- if interval > maxPollInterval {
- interval = maxPollInterval
- }
- }
-
- runtime.Gosched()
- }
-
- // Unblock callers waiting in Read() for data that will never arrive,
- // and callers waiting in Write() for data that will never get sent.
- close(c.workerRdChan)
- close(c.workerWrChan)
-
- // In case the close was done on an error condition, update the state
- // variable so that further calls to Write() will fail.
- c.Lock()
- defer c.Unlock()
- c.workerRunning = false
-}
-
-func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) (net.Conn, error) {
- id, err := newSessionID()
- if err != nil {
- return nil, err
- }
-
- tr := &http.Transport{Dial: dialFn}
- conn := &meekConn{
- args: ca,
- sessionID: id,
- transport: tr,
- workerRunning: true,
- workerWrChan: make(chan []byte, maxChanBacklog),
- workerRdChan: make(chan []byte, maxChanBacklog),
- workerCloseChan: make(chan bool),
- }
-
- // Start the I/O worker.
- go conn.ioWorker()
-
- return conn, nil
-}
-
-func newSessionID() (string, error) {
- var b [64]byte
- if _, err := rand.Read(b[:]); err != nil {
- return "", err
- }
- h := sha256.Sum256(b[:])
- return hex.EncodeToString(h[:16]), nil
-}
-
-var _ net.Conn = (*meekConn)(nil)
-var _ net.Addr = (*meekClientArgs)(nil)