summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/sctp/rtx_timer.go
blob: 14bc39d771376a25e538ced1ab6bf8baa105b183 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package sctp

import (
	"math"
	"sync"
	"time"
)

const (
	rtoInitial     float64 = 3.0 * 1000  // msec
	rtoMin         float64 = 1.0 * 1000  // msec
	rtoMax         float64 = 60.0 * 1000 // msec
	rtoAlpha       float64 = 0.125
	rtoBeta        float64 = 0.25
	maxInitRetrans uint    = 8
	pathMaxRetrans uint    = 5
	noMaxRetrans   uint    = 0
)

// rtoManager manages Rtx timeout values.
// This is an implementation of RFC 4960 sec 6.3.1.
type rtoManager struct {
	srtt     float64
	rttvar   float64
	rto      float64
	noUpdate bool
	mutex    sync.RWMutex
}

// newRTOManager creates a new rtoManager.
func newRTOManager() *rtoManager {
	return &rtoManager{
		rto: rtoInitial,
	}
}

// setNewRTT takes a newly measured RTT then adjust the RTO in msec.
func (m *rtoManager) setNewRTT(rtt float64) float64 {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	if m.noUpdate {
		return m.srtt
	}

	if m.srtt == 0 {
		// First measurement
		m.srtt = rtt
		m.rttvar = rtt / 2
	} else {
		// Subsequent rtt measurement
		m.rttvar = (1-rtoBeta)*m.rttvar + rtoBeta*(math.Abs(m.srtt-rtt))
		m.srtt = (1-rtoAlpha)*m.srtt + rtoAlpha*rtt
	}
	m.rto = math.Min(math.Max(m.srtt+4*m.rttvar, rtoMin), rtoMax)
	return m.srtt
}

// getRTO simply returns the current RTO in msec.
func (m *rtoManager) getRTO() float64 {
	m.mutex.RLock()
	defer m.mutex.RUnlock()

	return m.rto
}

// reset resets the RTO variables to the initial values.
func (m *rtoManager) reset() {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	if m.noUpdate {
		return
	}

	m.srtt = 0
	m.rttvar = 0
	m.rto = rtoInitial
}

// set RTO value for testing
func (m *rtoManager) setRTO(rto float64, noUpdate bool) {
	m.mutex.Lock()
	defer m.mutex.Unlock()

	m.rto = rto
	m.noUpdate = noUpdate
}

// rtxTimerObserver is the inteface to a timer observer.
// NOTE: Observers MUST NOT call start() or stop() method on rtxTimer
// from within these callbacks.
type rtxTimerObserver interface {
	onRetransmissionTimeout(timerID int, n uint)
	onRetransmissionFailure(timerID int)
}

// rtxTimer provides the retnransmission timer conforms with RFC 4960 Sec 6.3.1
type rtxTimer struct {
	id         int
	observer   rtxTimerObserver
	maxRetrans uint
	stopFunc   stopTimerLoop
	closed     bool
	mutex      sync.RWMutex
}

type stopTimerLoop func()

// newRTXTimer creates a new retransmission timer.
// if maxRetrans is set to 0, it will keep retransmitting until stop() is called.
// (it will never make onRetransmissionFailure() callback.
func newRTXTimer(id int, observer rtxTimerObserver, maxRetrans uint) *rtxTimer {
	return &rtxTimer{
		id:         id,
		observer:   observer,
		maxRetrans: maxRetrans,
	}
}

// start starts the timer.
func (t *rtxTimer) start(rto float64) bool {
	t.mutex.Lock()
	defer t.mutex.Unlock()

	// this timer is already closed
	if t.closed {
		return false
	}

	// this is a noop if the timer is always running
	if t.stopFunc != nil {
		return false
	}

	// Note: rto value is intentionally not capped by RTO.Min to allow
	// fast timeout for the tests. Non-test code should pass in the
	// rto generated by rtoManager getRTO() method which caps the
	// value at RTO.Min or at RTO.Max.
	var nRtos uint

	cancelCh := make(chan struct{})

	go func() {
		canceling := false

		for !canceling {
			timeout := calculateNextTimeout(rto, nRtos)
			timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)

			select {
			case <-timer.C:
				nRtos++
				if t.maxRetrans == 0 || nRtos <= t.maxRetrans {
					t.observer.onRetransmissionTimeout(t.id, nRtos)
				} else {
					t.stop()
					t.observer.onRetransmissionFailure(t.id)
				}
			case <-cancelCh:
				canceling = true
				timer.Stop()
			}
		}
	}()

	t.stopFunc = func() {
		close(cancelCh)
	}

	return true
}

// stop stops the timer.
func (t *rtxTimer) stop() {
	t.mutex.Lock()
	defer t.mutex.Unlock()

	if t.stopFunc != nil {
		t.stopFunc()
		t.stopFunc = nil
	}
}

// closes the timer. this is similar to stop() but subsequent start() call
// will fail (the timer is no longer usable)
func (t *rtxTimer) close() {
	t.mutex.Lock()
	defer t.mutex.Unlock()

	if t.stopFunc != nil {
		t.stopFunc()
		t.stopFunc = nil
	}

	t.closed = true
}

// isRunning tests if the timer is running.
// Debug purpose only
func (t *rtxTimer) isRunning() bool {
	t.mutex.RLock()
	defer t.mutex.RUnlock()

	return (t.stopFunc != nil)
}

func calculateNextTimeout(rto float64, nRtos uint) float64 {
	// RFC 4096 sec 6.3.3.  Handle T3-rtx Expiration
	//   E2)  For the destination address for which the timer expires, set RTO
	//        <- RTO * 2 ("back off the timer").  The maximum value discussed
	//        in rule C7 above (RTO.max) may be used to provide an upper bound
	//        to this doubling operation.
	if nRtos < 31 {
		m := 1 << nRtos
		return math.Min(rto*float64(m), rtoMax)
	}
	return rtoMax
}