summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkali kaneko (leap communications) <kali@leap.se>2021-11-29 18:12:40 +0100
committerkali kaneko (leap communications) <kali@leap.se>2021-11-29 18:14:21 +0100
commitb25ec7c923924e53ddb65f9a34e9a669dcf066c7 (patch)
tree5ddabd18ad1cb880ee07ae8c950b99b99f150012
parentc7148d9559dab0e1cdbc6dd5306a3c852615560e (diff)
[feat] snowflake client support
-rw-r--r--branding/motd-cli/motd.json15
-rw-r--r--cmd/snowflake-client/.gitignore1
-rw-r--r--cmd/snowflake-client/main.go249
-rw-r--r--pkg/snowflake/bootstrap.go101
-rw-r--r--pkg/snowflake/lib/interfaces.go34
-rw-r--r--pkg/snowflake/lib/lib_test.go269
-rw-r--r--pkg/snowflake/lib/peers.go135
-rw-r--r--pkg/snowflake/lib/rendezvous.go184
-rw-r--r--pkg/snowflake/lib/snowflake.go175
-rw-r--r--pkg/snowflake/lib/turbotunnel.go68
-rw-r--r--pkg/snowflake/lib/util.go70
-rw-r--r--pkg/snowflake/lib/webrtc.go222
12 files changed, 1523 insertions, 0 deletions
diff --git a/branding/motd-cli/motd.json b/branding/motd-cli/motd.json
new file mode 100644
index 0000000..7b91098
--- /dev/null
+++ b/branding/motd-cli/motd.json
@@ -0,0 +1,15 @@
+{
+ "motd": [{
+ "begin": "01 Nov 21 00:00 -0700",
+ "end": "31 Jan 22 00:00 -0700",
+ "type": "daily",
+ "platform": "all",
+ "urgency": "normal",
+ "text": [
+ { "lang": "en",
+ "str": "%20___________%0A%3C%20RiseupVPN%20%3E%0A%20-----------%0A%20%20%20%20%20%20%20%20%5C%20%20%20%5E__%5E%0A%20%20%20%20%20%20%20%20%20%5C%20%20%28oo%29%5C_______%0A%20%20%20%20%20%20%20%20%20%20%20%20%28__%29%5C%20%20%20%20%20%20%20%29%5C%2F%5C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%7C%7C----w%20%7C%0A%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%20%7C%7C%20%20%20%20%20%7C%7C"},
+ { "lang": "es",
+ "str": "¡Gracias por usar RiseupVPN! Por favor reportanos <a href='https://0xacab.org/leap/bitmask-vpn'>cualquier bug o petición</a>."}
+ ]}
+ ]
+}
diff --git a/cmd/snowflake-client/.gitignore b/cmd/snowflake-client/.gitignore
new file mode 100644
index 0000000..8ec6558
--- /dev/null
+++ b/cmd/snowflake-client/.gitignore
@@ -0,0 +1 @@
+snowflake-client
diff --git a/cmd/snowflake-client/main.go b/cmd/snowflake-client/main.go
new file mode 100644
index 0000000..72fba3e
--- /dev/null
+++ b/cmd/snowflake-client/main.go
@@ -0,0 +1,249 @@
+// Client transport plugin for the Snowflake pluggable transport.
+package main
+
+import (
+ "flag"
+ "io"
+ "io/ioutil"
+ "log"
+ "math/rand"
+ "net"
+ "os"
+ "os/signal"
+ "path/filepath"
+ "strings"
+ "sync"
+ "syscall"
+ "time"
+
+ pt "git.torproject.org/pluggable-transports/goptlib.git"
+ //sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
+ sf "0xacab.org/leap/bitmask-vpn/pkg/snowflake/lib"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/nat"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
+ "github.com/pion/webrtc/v3"
+)
+
+const (
+ DefaultSnowflakeCapacity = 1
+)
+
+// Accept local SOCKS connections and pass them to the handler.
+func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struct{}, wg *sync.WaitGroup) {
+ defer ln.Close()
+ for {
+ conn, err := ln.AcceptSocks()
+ if err != nil {
+ if err, ok := err.(net.Error); ok && err.Temporary() {
+ continue
+ }
+ log.Printf("SOCKS accept error: %s", err)
+ break
+ }
+ log.Printf("SOCKS accepted: %v", conn.Req)
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+ defer conn.Close()
+
+ err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
+ if err != nil {
+ log.Printf("conn.Grant error: %s", err)
+ return
+ }
+
+ handler := make(chan struct{})
+ go func() {
+ err = sf.Handler(conn, tongue)
+ if err != nil {
+ log.Printf("handler error: %s", err)
+ }
+ close(handler)
+ return
+
+ }()
+ select {
+ case <-shutdown:
+ log.Println("Received shutdown signal")
+ case <-handler:
+ log.Println("Handler ended")
+ }
+ return
+ }()
+ }
+}
+
+// s is a comma-separated list of ICE server URLs.
+func parseIceServers(s string) []webrtc.ICEServer {
+ var servers []webrtc.ICEServer
+ s = strings.TrimSpace(s)
+ if len(s) == 0 {
+ return nil
+ }
+ urls := strings.Split(s, ",")
+ for _, url := range urls {
+ url = strings.TrimSpace(url)
+ servers = append(servers, webrtc.ICEServer{
+ URLs: []string{url},
+ })
+ }
+ return servers
+}
+
+func main() {
+ iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
+ brokerURL := flag.String("url", "", "URL of signaling broker")
+ frontDomain := flag.String("front", "", "front domain")
+ logFilename := flag.String("log", "", "name of log file")
+ logToStateDir := flag.Bool("log-to-state-dir", false, "resolve the log file relative to tor's pt state dir")
+ keepLocalAddresses := flag.Bool("keep-local-addresses", false, "keep local LAN address ICE candidates")
+ unsafeLogging := flag.Bool("unsafe-logging", false, "prevent logs from being scrubbed")
+ max := flag.Int("max", DefaultSnowflakeCapacity,
+ "capacity for number of multiplexed WebRTC peers")
+
+ // Deprecated
+ oldLogToStateDir := flag.Bool("logToStateDir", false, "use -log-to-state-dir instead")
+ oldKeepLocalAddresses := flag.Bool("keepLocalAddresses", false, "use -keep-local-addresses instead")
+
+ flag.Parse()
+
+ log.SetFlags(log.LstdFlags | log.LUTC)
+
+ // Don't write to stderr; versions of tor earlier than about 0.3.5.6 do
+ // not read from the pipe, and eventually we will deadlock because the
+ // buffer is full.
+ // https://bugs.torproject.org/26360
+ // https://bugs.torproject.org/25600#comment:14
+ var logOutput = ioutil.Discard
+ if *logFilename != "" {
+ if *logToStateDir || *oldLogToStateDir {
+ stateDir, err := pt.MakeStateDir()
+ if err != nil {
+ log.Fatal(err)
+ }
+ *logFilename = filepath.Join(stateDir, *logFilename)
+ }
+ logFile, err := os.OpenFile(*logFilename,
+ os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer logFile.Close()
+ logOutput = logFile
+ }
+ if *unsafeLogging {
+ log.SetOutput(logOutput)
+ } else {
+ // We want to send the log output through our scrubber first
+ log.SetOutput(&safelog.LogScrubber{Output: logOutput})
+ }
+
+ log.Println("\n\n\n --- Starting Snowflake Client ---")
+
+ iceServers := parseIceServers(*iceServersCommas)
+ // chooses a random subset of servers from inputs
+ rand.Seed(time.Now().UnixNano())
+ rand.Shuffle(len(iceServers), func(i, j int) {
+ iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
+ })
+ if len(iceServers) > 2 {
+ iceServers = iceServers[:(len(iceServers)+1)/2]
+ }
+ log.Printf("Using ICE servers:")
+ for _, server := range iceServers {
+ log.Printf("url: %v", strings.Join(server.URLs, " "))
+ }
+
+ // Use potentially domain-fronting broker to rendezvous.
+ broker, err := sf.NewBrokerChannel(
+ *brokerURL, *frontDomain, sf.CreateBrokerTransport(),
+ *keepLocalAddresses || *oldKeepLocalAddresses)
+ if err != nil {
+ log.Fatalf("parsing broker URL: %v", err)
+ }
+ go updateNATType(iceServers, broker)
+
+ // Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes
+ dialer := sf.NewWebRTCDialer(broker, iceServers, *max)
+
+ // Begin goptlib client process.
+ ptInfo, err := pt.ClientSetup(nil)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if ptInfo.ProxyURL != nil {
+ pt.ProxyError("proxy is not supported")
+ os.Exit(1)
+ }
+ listeners := make([]net.Listener, 0)
+ shutdown := make(chan struct{})
+ var wg sync.WaitGroup
+ for _, methodName := range ptInfo.MethodNames {
+ switch methodName {
+ case "snowflake":
+ // TODO: Be able to recover when SOCKS dies.
+ ln, err := pt.ListenSocks("tcp", "127.0.0.1:0")
+ if err != nil {
+ pt.CmethodError(methodName, err.Error())
+ break
+ }
+ log.Printf("Started SOCKS listener at %v.", ln.Addr())
+ go socksAcceptLoop(ln, dialer, shutdown, &wg)
+ pt.Cmethod(methodName, ln.Version(), ln.Addr())
+ listeners = append(listeners, ln)
+ default:
+ pt.CmethodError(methodName, "no such method")
+ }
+ }
+ pt.CmethodsDone()
+
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGTERM)
+
+ if os.Getenv("TOR_PT_EXIT_ON_STDIN_CLOSE") == "1" {
+ // This environment variable means we should treat EOF on stdin
+ // just like SIGTERM: https://bugs.torproject.org/15435.
+ go func() {
+ if _, err := io.Copy(ioutil.Discard, os.Stdin); err != nil {
+ log.Printf("calling io.Copy(ioutil.Discard, os.Stdin) returned error: %v", err)
+ }
+ log.Printf("synthesizing SIGTERM because of stdin close")
+ sigChan <- syscall.SIGTERM
+ }()
+ }
+
+ // Wait for a signal.
+ <-sigChan
+ log.Println("stopping snowflake")
+
+ // Signal received, shut down.
+ for _, ln := range listeners {
+ ln.Close()
+ }
+ close(shutdown)
+ wg.Wait()
+ log.Println("snowflake is done.")
+}
+
+// loop through all provided STUN servers until we exhaust the list or find
+// one that is compatable with RFC 5780
+func updateNATType(servers []webrtc.ICEServer, broker *sf.BrokerChannel) {
+
+ var restrictedNAT bool
+ var err error
+ for _, server := range servers {
+ addr := strings.TrimPrefix(server.URLs[0], "stun:")
+ restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
+ if err == nil {
+ if restrictedNAT {
+ broker.SetNATType(nat.NATRestricted)
+ } else {
+ broker.SetNATType(nat.NATUnrestricted)
+ }
+ break
+ }
+ }
+ if err != nil {
+ broker.SetNATType(nat.NATUnknown)
+ }
+}
diff --git a/pkg/snowflake/bootstrap.go b/pkg/snowflake/bootstrap.go
new file mode 100644
index 0000000..0f370fa
--- /dev/null
+++ b/pkg/snowflake/bootstrap.go
@@ -0,0 +1,101 @@
+package snowflake
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "os"
+ "time"
+
+ "0xacab.org/leap/bitmask-vpn/pkg/config"
+ "github.com/cretz/bine/tor"
+)
+
+const torrc = `UseBridges 1
+DataDirectory datadir
+
+ClientTransportPlugin snowflake exec /usr/local/bin/snowflake-client \
+-url https://snowflake-broker.torproject.net.global.prod.fastly.net/ -front cdn.sstatic.net \
+-ice stun:stun.voip.blackberry.com:3478,stun:stun.altar.com.pl:3478,stun:stun.antisip.com:3478,stun:stun.bluesip.net:3478,stun:stun.dus.net:3478,stun:stun.epygi.com:3478,stun:stun.sonetel.com:3478,stun:stun.sonetel.net:3478,stun:stun.stunprotocol.org:3478,stun:stun.uls.co.za:3478,stun:stun.voipgate.com:3478,stun:stun.voys.nl:3478 \
+-max 3
+
+Bridge snowflake 0.0.3.0:1`
+
+func writeTorrc() string {
+ f, err := ioutil.TempFile("", "torrc-snowflake-")
+ if err != nil {
+ log.Println(err)
+ }
+ f.Write([]byte(torrc))
+ return f.Name()
+}
+
+func BootstrapWithSnowflakeProxies() error {
+ rcfile := writeTorrc()
+ conf := &tor.StartConf{DebugWriter: os.Stdout, TorrcFile: rcfile}
+
+ fmt.Println("Starting Tor and fetching files to bootstrap VPN tunnel...")
+ fmt.Println("")
+
+ t, err := tor.Start(nil, conf)
+ if err != nil {
+ return err
+ }
+ defer t.Close()
+
+ // Wait at most 5 minutes
+ dialCtx, dialCancel := context.WithTimeout(context.Background(), time.Minute*10)
+ defer dialCancel()
+ dialer, err := t.Dialer(dialCtx, nil)
+ if err != nil {
+ return err
+ }
+
+ /*
+ regClient := &http.Client{
+ Transport: &http.Transport{
+ DialContext: dialer.DialContext,
+ },
+ Timeout: time.Minute * 5,
+ }
+ */
+ //fetchFile(regClient, "https://wtfismyip.com/json")
+
+ certs := x509.NewCertPool()
+ certs.AppendCertsFromPEM(config.CaCert)
+
+ apiClient := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{
+ RootCAs: certs,
+ },
+ DialContext: dialer.DialContext,
+ },
+ Timeout: time.Minute * 5,
+ }
+
+ // XXX parametrize these urls
+ fetchFile(apiClient, "https://api.black.riseup.net/3/config/eip-service.json")
+ fetchFile(apiClient, "https://api.black.riseup.net/3/cert")
+
+ return nil
+}
+
+func fetchFile(client *http.Client, uri string) error {
+ resp, err := client.Get(uri)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ c, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ log.Println(err)
+ }
+ fmt.Println(string(c))
+ return nil
+}
diff --git a/pkg/snowflake/lib/interfaces.go b/pkg/snowflake/lib/interfaces.go
new file mode 100644
index 0000000..5378f4a
--- /dev/null
+++ b/pkg/snowflake/lib/interfaces.go
@@ -0,0 +1,34 @@
+package lib
+
+import (
+ "net"
+)
+
+// Interface for catching Snowflakes. (aka the remote dialer)
+type Tongue interface {
+ Catch() (*WebRTCPeer, error)
+
+ // Get the maximum number of snowflakes
+ GetMax() int
+}
+
+// Interface for collecting some number of Snowflakes, for passing along
+// ultimately to the SOCKS handler.
+type SnowflakeCollector interface {
+ // Add a Snowflake to the collection.
+ // Implementation should decide how to connect and maintain the webRTCConn.
+ Collect() (*WebRTCPeer, error)
+
+ // Remove and return the most available Snowflake from the collection.
+ Pop() *WebRTCPeer
+
+ // Signal when the collector has stopped collecting.
+ Melted() <-chan struct{}
+}
+
+// Interface to adapt to goptlib's SocksConn struct.
+type SocksConnector interface {
+ Grant(*net.TCPAddr) error
+ Reject() error
+ net.Conn
+}
diff --git a/pkg/snowflake/lib/lib_test.go b/pkg/snowflake/lib/lib_test.go
new file mode 100644
index 0000000..5537a52
--- /dev/null
+++ b/pkg/snowflake/lib/lib_test.go
@@ -0,0 +1,269 @@
+package lib
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "testing"
+
+ "git.torproject.org/pluggable-transports/snowflake.git/common/util"
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+type MockTransport struct {
+ statusOverride int
+ body []byte
+}
+
+// Just returns a response with fake SDP answer.
+func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
+ s := ioutil.NopCloser(bytes.NewReader(m.body))
+ r := &http.Response{
+ StatusCode: m.statusOverride,
+ Body: s,
+ }
+ return r, nil
+}
+
+type FakeDialer struct {
+ max int
+}
+
+func (w FakeDialer) Catch() (*WebRTCPeer, error) {
+ fmt.Println("Caught a dummy snowflake.")
+ return &WebRTCPeer{}, nil
+}
+
+func (w FakeDialer) GetMax() int {
+ return w.max
+}
+
+type FakeSocksConn struct {
+ net.Conn
+ rejected bool
+}
+
+func (f FakeSocksConn) Reject() error {
+ f.rejected = true
+ return nil
+}
+func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
+
+type FakePeers struct{ toRelease *WebRTCPeer }
+
+func (f FakePeers) Collect() (*WebRTCPeer, error) { return &WebRTCPeer{}, nil }
+func (f FakePeers) Pop() *WebRTCPeer { return nil }
+func (f FakePeers) Melted() <-chan struct{} { return nil }
+
+func TestSnowflakeClient(t *testing.T) {
+
+ Convey("Peers", t, func() {
+ Convey("Can construct", func() {
+ d := &FakeDialer{max: 1}
+ p, _ := NewPeers(d)
+ So(p.Tongue.GetMax(), ShouldEqual, 1)
+ So(p.snowflakeChan, ShouldNotBeNil)
+ So(cap(p.snowflakeChan), ShouldEqual, 1)
+ })
+
+ Convey("Collecting a Snowflake requires a Tongue.", func() {
+ p, err := NewPeers(nil)
+ So(err, ShouldNotBeNil)
+ // Set the dialer so that collection is possible.
+ d := &FakeDialer{max: 1}
+ p, err = NewPeers(d)
+ _, err = p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, 1)
+ // S
+ _, err = p.Collect()
+ })
+
+ Convey("Collection continues until capacity.", func() {
+ c := 5
+ p, _ := NewPeers(FakeDialer{max: c})
+ // Fill up to capacity.
+ for i := 0; i < c; i++ {
+ fmt.Println("Adding snowflake ", i)
+ _, err := p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, i+1)
+ }
+ // But adding another gives an error.
+ So(p.Count(), ShouldEqual, c)
+ _, err := p.Collect()
+ So(err, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, c)
+
+ // But popping and closing allows it to continue.
+ s := p.Pop()
+ s.Close()
+ So(s, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, c-1)
+
+ _, err = p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, c)
+ })
+
+ Convey("Count correctly purges peers marked for deletion.", func() {
+ p, _ := NewPeers(FakeDialer{max: 5})
+ p.Collect()
+ p.Collect()
+ p.Collect()
+ p.Collect()
+ So(p.Count(), ShouldEqual, 4)
+ s := p.Pop()
+ s.Close()
+ So(p.Count(), ShouldEqual, 3)
+ s = p.Pop()
+ s.Close()
+ So(p.Count(), ShouldEqual, 2)
+ })
+
+ Convey("End Closes all peers.", func() {
+ cnt := 5
+ p, _ := NewPeers(FakeDialer{max: cnt})
+ for i := 0; i < cnt; i++ {
+ p.activePeers.PushBack(&WebRTCPeer{})
+ }
+ So(p.Count(), ShouldEqual, cnt)
+ p.End()
+ <-p.Melted()
+ So(p.Count(), ShouldEqual, 0)
+ })
+
+ Convey("Pop skips over closed peers.", func() {
+ p, _ := NewPeers(FakeDialer{max: 4})
+ wc1, _ := p.Collect()
+ wc2, _ := p.Collect()
+ wc3, _ := p.Collect()
+ So(wc1, ShouldNotBeNil)
+ So(wc2, ShouldNotBeNil)
+ So(wc3, ShouldNotBeNil)
+ wc1.Close()
+ r := p.Pop()
+ So(p.Count(), ShouldEqual, 2)
+ So(r, ShouldEqual, wc2)
+ wc4, _ := p.Collect()
+ wc2.Close()
+ wc3.Close()
+ r = p.Pop()
+ So(r, ShouldEqual, wc4)
+ })
+
+ })
+
+ Convey("Snowflake", t, func() {
+
+ SkipConvey("Handler Grants correctly", func() {
+ socks := &FakeSocksConn{}
+ broker := &BrokerChannel{Host: "test"}
+ d := NewWebRTCDialer(broker, nil, 1)
+
+ So(socks.rejected, ShouldEqual, false)
+ Handler(socks, d)
+ So(socks.rejected, ShouldEqual, true)
+ })
+ })
+
+ Convey("Dialers", t, func() {
+ Convey("Can construct WebRTCDialer.", func() {
+ broker := &BrokerChannel{Host: "test"}
+ d := NewWebRTCDialer(broker, nil, 1)
+ So(d, ShouldNotBeNil)
+ So(d.BrokerChannel, ShouldNotBeNil)
+ So(d.BrokerChannel.Host, ShouldEqual, "test")
+ })
+ SkipConvey("WebRTCDialer can Catch a snowflake.", func() {
+ broker := &BrokerChannel{Host: "test"}
+ d := NewWebRTCDialer(broker, nil, 1)
+ conn, err := d.Catch()
+ So(conn, ShouldBeNil)
+ So(err, ShouldNotBeNil)
+ })
+ })
+
+ Convey("Rendezvous", t, func() {
+ transport := &MockTransport{
+ http.StatusOK,
+ []byte(`{"type":"answer","sdp":"fake"}`),
+ }
+ fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
+ if err != nil {
+ panic(err)
+ }
+
+ Convey("Construct BrokerChannel with no front domain", func() {
+ b, err := NewBrokerChannel("test.broker", "", transport, false)
+ So(b.url, ShouldNotBeNil)
+ So(err, ShouldBeNil)
+ So(b.url.Path, ShouldResemble, "test.broker")
+ So(b.transport, ShouldNotBeNil)
+ })
+
+ Convey("Construct BrokerChannel *with* front domain", func() {
+ b, err := NewBrokerChannel("test.broker", "front", transport, false)
+ So(b.url, ShouldNotBeNil)
+ So(err, ShouldBeNil)
+ So(b.url.Path, ShouldResemble, "test.broker")
+ So(b.url.Host, ShouldResemble, "front")
+ So(b.transport, ShouldNotBeNil)
+ })
+
+ Convey("BrokerChannel.Negotiate responds with answer", func() {
+ b, err := NewBrokerChannel("test.broker", "", transport, false)
+ So(err, ShouldBeNil)
+ answer, err := b.Negotiate(fakeOffer)
+ So(err, ShouldBeNil)
+ So(answer, ShouldNotBeNil)
+ So(answer.SDP, ShouldResemble, "fake")
+ })
+
+ Convey("BrokerChannel.Negotiate fails with 503", func() {
+ b, err := NewBrokerChannel("test.broker", "",
+ &MockTransport{http.StatusServiceUnavailable, []byte("\n")},
+ false)
+ So(err, ShouldBeNil)
+ answer, err := b.Negotiate(fakeOffer)
+ So(err, ShouldNotBeNil)
+ So(answer, ShouldBeNil)
+ So(err.Error(), ShouldResemble, BrokerError503)
+ })
+
+ Convey("BrokerChannel.Negotiate fails with 400", func() {
+ b, err := NewBrokerChannel("test.broker", "",
+ &MockTransport{http.StatusBadRequest, []byte("\n")},
+ false)
+ So(err, ShouldBeNil)
+ answer, err := b.Negotiate(fakeOffer)
+ So(err, ShouldNotBeNil)
+ So(answer, ShouldBeNil)
+ So(err.Error(), ShouldResemble, BrokerError400)
+ })
+
+ Convey("BrokerChannel.Negotiate fails with large read", func() {
+ b, err := NewBrokerChannel("test.broker", "",
+ &MockTransport{http.StatusOK, make([]byte, 100001, 100001)},
+ false)
+ So(err, ShouldBeNil)
+ answer, err := b.Negotiate(fakeOffer)
+ So(err, ShouldNotBeNil)
+ So(answer, ShouldBeNil)
+ So(err.Error(), ShouldResemble, "unexpected EOF")
+ })
+
+ Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
+ b, err := NewBrokerChannel("test.broker", "",
+ &MockTransport{123, []byte("")}, false)
+ So(err, ShouldBeNil)
+ answer, err := b.Negotiate(fakeOffer)
+ So(err, ShouldNotBeNil)
+ So(answer, ShouldBeNil)
+ So(err.Error(), ShouldResemble, BrokerErrorUnexpected)
+ })
+ })
+
+}
diff --git a/pkg/snowflake/lib/peers.go b/pkg/snowflake/lib/peers.go
new file mode 100644
index 0000000..d02eed3
--- /dev/null
+++ b/pkg/snowflake/lib/peers.go
@@ -0,0 +1,135 @@
+package lib
+
+import (
+ "container/list"
+ "errors"
+ "fmt"
+ "log"
+ "sync"
+)
+
+// Container which keeps track of multiple WebRTC remote peers.
+// Implements |SnowflakeCollector|.
+//
+// Maintaining a set of pre-connected Peers with fresh but inactive datachannels
+// allows allows rapid recovery when the current WebRTC Peer disconnects.
+//
+// Note: For now, only one remote can be active at any given moment.
+// This is a property of Tor circuits & its current multiplexing constraints,
+// but could be updated if that changes.
+// (Also, this constraint does not necessarily apply to the more generic PT
+// version of Snowflake)
+type Peers struct {
+ Tongue
+ BytesLogger BytesLogger
+
+ snowflakeChan chan *WebRTCPeer
+ activePeers *list.List
+
+ melt chan struct{}
+ melted bool
+
+ collection sync.WaitGroup
+}
+
+// Construct a fresh container of remote peers.
+func NewPeers(tongue Tongue) (*Peers, error) {
+ p := &Peers{}
+ // Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
+ if tongue == nil {
+ return nil, errors.New("missing Tongue to catch Snowflakes with")
+ }
+ p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax())
+ p.activePeers = list.New()
+ p.melt = make(chan struct{})
+ p.Tongue = tongue
+ return p, nil
+}
+
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Collect() (*WebRTCPeer, error) {
+ // Engage the Snowflake Catching interface, which must be available.
+ p.collection.Add(1)
+ defer p.collection.Done()
+ if p.melted {
+ return nil, fmt.Errorf("Snowflakes have melted")
+ }
+ if nil == p.Tongue {
+ return nil, errors.New("missing Tongue to catch Snowflakes with")
+ }
+ cnt := p.Count()
+ capacity := p.Tongue.GetMax()
+ s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity)
+ if cnt >= capacity {
+ return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity)
+ }
+ log.Println("WebRTC: Collecting a new Snowflake.", s)
+ // BUG: some broker conflict here.
+ connection, err := p.Tongue.Catch()
+ if nil != err {
+ return nil, err
+ }
+ // Track new valid Snowflake in internal collection and pass along.
+ p.activePeers.PushBack(connection)
+ p.snowflakeChan <- connection
+ return connection, nil
+}
+
+// Pop blocks until an available, valid snowflake appears. Returns nil after End
+// has been called.
+func (p *Peers) Pop() *WebRTCPeer {
+ for {
+ snowflake, ok := <-p.snowflakeChan
+ if !ok {
+ return nil
+ }
+ if snowflake.closed {
+ continue
+ }
+ // Set to use the same rate-limited traffic logger to keep consistency.
+ snowflake.BytesLogger = p.BytesLogger
+ return snowflake
+ }
+}
+
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Melted() <-chan struct{} {
+ return p.melt
+}
+
+// Returns total available Snowflakes (including the active one)
+// The count only reduces when connections themselves close, rather than when
+// they are popped.
+func (p *Peers) Count() int {
+ p.purgeClosedPeers()
+ return p.activePeers.Len()
+}
+
+func (p *Peers) purgeClosedPeers() {
+ for e := p.activePeers.Front(); e != nil; {
+ next := e.Next()
+ conn := e.Value.(*WebRTCPeer)
+ // Purge those marked for deletion.
+ if conn.closed {
+ p.activePeers.Remove(e)
+ }
+ e = next
+ }
+}
+
+// Close all Peers contained here.
+func (p *Peers) End() {
+ close(p.melt)
+ p.melted = true
+ p.collection.Wait()
+ close(p.snowflakeChan)
+ cnt := p.Count()
+ for e := p.activePeers.Front(); e != nil; {
+ next := e.Next()
+ conn := e.Value.(*WebRTCPeer)
+ conn.Close()
+ p.activePeers.Remove(e)
+ e = next
+ }
+ log.Printf("WebRTC: melted all %d snowflakes.", cnt)
+}
diff --git a/pkg/snowflake/lib/rendezvous.go b/pkg/snowflake/lib/rendezvous.go
new file mode 100644
index 0000000..32da081
--- /dev/null
+++ b/pkg/snowflake/lib/rendezvous.go
@@ -0,0 +1,184 @@
+// WebRTC rendezvous requires the exchange of SessionDescriptions between
+// peers in order to establish a PeerConnection.
+//
+// This file contains the one method currently available to Snowflake:
+//
+// - Domain-fronted HTTP signaling. The Broker automatically exchange offers
+// and answers between this client and some remote WebRTC proxy.
+
+package lib
+
+import (
+ "bytes"
+ "errors"
+ "io"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/url"
+ "sync"
+ "time"
+
+ "git.torproject.org/pluggable-transports/snowflake.git/common/nat"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/util"
+ "github.com/pion/webrtc/v3"
+)
+
+const (
+ BrokerError503 string = "No snowflake proxies currently available."
+ BrokerError400 string = "You sent an invalid offer in the request."
+ BrokerErrorUnexpected string = "Unexpected error, no answer."
+ readLimit = 100000 //Maximum number of bytes to be read from an HTTP response
+)
+
+// Signalling Channel to the Broker.
+type BrokerChannel struct {
+ // The Host header to put in the HTTP request (optional and may be
+ // different from the host name in URL).
+ Host string
+ url *url.URL
+ transport http.RoundTripper // Used to make all requests.
+ keepLocalAddresses bool
+ NATType string
+ lock sync.Mutex
+}
+
+// We make a copy of DefaultTransport because we want the default Dial
+// and TLSHandshakeTimeout settings. But we want to disable the default
+// ProxyFromEnvironment setting.
+func CreateBrokerTransport() http.RoundTripper {
+ transport := http.DefaultTransport.(*http.Transport)
+ transport.Proxy = nil
+ transport.ResponseHeaderTimeout = 15 * time.Second
+ return transport
+}
+
+// Construct a new BrokerChannel, where:
+// |broker| is the full URL of the facilitating program which assigns proxies
+// to clients, and |front| is the option fronting domain.
+func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool) (*BrokerChannel, error) {
+ targetURL, err := url.Parse(broker)
+ if err != nil {
+ return nil, err
+ }
+ log.Println("Rendezvous using Broker at:", broker)
+ bc := new(BrokerChannel)
+ bc.url = targetURL
+ if front != "" { // Optional front domain.
+ log.Println("Domain fronting using:", front)
+ bc.Host = bc.url.Host
+ bc.url.Host = front
+ }
+
+ bc.transport = transport
+ bc.keepLocalAddresses = keepLocalAddresses
+ bc.NATType = nat.NATUnknown
+ return bc, nil
+}
+
+func limitedRead(r io.Reader, limit int64) ([]byte, error) {
+ p, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: limit + 1})
+ if err != nil {
+ return p, err
+ } else if int64(len(p)) == limit+1 {
+ return p[0:limit], io.ErrUnexpectedEOF
+ }
+ return p, err
+}
+
+// Roundtrip HTTP POST using WebRTC SessionDescriptions.
+//
+// Send an SDP offer to the broker, which assigns a proxy and responds
+// with an SDP answer from a designated remote WebRTC peer.
+func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
+ *webrtc.SessionDescription, error) {
+ log.Println("Negotiating via BrokerChannel...\nTarget URL: ",
+ bc.Host, "\nFront URL: ", bc.url.Host)
+ // Ideally, we could specify an `RTCIceTransportPolicy` that would handle
+ // this for us. However, "public" was removed from the draft spec.
+ // See https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration#RTCIceTransportPolicy_enum
+ if !bc.keepLocalAddresses {
+ offer = &webrtc.SessionDescription{
+ Type: offer.Type,
+ SDP: util.StripLocalAddresses(offer.SDP),
+ }
+ }
+ offerSDP, err := util.SerializeSessionDescription(offer)
+ if err != nil {
+ return nil, err
+ }
+ data := bytes.NewReader([]byte(offerSDP))
+ // Suffix with broker's client registration handler.
+ clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
+ request, err := http.NewRequest("POST", clientURL.String(), data)
+ if nil != err {
+ return nil, err
+ }
+ if "" != bc.Host { // Set true host if necessary.
+ request.Host = bc.Host
+ }
+ // include NAT-TYPE
+ bc.lock.Lock()
+ request.Header.Set("Snowflake-NAT-TYPE", bc.NATType)
+ bc.lock.Unlock()
+ resp, err := bc.transport.RoundTrip(request)
+ if nil != err {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ log.Printf("BrokerChannel Response:\n%s\n\n", resp.Status)
+
+ switch resp.StatusCode {
+ case http.StatusOK:
+ body, err := limitedRead(resp.Body, readLimit)
+ if nil != err {
+ return nil, err
+ }
+ log.Printf("Received answer: %s", string(body))
+ return util.DeserializeSessionDescription(string(body))
+ case http.StatusServiceUnavailable:
+ return nil, errors.New(BrokerError503)
+ case http.StatusBadRequest:
+ return nil, errors.New(BrokerError400)
+ default:
+ return nil, errors.New(BrokerErrorUnexpected)
+ }
+}
+
+func (bc *BrokerChannel) SetNATType(NATType string) {
+ bc.lock.Lock()
+ bc.NATType = NATType
+ bc.lock.Unlock()
+ log.Printf("NAT Type: %s", NATType)
+}
+
+// Implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
+type WebRTCDialer struct {
+ *BrokerChannel
+ webrtcConfig *webrtc.Configuration
+ max int
+}
+
+func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer {
+ config := webrtc.Configuration{
+ ICEServers: iceServers,
+ }
+
+ return &WebRTCDialer{
+ BrokerChannel: broker,
+ webrtcConfig: &config,
+ max: max,
+ }
+}
+
+// Initialize a WebRTC Connection by signaling through the broker.
+func (w WebRTCDialer) Catch() (*WebRTCPeer, error) {
+ // TODO: [#25591] Fetch ICE server information from Broker.
+ // TODO: [#25596] Consider TURN servers here too.
+ return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel)
+}
+
+// Returns the maximum number of snowflakes to collect
+func (w WebRTCDialer) GetMax() int {
+ return w.max
+}
diff --git a/pkg/snowflake/lib/snowflake.go b/pkg/snowflake/lib/snowflake.go
new file mode 100644
index 0000000..2ed51a1
--- /dev/null
+++ b/pkg/snowflake/lib/snowflake.go
@@ -0,0 +1,175 @@
+package lib
+
+import (
+ "context"
+ "errors"
+ "io"
+ "log"
+ "net"
+ "time"
+
+ "git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
+ "github.com/xtaci/kcp-go/v5"
+ "github.com/xtaci/smux"
+)
+
+const (
+ ReconnectTimeout = 10 * time.Second
+ SnowflakeTimeout = 20 * time.Second
+ // How long to wait for the OnOpen callback on a DataChannel.
+ DataChannelTimeout = 10 * time.Second
+)
+
+type dummyAddr struct{}
+
+func (addr dummyAddr) Network() string { return "dummy" }
+func (addr dummyAddr) String() string { return "dummy" }
+
+// newSession returns a new smux.Session and the net.PacketConn it is running
+// over. The net.PacketConn successively connects through Snowflake proxies
+// pulled from snowflakes.
+func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
+ clientID := turbotunnel.NewClientID()
+
+ // We build a persistent KCP session on a sequence of ephemeral WebRTC
+ // connections. This dialContext tells RedialPacketConn how to get a new
+ // WebRTC connection when the previous one dies. Inside each WebRTC
+ // connection, we use EncapsulationPacketConn to encode packets into a
+ // stream.
+ dialContext := func(ctx context.Context) (net.PacketConn, error) {
+ log.Printf("redialing on same connection")
+ // Obtain an available WebRTC remote. May block.
+ conn := snowflakes.Pop()
+ if conn == nil {
+ return nil, errors.New("handler: Received invalid Snowflake")
+ }
+ log.Println("---- Handler: snowflake assigned ----")
+ // Send the magic Turbo Tunnel token.
+ _, err := conn.Write(turbotunnel.Token[:])
+ if err != nil {
+ return nil, err
+ }
+ // Send ClientID prefix.
+ _, err = conn.Write(clientID[:])
+ if err != nil {
+ return nil, err
+ }
+ return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
+ }
+ pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
+
+ // conn is built on the underlying RedialPacketConn—when one WebRTC
+ // connection dies, another one will be found to take its place. The
+ // sequence of packets across multiple WebRTC connections drives the KCP
+ // engine.
+ conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
+ if err != nil {
+ pconn.Close()
+ return nil, nil, err
+ }
+ // Permit coalescing the payloads of consecutive sends.
+ conn.SetStreamMode(true)
+ // Set the maximum send and receive window sizes to a high number
+ // Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026
+ conn.SetWindowSize(65535, 65535)
+ // Disable the dynamic congestion window (limit only by the
+ // maximum of local and remote static windows).
+ conn.SetNoDelay(
+ 0, // default nodelay
+ 0, // default interval
+ 0, // default resend
+ 1, // nc=1 => congestion window off
+ )
+ // On the KCP connection we overlay an smux session and stream.
+ smuxConfig := smux.DefaultConfig()
+ smuxConfig.Version = 2
+ smuxConfig.KeepAliveTimeout = 10 * time.Minute
+ sess, err := smux.Client(conn, smuxConfig)
+ if err != nil {
+ conn.Close()
+ pconn.Close()
+ return nil, nil, err
+ }
+
+ return pconn, sess, err
+}
+
+// Given an accepted SOCKS connection, establish a WebRTC connection to the
+// remote peer and exchange traffic.
+func Handler(socks net.Conn, tongue Tongue) error {
+ // Prepare to collect remote WebRTC peers.
+ snowflakes, err := NewPeers(tongue)
+ if err != nil {
+ return err
+ }
+
+ // Use a real logger to periodically output how much traffic is happening.
+ snowflakes.BytesLogger = NewBytesSyncLogger()
+
+ log.Printf("---- Handler: begin collecting snowflakes ---")
+ go connectLoop(snowflakes)
+
+ // Create a new smux session
+ log.Printf("---- Handler: starting a new session ---")
+ pconn, sess, err := newSession(snowflakes)
+ if err != nil {
+ return err
+ }
+
+ // On the smux session we overlay a stream.
+ stream, err := sess.OpenStream()
+ if err != nil {
+ return err
+ }
+ defer stream.Close()
+
+ // Begin exchanging data.
+ log.Printf("---- Handler: begin stream %v ---", stream.ID())
+ copyLoop(socks, stream)
+ log.Printf("---- Handler: closed stream %v ---", stream.ID())
+ snowflakes.End()
+ log.Printf("---- Handler: end collecting snowflakes ---")
+ pconn.Close()
+ sess.Close()
+ log.Printf("---- Handler: discarding finished session ---")
+ return nil
+}
+
+// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
+// transfer to the Tor SOCKS handler when needed.
+func connectLoop(snowflakes SnowflakeCollector) {
+ for {
+ timer := time.After(ReconnectTimeout)
+ _, err := snowflakes.Collect()
+ if err != nil {
+ log.Printf("WebRTC: %v Retrying...", err)
+ }
+ select {
+ case <-timer:
+ continue
+ case <-snowflakes.Melted():
+ log.Println("ConnectLoop: stopped.")
+ return
+ }
+ }
+}
+
+// Exchanges bytes between two ReadWriters.
+// (In this case, between a SOCKS connection and smux stream.)
+func copyLoop(socks, stream io.ReadWriter) {
+ done := make(chan struct{}, 2)
+ go func() {
+ if _, err := io.Copy(socks, stream); err != nil {
+ log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
+ }
+ done <- struct{}{}
+ }()
+ go func() {
+ if _, err := io.Copy(stream, socks); err != nil {
+ log.Printf("copying SOCKS to stream resulted in error: %v", err)
+ }
+ done <- struct{}{}
+ }()
+ <-done
+ log.Println("copy loop ended")
+}
diff --git a/pkg/snowflake/lib/turbotunnel.go b/pkg/snowflake/lib/turbotunnel.go
new file mode 100644
index 0000000..aad2e6a
--- /dev/null
+++ b/pkg/snowflake/lib/turbotunnel.go
@@ -0,0 +1,68 @@
+package lib
+
+import (
+ "bufio"
+ "errors"
+ "io"
+ "net"
+ "time"
+
+ "git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation"
+)
+
+var errNotImplemented = errors.New("not implemented")
+
+// EncapsulationPacketConn implements the net.PacketConn interface over an
+// io.ReadWriteCloser stream, using the encapsulation package to represent
+// packets in a stream.
+type EncapsulationPacketConn struct {
+ io.ReadWriteCloser
+ localAddr net.Addr
+ remoteAddr net.Addr
+ bw *bufio.Writer
+}
+
+// NewEncapsulationPacketConn makes
+func NewEncapsulationPacketConn(
+ localAddr, remoteAddr net.Addr,
+ conn io.ReadWriteCloser,
+) *EncapsulationPacketConn {
+ return &EncapsulationPacketConn{
+ ReadWriteCloser: conn,
+ localAddr: localAddr,
+ remoteAddr: remoteAddr,
+ bw: bufio.NewWriter(conn),
+ }
+}
+
+// ReadFrom reads an encapsulated packet from the stream.
+func (c *EncapsulationPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
+ data, err := encapsulation.ReadData(c.ReadWriteCloser)
+ if err != nil {
+ return 0, c.remoteAddr, err
+ }
+ return copy(p, data), c.remoteAddr, nil
+}
+
+// WriteTo writes an encapsulated packet to the stream.
+func (c *EncapsulationPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
+ // addr is ignored.
+ _, err := encapsulation.WriteData(c.bw, p)
+ if err == nil {
+ err = c.bw.Flush()
+ }
+ if err != nil {
+ return 0, err
+ }
+ return len(p), nil
+}
+
+// LocalAddr returns the localAddr value that was passed to
+// NewEncapsulationPacketConn.
+func (c *EncapsulationPacketConn) LocalAddr() net.Addr {
+ return c.localAddr
+}
+
+func (c *EncapsulationPacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
+func (c *EncapsulationPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
+func (c *EncapsulationPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }
diff --git a/pkg/snowflake/lib/util.go b/pkg/snowflake/lib/util.go
new file mode 100644
index 0000000..0eb8ddd
--- /dev/null
+++ b/pkg/snowflake/lib/util.go
@@ -0,0 +1,70 @@
+package lib
+
+import (
+ "log"
+ "time"
+)
+
+const (
+ LogTimeInterval = 5 * time.Second
+)
+
+type BytesLogger interface {
+ AddOutbound(int)
+ AddInbound(int)
+}
+
+// Default BytesLogger does nothing.
+type BytesNullLogger struct{}
+
+func (b BytesNullLogger) AddOutbound(amount int) {}
+func (b BytesNullLogger) AddInbound(amount int) {}
+
+// BytesSyncLogger uses channels to safely log from multiple sources with output
+// occuring at reasonable intervals.
+type BytesSyncLogger struct {
+ outboundChan chan int
+ inboundChan chan int
+}
+
+// NewBytesSyncLogger returns a new BytesSyncLogger and starts it loggin.
+func NewBytesSyncLogger() *BytesSyncLogger {
+ b := &BytesSyncLogger{
+ outboundChan: make(chan int, 5),
+ inboundChan: make(chan int, 5),
+ }
+ go b.log()
+ return b
+}
+
+func (b *BytesSyncLogger) log() {
+ var outbound, inbound, outEvents, inEvents int
+ ticker := time.NewTicker(LogTimeInterval)
+ for {
+ select {
+ case <-ticker.C:
+ if outEvents > 0 || inEvents > 0 {
+ log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
+ inbound, outbound, inEvents, outEvents)
+ }
+ outbound = 0
+ outEvents = 0
+ inbound = 0
+ inEvents = 0
+ case amount := <-b.outboundChan:
+ outbound += amount
+ outEvents++
+ case amount := <-b.inboundChan:
+ inbound += amount
+ inEvents++
+ }
+ }
+}
+
+func (b *BytesSyncLogger) AddOutbound(amount int) {
+ b.outboundChan <- amount
+}
+
+func (b *BytesSyncLogger) AddInbound(amount int) {
+ b.inboundChan <- amount
+}
diff --git a/pkg/snowflake/lib/webrtc.go b/pkg/snowflake/lib/webrtc.go
new file mode 100644
index 0000000..af7ba6d
--- /dev/null
+++ b/pkg/snowflake/lib/webrtc.go
@@ -0,0 +1,222 @@
+package lib
+
+import (
+ "crypto/rand"
+ "encoding/hex"
+ "errors"
+ "io"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/pion/webrtc/v3"
+)
+
+// Remote WebRTC peer.
+//
+// Handles preparation of go-webrtc PeerConnection. Only ever has
+// one DataChannel.
+type WebRTCPeer struct {
+ id string
+ pc *webrtc.PeerConnection
+ transport *webrtc.DataChannel
+
+ recvPipe *io.PipeReader
+ writePipe *io.PipeWriter
+ lastReceive time.Time
+
+ open chan struct{} // Channel to notify when datachannel opens
+ closed bool
+
+ once sync.Once // Synchronization for PeerConnection destruction
+
+ BytesLogger BytesLogger
+}
+
+// Construct a WebRTC PeerConnection.
+func NewWebRTCPeer(config *webrtc.Configuration,
+ broker *BrokerChannel) (*WebRTCPeer, error) {
+ connection := new(WebRTCPeer)
+ {
+ var buf [8]byte
+ if _, err := rand.Read(buf[:]); err != nil {
+ panic(err)
+ }
+ connection.id = "snowflake-" + hex.EncodeToString(buf[:])
+ }
+
+ // Override with something that's not NullLogger to have real logging.
+ connection.BytesLogger = &BytesNullLogger{}
+
+ // Pipes remain the same even when DataChannel gets switched.
+ connection.recvPipe, connection.writePipe = io.Pipe()
+
+ err := connection.connect(config, broker)
+ if err != nil {
+ connection.Close()
+ return nil, err
+ }
+ return connection, nil
+}
+
+// Read bytes from local SOCKS.
+// As part of |io.ReadWriter|
+func (c *WebRTCPeer) Read(b []byte) (int, error) {
+ return c.recvPipe.Read(b)
+}
+
+// Writes bytes out to remote WebRTC.
+// As part of |io.ReadWriter|
+func (c *WebRTCPeer) Write(b []byte) (int, error) {
+ err := c.transport.Send(b)
+ if err != nil {
+ return 0, err
+ }
+ c.BytesLogger.AddOutbound(len(b))
+ return len(b), nil
+}
+
+func (c *WebRTCPeer) Close() error {
+ c.once.Do(func() {
+ c.closed = true
+ c.cleanup()
+ log.Printf("WebRTC: Closing")
+ })
+ return nil
+}
+
+// Prevent long-lived broken remotes.
+// Should also update the DataChannel in underlying go-webrtc's to make Closes
+// more immediate / responsive.
+func (c *WebRTCPeer) checkForStaleness() {
+ c.lastReceive = time.Now()
+ for {
+ if c.closed {
+ return
+ }
+ if time.Since(c.lastReceive) > SnowflakeTimeout {
+ log.Printf("WebRTC: No messages received for %v -- closing stale connection.",
+ SnowflakeTimeout)
+ c.Close()
+ return
+ }
+ <-time.After(time.Second)
+ }
+}
+
+func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel) error {
+ log.Println(c.id, " connecting...")
+ // TODO: When go-webrtc is more stable, it's possible that a new
+ // PeerConnection won't need to be re-prepared each time.
+ c.preparePeerConnection(config)
+ answer, err := broker.Negotiate(c.pc.LocalDescription())
+ if err != nil {
+ return err
+ }
+ log.Printf("Received Answer.\n")
+ err = c.pc.SetRemoteDescription(*answer)
+ if nil != err {
+ log.Println("WebRTC: Unable to SetRemoteDescription:", err)
+ return err
+ }
+
+ // Wait for the datachannel to open or time out
+ select {
+ case <-c.open:
+ case <-time.After(DataChannelTimeout):
+ c.transport.Close()
+ return errors.New("timeout waiting for DataChannel.OnOpen")
+ }
+
+ go c.checkForStaleness()
+ return nil
+}
+
+// preparePeerConnection creates a new WebRTC PeerConnection and returns it
+// after ICE candidate gathering is complete..
+func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error {
+ var err error
+ c.pc, err = webrtc.NewPeerConnection(*config)
+ if err != nil {
+ log.Printf("NewPeerConnection ERROR: %s", err)
+ return err
+ }
+ ordered := true
+ dataChannelOptions := &webrtc.DataChannelInit{
+ Ordered: &ordered,
+ }
+ // We must create the data channel before creating an offer
+ // https://github.com/pion/webrtc/wiki/Release-WebRTC@v3.0.0
+ dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
+ if err != nil {
+ log.Printf("CreateDataChannel ERROR: %s", err)
+ return err
+ }
+ dc.OnOpen(func() {
+ log.Println("WebRTC: DataChannel.OnOpen")
+ close(c.open)
+ })
+ dc.OnClose(func() {
+ log.Println("WebRTC: DataChannel.OnClose")
+ c.Close()
+ })
+ dc.OnMessage(func(msg webrtc.DataChannelMessage) {
+ if len(msg.Data) <= 0 {
+ log.Println("0 length message---")
+ }
+ n, err := c.writePipe.Write(msg.Data)
+ c.BytesLogger.AddInbound(n)
+ if err != nil {
+ // TODO: Maybe shouldn't actually close.
+ log.Println("Error writing to SOCKS pipe")
+ if inerr := c.writePipe.CloseWithError(err); inerr != nil {
+ log.Printf("c.writePipe.CloseWithError returned error: %v", inerr)
+ }
+ }
+ c.lastReceive = time.Now()
+ })
+ c.transport = dc
+ c.open = make(chan struct{})
+ log.Println("WebRTC: DataChannel created.")
+
+ // Allow candidates to accumulate until ICEGatheringStateComplete.
+ done := webrtc.GatheringCompletePromise(c.pc)
+ offer, err := c.pc.CreateOffer(nil)
+ // TODO: Potentially timeout and retry if ICE isn't working.
+ if err != nil {
+ log.Println("Failed to prepare offer", err)
+ c.pc.Close()
+ return err
+ }
+ log.Println("WebRTC: Created offer")
+ err = c.pc.SetLocalDescription(offer)
+ if err != nil {
+ log.Println("Failed to prepare offer", err)
+ c.pc.Close()
+ return err
+ }
+ log.Println("WebRTC: Set local description")
+
+ <-done // Wait for ICE candidate gathering to complete.
+ log.Println("WebRTC: PeerConnection created.")
+ return nil
+}
+
+// Close all channels and transports
+func (c *WebRTCPeer) cleanup() {
+ // Close this side of the SOCKS pipe.
+ if c.writePipe != nil { // c.writePipe can be nil in tests.
+ c.writePipe.Close()
+ }
+ if nil != c.transport {
+ log.Printf("WebRTC: closing DataChannel")
+ c.transport.Close()
+ }
+ if nil != c.pc {
+ log.Printf("WebRTC: closing PeerConnection")
+ err := c.pc.Close()
+ if nil != err {
+ log.Printf("Error closing peerconnection...")
+ }
+ }
+}