summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/transport/packetio/buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/transport/packetio/buffer.go')
-rw-r--r--vendor/github.com/pion/transport/packetio/buffer.go347
1 files changed, 347 insertions, 0 deletions
diff --git a/vendor/github.com/pion/transport/packetio/buffer.go b/vendor/github.com/pion/transport/packetio/buffer.go
new file mode 100644
index 0000000..97d86f8
--- /dev/null
+++ b/vendor/github.com/pion/transport/packetio/buffer.go
@@ -0,0 +1,347 @@
+// Package packetio provides packet buffer
+package packetio
+
+import (
+ "errors"
+ "io"
+ "sync"
+ "time"
+
+ "github.com/pion/transport/deadline"
+)
+
+var errPacketTooBig = errors.New("packet too big")
+
+// BufferPacketType allow the Buffer to know which packet protocol is writing.
+type BufferPacketType int
+
+const (
+ // RTPBufferPacket indicates the Buffer that is handling RTP packets
+ RTPBufferPacket BufferPacketType = 1
+ // RTCPBufferPacket indicates the Buffer that is handling RTCP packets
+ RTCPBufferPacket BufferPacketType = 2
+)
+
+// Buffer allows writing packets to an intermediate buffer, which can then be read form.
+// This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read.
+type Buffer struct {
+ mutex sync.Mutex
+
+ // this is a circular buffer. If head <= tail, then the useful
+ // data is in the interval [head, tail[. If tail < head, then
+ // the useful data is the union of [head, len[ and [0, tail[.
+ // In order to avoid ambiguity when head = tail, we always leave
+ // an unused byte in the buffer.
+ data []byte
+ head, tail int
+
+ notify chan struct{}
+ subs bool
+ closed bool
+
+ count int
+ limitCount, limitSize int
+
+ readDeadline *deadline.Deadline
+}
+
+const (
+ minSize = 2048
+ cutoffSize = 128 * 1024
+ maxSize = 4 * 1024 * 1024
+)
+
+// NewBuffer creates a new Buffer.
+func NewBuffer() *Buffer {
+ return &Buffer{
+ notify: make(chan struct{}),
+ readDeadline: deadline.New(),
+ }
+}
+
+// available returns true if the buffer is large enough to fit a packet
+// of the given size, taking overhead into account.
+func (b *Buffer) available(size int) bool {
+ available := b.head - b.tail
+ if available <= 0 {
+ available += len(b.data)
+ }
+ // we interpret head=tail as empty, so always keep a byte free
+ if size+2+1 > available {
+ return false
+ }
+
+ return true
+}
+
+// grow increases the size of the buffer. If it returns nil, then the
+// buffer has been grown. It returns ErrFull if hits a limit.
+func (b *Buffer) grow() error {
+ var newsize int
+ if len(b.data) < cutoffSize {
+ newsize = 2 * len(b.data)
+ } else {
+ newsize = 5 * len(b.data) / 4
+ }
+ if newsize < minSize {
+ newsize = minSize
+ }
+ if (b.limitSize <= 0 || sizeHardlimit) && newsize > maxSize {
+ newsize = maxSize
+ }
+
+ // one byte slack
+ if b.limitSize > 0 && newsize > b.limitSize+1 {
+ newsize = b.limitSize + 1
+ }
+
+ if newsize <= len(b.data) {
+ return ErrFull
+ }
+
+ newdata := make([]byte, newsize)
+
+ var n int
+ if b.head <= b.tail {
+ // data was contiguous
+ n = copy(newdata, b.data[b.head:b.tail])
+ } else {
+ // data was discontiguous
+ n = copy(newdata, b.data[b.head:])
+ n += copy(newdata[n:], b.data[:b.tail])
+ }
+ b.head = 0
+ b.tail = n
+ b.data = newdata
+
+ return nil
+}
+
+// Write appends a copy of the packet data to the buffer.
+// Returns ErrFull if the packet doesn't fit.
+//
+// Note that the packet size is limited to 65536 bytes since v0.11.0 due to the internal data structure.
+func (b *Buffer) Write(packet []byte) (int, error) {
+ if len(packet) >= 0x10000 {
+ return 0, errPacketTooBig
+ }
+
+ b.mutex.Lock()
+
+ if b.closed {
+ b.mutex.Unlock()
+ return 0, io.ErrClosedPipe
+ }
+
+ if (b.limitCount > 0 && b.count >= b.limitCount) ||
+ (b.limitSize > 0 && b.size()+2+len(packet) > b.limitSize) {
+ b.mutex.Unlock()
+ return 0, ErrFull
+ }
+
+ // grow the buffer until the packet fits
+ for !b.available(len(packet)) {
+ err := b.grow()
+ if err != nil {
+ b.mutex.Unlock()
+ return 0, err
+ }
+ }
+
+ var notify chan struct{}
+
+ if b.subs {
+ // readers are waiting. Prepare to notify, but only
+ // actually do it after we release the lock.
+ notify = b.notify
+ b.notify = make(chan struct{})
+ b.subs = false
+ }
+
+ // store the length of the packet
+ b.data[b.tail] = uint8(len(packet) >> 8)
+ b.tail++
+ if b.tail >= len(b.data) {
+ b.tail = 0
+ }
+ b.data[b.tail] = uint8(len(packet))
+ b.tail++
+ if b.tail >= len(b.data) {
+ b.tail = 0
+ }
+
+ // store the packet
+ n := copy(b.data[b.tail:], packet)
+ b.tail += n
+ if b.tail >= len(b.data) {
+ // we reached the end, wrap around
+ m := copy(b.data, packet[n:])
+ b.tail = m
+ }
+ b.count++
+ b.mutex.Unlock()
+
+ if notify != nil {
+ close(notify)
+ }
+
+ return len(packet), nil
+}
+
+// Read populates the given byte slice, returning the number of bytes read.
+// Blocks until data is available or the buffer is closed.
+// Returns io.ErrShortBuffer is the packet is too small to copy the Write.
+// Returns io.EOF if the buffer is closed.
+func (b *Buffer) Read(packet []byte) (n int, err error) {
+ // Return immediately if the deadline is already exceeded.
+ select {
+ case <-b.readDeadline.Done():
+ return 0, &netError{ErrTimeout, true, true}
+ default:
+ }
+
+ for {
+ b.mutex.Lock()
+
+ if b.head != b.tail {
+ // decode the packet size
+ n1 := b.data[b.head]
+ b.head++
+ if b.head >= len(b.data) {
+ b.head = 0
+ }
+ n2 := b.data[b.head]
+ b.head++
+ if b.head >= len(b.data) {
+ b.head = 0
+ }
+ count := int((uint16(n1) << 8) | uint16(n2))
+
+ // determine the number of bytes we'll actually copy
+ copied := count
+ if copied > len(packet) {
+ copied = len(packet)
+ }
+
+ // copy the data
+ if b.head+copied < len(b.data) {
+ copy(packet, b.data[b.head:b.head+copied])
+ } else {
+ k := copy(packet, b.data[b.head:])
+ copy(packet[k:], b.data[:copied-k])
+ }
+
+ // advance head, discarding any data that wasn't copied
+ b.head += count
+ if b.head >= len(b.data) {
+ b.head -= len(b.data)
+ }
+
+ if b.head == b.tail {
+ // the buffer is empty, reset to beginning
+ // in order to improve cache locality.
+ b.head = 0
+ b.tail = 0
+ }
+
+ b.count--
+
+ b.mutex.Unlock()
+
+ if copied < count {
+ return copied, io.ErrShortBuffer
+ }
+ return copied, nil
+ }
+
+ if b.closed {
+ b.mutex.Unlock()
+ return 0, io.EOF
+ }
+
+ notify := b.notify
+ b.subs = true
+ b.mutex.Unlock()
+
+ select {
+ case <-b.readDeadline.Done():
+ return 0, &netError{ErrTimeout, true, true}
+ case <-notify:
+ }
+ }
+}
+
+// Close the buffer, unblocking any pending reads.
+// Data in the buffer can still be read, Read will return io.EOF only when empty.
+func (b *Buffer) Close() (err error) {
+ b.mutex.Lock()
+
+ if b.closed {
+ b.mutex.Unlock()
+ return nil
+ }
+
+ notify := b.notify
+ b.closed = true
+
+ b.mutex.Unlock()
+
+ close(notify)
+
+ return nil
+}
+
+// Count returns the number of packets in the buffer.
+func (b *Buffer) Count() int {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+ return b.count
+}
+
+// SetLimitCount controls the maximum number of packets that can be buffered.
+// Causes Write to return ErrFull when this limit is reached.
+// A zero value will disable this limit.
+func (b *Buffer) SetLimitCount(limit int) {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ b.limitCount = limit
+}
+
+// Size returns the total byte size of packets in the buffer, including
+// a small amount of administrative overhead.
+func (b *Buffer) Size() int {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ return b.size()
+}
+
+func (b *Buffer) size() int {
+ size := b.tail - b.head
+ if size < 0 {
+ size += len(b.data)
+ }
+ return size
+}
+
+// SetLimitSize controls the maximum number of bytes that can be buffered.
+// Causes Write to return ErrFull when this limit is reached.
+// A zero value means 4MB since v0.11.0.
+//
+// User can set packetioSizeHardlimit build tag to enable 4MB hardlimit.
+// When packetioSizeHardlimit build tag is set, SetLimitSize exceeding
+// the hardlimit will be silently discarded.
+func (b *Buffer) SetLimitSize(limit int) {
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
+
+ b.limitSize = limit
+}
+
+// SetReadDeadline sets the deadline for the Read operation.
+// Setting to zero means no deadline.
+func (b *Buffer) SetReadDeadline(t time.Time) error {
+ b.readDeadline.Set(t)
+ return nil
+}