diff options
author | kali kaneko (leap communications) <kali@leap.se> | 2021-11-29 18:12:40 +0100 |
---|---|---|
committer | kali kaneko (leap communications) <kali@leap.se> | 2021-11-29 18:14:21 +0100 |
commit | b25ec7c923924e53ddb65f9a34e9a669dcf066c7 (patch) | |
tree | 5ddabd18ad1cb880ee07ae8c950b99b99f150012 /pkg | |
parent | c7148d9559dab0e1cdbc6dd5306a3c852615560e (diff) |
[feat] snowflake client support
Diffstat (limited to 'pkg')
-rw-r--r-- | pkg/snowflake/bootstrap.go | 101 | ||||
-rw-r--r-- | pkg/snowflake/lib/interfaces.go | 34 | ||||
-rw-r--r-- | pkg/snowflake/lib/lib_test.go | 269 | ||||
-rw-r--r-- | pkg/snowflake/lib/peers.go | 135 | ||||
-rw-r--r-- | pkg/snowflake/lib/rendezvous.go | 184 | ||||
-rw-r--r-- | pkg/snowflake/lib/snowflake.go | 175 | ||||
-rw-r--r-- | pkg/snowflake/lib/turbotunnel.go | 68 | ||||
-rw-r--r-- | pkg/snowflake/lib/util.go | 70 | ||||
-rw-r--r-- | pkg/snowflake/lib/webrtc.go | 222 |
9 files changed, 1258 insertions, 0 deletions
diff --git a/pkg/snowflake/bootstrap.go b/pkg/snowflake/bootstrap.go new file mode 100644 index 0000000..0f370fa --- /dev/null +++ b/pkg/snowflake/bootstrap.go @@ -0,0 +1,101 @@ +package snowflake + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "log" + "net/http" + "os" + "time" + + "0xacab.org/leap/bitmask-vpn/pkg/config" + "github.com/cretz/bine/tor" +) + +const torrc = `UseBridges 1 +DataDirectory datadir + +ClientTransportPlugin snowflake exec /usr/local/bin/snowflake-client \ +-url https://snowflake-broker.torproject.net.global.prod.fastly.net/ -front cdn.sstatic.net \ +-ice stun:stun.voip.blackberry.com:3478,stun:stun.altar.com.pl:3478,stun:stun.antisip.com:3478,stun:stun.bluesip.net:3478,stun:stun.dus.net:3478,stun:stun.epygi.com:3478,stun:stun.sonetel.com:3478,stun:stun.sonetel.net:3478,stun:stun.stunprotocol.org:3478,stun:stun.uls.co.za:3478,stun:stun.voipgate.com:3478,stun:stun.voys.nl:3478 \ +-max 3 + +Bridge snowflake 0.0.3.0:1` + +func writeTorrc() string { + f, err := ioutil.TempFile("", "torrc-snowflake-") + if err != nil { + log.Println(err) + } + f.Write([]byte(torrc)) + return f.Name() +} + +func BootstrapWithSnowflakeProxies() error { + rcfile := writeTorrc() + conf := &tor.StartConf{DebugWriter: os.Stdout, TorrcFile: rcfile} + + fmt.Println("Starting Tor and fetching files to bootstrap VPN tunnel...") + fmt.Println("") + + t, err := tor.Start(nil, conf) + if err != nil { + return err + } + defer t.Close() + + // Wait at most 5 minutes + dialCtx, dialCancel := context.WithTimeout(context.Background(), time.Minute*10) + defer dialCancel() + dialer, err := t.Dialer(dialCtx, nil) + if err != nil { + return err + } + + /* + regClient := &http.Client{ + Transport: &http.Transport{ + DialContext: dialer.DialContext, + }, + Timeout: time.Minute * 5, + } + */ + //fetchFile(regClient, "https://wtfismyip.com/json") + + certs := x509.NewCertPool() + certs.AppendCertsFromPEM(config.CaCert) + + apiClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: certs, + }, + DialContext: dialer.DialContext, + }, + Timeout: time.Minute * 5, + } + + // XXX parametrize these urls + fetchFile(apiClient, "https://api.black.riseup.net/3/config/eip-service.json") + fetchFile(apiClient, "https://api.black.riseup.net/3/cert") + + return nil +} + +func fetchFile(client *http.Client, uri string) error { + resp, err := client.Get(uri) + if err != nil { + return err + } + defer resp.Body.Close() + + c, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Println(err) + } + fmt.Println(string(c)) + return nil +} diff --git a/pkg/snowflake/lib/interfaces.go b/pkg/snowflake/lib/interfaces.go new file mode 100644 index 0000000..5378f4a --- /dev/null +++ b/pkg/snowflake/lib/interfaces.go @@ -0,0 +1,34 @@ +package lib + +import ( + "net" +) + +// Interface for catching Snowflakes. (aka the remote dialer) +type Tongue interface { + Catch() (*WebRTCPeer, error) + + // Get the maximum number of snowflakes + GetMax() int +} + +// Interface for collecting some number of Snowflakes, for passing along +// ultimately to the SOCKS handler. +type SnowflakeCollector interface { + // Add a Snowflake to the collection. + // Implementation should decide how to connect and maintain the webRTCConn. + Collect() (*WebRTCPeer, error) + + // Remove and return the most available Snowflake from the collection. + Pop() *WebRTCPeer + + // Signal when the collector has stopped collecting. + Melted() <-chan struct{} +} + +// Interface to adapt to goptlib's SocksConn struct. +type SocksConnector interface { + Grant(*net.TCPAddr) error + Reject() error + net.Conn +} diff --git a/pkg/snowflake/lib/lib_test.go b/pkg/snowflake/lib/lib_test.go new file mode 100644 index 0000000..5537a52 --- /dev/null +++ b/pkg/snowflake/lib/lib_test.go @@ -0,0 +1,269 @@ +package lib + +import ( + "bytes" + "fmt" + "io/ioutil" + "net" + "net/http" + "testing" + + "git.torproject.org/pluggable-transports/snowflake.git/common/util" + . "github.com/smartystreets/goconvey/convey" +) + +type MockTransport struct { + statusOverride int + body []byte +} + +// Just returns a response with fake SDP answer. +func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + s := ioutil.NopCloser(bytes.NewReader(m.body)) + r := &http.Response{ + StatusCode: m.statusOverride, + Body: s, + } + return r, nil +} + +type FakeDialer struct { + max int +} + +func (w FakeDialer) Catch() (*WebRTCPeer, error) { + fmt.Println("Caught a dummy snowflake.") + return &WebRTCPeer{}, nil +} + +func (w FakeDialer) GetMax() int { + return w.max +} + +type FakeSocksConn struct { + net.Conn + rejected bool +} + +func (f FakeSocksConn) Reject() error { + f.rejected = true + return nil +} +func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil } + +type FakePeers struct{ toRelease *WebRTCPeer } + +func (f FakePeers) Collect() (*WebRTCPeer, error) { return &WebRTCPeer{}, nil } +func (f FakePeers) Pop() *WebRTCPeer { return nil } +func (f FakePeers) Melted() <-chan struct{} { return nil } + +func TestSnowflakeClient(t *testing.T) { + + Convey("Peers", t, func() { + Convey("Can construct", func() { + d := &FakeDialer{max: 1} + p, _ := NewPeers(d) + So(p.Tongue.GetMax(), ShouldEqual, 1) + So(p.snowflakeChan, ShouldNotBeNil) + So(cap(p.snowflakeChan), ShouldEqual, 1) + }) + + Convey("Collecting a Snowflake requires a Tongue.", func() { + p, err := NewPeers(nil) + So(err, ShouldNotBeNil) + // Set the dialer so that collection is possible. + d := &FakeDialer{max: 1} + p, err = NewPeers(d) + _, err = p.Collect() + So(err, ShouldBeNil) + So(p.Count(), ShouldEqual, 1) + // S + _, err = p.Collect() + }) + + Convey("Collection continues until capacity.", func() { + c := 5 + p, _ := NewPeers(FakeDialer{max: c}) + // Fill up to capacity. + for i := 0; i < c; i++ { + fmt.Println("Adding snowflake ", i) + _, err := p.Collect() + So(err, ShouldBeNil) + So(p.Count(), ShouldEqual, i+1) + } + // But adding another gives an error. + So(p.Count(), ShouldEqual, c) + _, err := p.Collect() + So(err, ShouldNotBeNil) + So(p.Count(), ShouldEqual, c) + + // But popping and closing allows it to continue. + s := p.Pop() + s.Close() + So(s, ShouldNotBeNil) + So(p.Count(), ShouldEqual, c-1) + + _, err = p.Collect() + So(err, ShouldBeNil) + So(p.Count(), ShouldEqual, c) + }) + + Convey("Count correctly purges peers marked for deletion.", func() { + p, _ := NewPeers(FakeDialer{max: 5}) + p.Collect() + p.Collect() + p.Collect() + p.Collect() + So(p.Count(), ShouldEqual, 4) + s := p.Pop() + s.Close() + So(p.Count(), ShouldEqual, 3) + s = p.Pop() + s.Close() + So(p.Count(), ShouldEqual, 2) + }) + + Convey("End Closes all peers.", func() { + cnt := 5 + p, _ := NewPeers(FakeDialer{max: cnt}) + for i := 0; i < cnt; i++ { + p.activePeers.PushBack(&WebRTCPeer{}) + } + So(p.Count(), ShouldEqual, cnt) + p.End() + <-p.Melted() + So(p.Count(), ShouldEqual, 0) + }) + + Convey("Pop skips over closed peers.", func() { + p, _ := NewPeers(FakeDialer{max: 4}) + wc1, _ := p.Collect() + wc2, _ := p.Collect() + wc3, _ := p.Collect() + So(wc1, ShouldNotBeNil) + So(wc2, ShouldNotBeNil) + So(wc3, ShouldNotBeNil) + wc1.Close() + r := p.Pop() + So(p.Count(), ShouldEqual, 2) + So(r, ShouldEqual, wc2) + wc4, _ := p.Collect() + wc2.Close() + wc3.Close() + r = p.Pop() + So(r, ShouldEqual, wc4) + }) + + }) + + Convey("Snowflake", t, func() { + + SkipConvey("Handler Grants correctly", func() { + socks := &FakeSocksConn{} + broker := &BrokerChannel{Host: "test"} + d := NewWebRTCDialer(broker, nil, 1) + + So(socks.rejected, ShouldEqual, false) + Handler(socks, d) + So(socks.rejected, ShouldEqual, true) + }) + }) + + Convey("Dialers", t, func() { + Convey("Can construct WebRTCDialer.", func() { + broker := &BrokerChannel{Host: "test"} + d := NewWebRTCDialer(broker, nil, 1) + So(d, ShouldNotBeNil) + So(d.BrokerChannel, ShouldNotBeNil) + So(d.BrokerChannel.Host, ShouldEqual, "test") + }) + SkipConvey("WebRTCDialer can Catch a snowflake.", func() { + broker := &BrokerChannel{Host: "test"} + d := NewWebRTCDialer(broker, nil, 1) + conn, err := d.Catch() + So(conn, ShouldBeNil) + So(err, ShouldNotBeNil) + }) + }) + + Convey("Rendezvous", t, func() { + transport := &MockTransport{ + http.StatusOK, + []byte(`{"type":"answer","sdp":"fake"}`), + } + fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`) + if err != nil { + panic(err) + } + + Convey("Construct BrokerChannel with no front domain", func() { + b, err := NewBrokerChannel("test.broker", "", transport, false) + So(b.url, ShouldNotBeNil) + So(err, ShouldBeNil) + So(b.url.Path, ShouldResemble, "test.broker") + So(b.transport, ShouldNotBeNil) + }) + + Convey("Construct BrokerChannel *with* front domain", func() { + b, err := NewBrokerChannel("test.broker", "front", transport, false) + So(b.url, ShouldNotBeNil) + So(err, ShouldBeNil) + So(b.url.Path, ShouldResemble, "test.broker") + So(b.url.Host, ShouldResemble, "front") + So(b.transport, ShouldNotBeNil) + }) + + Convey("BrokerChannel.Negotiate responds with answer", func() { + b, err := NewBrokerChannel("test.broker", "", transport, false) + So(err, ShouldBeNil) + answer, err := b.Negotiate(fakeOffer) + So(err, ShouldBeNil) + So(answer, ShouldNotBeNil) + So(answer.SDP, ShouldResemble, "fake") + }) + + Convey("BrokerChannel.Negotiate fails with 503", func() { + b, err := NewBrokerChannel("test.broker", "", + &MockTransport{http.StatusServiceUnavailable, []byte("\n")}, + false) + So(err, ShouldBeNil) + answer, err := b.Negotiate(fakeOffer) + So(err, ShouldNotBeNil) + So(answer, ShouldBeNil) + So(err.Error(), ShouldResemble, BrokerError503) + }) + + Convey("BrokerChannel.Negotiate fails with 400", func() { + b, err := NewBrokerChannel("test.broker", "", + &MockTransport{http.StatusBadRequest, []byte("\n")}, + false) + So(err, ShouldBeNil) + answer, err := b.Negotiate(fakeOffer) + So(err, ShouldNotBeNil) + So(answer, ShouldBeNil) + So(err.Error(), ShouldResemble, BrokerError400) + }) + + Convey("BrokerChannel.Negotiate fails with large read", func() { + b, err := NewBrokerChannel("test.broker", "", + &MockTransport{http.StatusOK, make([]byte, 100001, 100001)}, + false) + So(err, ShouldBeNil) + answer, err := b.Negotiate(fakeOffer) + So(err, ShouldNotBeNil) + So(answer, ShouldBeNil) + So(err.Error(), ShouldResemble, "unexpected EOF") + }) + + Convey("BrokerChannel.Negotiate fails with unexpected error", func() { + b, err := NewBrokerChannel("test.broker", "", + &MockTransport{123, []byte("")}, false) + So(err, ShouldBeNil) + answer, err := b.Negotiate(fakeOffer) + So(err, ShouldNotBeNil) + So(answer, ShouldBeNil) + So(err.Error(), ShouldResemble, BrokerErrorUnexpected) + }) + }) + +} diff --git a/pkg/snowflake/lib/peers.go b/pkg/snowflake/lib/peers.go new file mode 100644 index 0000000..d02eed3 --- /dev/null +++ b/pkg/snowflake/lib/peers.go @@ -0,0 +1,135 @@ +package lib + +import ( + "container/list" + "errors" + "fmt" + "log" + "sync" +) + +// Container which keeps track of multiple WebRTC remote peers. +// Implements |SnowflakeCollector|. +// +// Maintaining a set of pre-connected Peers with fresh but inactive datachannels +// allows allows rapid recovery when the current WebRTC Peer disconnects. +// +// Note: For now, only one remote can be active at any given moment. +// This is a property of Tor circuits & its current multiplexing constraints, +// but could be updated if that changes. +// (Also, this constraint does not necessarily apply to the more generic PT +// version of Snowflake) +type Peers struct { + Tongue + BytesLogger BytesLogger + + snowflakeChan chan *WebRTCPeer + activePeers *list.List + + melt chan struct{} + melted bool + + collection sync.WaitGroup +} + +// Construct a fresh container of remote peers. +func NewPeers(tongue Tongue) (*Peers, error) { + p := &Peers{} + // Use buffered go channel to pass snowflakes onwards to the SOCKS handler. + if tongue == nil { + return nil, errors.New("missing Tongue to catch Snowflakes with") + } + p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax()) + p.activePeers = list.New() + p.melt = make(chan struct{}) + p.Tongue = tongue + return p, nil +} + +// As part of |SnowflakeCollector| interface. +func (p *Peers) Collect() (*WebRTCPeer, error) { + // Engage the Snowflake Catching interface, which must be available. + p.collection.Add(1) + defer p.collection.Done() + if p.melted { + return nil, fmt.Errorf("Snowflakes have melted") + } + if nil == p.Tongue { + return nil, errors.New("missing Tongue to catch Snowflakes with") + } + cnt := p.Count() + capacity := p.Tongue.GetMax() + s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity) + if cnt >= capacity { + return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity) + } + log.Println("WebRTC: Collecting a new Snowflake.", s) + // BUG: some broker conflict here. + connection, err := p.Tongue.Catch() + if nil != err { + return nil, err + } + // Track new valid Snowflake in internal collection and pass along. + p.activePeers.PushBack(connection) + p.snowflakeChan <- connection + return connection, nil +} + +// Pop blocks until an available, valid snowflake appears. Returns nil after End +// has been called. +func (p *Peers) Pop() *WebRTCPeer { + for { + snowflake, ok := <-p.snowflakeChan + if !ok { + return nil + } + if snowflake.closed { + continue + } + // Set to use the same rate-limited traffic logger to keep consistency. + snowflake.BytesLogger = p.BytesLogger + return snowflake + } +} + +// As part of |SnowflakeCollector| interface. +func (p *Peers) Melted() <-chan struct{} { + return p.melt +} + +// Returns total available Snowflakes (including the active one) +// The count only reduces when connections themselves close, rather than when +// they are popped. +func (p *Peers) Count() int { + p.purgeClosedPeers() + return p.activePeers.Len() +} + +func (p *Peers) purgeClosedPeers() { + for e := p.activePeers.Front(); e != nil; { + next := e.Next() + conn := e.Value.(*WebRTCPeer) + // Purge those marked for deletion. + if conn.closed { + p.activePeers.Remove(e) + } + e = next + } +} + +// Close all Peers contained here. +func (p *Peers) End() { + close(p.melt) + p.melted = true + p.collection.Wait() + close(p.snowflakeChan) + cnt := p.Count() + for e := p.activePeers.Front(); e != nil; { + next := e.Next() + conn := e.Value.(*WebRTCPeer) + conn.Close() + p.activePeers.Remove(e) + e = next + } + log.Printf("WebRTC: melted all %d snowflakes.", cnt) +} diff --git a/pkg/snowflake/lib/rendezvous.go b/pkg/snowflake/lib/rendezvous.go new file mode 100644 index 0000000..32da081 --- /dev/null +++ b/pkg/snowflake/lib/rendezvous.go @@ -0,0 +1,184 @@ +// WebRTC rendezvous requires the exchange of SessionDescriptions between +// peers in order to establish a PeerConnection. +// +// This file contains the one method currently available to Snowflake: +// +// - Domain-fronted HTTP signaling. The Broker automatically exchange offers +// and answers between this client and some remote WebRTC proxy. + +package lib + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" + "sync" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/common/nat" + "git.torproject.org/pluggable-transports/snowflake.git/common/util" + "github.com/pion/webrtc/v3" +) + +const ( + BrokerError503 string = "No snowflake proxies currently available." + BrokerError400 string = "You sent an invalid offer in the request." + BrokerErrorUnexpected string = "Unexpected error, no answer." + readLimit = 100000 //Maximum number of bytes to be read from an HTTP response +) + +// Signalling Channel to the Broker. +type BrokerChannel struct { + // The Host header to put in the HTTP request (optional and may be + // different from the host name in URL). + Host string + url *url.URL + transport http.RoundTripper // Used to make all requests. + keepLocalAddresses bool + NATType string + lock sync.Mutex +} + +// We make a copy of DefaultTransport because we want the default Dial +// and TLSHandshakeTimeout settings. But we want to disable the default +// ProxyFromEnvironment setting. +func CreateBrokerTransport() http.RoundTripper { + transport := http.DefaultTransport.(*http.Transport) + transport.Proxy = nil + transport.ResponseHeaderTimeout = 15 * time.Second + return transport +} + +// Construct a new BrokerChannel, where: +// |broker| is the full URL of the facilitating program which assigns proxies +// to clients, and |front| is the option fronting domain. +func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool) (*BrokerChannel, error) { + targetURL, err := url.Parse(broker) + if err != nil { + return nil, err + } + log.Println("Rendezvous using Broker at:", broker) + bc := new(BrokerChannel) + bc.url = targetURL + if front != "" { // Optional front domain. + log.Println("Domain fronting using:", front) + bc.Host = bc.url.Host + bc.url.Host = front + } + + bc.transport = transport + bc.keepLocalAddresses = keepLocalAddresses + bc.NATType = nat.NATUnknown + return bc, nil +} + +func limitedRead(r io.Reader, limit int64) ([]byte, error) { + p, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: limit + 1}) + if err != nil { + return p, err + } else if int64(len(p)) == limit+1 { + return p[0:limit], io.ErrUnexpectedEOF + } + return p, err +} + +// Roundtrip HTTP POST using WebRTC SessionDescriptions. +// +// Send an SDP offer to the broker, which assigns a proxy and responds +// with an SDP answer from a designated remote WebRTC peer. +func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( + *webrtc.SessionDescription, error) { + log.Println("Negotiating via BrokerChannel...\nTarget URL: ", + bc.Host, "\nFront URL: ", bc.url.Host) + // Ideally, we could specify an `RTCIceTransportPolicy` that would handle + // this for us. However, "public" was removed from the draft spec. + // See https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration#RTCIceTransportPolicy_enum + if !bc.keepLocalAddresses { + offer = &webrtc.SessionDescription{ + Type: offer.Type, + SDP: util.StripLocalAddresses(offer.SDP), + } + } + offerSDP, err := util.SerializeSessionDescription(offer) + if err != nil { + return nil, err + } + data := bytes.NewReader([]byte(offerSDP)) + // Suffix with broker's client registration handler. + clientURL := bc.url.ResolveReference(&url.URL{Path: "client"}) + request, err := http.NewRequest("POST", clientURL.String(), data) + if nil != err { + return nil, err + } + if "" != bc.Host { // Set true host if necessary. + request.Host = bc.Host + } + // include NAT-TYPE + bc.lock.Lock() + request.Header.Set("Snowflake-NAT-TYPE", bc.NATType) + bc.lock.Unlock() + resp, err := bc.transport.RoundTrip(request) + if nil != err { + return nil, err + } + defer resp.Body.Close() + log.Printf("BrokerChannel Response:\n%s\n\n", resp.Status) + + switch resp.StatusCode { + case http.StatusOK: + body, err := limitedRead(resp.Body, readLimit) + if nil != err { + return nil, err + } + log.Printf("Received answer: %s", string(body)) + return util.DeserializeSessionDescription(string(body)) + case http.StatusServiceUnavailable: + return nil, errors.New(BrokerError503) + case http.StatusBadRequest: + return nil, errors.New(BrokerError400) + default: + return nil, errors.New(BrokerErrorUnexpected) + } +} + +func (bc *BrokerChannel) SetNATType(NATType string) { + bc.lock.Lock() + bc.NATType = NATType + bc.lock.Unlock() + log.Printf("NAT Type: %s", NATType) +} + +// Implements the |Tongue| interface to catch snowflakes, using BrokerChannel. +type WebRTCDialer struct { + *BrokerChannel + webrtcConfig *webrtc.Configuration + max int +} + +func NewWebRTCDialer(broker *BrokerChannel, iceServers []webrtc.ICEServer, max int) *WebRTCDialer { + config := webrtc.Configuration{ + ICEServers: iceServers, + } + + return &WebRTCDialer{ + BrokerChannel: broker, + webrtcConfig: &config, + max: max, + } +} + +// Initialize a WebRTC Connection by signaling through the broker. +func (w WebRTCDialer) Catch() (*WebRTCPeer, error) { + // TODO: [#25591] Fetch ICE server information from Broker. + // TODO: [#25596] Consider TURN servers here too. + return NewWebRTCPeer(w.webrtcConfig, w.BrokerChannel) +} + +// Returns the maximum number of snowflakes to collect +func (w WebRTCDialer) GetMax() int { + return w.max +} diff --git a/pkg/snowflake/lib/snowflake.go b/pkg/snowflake/lib/snowflake.go new file mode 100644 index 0000000..2ed51a1 --- /dev/null +++ b/pkg/snowflake/lib/snowflake.go @@ -0,0 +1,175 @@ +package lib + +import ( + "context" + "errors" + "io" + "log" + "net" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel" + "github.com/xtaci/kcp-go/v5" + "github.com/xtaci/smux" +) + +const ( + ReconnectTimeout = 10 * time.Second + SnowflakeTimeout = 20 * time.Second + // How long to wait for the OnOpen callback on a DataChannel. + DataChannelTimeout = 10 * time.Second +) + +type dummyAddr struct{} + +func (addr dummyAddr) Network() string { return "dummy" } +func (addr dummyAddr) String() string { return "dummy" } + +// newSession returns a new smux.Session and the net.PacketConn it is running +// over. The net.PacketConn successively connects through Snowflake proxies +// pulled from snowflakes. +func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) { + clientID := turbotunnel.NewClientID() + + // We build a persistent KCP session on a sequence of ephemeral WebRTC + // connections. This dialContext tells RedialPacketConn how to get a new + // WebRTC connection when the previous one dies. Inside each WebRTC + // connection, we use EncapsulationPacketConn to encode packets into a + // stream. + dialContext := func(ctx context.Context) (net.PacketConn, error) { + log.Printf("redialing on same connection") + // Obtain an available WebRTC remote. May block. + conn := snowflakes.Pop() + if conn == nil { + return nil, errors.New("handler: Received invalid Snowflake") + } + log.Println("---- Handler: snowflake assigned ----") + // Send the magic Turbo Tunnel token. + _, err := conn.Write(turbotunnel.Token[:]) + if err != nil { + return nil, err + } + // Send ClientID prefix. + _, err = conn.Write(clientID[:]) + if err != nil { + return nil, err + } + return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil + } + pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext) + + // conn is built on the underlying RedialPacketConn—when one WebRTC + // connection dies, another one will be found to take its place. The + // sequence of packets across multiple WebRTC connections drives the KCP + // engine. + conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn) + if err != nil { + pconn.Close() + return nil, nil, err + } + // Permit coalescing the payloads of consecutive sends. + conn.SetStreamMode(true) + // Set the maximum send and receive window sizes to a high number + // Removes KCP bottlenecks: https://gitlab.torproject.org/tpo/anti-censorship/pluggable-transports/snowflake/-/issues/40026 + conn.SetWindowSize(65535, 65535) + // Disable the dynamic congestion window (limit only by the + // maximum of local and remote static windows). + conn.SetNoDelay( + 0, // default nodelay + 0, // default interval + 0, // default resend + 1, // nc=1 => congestion window off + ) + // On the KCP connection we overlay an smux session and stream. + smuxConfig := smux.DefaultConfig() + smuxConfig.Version = 2 + smuxConfig.KeepAliveTimeout = 10 * time.Minute + sess, err := smux.Client(conn, smuxConfig) + if err != nil { + conn.Close() + pconn.Close() + return nil, nil, err + } + + return pconn, sess, err +} + +// Given an accepted SOCKS connection, establish a WebRTC connection to the +// remote peer and exchange traffic. +func Handler(socks net.Conn, tongue Tongue) error { + // Prepare to collect remote WebRTC peers. + snowflakes, err := NewPeers(tongue) + if err != nil { + return err + } + + // Use a real logger to periodically output how much traffic is happening. + snowflakes.BytesLogger = NewBytesSyncLogger() + + log.Printf("---- Handler: begin collecting snowflakes ---") + go connectLoop(snowflakes) + + // Create a new smux session + log.Printf("---- Handler: starting a new session ---") + pconn, sess, err := newSession(snowflakes) + if err != nil { + return err + } + + // On the smux session we overlay a stream. + stream, err := sess.OpenStream() + if err != nil { + return err + } + defer stream.Close() + + // Begin exchanging data. + log.Printf("---- Handler: begin stream %v ---", stream.ID()) + copyLoop(socks, stream) + log.Printf("---- Handler: closed stream %v ---", stream.ID()) + snowflakes.End() + log.Printf("---- Handler: end collecting snowflakes ---") + pconn.Close() + sess.Close() + log.Printf("---- Handler: discarding finished session ---") + return nil +} + +// Maintain |SnowflakeCapacity| number of available WebRTC connections, to +// transfer to the Tor SOCKS handler when needed. +func connectLoop(snowflakes SnowflakeCollector) { + for { + timer := time.After(ReconnectTimeout) + _, err := snowflakes.Collect() + if err != nil { + log.Printf("WebRTC: %v Retrying...", err) + } + select { + case <-timer: + continue + case <-snowflakes.Melted(): + log.Println("ConnectLoop: stopped.") + return + } + } +} + +// Exchanges bytes between two ReadWriters. +// (In this case, between a SOCKS connection and smux stream.) +func copyLoop(socks, stream io.ReadWriter) { + done := make(chan struct{}, 2) + go func() { + if _, err := io.Copy(socks, stream); err != nil { + log.Printf("copying WebRTC to SOCKS resulted in error: %v", err) + } + done <- struct{}{} + }() + go func() { + if _, err := io.Copy(stream, socks); err != nil { + log.Printf("copying SOCKS to stream resulted in error: %v", err) + } + done <- struct{}{} + }() + <-done + log.Println("copy loop ended") +} diff --git a/pkg/snowflake/lib/turbotunnel.go b/pkg/snowflake/lib/turbotunnel.go new file mode 100644 index 0000000..aad2e6a --- /dev/null +++ b/pkg/snowflake/lib/turbotunnel.go @@ -0,0 +1,68 @@ +package lib + +import ( + "bufio" + "errors" + "io" + "net" + "time" + + "git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation" +) + +var errNotImplemented = errors.New("not implemented") + +// EncapsulationPacketConn implements the net.PacketConn interface over an +// io.ReadWriteCloser stream, using the encapsulation package to represent +// packets in a stream. +type EncapsulationPacketConn struct { + io.ReadWriteCloser + localAddr net.Addr + remoteAddr net.Addr + bw *bufio.Writer +} + +// NewEncapsulationPacketConn makes +func NewEncapsulationPacketConn( + localAddr, remoteAddr net.Addr, + conn io.ReadWriteCloser, +) *EncapsulationPacketConn { + return &EncapsulationPacketConn{ + ReadWriteCloser: conn, + localAddr: localAddr, + remoteAddr: remoteAddr, + bw: bufio.NewWriter(conn), + } +} + +// ReadFrom reads an encapsulated packet from the stream. +func (c *EncapsulationPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { + data, err := encapsulation.ReadData(c.ReadWriteCloser) + if err != nil { + return 0, c.remoteAddr, err + } + return copy(p, data), c.remoteAddr, nil +} + +// WriteTo writes an encapsulated packet to the stream. +func (c *EncapsulationPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { + // addr is ignored. + _, err := encapsulation.WriteData(c.bw, p) + if err == nil { + err = c.bw.Flush() + } + if err != nil { + return 0, err + } + return len(p), nil +} + +// LocalAddr returns the localAddr value that was passed to +// NewEncapsulationPacketConn. +func (c *EncapsulationPacketConn) LocalAddr() net.Addr { + return c.localAddr +} + +func (c *EncapsulationPacketConn) SetDeadline(t time.Time) error { return errNotImplemented } +func (c *EncapsulationPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented } +func (c *EncapsulationPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented } diff --git a/pkg/snowflake/lib/util.go b/pkg/snowflake/lib/util.go new file mode 100644 index 0000000..0eb8ddd --- /dev/null +++ b/pkg/snowflake/lib/util.go @@ -0,0 +1,70 @@ +package lib + +import ( + "log" + "time" +) + +const ( + LogTimeInterval = 5 * time.Second +) + +type BytesLogger interface { + AddOutbound(int) + AddInbound(int) +} + +// Default BytesLogger does nothing. +type BytesNullLogger struct{} + +func (b BytesNullLogger) AddOutbound(amount int) {} +func (b BytesNullLogger) AddInbound(amount int) {} + +// BytesSyncLogger uses channels to safely log from multiple sources with output +// occuring at reasonable intervals. +type BytesSyncLogger struct { + outboundChan chan int + inboundChan chan int +} + +// NewBytesSyncLogger returns a new BytesSyncLogger and starts it loggin. +func NewBytesSyncLogger() *BytesSyncLogger { + b := &BytesSyncLogger{ + outboundChan: make(chan int, 5), + inboundChan: make(chan int, 5), + } + go b.log() + return b +} + +func (b *BytesSyncLogger) log() { + var outbound, inbound, outEvents, inEvents int + ticker := time.NewTicker(LogTimeInterval) + for { + select { + case <-ticker.C: + if outEvents > 0 || inEvents > 0 { + log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)", + inbound, outbound, inEvents, outEvents) + } + outbound = 0 + outEvents = 0 + inbound = 0 + inEvents = 0 + case amount := <-b.outboundChan: + outbound += amount + outEvents++ + case amount := <-b.inboundChan: + inbound += amount + inEvents++ + } + } +} + +func (b *BytesSyncLogger) AddOutbound(amount int) { + b.outboundChan <- amount +} + +func (b *BytesSyncLogger) AddInbound(amount int) { + b.inboundChan <- amount +} diff --git a/pkg/snowflake/lib/webrtc.go b/pkg/snowflake/lib/webrtc.go new file mode 100644 index 0000000..af7ba6d --- /dev/null +++ b/pkg/snowflake/lib/webrtc.go @@ -0,0 +1,222 @@ +package lib + +import ( + "crypto/rand" + "encoding/hex" + "errors" + "io" + "log" + "sync" + "time" + + "github.com/pion/webrtc/v3" +) + +// Remote WebRTC peer. +// +// Handles preparation of go-webrtc PeerConnection. Only ever has +// one DataChannel. +type WebRTCPeer struct { + id string + pc *webrtc.PeerConnection + transport *webrtc.DataChannel + + recvPipe *io.PipeReader + writePipe *io.PipeWriter + lastReceive time.Time + + open chan struct{} // Channel to notify when datachannel opens + closed bool + + once sync.Once // Synchronization for PeerConnection destruction + + BytesLogger BytesLogger +} + +// Construct a WebRTC PeerConnection. +func NewWebRTCPeer(config *webrtc.Configuration, + broker *BrokerChannel) (*WebRTCPeer, error) { + connection := new(WebRTCPeer) + { + var buf [8]byte + if _, err := rand.Read(buf[:]); err != nil { + panic(err) + } + connection.id = "snowflake-" + hex.EncodeToString(buf[:]) + } + + // Override with something that's not NullLogger to have real logging. + connection.BytesLogger = &BytesNullLogger{} + + // Pipes remain the same even when DataChannel gets switched. + connection.recvPipe, connection.writePipe = io.Pipe() + + err := connection.connect(config, broker) + if err != nil { + connection.Close() + return nil, err + } + return connection, nil +} + +// Read bytes from local SOCKS. +// As part of |io.ReadWriter| +func (c *WebRTCPeer) Read(b []byte) (int, error) { + return c.recvPipe.Read(b) +} + +// Writes bytes out to remote WebRTC. +// As part of |io.ReadWriter| +func (c *WebRTCPeer) Write(b []byte) (int, error) { + err := c.transport.Send(b) + if err != nil { + return 0, err + } + c.BytesLogger.AddOutbound(len(b)) + return len(b), nil +} + +func (c *WebRTCPeer) Close() error { + c.once.Do(func() { + c.closed = true + c.cleanup() + log.Printf("WebRTC: Closing") + }) + return nil +} + +// Prevent long-lived broken remotes. +// Should also update the DataChannel in underlying go-webrtc's to make Closes +// more immediate / responsive. +func (c *WebRTCPeer) checkForStaleness() { + c.lastReceive = time.Now() + for { + if c.closed { + return + } + if time.Since(c.lastReceive) > SnowflakeTimeout { + log.Printf("WebRTC: No messages received for %v -- closing stale connection.", + SnowflakeTimeout) + c.Close() + return + } + <-time.After(time.Second) + } +} + +func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel) error { + log.Println(c.id, " connecting...") + // TODO: When go-webrtc is more stable, it's possible that a new + // PeerConnection won't need to be re-prepared each time. + c.preparePeerConnection(config) + answer, err := broker.Negotiate(c.pc.LocalDescription()) + if err != nil { + return err + } + log.Printf("Received Answer.\n") + err = c.pc.SetRemoteDescription(*answer) + if nil != err { + log.Println("WebRTC: Unable to SetRemoteDescription:", err) + return err + } + + // Wait for the datachannel to open or time out + select { + case <-c.open: + case <-time.After(DataChannelTimeout): + c.transport.Close() + return errors.New("timeout waiting for DataChannel.OnOpen") + } + + go c.checkForStaleness() + return nil +} + +// preparePeerConnection creates a new WebRTC PeerConnection and returns it +// after ICE candidate gathering is complete.. +func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error { + var err error + c.pc, err = webrtc.NewPeerConnection(*config) + if err != nil { + log.Printf("NewPeerConnection ERROR: %s", err) + return err + } + ordered := true + dataChannelOptions := &webrtc.DataChannelInit{ + Ordered: &ordered, + } + // We must create the data channel before creating an offer + // https://github.com/pion/webrtc/wiki/Release-WebRTC@v3.0.0 + dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions) + if err != nil { + log.Printf("CreateDataChannel ERROR: %s", err) + return err + } + dc.OnOpen(func() { + log.Println("WebRTC: DataChannel.OnOpen") + close(c.open) + }) + dc.OnClose(func() { + log.Println("WebRTC: DataChannel.OnClose") + c.Close() + }) + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if len(msg.Data) <= 0 { + log.Println("0 length message---") + } + n, err := c.writePipe.Write(msg.Data) + c.BytesLogger.AddInbound(n) + if err != nil { + // TODO: Maybe shouldn't actually close. + log.Println("Error writing to SOCKS pipe") + if inerr := c.writePipe.CloseWithError(err); inerr != nil { + log.Printf("c.writePipe.CloseWithError returned error: %v", inerr) + } + } + c.lastReceive = time.Now() + }) + c.transport = dc + c.open = make(chan struct{}) + log.Println("WebRTC: DataChannel created.") + + // Allow candidates to accumulate until ICEGatheringStateComplete. + done := webrtc.GatheringCompletePromise(c.pc) + offer, err := c.pc.CreateOffer(nil) + // TODO: Potentially timeout and retry if ICE isn't working. + if err != nil { + log.Println("Failed to prepare offer", err) + c.pc.Close() + return err + } + log.Println("WebRTC: Created offer") + err = c.pc.SetLocalDescription(offer) + if err != nil { + log.Println("Failed to prepare offer", err) + c.pc.Close() + return err + } + log.Println("WebRTC: Set local description") + + <-done // Wait for ICE candidate gathering to complete. + log.Println("WebRTC: PeerConnection created.") + return nil +} + +// Close all channels and transports +func (c *WebRTCPeer) cleanup() { + // Close this side of the SOCKS pipe. + if c.writePipe != nil { // c.writePipe can be nil in tests. + c.writePipe.Close() + } + if nil != c.transport { + log.Printf("WebRTC: closing DataChannel") + c.transport.Close() + } + if nil != c.pc { + log.Printf("WebRTC: closing PeerConnection") + err := c.pc.Close() + if nil != err { + log.Printf("Error closing peerconnection...") + } + } +} |