summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrandon Wiley <brandon@blanu.net>2016-03-31 12:46:08 -0500
committerBrandon Wiley <brandon@blanu.net>2016-03-31 12:46:08 -0500
commitd3632a66253910b3dd605c42585136af7a1abf07 (patch)
treefa990ede1b43a693a28146e65f64b3f7d2cd6929
parent0326ad13feb27480d634cfcdf5ef87fe43ef6c9f (diff)
Split out proxy code into multiple different proxy packages
-rw-r--r--proxies/proxy_http/proxy_http.go158
-rw-r--r--proxies/proxy_socks4/proxy_socks4.go164
-rw-r--r--proxies/proxy_socks5/proxy_socks5.go303
-rw-r--r--proxies/proxy_transparent/proxy_transparent.go359
-rw-r--r--proxies/proxy_transparent_udp/proxy_transparent_udp.go343
5 files changed, 1327 insertions, 0 deletions
diff --git a/proxies/proxy_http/proxy_http.go b/proxies/proxy_http/proxy_http.go
new file mode 100644
index 0000000..cbca64a
--- /dev/null
+++ b/proxies/proxy_http/proxy_http.go
@@ -0,0 +1,158 @@
+/*
+ * Copyright (c) 2014, Yawning Angel <yawning at torproject dot org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package proxy_http
+
+import (
+ "bufio"
+ "fmt"
+ "net"
+ "net/http"
+ "net/http/httputil"
+ "net/url"
+ "time"
+
+ "golang.org/x/net/proxy"
+)
+
+// httpProxy is a HTTP connect proxy.
+type httpProxy struct {
+ hostPort string
+ haveAuth bool
+ username string
+ password string
+ forward proxy.Dialer
+}
+
+func newHTTP(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
+ s := new(httpProxy)
+ s.hostPort = uri.Host
+ s.forward = forward
+ if uri.User != nil {
+ s.haveAuth = true
+ s.username = uri.User.Username()
+ s.password, _ = uri.User.Password()
+ }
+
+ return s, nil
+}
+
+func (s *httpProxy) Dial(network, addr string) (net.Conn, error) {
+ // Dial and create the http client connection.
+ c, err := s.forward.Dial("tcp", s.hostPort)
+ if err != nil {
+ return nil, err
+ }
+ conn := new(httpConn)
+ conn.httpConn = httputil.NewClientConn(c, nil)
+ conn.remoteAddr, err = net.ResolveTCPAddr(network, addr)
+ if err != nil {
+ conn.httpConn.Close()
+ return nil, err
+ }
+
+ // HACK HACK HACK HACK. http.ReadRequest also does this.
+ reqURL, err := url.Parse("http://" + addr)
+ if err != nil {
+ conn.httpConn.Close()
+ return nil, err
+ }
+ reqURL.Scheme = ""
+
+ req, err := http.NewRequest("CONNECT", reqURL.String(), nil)
+ if err != nil {
+ conn.httpConn.Close()
+ return nil, err
+ }
+ req.Close = false
+ if s.haveAuth {
+ req.SetBasicAuth(s.username, s.password)
+ }
+ req.Header.Set("User-Agent", "")
+
+ resp, err := conn.httpConn.Do(req)
+ if err != nil && err != httputil.ErrPersistEOF {
+ conn.httpConn.Close()
+ return nil, err
+ }
+ if resp.StatusCode != 200 {
+ conn.httpConn.Close()
+ return nil, fmt.Errorf("proxy error: %s", resp.Status)
+ }
+
+ conn.hijackedConn, conn.staleReader = conn.httpConn.Hijack()
+ return conn, nil
+}
+
+type httpConn struct {
+ remoteAddr *net.TCPAddr
+ httpConn *httputil.ClientConn
+ hijackedConn net.Conn
+ staleReader *bufio.Reader
+}
+
+func (c *httpConn) Read(b []byte) (int, error) {
+ if c.staleReader != nil {
+ if c.staleReader.Buffered() > 0 {
+ return c.staleReader.Read(b)
+ }
+ c.staleReader = nil
+ }
+ return c.hijackedConn.Read(b)
+}
+
+func (c *httpConn) Write(b []byte) (int, error) {
+ return c.hijackedConn.Write(b)
+}
+
+func (c *httpConn) Close() error {
+ return c.hijackedConn.Close()
+}
+
+func (c *httpConn) LocalAddr() net.Addr {
+ return c.hijackedConn.LocalAddr()
+}
+
+func (c *httpConn) RemoteAddr() net.Addr {
+ return c.remoteAddr
+}
+
+func (c *httpConn) SetDeadline(t time.Time) error {
+ return c.hijackedConn.SetDeadline(t)
+}
+
+func (c *httpConn) SetReadDeadline(t time.Time) error {
+ return c.hijackedConn.SetReadDeadline(t)
+}
+
+func (c *httpConn) SetWriteDeadline(t time.Time) error {
+ return c.hijackedConn.SetWriteDeadline(t)
+}
+
+func init() {
+ proxy.RegisterDialerType("http", newHTTP)
+}
diff --git a/proxies/proxy_socks4/proxy_socks4.go b/proxies/proxy_socks4/proxy_socks4.go
new file mode 100644
index 0000000..4e67f46
--- /dev/null
+++ b/proxies/proxy_socks4/proxy_socks4.go
@@ -0,0 +1,164 @@
+/*
+ * Copyright (c) 2014, Yawning Angel <yawning at torproject dot org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * This is inspired by go.net/proxy/socks5.go:
+ *
+ * Copyright 2011 The Go Authors. All rights reserved.
+ * Use of this source code is governed by a BSD-style
+ * license that can be found in the LICENSE file.
+ */
+
+package proxy_socks4
+
+import (
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "net/url"
+ "strconv"
+
+ "golang.org/x/net/proxy"
+)
+
+// socks4Proxy is a SOCKS4 proxy.
+type socks4Proxy struct {
+ hostPort string
+ username string
+ forward proxy.Dialer
+}
+
+const (
+ socks4Version = 0x04
+ socks4CommandConnect = 0x01
+ socks4Null = 0x00
+ socks4ReplyVersion = 0x00
+
+ socks4Granted = 0x5a
+ socks4Rejected = 0x5b
+ socks4RejectedIdentdFailed = 0x5c
+ socks4RejectedIdentdMismatch = 0x5d
+)
+
+func newSOCKS4(uri *url.URL, forward proxy.Dialer) (proxy.Dialer, error) {
+ s := new(socks4Proxy)
+ s.hostPort = uri.Host
+ s.forward = forward
+ if uri.User != nil {
+ s.username = uri.User.Username()
+ }
+ return s, nil
+}
+
+func (s *socks4Proxy) Dial(network, addr string) (net.Conn, error) {
+ if network != "tcp" && network != "tcp4" {
+ return nil, errors.New("invalid network type")
+ }
+
+ // Deal with the destination address/string.
+ ipStr, portStr, err := net.SplitHostPort(addr)
+ if err != nil {
+ return nil, err
+ }
+ ip := net.ParseIP(ipStr)
+ if ip == nil {
+ return nil, errors.New("failed to parse destination IP")
+ }
+ ip4 := ip.To4()
+ if ip4 == nil {
+ return nil, errors.New("destination address is not IPv4")
+ }
+ port, err := strconv.ParseUint(portStr, 10, 16)
+ if err != nil {
+ return nil, err
+ }
+
+ // Connect to the proxy.
+ c, err := s.forward.Dial("tcp", s.hostPort)
+ if err != nil {
+ return nil, err
+ }
+
+ // Make/write the request:
+ // +----+----+----+----+----+----+----+----+----+----+....+----+
+ // | VN | CD | DSTPORT | DSTIP | USERID |NULL|
+ // +----+----+----+----+----+----+----+----+----+----+....+----+
+
+ req := make([]byte, 0, 9+len(s.username))
+ req = append(req, socks4Version)
+ req = append(req, socks4CommandConnect)
+ req = append(req, byte(port>>8), byte(port))
+ req = append(req, ip4...)
+ if s.username != "" {
+ req = append(req, s.username...)
+ }
+ req = append(req, socks4Null)
+ _, err = c.Write(req)
+ if err != nil {
+ c.Close()
+ return nil, err
+ }
+
+ // Read the response:
+ // +----+----+----+----+----+----+----+----+
+ // | VN | CD | DSTPORT | DSTIP |
+ // +----+----+----+----+----+----+----+----+
+
+ var resp [8]byte
+ _, err = io.ReadFull(c, resp[:])
+ if err != nil {
+ c.Close()
+ return nil, err
+ }
+ if resp[0] != socks4ReplyVersion {
+ c.Close()
+ return nil, errors.New("proxy returned invalid SOCKS4 version")
+ }
+ if resp[1] != socks4Granted {
+ c.Close()
+ return nil, fmt.Errorf("proxy error: %s", socks4ErrorToString(resp[1]))
+ }
+
+ return c, nil
+}
+
+func socks4ErrorToString(code byte) string {
+ switch code {
+ case socks4Rejected:
+ return "request rejected or failed"
+ case socks4RejectedIdentdFailed:
+ return "request rejected becasue SOCKS server cannot connect to identd on the client"
+ case socks4RejectedIdentdMismatch:
+ return "request rejected because the client program and identd report different user-ids"
+ default:
+ return fmt.Sprintf("unknown failure code %x", code)
+ }
+}
+
+func init() {
+ // Despite the scheme name, this really is SOCKS4.
+ proxy.RegisterDialerType("socks4a", newSOCKS4)
+}
diff --git a/proxies/proxy_socks5/proxy_socks5.go b/proxies/proxy_socks5/proxy_socks5.go
new file mode 100644
index 0000000..8037d4d
--- /dev/null
+++ b/proxies/proxy_socks5/proxy_socks5.go
@@ -0,0 +1,303 @@
+/*
+ * Copyright (c) 2014-2015, Yawning Angel <yawning at torproject dot org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// Go language Tor Pluggable Transport suite. Works only as a managed
+// client/server.
+package proxy_socks5
+
+import (
+ "io"
+ "fmt"
+ golog "log"
+ "net"
+ "net/url"
+ "sync"
+
+ "golang.org/x/net/proxy"
+
+ "git.torproject.org/pluggable-transports/goptlib.git"
+ "github.com/OperatorFoundation/obfs4/common/log"
+ "github.com/OperatorFoundation/obfs4/common/socks5"
+ "github.com/OperatorFoundation/obfs4/common/termmon"
+ "github.com/OperatorFoundation/obfs4/common/pt_extras"
+ "github.com/OperatorFoundation/obfs4/transports"
+ "github.com/OperatorFoundation/obfs4/transports/base"
+)
+
+const (
+ obfs4proxyVersion = "0.0.7-dev"
+ obfs4proxyLogFile = "obfs4proxy.log"
+ socksAddr = "127.0.0.1:0"
+)
+
+var stateDir string
+
+func ClientSetup(termMon *termmon.TermMonitor) (launched bool, listeners []net.Listener) {
+ ptClientInfo, err := pt.ClientSetup(transports.Transports())
+ if err != nil {
+ golog.Fatal(err)
+ }
+
+ ptClientProxy, err := pt_extras.PtGetProxy()
+ fmt.Println("ptclientproxy", ptClientProxy)
+ if err != nil {
+ golog.Fatal(err)
+ } else if ptClientProxy != nil {
+ pt_extras.PtProxyDone()
+ }
+
+ // Launch each of the client listeners.
+ for _, name := range ptClientInfo.MethodNames {
+ t := transports.Get(name)
+ if t == nil {
+ pt.CmethodError(name, "no such transport is supported")
+ continue
+ }
+
+ f, err := t.ClientFactory(stateDir)
+ if err != nil {
+ pt.CmethodError(name, "failed to get ClientFactory")
+ continue
+ }
+
+ ln, err := net.Listen("tcp", socksAddr)
+ if err != nil {
+ pt.CmethodError(name, err.Error())
+ continue
+ }
+
+ go clientAcceptLoop(termMon, f, ln, ptClientProxy)
+ pt.Cmethod(name, socks5.Version(), ln.Addr())
+
+ log.Infof("%s - registered listener: %s", name, ln.Addr())
+
+ listeners = append(listeners, ln)
+ launched = true
+ }
+ pt.CmethodsDone()
+
+ return
+}
+
+func clientAcceptLoop(termMon *termmon.TermMonitor, f base.ClientFactory, ln net.Listener, proxyURI *url.URL) error {
+ defer ln.Close()
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ if e, ok := err.(net.Error); ok && !e.Temporary() {
+ return err
+ }
+ continue
+ }
+ go clientHandler(termMon, f, conn, proxyURI)
+ }
+}
+
+func clientHandler(termMon *termmon.TermMonitor, f base.ClientFactory, conn net.Conn, proxyURI *url.URL) {
+ defer conn.Close()
+ termMon.OnHandlerStart()
+ defer termMon.OnHandlerFinish()
+
+ name := f.Transport().Name()
+
+ // Read the client's SOCKS handshake.
+ socksReq, err := socks5.Handshake(conn)
+ if err != nil {
+ log.Errorf("%s - client failed socks handshake: %s", name, err)
+ return
+ }
+ addrStr := log.ElideAddr(socksReq.Target)
+
+ // Deal with arguments.
+ args, err := f.ParseArgs(&socksReq.Args)
+ if err != nil {
+ log.Errorf("%s(%s) - invalid arguments: %s", name, addrStr, err)
+ socksReq.Reply(socks5.ReplyGeneralFailure)
+ return
+ }
+
+ // Obtain the proxy dialer if any, and create the outgoing TCP connection.
+ dialFn := proxy.Direct.Dial
+ if proxyURI != nil {
+ dialer, err := proxy.FromURL(proxyURI, proxy.Direct)
+ if err != nil {
+ // This should basically never happen, since config protocol
+ // verifies this.
+ log.Errorf("%s(%s) - failed to obtain proxy dialer: %s", name, addrStr, log.ElideError(err))
+ socksReq.Reply(socks5.ReplyGeneralFailure)
+ return
+ }
+ dialFn = dialer.Dial
+ }
+
+ fmt.Println("Got dialer", dialFn, proxyURI, proxy.Direct)
+
+ remote, err := f.Dial("tcp", socksReq.Target, dialFn, args)
+ if err != nil {
+ log.Errorf("%s(%s) - outgoing connection failed: %s", name, addrStr, log.ElideError(err))
+ socksReq.Reply(socks5.ErrorToReplyCode(err))
+ return
+ }
+ defer remote.Close()
+ err = socksReq.Reply(socks5.ReplySucceeded)
+ if err != nil {
+ log.Errorf("%s(%s) - SOCKS reply failed: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+
+ if err = copyLoop(conn, remote); err != nil {
+ log.Warnf("%s(%s) - closed connection: %s", name, addrStr, log.ElideError(err))
+ } else {
+ log.Infof("%s(%s) - closed connection", name, addrStr)
+ }
+
+ return
+}
+
+func ServerSetup(termMon *termmon.TermMonitor) (launched bool, listeners []net.Listener) {
+ ptServerInfo, err := pt.ServerSetup(transports.Transports())
+ if err != nil {
+ golog.Fatal(err)
+ }
+
+ for _, bindaddr := range ptServerInfo.Bindaddrs {
+ name := bindaddr.MethodName
+ t := transports.Get(name)
+ if t == nil {
+ pt.SmethodError(name, "no such transport is supported")
+ continue
+ }
+
+ f, err := t.ServerFactory(stateDir, &bindaddr.Options)
+ if err != nil {
+ pt.SmethodError(name, err.Error())
+ continue
+ }
+
+ ln, err := net.ListenTCP("tcp", bindaddr.Addr)
+ if err != nil {
+ pt.SmethodError(name, err.Error())
+ continue
+ }
+
+ go serverAcceptLoop(termMon, f, ln, &ptServerInfo)
+ if args := f.Args(); args != nil {
+ pt.SmethodArgs(name, ln.Addr(), *args)
+ } else {
+ pt.SmethodArgs(name, ln.Addr(), nil)
+ }
+
+ log.Infof("%s - registered listener: %s", name, log.ElideAddr(ln.Addr().String()))
+
+ listeners = append(listeners, ln)
+ launched = true
+ }
+ pt.SmethodsDone()
+
+ return
+}
+
+func serverAcceptLoop(termMon *termmon.TermMonitor, f base.ServerFactory, ln net.Listener, info *pt.ServerInfo) error {
+ defer ln.Close()
+ for {
+ conn, err := ln.Accept()
+ if err != nil {
+ if e, ok := err.(net.Error); ok && !e.Temporary() {
+ return err
+ }
+ continue
+ }
+ go serverHandler(termMon, f, conn, info)
+ }
+}
+
+func serverHandler(termMon *termmon.TermMonitor, f base.ServerFactory, conn net.Conn, info *pt.ServerInfo) {
+ defer conn.Close()
+ termMon.OnHandlerStart()
+ defer termMon.OnHandlerFinish()
+
+ name := f.Transport().Name()
+ addrStr := log.ElideAddr(conn.RemoteAddr().String())
+ log.Infof("%s(%s) - new connection", name, addrStr)
+
+ // Instantiate the server transport method and handshake.
+ remote, err := f.WrapConn(conn)
+ if err != nil {
+ log.Warnf("%s(%s) - handshake failed: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+
+ // Connect to the orport.
+ orConn, err := pt.DialOr(info, conn.RemoteAddr().String(), name)
+ if err != nil {
+ log.Errorf("%s(%s) - failed to connect to ORPort: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+ defer orConn.Close()
+
+ if err = copyLoop(orConn, remote); err != nil {
+ log.Warnf("%s(%s) - closed connection: %s", name, addrStr, log.ElideError(err))
+ } else {
+ log.Infof("%s(%s) - closed connection", name, addrStr)
+ }
+
+ return
+}
+
+func copyLoop(a net.Conn, b net.Conn) error {
+ // Note: b is always the pt connection. a is the SOCKS/ORPort connection.
+ errChan := make(chan error, 2)
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go func() {
+ defer wg.Done()
+ defer b.Close()
+ defer a.Close()
+ _, err := io.Copy(b, a)
+ errChan <- err
+ }()
+ go func() {
+ defer wg.Done()
+ defer a.Close()
+ defer b.Close()
+ _, err := io.Copy(a, b)
+ errChan <- err
+ }()
+
+ // Wait for both upstream and downstream to close. Since one side
+ // terminating closes the other, the second error in the channel will be
+ // something like EINVAL (though io.Copy() will swallow EOF), so only the
+ // first error is returned.
+ wg.Wait()
+ if len(errChan) > 0 {
+ return <-errChan
+ }
+
+ return nil
+}
diff --git a/proxies/proxy_transparent/proxy_transparent.go b/proxies/proxy_transparent/proxy_transparent.go
new file mode 100644
index 0000000..4e9e22a
--- /dev/null
+++ b/proxies/proxy_transparent/proxy_transparent.go
@@ -0,0 +1,359 @@
+/*
+ * Copyright (c) 2014-2015, Yawning Angel <yawning at torproject dot org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// Go language Tor Pluggable Transport suite. Works only as a managed
+// client/server.
+package proxy_transparent
+
+import (
+ "io"
+ "fmt"
+ golog "log"
+ "net"
+ "net/url"
+ "sync"
+ "strings"
+ "strconv"
+
+ "golang.org/x/net/proxy"
+
+ "git.torproject.org/pluggable-transports/goptlib.git"
+ "github.com/OperatorFoundation/obfs4/common/log"
+ "github.com/OperatorFoundation/obfs4/common/termmon"
+ "github.com/OperatorFoundation/obfs4/transports"
+ "github.com/OperatorFoundation/obfs4/transports/base"
+)
+
+const (
+ obfs4proxyVersion = "0.0.7-dev"
+ obfs4proxyLogFile = "obfs4proxy.log"
+ socksAddr = "127.0.0.1:1234"
+)
+
+var stateDir string
+
+func ClientSetup(termMon *termmon.TermMonitor, target string) (launched bool, listeners []net.Listener) {
+ methodNames := [...]string{"obfs2"}
+ var ptClientProxy *url.URL = nil
+
+ // Launch each of the client listeners.
+ for _, name := range methodNames {
+ t := transports.Get(name)
+ if t == nil {
+ log.Errorf("no such transport is supported: %s", name)
+ continue
+ }
+
+ f, err := t.ClientFactory(stateDir)
+ if err != nil {
+ log.Errorf("failed to get ClientFactory: %s", name)
+ continue
+ }
+
+ fmt.Println("Listening ", socksAddr)
+ ln, err := net.Listen("tcp", socksAddr)
+ if err != nil {
+ log.Errorf("failed to listen %s %s", name, err.Error())
+ continue
+ }
+
+ go clientAcceptLoop(target, termMon, f, ln, ptClientProxy)
+
+ log.Infof("%s - registered listener: %s", name, ln.Addr())
+
+ listeners = append(listeners, ln)
+ launched = true
+ }
+
+ return
+}
+
+func clientAcceptLoop(target string, termMon *termmon.TermMonitor, f base.ClientFactory, ln net.Listener, proxyURI *url.URL) error {
+ defer ln.Close()
+ for {
+ conn, err := ln.Accept()
+ fmt.Println("Accepted")
+ if err != nil {
+ if e, ok := err.(net.Error); ok && !e.Temporary() {
+ return err
+ }
+ continue
+ }
+ go clientHandler(target, termMon, f, conn, proxyURI)
+ }
+}
+
+func clientHandler(target string, termMon *termmon.TermMonitor, f base.ClientFactory, conn net.Conn, proxyURI *url.URL) {
+ defer conn.Close()
+ termMon.OnHandlerStart()
+ defer termMon.OnHandlerFinish()
+
+ fmt.Println("handling...")
+
+ name := f.Transport().Name()
+
+ fmt.Println("Transport is", name)
+
+ // Deal with arguments.
+ args, err := f.ParseArgs(&pt.Args{})
+ if err != nil {
+ fmt.Println("Invalid arguments")
+ log.Errorf("%s(%s) - invalid arguments: %s", name, target, err)
+ return
+ }
+
+ fmt.Println("Making dialer...")
+
+ // Obtain the proxy dialer if any, and create the outgoing TCP connection.
+ dialFn := proxy.Direct.Dial
+ if proxyURI != nil {
+ dialer, err := proxy.FromURL(proxyURI, proxy.Direct)
+ if err != nil {
+ // This should basically never happen, since config protocol
+ // verifies this.
+ fmt.Println("failed to obtain dialer", proxyURI, proxy.Direct)
+ log.Errorf("%s(%s) - failed to obtain proxy dialer: %s", name, target, log.ElideError(err))
+ return
+ }
+ dialFn = dialer.Dial
+ }
+
+ fmt.Println("Dialing...")
+
+ remote, err := f.Dial("tcp", target, dialFn, args)
+ if err != nil {
+ fmt.Println("outgoing connection failed")
+ log.Errorf("%s(%s) - outgoing connection failed: %s", name, target, log.ElideError(err))
+ return
+ }
+ defer remote.Close()
+
+ fmt.Println("copying...")
+
+ if err = copyLoop(conn, remote); err != nil {
+ log.Warnf("%s(%s) - closed connection: %s", name, target, log.ElideError(err))
+ } else {
+ log.Infof("%s(%s) - closed connection", name, target)
+ }
+
+ fmt.Println("done")
+
+ return
+}
+
+func ServerSetup(termMon *termmon.TermMonitor, bindaddrString string) (launched bool, listeners []net.Listener) {
+ ptServerInfo, err := pt.ServerSetup(transports.Transports())
+ if err != nil {
+ golog.Fatal(err)
+ }
+
+ fmt.Println("ServerSetup")
+
+ bindaddrs, _ := getServerBindaddrs(bindaddrString)
+
+ for _, bindaddr := range bindaddrs {
+ name := bindaddr.MethodName
+ fmt.Println("bindaddr", bindaddr)
+ t := transports.Get(name)
+ if t == nil {
+ fmt.Println(name, "no such transport is supported")
+ continue
+ }
+
+ f, err := t.ServerFactory(stateDir, &bindaddr.Options)
+ if err != nil {
+ fmt.Println(name, err.Error())
+ continue
+ }
+
+ ln, err := net.ListenTCP("tcp", bindaddr.Addr)
+ if err != nil {
+ fmt.Println(name, err.Error())
+ continue
+ }
+
+ go serverAcceptLoop(termMon, f, ln, &ptServerInfo)
+
+ log.Infof("%s - registered listener: %s", name, log.ElideAddr(ln.Addr().String()))
+
+ listeners = append(listeners, ln)
+ launched = true
+ }
+
+ return
+}
+
+func getServerBindaddrs(serverBindaddr string) ([]pt.Bindaddr, error) {
+ var result []pt.Bindaddr;
+
+ for _, spec := range strings.Split(serverBindaddr, ",") {
+ var bindaddr pt.Bindaddr
+
+ parts := strings.SplitN(spec, "-", 2)
+ if len(parts) != 2 {
+ fmt.Println("TOR_PT_SERVER_BINDADDR: doesn't contain \"-\"", spec)
+ return nil, nil
+ }
+ bindaddr.MethodName = parts[0]
+ addr, err := resolveAddr(parts[1])
+ if err != nil {
+ fmt.Println("TOR_PT_SERVER_BINDADDR: ", spec, err.Error())
+ return nil, nil
+ }
+ bindaddr.Addr = addr
+// bindaddr.Options = optionsMap[bindaddr.MethodName]
+ result = append(result, bindaddr)
+ }
+
+ return result, nil
+}
+
+// Resolve an address string into a net.TCPAddr. We are a bit more strict than
+// net.ResolveTCPAddr; we don't allow an empty host or port, and the host part
+// must be a literal IP address.
+func resolveAddr(addrStr string) (*net.TCPAddr, error) {
+ ipStr, portStr, err := net.SplitHostPort(addrStr)
+ if err != nil {
+ // Before the fixing of bug #7011, tor doesn't put brackets around IPv6
+ // addresses. Split after the last colon, assuming it is a port
+ // separator, and try adding the brackets.
+ parts := strings.Split(addrStr, ":")
+ if len(parts) <= 2 {
+ return nil, err
+ }
+ addrStr := "[" + strings.Join(parts[:len(parts)-1], ":") + "]:" + parts[len(parts)-1]
+ ipStr, portStr, err = net.SplitHostPort(addrStr)
+ }
+ if err != nil {
+ return nil, err
+ }
+ if ipStr == "" {
+ return nil, net.InvalidAddrError(fmt.Sprintf("address string %q lacks a host part", addrStr))
+ }
+ if portStr == "" {
+ return nil, net.InvalidAddrError(fmt.Sprintf("address string %q lacks a port part", addrStr))
+ }
+ ip := net.ParseIP(ipStr)
+ if ip == nil {
+ return nil, net.InvalidAddrError(fmt.Sprintf("not an IP string: %q", ipStr))
+ }
+ port, err := parsePort(portStr)
+ if err != nil {
+ return nil, err
+ }
+ return &net.TCPAddr{IP: ip, Port: port}, nil
+}
+
+func parsePort(portStr string) (int, error) {
+ port, err := strconv.ParseUint(portStr, 10, 16)
+ return int(port), err
+}
+
+func serverAcceptLoop(termMon *termmon.TermMonitor, f base.ServerFactory, ln net.Listener, info *pt.ServerInfo) error {
+ defer ln.Close()
+ for {
+ conn, err := ln.Accept()
+ fmt.Println("accepted")
+ if err != nil {
+ if e, ok := err.(net.Error); ok && !e.Temporary() {
+ return err
+ }
+ continue
+ }
+ go serverHandler(termMon, f, conn, info)
+ }
+}
+
+func serverHandler(termMon *termmon.TermMonitor, f base.ServerFactory, conn net.Conn, info *pt.ServerInfo) {
+ defer conn.Close()
+ termMon.OnHandlerStart()
+ defer termMon.OnHandlerFinish()
+
+ name := f.Transport().Name()
+ addrStr := log.ElideAddr(conn.RemoteAddr().String())
+ fmt.Println("handling", name)
+ log.Infof("%s(%s) - new connection", name, addrStr)
+
+ // Instantiate the server transport method and handshake.
+ remote, err := f.WrapConn(conn)
+ if err != nil {
+ fmt.Println("handshake failed")
+ log.Warnf("%s(%s) - handshake failed: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+
+ // Connect to the orport.
+ orConn, err := pt.DialOr(info, conn.RemoteAddr().String(), name)
+ if err != nil {
+ fmt.Println("OR conn failed", info, conn.RemoteAddr(), name)
+ log.Errorf("%s(%s) - failed to connect to ORPort: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+ defer orConn.Close()
+
+ if err = copyLoop(orConn, remote); err != nil {
+ log.Warnf("%s(%s) - closed connection: %s", name, addrStr, log.ElideError(err))
+ } else {
+ log.Infof("%s(%s) - closed connection", name, addrStr)
+ }
+
+ return
+}
+
+func copyLoop(a net.Conn, b net.Conn) error {
+ // Note: b is always the pt connection. a is the SOCKS/ORPort connection.
+ errChan := make(chan error, 2)
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go func() {
+ defer wg.Done()
+ defer b.Close()
+ defer a.Close()
+ _, err := io.Copy(b, a)
+ errChan <- err
+ }()
+ go func() {
+ defer wg.Done()
+ defer a.Close()
+ defer b.Close()
+ _, err := io.Copy(a, b)
+ errChan <- err
+ }()
+
+ // Wait for both upstream and downstream to close. Since one side
+ // terminating closes the other, the second error in the channel will be
+ // something like EINVAL (though io.Copy() will swallow EOF), so only the
+ // first error is returned.
+ wg.Wait()
+ if len(errChan) > 0 {
+ return <-errChan
+ }
+
+ return nil
+}
diff --git a/proxies/proxy_transparent_udp/proxy_transparent_udp.go b/proxies/proxy_transparent_udp/proxy_transparent_udp.go
new file mode 100644
index 0000000..14f132f
--- /dev/null
+++ b/proxies/proxy_transparent_udp/proxy_transparent_udp.go
@@ -0,0 +1,343 @@
+/*
+ * Copyright (c) 2014-2015, Yawning Angel <yawning at torproject dot org>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+// Go language Tor Pluggable Transport suite. Works only as a managed
+// client/server.
+package proxy_transparent_udp
+
+import (
+ "io"
+ "fmt"
+ golog "log"
+ "net"
+ "net/url"
+ "sync"
+ "strings"
+ "strconv"
+
+ "golang.org/x/net/proxy"
+
+ "git.torproject.org/pluggable-transports/goptlib.git"
+ "github.com/OperatorFoundation/obfs4/common/log"
+ "github.com/OperatorFoundation/obfs4/common/termmon"
+ "github.com/OperatorFoundation/obfs4/transports"
+ "github.com/OperatorFoundation/obfs4/transports/base"
+)
+
+const (
+ obfs4proxyVersion = "0.0.7-dev"
+ obfs4proxyLogFile = "obfs4proxy.log"
+ socksAddr = "127.0.0.1:1234"
+)
+
+var stateDir string
+
+func ClientSetup(termMon *termmon.TermMonitor, target string) bool {
+ methodNames := [...]string{"obfs2"}
+ var ptClientProxy *url.URL = nil
+
+ // Launch each of the client listeners.
+ for _, name := range methodNames {
+ t := transports.Get(name)
+ if t == nil {
+ log.Errorf("no such transport is supported: %s", name)
+ continue
+ }
+
+ f, err := t.ClientFactory(stateDir)
+ if err != nil {
+ log.Errorf("failed to get ClientFactory: %s", name)
+ continue
+ }
+
+ udpAddr, err := net.ResolveUDPAddr("udp", socksAddr)
+ if err != nil {
+ fmt.Println("Error resolving address", socksAddr)
+ }
+
+ fmt.Println("Listening ", socksAddr)
+ ln, err := net.ListenUDP("udp", udpAddr)
+ if err != nil {
+ log.Errorf("failed to listen %s %s", name, err.Error())
+ continue
+ }
+
+ go clientHandler(target, termMon, f, ln, ptClientProxy)
+
+ log.Infof("%s - registered listener: %s", name, ln)
+ }
+
+ return true
+}
+
+func clientHandler(target string, termMon *termmon.TermMonitor, f base.ClientFactory, conn net.Conn, proxyURI *url.URL) {
+ defer conn.Close()
+ termMon.OnHandlerStart()
+ defer termMon.OnHandlerFinish()
+
+ fmt.Println("handling...")
+
+ name := f.Transport().Name()
+
+ fmt.Println("Transport is", name)
+
+ // Deal with arguments.
+ args, err := f.ParseArgs(&pt.Args{})
+ if err != nil {
+ fmt.Println("Invalid arguments")
+ log.Errorf("%s(%s) - invalid arguments: %s", name, target, err)
+ return
+ }
+
+ fmt.Println("Making dialer...")
+
+ // Obtain the proxy dialer if any, and create the outgoing TCP connection.
+ dialFn := proxy.Direct.Dial
+ if proxyURI != nil {
+ dialer, err := proxy.FromURL(proxyURI, proxy.Direct)
+ if err != nil {
+ // This should basically never happen, since config protocol
+ // verifies this.
+ fmt.Println("failed to obtain dialer", proxyURI, proxy.Direct)
+ log.Errorf("%s(%s) - failed to obtain proxy dialer: %s", name, target, log.ElideError(err))
+ return
+ }
+ dialFn = dialer.Dial
+ }
+
+ fmt.Println("Dialing...")
+
+ remote, err := f.Dial("tcp", target, dialFn, args)
+ if err != nil {
+ fmt.Println("outgoing connection failed")
+ log.Errorf("%s(%s) - outgoing connection failed: %s", name, target, log.ElideError(err))
+ return
+ }
+ defer remote.Close()
+
+ fmt.Println("copying...")
+
+ if err = copyLoopUDP(conn, remote); err != nil {
+ log.Warnf("%s(%s) - closed connection: %s", name, target, log.ElideError(err))
+ } else {
+ log.Infof("%s(%s) - closed connection", name, target)
+ }
+
+ fmt.Println("done")
+
+ return
+}
+
+func ServerSetup(termMon *termmon.TermMonitor, bindaddrString string) bool {
+ ptServerInfo, err := pt.ServerSetup(transports.Transports())
+ if err != nil {
+ golog.Fatal(err)
+ }
+
+ fmt.Println("ServerSetup")
+
+ bindaddrs, _ := getServerBindaddrs(bindaddrString)
+
+ for _, bindaddr := range bindaddrs {
+ name := bindaddr.MethodName
+ fmt.Println("bindaddr", bindaddr)
+ t := transports.Get(name)
+ if t == nil {
+ fmt.Println(name, "no such transport is supported")
+ continue
+ }
+
+ f, err := t.ServerFactory(stateDir, &bindaddr.Options)
+ if err != nil {
+ fmt.Println(name, err.Error())
+ continue
+ }
+
+ ln, err := net.ListenTCP("tcp", bindaddr.Addr)
+ if err != nil {
+ fmt.Println(name, err.Error())
+ continue
+ }
+
+ go serverAcceptLoop(termMon, f, ln, &ptServerInfo)
+
+ log.Infof("%s - registered listener: %s", name, log.ElideAddr(ln.Addr().String()))
+ }
+
+ return true
+}
+
+func getServerBindaddrs(serverBindaddr string) ([]pt.Bindaddr, error) {
+ var result []pt.Bindaddr;
+
+ for _, spec := range strings.Split(serverBindaddr, ",") {
+ var bindaddr pt.Bindaddr
+
+ parts := strings.SplitN(spec, "-", 2)
+ if len(parts) != 2 {
+ fmt.Println("TOR_PT_SERVER_BINDADDR: doesn't contain \"-\"", spec)
+ return nil, nil
+ }
+ bindaddr.MethodName = parts[0]
+ addr, err := resolveAddr(parts[1])
+ if err != nil {
+ fmt.Println("TOR_PT_SERVER_BINDADDR: ", spec, err.Error())
+ return nil, nil
+ }
+ bindaddr.Addr = addr
+// bindaddr.Options = optionsMap[bindaddr.MethodName]
+ result = append(result, bindaddr)
+ }
+
+ return result, nil
+}
+
+// Resolve an address string into a net.TCPAddr. We are a bit more strict than
+// net.ResolveTCPAddr; we don't allow an empty host or port, and the host part
+// must be a literal IP address.
+func resolveAddr(addrStr string) (*net.TCPAddr, error) {
+ ipStr, portStr, err := net.SplitHostPort(addrStr)
+ if err != nil {
+ // Before the fixing of bug #7011, tor doesn't put brackets around IPv6
+ // addresses. Split after the last colon, assuming it is a port
+ // separator, and try adding the brackets.
+ parts := strings.Split(addrStr, ":")
+ if len(parts) <= 2 {
+ return nil, err
+ }
+ addrStr := "[" + strings.Join(parts[:len(parts)-1], ":") + "]:" + parts[len(parts)-1]
+ ipStr, portStr, err = net.SplitHostPort(addrStr)
+ }
+ if err != nil {
+ return nil, err
+ }
+ if ipStr == "" {
+ return nil, net.InvalidAddrError(fmt.Sprintf("address string %q lacks a host part", addrStr))
+ }
+ if portStr == "" {
+ return nil, net.InvalidAddrError(fmt.Sprintf("address string %q lacks a port part", addrStr))
+ }
+ ip := net.ParseIP(ipStr)
+ if ip == nil {
+ return nil, net.InvalidAddrError(fmt.Sprintf("not an IP string: %q", ipStr))
+ }
+ port, err := parsePort(portStr)
+ if err != nil {
+ return nil, err
+ }
+ return &net.TCPAddr{IP: ip, Port: port}, nil
+}
+
+func parsePort(portStr string) (int, error) {
+ port, err := strconv.ParseUint(portStr, 10, 16)
+ return int(port), err
+}
+
+func serverAcceptLoop(termMon *termmon.TermMonitor, f base.ServerFactory, ln net.Listener, info *pt.ServerInfo) error {
+ defer ln.Close()
+ for {
+ conn, err := ln.Accept()
+ fmt.Println("accepted")
+ if err != nil {
+ if e, ok := err.(net.Error); ok && !e.Temporary() {
+ return err
+ }
+ continue
+ }
+ go serverHandler(termMon, f, conn, info)
+ }
+}
+
+func serverHandler(termMon *termmon.TermMonitor, f base.ServerFactory, conn net.Conn, info *pt.ServerInfo) {
+ defer conn.Close()
+ termMon.OnHandlerStart()
+ defer termMon.OnHandlerFinish()
+
+ name := f.Transport().Name()
+ addrStr := log.ElideAddr(conn.RemoteAddr().String())
+ fmt.Println("handling", name)
+ log.Infof("%s(%s) - new connection", name, addrStr)
+
+ // Instantiate the server transport method and handshake.
+ remote, err := f.WrapConn(conn)
+ if err != nil {
+ fmt.Println("handshake failed")
+ log.Warnf("%s(%s) - handshake failed: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+
+ // Connect to the orport.
+ orConn, err := pt.DialOr(info, conn.RemoteAddr().String(), name)
+ if err != nil {
+ fmt.Println("OR conn failed", info, conn.RemoteAddr(), name)
+ log.Errorf("%s(%s) - failed to connect to ORPort: %s", name, addrStr, log.ElideError(err))
+ return
+ }
+ defer orConn.Close()
+
+ if err = copyLoopUDP(orConn, remote); err != nil {
+ log.Warnf("%s(%s) - closed connection: %s", name, addrStr, log.ElideError(err))
+ } else {
+ log.Infof("%s(%s) - closed connection", name, addrStr)
+ }
+
+ return
+}
+
+func copyLoopUDP(a net.Conn, b net.Conn) error {
+ // Note: b is always the pt connection. a is the UDP connection.
+ errChan := make(chan error, 2)
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+
+ go func() {
+ defer wg.Done()
+ defer b.Close()
+ defer a.Close()
+ _, err := io.Copy(b, a)
+ errChan <- err
+ }()
+ go func() {
+ defer wg.Done()
+ defer a.Close()
+ defer b.Close()
+ _, err := io.Copy(a, b)
+ errChan <- err
+ }()
+
+ // Wait for both upstream and downstream to close. Since one side
+ // terminating closes the other, the second error in the channel will be
+ // something like EINVAL (though io.Copy() will swallow EOF), so only the
+ // first error is returned.
+ wg.Wait()
+ if len(errChan) > 0 {
+ return <-errChan
+ }
+
+ return nil
+}