summaryrefslogtreecommitdiff
path: root/pkg/snowflake/lib/peers.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/snowflake/lib/peers.go')
-rw-r--r--pkg/snowflake/lib/peers.go135
1 files changed, 135 insertions, 0 deletions
diff --git a/pkg/snowflake/lib/peers.go b/pkg/snowflake/lib/peers.go
new file mode 100644
index 0000000..d02eed3
--- /dev/null
+++ b/pkg/snowflake/lib/peers.go
@@ -0,0 +1,135 @@
+package lib
+
+import (
+ "container/list"
+ "errors"
+ "fmt"
+ "log"
+ "sync"
+)
+
+// Container which keeps track of multiple WebRTC remote peers.
+// Implements |SnowflakeCollector|.
+//
+// Maintaining a set of pre-connected Peers with fresh but inactive datachannels
+// allows allows rapid recovery when the current WebRTC Peer disconnects.
+//
+// Note: For now, only one remote can be active at any given moment.
+// This is a property of Tor circuits & its current multiplexing constraints,
+// but could be updated if that changes.
+// (Also, this constraint does not necessarily apply to the more generic PT
+// version of Snowflake)
+type Peers struct {
+ Tongue
+ BytesLogger BytesLogger
+
+ snowflakeChan chan *WebRTCPeer
+ activePeers *list.List
+
+ melt chan struct{}
+ melted bool
+
+ collection sync.WaitGroup
+}
+
+// Construct a fresh container of remote peers.
+func NewPeers(tongue Tongue) (*Peers, error) {
+ p := &Peers{}
+ // Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
+ if tongue == nil {
+ return nil, errors.New("missing Tongue to catch Snowflakes with")
+ }
+ p.snowflakeChan = make(chan *WebRTCPeer, tongue.GetMax())
+ p.activePeers = list.New()
+ p.melt = make(chan struct{})
+ p.Tongue = tongue
+ return p, nil
+}
+
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Collect() (*WebRTCPeer, error) {
+ // Engage the Snowflake Catching interface, which must be available.
+ p.collection.Add(1)
+ defer p.collection.Done()
+ if p.melted {
+ return nil, fmt.Errorf("Snowflakes have melted")
+ }
+ if nil == p.Tongue {
+ return nil, errors.New("missing Tongue to catch Snowflakes with")
+ }
+ cnt := p.Count()
+ capacity := p.Tongue.GetMax()
+ s := fmt.Sprintf("Currently at [%d/%d]", cnt, capacity)
+ if cnt >= capacity {
+ return nil, fmt.Errorf("At capacity [%d/%d]", cnt, capacity)
+ }
+ log.Println("WebRTC: Collecting a new Snowflake.", s)
+ // BUG: some broker conflict here.
+ connection, err := p.Tongue.Catch()
+ if nil != err {
+ return nil, err
+ }
+ // Track new valid Snowflake in internal collection and pass along.
+ p.activePeers.PushBack(connection)
+ p.snowflakeChan <- connection
+ return connection, nil
+}
+
+// Pop blocks until an available, valid snowflake appears. Returns nil after End
+// has been called.
+func (p *Peers) Pop() *WebRTCPeer {
+ for {
+ snowflake, ok := <-p.snowflakeChan
+ if !ok {
+ return nil
+ }
+ if snowflake.closed {
+ continue
+ }
+ // Set to use the same rate-limited traffic logger to keep consistency.
+ snowflake.BytesLogger = p.BytesLogger
+ return snowflake
+ }
+}
+
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Melted() <-chan struct{} {
+ return p.melt
+}
+
+// Returns total available Snowflakes (including the active one)
+// The count only reduces when connections themselves close, rather than when
+// they are popped.
+func (p *Peers) Count() int {
+ p.purgeClosedPeers()
+ return p.activePeers.Len()
+}
+
+func (p *Peers) purgeClosedPeers() {
+ for e := p.activePeers.Front(); e != nil; {
+ next := e.Next()
+ conn := e.Value.(*WebRTCPeer)
+ // Purge those marked for deletion.
+ if conn.closed {
+ p.activePeers.Remove(e)
+ }
+ e = next
+ }
+}
+
+// Close all Peers contained here.
+func (p *Peers) End() {
+ close(p.melt)
+ p.melted = true
+ p.collection.Wait()
+ close(p.snowflakeChan)
+ cnt := p.Count()
+ for e := p.activePeers.Front(); e != nil; {
+ next := e.Next()
+ conn := e.Value.(*WebRTCPeer)
+ conn.Close()
+ p.activePeers.Remove(e)
+ e = next
+ }
+ log.Printf("WebRTC: melted all %d snowflakes.", cnt)
+}