summaryrefslogtreecommitdiff
path: root/vendor/git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel/clientmap.go
blob: 53d03024216953312e150bce6c82ede572838182 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package turbotunnel

import (
	"container/heap"
	"net"
	"sync"
	"time"
)

// clientRecord is a record of a recently seen client, with the time it was last
// seen and a send queue.
type clientRecord struct {
	Addr      net.Addr
	LastSeen  time.Time
	SendQueue chan []byte
}

// ClientMap manages a mapping of live clients (keyed by address, which will be
// a ClientID) to their respective send queues. ClientMap's functions are safe
// to call from multiple goroutines.
type ClientMap struct {
	// We use an inner structure to avoid exposing public heap.Interface
	// functions to users of clientMap.
	inner clientMapInner
	// Synchronizes access to inner.
	lock sync.Mutex
}

// NewClientMap creates a ClientMap that expires clients after a timeout.
//
// The timeout does not have to be kept in sync with QUIC's internal idle
// timeout. If a client is removed from the client map while the QUIC session is
// still live, the worst that can happen is a loss of whatever packets were in
// the send queue at the time. If QUIC later decides to send more packets to the
// same client, we'll instantiate a new send queue, and if the client ever
// connects again with the proper client ID, we'll deliver them.
func NewClientMap(timeout time.Duration) *ClientMap {
	m := &ClientMap{
		inner: clientMapInner{
			byAge:  make([]*clientRecord, 0),
			byAddr: make(map[net.Addr]int),
		},
	}
	go func() {
		for {
			time.Sleep(timeout / 2)
			now := time.Now()
			m.lock.Lock()
			m.inner.removeExpired(now, timeout)
			m.lock.Unlock()
		}
	}()
	return m
}

// SendQueue returns the send queue corresponding to addr, creating it if
// necessary.
func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
	m.lock.Lock()
	defer m.lock.Unlock()
	return m.inner.SendQueue(addr, time.Now())
}

// clientMapInner is the inner type of ClientMap, implementing heap.Interface.
// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
// to heap indices, to allow looking up by address. Unlike ClientMap,
// clientMapInner requires external synchonization.
type clientMapInner struct {
	byAge  []*clientRecord
	byAddr map[net.Addr]int
}

// removeExpired removes all client records whose LastSeen timestamp is more
// than timeout in the past.
func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
	for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
		heap.Pop(inner)
	}
}

// SendQueue finds the existing client record corresponding to addr, or creates
// a new one if none exists yet. It updates the client record's LastSeen time
// and returns its SendQueue.
func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
	var record *clientRecord
	i, ok := inner.byAddr[addr]
	if ok {
		// Found one, update its LastSeen.
		record = inner.byAge[i]
		record.LastSeen = now
		heap.Fix(inner, i)
	} else {
		// Not found, create a new one.
		record = &clientRecord{
			Addr:      addr,
			LastSeen:  now,
			SendQueue: make(chan []byte, queueSize),
		}
		heap.Push(inner, record)
	}
	return record.SendQueue
}

// heap.Interface for clientMapInner.

func (inner *clientMapInner) Len() int {
	if len(inner.byAge) != len(inner.byAddr) {
		panic("inconsistent clientMap")
	}
	return len(inner.byAge)
}

func (inner *clientMapInner) Less(i, j int) bool {
	return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
}

func (inner *clientMapInner) Swap(i, j int) {
	inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
	inner.byAddr[inner.byAge[i].Addr] = i
	inner.byAddr[inner.byAge[j].Addr] = j
}

func (inner *clientMapInner) Push(x interface{}) {
	record := x.(*clientRecord)
	if _, ok := inner.byAddr[record.Addr]; ok {
		panic("duplicate address in clientMap")
	}
	// Insert into byAddr map.
	inner.byAddr[record.Addr] = len(inner.byAge)
	// Insert into byAge slice.
	inner.byAge = append(inner.byAge, record)
}

func (inner *clientMapInner) Pop() interface{} {
	n := len(inner.byAddr)
	// Remove from byAge slice.
	record := inner.byAge[n-1]
	inner.byAge[n-1] = nil
	inner.byAge = inner.byAge[:n-1]
	// Remove from byAddr map.
	delete(inner.byAddr, record.Addr)
	close(record.SendQueue)
	return record
}