summaryrefslogtreecommitdiff
path: root/pkg/snowflake/lib/webrtc.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/snowflake/lib/webrtc.go')
-rw-r--r--pkg/snowflake/lib/webrtc.go222
1 files changed, 222 insertions, 0 deletions
diff --git a/pkg/snowflake/lib/webrtc.go b/pkg/snowflake/lib/webrtc.go
new file mode 100644
index 0000000..af7ba6d
--- /dev/null
+++ b/pkg/snowflake/lib/webrtc.go
@@ -0,0 +1,222 @@
+package lib
+
+import (
+ "crypto/rand"
+ "encoding/hex"
+ "errors"
+ "io"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/pion/webrtc/v3"
+)
+
+// Remote WebRTC peer.
+//
+// Handles preparation of go-webrtc PeerConnection. Only ever has
+// one DataChannel.
+type WebRTCPeer struct {
+ id string
+ pc *webrtc.PeerConnection
+ transport *webrtc.DataChannel
+
+ recvPipe *io.PipeReader
+ writePipe *io.PipeWriter
+ lastReceive time.Time
+
+ open chan struct{} // Channel to notify when datachannel opens
+ closed bool
+
+ once sync.Once // Synchronization for PeerConnection destruction
+
+ BytesLogger BytesLogger
+}
+
+// Construct a WebRTC PeerConnection.
+func NewWebRTCPeer(config *webrtc.Configuration,
+ broker *BrokerChannel) (*WebRTCPeer, error) {
+ connection := new(WebRTCPeer)
+ {
+ var buf [8]byte
+ if _, err := rand.Read(buf[:]); err != nil {
+ panic(err)
+ }
+ connection.id = "snowflake-" + hex.EncodeToString(buf[:])
+ }
+
+ // Override with something that's not NullLogger to have real logging.
+ connection.BytesLogger = &BytesNullLogger{}
+
+ // Pipes remain the same even when DataChannel gets switched.
+ connection.recvPipe, connection.writePipe = io.Pipe()
+
+ err := connection.connect(config, broker)
+ if err != nil {
+ connection.Close()
+ return nil, err
+ }
+ return connection, nil
+}
+
+// Read bytes from local SOCKS.
+// As part of |io.ReadWriter|
+func (c *WebRTCPeer) Read(b []byte) (int, error) {
+ return c.recvPipe.Read(b)
+}
+
+// Writes bytes out to remote WebRTC.
+// As part of |io.ReadWriter|
+func (c *WebRTCPeer) Write(b []byte) (int, error) {
+ err := c.transport.Send(b)
+ if err != nil {
+ return 0, err
+ }
+ c.BytesLogger.AddOutbound(len(b))
+ return len(b), nil
+}
+
+func (c *WebRTCPeer) Close() error {
+ c.once.Do(func() {
+ c.closed = true
+ c.cleanup()
+ log.Printf("WebRTC: Closing")
+ })
+ return nil
+}
+
+// Prevent long-lived broken remotes.
+// Should also update the DataChannel in underlying go-webrtc's to make Closes
+// more immediate / responsive.
+func (c *WebRTCPeer) checkForStaleness() {
+ c.lastReceive = time.Now()
+ for {
+ if c.closed {
+ return
+ }
+ if time.Since(c.lastReceive) > SnowflakeTimeout {
+ log.Printf("WebRTC: No messages received for %v -- closing stale connection.",
+ SnowflakeTimeout)
+ c.Close()
+ return
+ }
+ <-time.After(time.Second)
+ }
+}
+
+func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel) error {
+ log.Println(c.id, " connecting...")
+ // TODO: When go-webrtc is more stable, it's possible that a new
+ // PeerConnection won't need to be re-prepared each time.
+ c.preparePeerConnection(config)
+ answer, err := broker.Negotiate(c.pc.LocalDescription())
+ if err != nil {
+ return err
+ }
+ log.Printf("Received Answer.\n")
+ err = c.pc.SetRemoteDescription(*answer)
+ if nil != err {
+ log.Println("WebRTC: Unable to SetRemoteDescription:", err)
+ return err
+ }
+
+ // Wait for the datachannel to open or time out
+ select {
+ case <-c.open:
+ case <-time.After(DataChannelTimeout):
+ c.transport.Close()
+ return errors.New("timeout waiting for DataChannel.OnOpen")
+ }
+
+ go c.checkForStaleness()
+ return nil
+}
+
+// preparePeerConnection creates a new WebRTC PeerConnection and returns it
+// after ICE candidate gathering is complete..
+func (c *WebRTCPeer) preparePeerConnection(config *webrtc.Configuration) error {
+ var err error
+ c.pc, err = webrtc.NewPeerConnection(*config)
+ if err != nil {
+ log.Printf("NewPeerConnection ERROR: %s", err)
+ return err
+ }
+ ordered := true
+ dataChannelOptions := &webrtc.DataChannelInit{
+ Ordered: &ordered,
+ }
+ // We must create the data channel before creating an offer
+ // https://github.com/pion/webrtc/wiki/Release-WebRTC@v3.0.0
+ dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
+ if err != nil {
+ log.Printf("CreateDataChannel ERROR: %s", err)
+ return err
+ }
+ dc.OnOpen(func() {
+ log.Println("WebRTC: DataChannel.OnOpen")
+ close(c.open)
+ })
+ dc.OnClose(func() {
+ log.Println("WebRTC: DataChannel.OnClose")
+ c.Close()
+ })
+ dc.OnMessage(func(msg webrtc.DataChannelMessage) {
+ if len(msg.Data) <= 0 {
+ log.Println("0 length message---")
+ }
+ n, err := c.writePipe.Write(msg.Data)
+ c.BytesLogger.AddInbound(n)
+ if err != nil {
+ // TODO: Maybe shouldn't actually close.
+ log.Println("Error writing to SOCKS pipe")
+ if inerr := c.writePipe.CloseWithError(err); inerr != nil {
+ log.Printf("c.writePipe.CloseWithError returned error: %v", inerr)
+ }
+ }
+ c.lastReceive = time.Now()
+ })
+ c.transport = dc
+ c.open = make(chan struct{})
+ log.Println("WebRTC: DataChannel created.")
+
+ // Allow candidates to accumulate until ICEGatheringStateComplete.
+ done := webrtc.GatheringCompletePromise(c.pc)
+ offer, err := c.pc.CreateOffer(nil)
+ // TODO: Potentially timeout and retry if ICE isn't working.
+ if err != nil {
+ log.Println("Failed to prepare offer", err)
+ c.pc.Close()
+ return err
+ }
+ log.Println("WebRTC: Created offer")
+ err = c.pc.SetLocalDescription(offer)
+ if err != nil {
+ log.Println("Failed to prepare offer", err)
+ c.pc.Close()
+ return err
+ }
+ log.Println("WebRTC: Set local description")
+
+ <-done // Wait for ICE candidate gathering to complete.
+ log.Println("WebRTC: PeerConnection created.")
+ return nil
+}
+
+// Close all channels and transports
+func (c *WebRTCPeer) cleanup() {
+ // Close this side of the SOCKS pipe.
+ if c.writePipe != nil { // c.writePipe can be nil in tests.
+ c.writePipe.Close()
+ }
+ if nil != c.transport {
+ log.Printf("WebRTC: closing DataChannel")
+ c.transport.Close()
+ }
+ if nil != c.pc {
+ log.Printf("WebRTC: closing PeerConnection")
+ err := c.pc.Close()
+ if nil != err {
+ log.Printf("Error closing peerconnection...")
+ }
+ }
+}