summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/webrtc/v3/operations.go
blob: 82bc832244a2eee5aba642ed886ecfe2b45c7620 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package webrtc

import (
	"sync"
)

// Operation is a function
type operation func()

// Operations is a task executor.
type operations struct {
	mu   sync.Mutex
	busy bool
	ops  []operation
}

func newOperations() *operations {
	return &operations{}
}

// Enqueue adds a new action to be executed. If there are no actions scheduled,
// the execution will start immediately in a new goroutine.
func (o *operations) Enqueue(op operation) {
	if op == nil {
		return
	}

	o.mu.Lock()
	running := o.busy
	o.ops = append(o.ops, op)
	o.busy = true
	o.mu.Unlock()

	if !running {
		go o.start()
	}
}

// IsEmpty checks if there are tasks in the queue
func (o *operations) IsEmpty() bool {
	o.mu.Lock()
	defer o.mu.Unlock()
	return len(o.ops) == 0
}

// Done blocks until all currently enqueued operations are finished executing.
// For more complex synchronization, use Enqueue directly.
func (o *operations) Done() {
	var wg sync.WaitGroup
	wg.Add(1)
	o.Enqueue(func() {
		wg.Done()
	})
	wg.Wait()
}

func (o *operations) pop() func() {
	o.mu.Lock()
	defer o.mu.Unlock()
	if len(o.ops) == 0 {
		return nil
	}

	fn := o.ops[0]
	o.ops = o.ops[1:]
	return fn
}

func (o *operations) start() {
	defer func() {
		o.mu.Lock()
		defer o.mu.Unlock()
		if len(o.ops) == 0 {
			o.busy = false
			return
		}
		// either a new operation was enqueued while we
		// were busy, or an operation panicked
		go o.start()
	}()

	fn := o.pop()
	for fn != nil {
		fn()
		fn = o.pop()
	}
}