summaryrefslogtreecommitdiff
path: root/vendor/github.com/pion/ice/v2/gather.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/pion/ice/v2/gather.go')
-rw-r--r--vendor/github.com/pion/ice/v2/gather.go497
1 files changed, 497 insertions, 0 deletions
diff --git a/vendor/github.com/pion/ice/v2/gather.go b/vendor/github.com/pion/ice/v2/gather.go
new file mode 100644
index 0000000..3bc3d77
--- /dev/null
+++ b/vendor/github.com/pion/ice/v2/gather.go
@@ -0,0 +1,497 @@
+package ice
+
+import (
+ "context"
+ "crypto/tls"
+ "errors"
+ "fmt"
+ "net"
+ "reflect"
+ "sync"
+ "time"
+
+ "github.com/pion/dtls/v2"
+ "github.com/pion/logging"
+ "github.com/pion/turn/v2"
+)
+
+const (
+ stunGatherTimeout = time.Second * 5
+)
+
+type closeable interface {
+ Close() error
+}
+
+// Close a net.Conn and log if we have a failure
+func closeConnAndLog(c closeable, log logging.LeveledLogger, msg string) {
+ if c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
+ log.Warnf("Conn is not allocated (%s)", msg)
+ return
+ }
+
+ log.Warnf(msg)
+ if err := c.Close(); err != nil {
+ log.Warnf("Failed to close conn: %v", err)
+ }
+}
+
+// fakePacketConn wraps a net.Conn and emulates net.PacketConn
+type fakePacketConn struct {
+ nextConn net.Conn
+}
+
+func (f *fakePacketConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
+ n, err = f.nextConn.Read(p)
+ addr = f.nextConn.RemoteAddr()
+ return
+}
+func (f *fakePacketConn) Close() error { return f.nextConn.Close() }
+func (f *fakePacketConn) LocalAddr() net.Addr { return f.nextConn.LocalAddr() }
+func (f *fakePacketConn) SetDeadline(t time.Time) error { return f.nextConn.SetDeadline(t) }
+func (f *fakePacketConn) SetReadDeadline(t time.Time) error { return f.nextConn.SetReadDeadline(t) }
+func (f *fakePacketConn) SetWriteDeadline(t time.Time) error { return f.nextConn.SetWriteDeadline(t) }
+func (f *fakePacketConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
+ return f.nextConn.Write(p)
+}
+
+// GatherCandidates initiates the trickle based gathering process.
+func (a *Agent) GatherCandidates() error {
+ var gatherErr error
+
+ if runErr := a.run(a.context(), func(ctx context.Context, agent *Agent) {
+ if a.gatheringState != GatheringStateNew {
+ gatherErr = ErrMultipleGatherAttempted
+ return
+ } else if a.onCandidateHdlr.Load() == nil {
+ gatherErr = ErrNoOnCandidateHandler
+ return
+ }
+
+ a.gatherCandidateCancel() // Cancel previous gathering routine
+ ctx, cancel := context.WithCancel(ctx)
+ a.gatherCandidateCancel = cancel
+
+ go a.gatherCandidates(ctx)
+ }); runErr != nil {
+ return runErr
+ }
+ return gatherErr
+}
+
+func (a *Agent) gatherCandidates(ctx context.Context) {
+ if err := a.setGatheringState(GatheringStateGathering); err != nil {
+ a.log.Warnf("failed to set gatheringState to GatheringStateGathering: %v", err)
+ return
+ }
+
+ var wg sync.WaitGroup
+ for _, t := range a.candidateTypes {
+ switch t {
+ case CandidateTypeHost:
+ wg.Add(1)
+ go func() {
+ a.gatherCandidatesLocal(ctx, a.networkTypes)
+ wg.Done()
+ }()
+ case CandidateTypeServerReflexive:
+ wg.Add(1)
+ go func() {
+ a.gatherCandidatesSrflx(ctx, a.urls, a.networkTypes)
+ wg.Done()
+ }()
+ if a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeServerReflexive {
+ wg.Add(1)
+ go func() {
+ a.gatherCandidatesSrflxMapped(ctx, a.networkTypes)
+ wg.Done()
+ }()
+ }
+ case CandidateTypeRelay:
+ wg.Add(1)
+ go func() {
+ a.gatherCandidatesRelay(ctx, a.urls)
+ wg.Done()
+ }()
+ case CandidateTypePeerReflexive, CandidateTypeUnspecified:
+ }
+ }
+ // Block until all STUN and TURN URLs have been gathered (or timed out)
+ wg.Wait()
+
+ if err := a.setGatheringState(GatheringStateComplete); err != nil {
+ a.log.Warnf("failed to set gatheringState to GatheringStateComplete: %v", err)
+ }
+}
+
+func (a *Agent) gatherCandidatesLocal(ctx context.Context, networkTypes []NetworkType) { //nolint:gocognit
+ networks := map[string]struct{}{}
+ for _, networkType := range networkTypes {
+ if networkType.IsTCP() {
+ networks[tcp] = struct{}{}
+ } else {
+ networks[udp] = struct{}{}
+ }
+ }
+
+ localIPs, err := localInterfaces(a.net, a.interfaceFilter, networkTypes)
+ if err != nil {
+ a.log.Warnf("failed to iterate local interfaces, host candidates will not be gathered %s", err)
+ return
+ }
+
+ for _, ip := range localIPs {
+ mappedIP := ip
+ if a.mDNSMode != MulticastDNSModeQueryAndGather && a.extIPMapper != nil && a.extIPMapper.candidateType == CandidateTypeHost {
+ if _mappedIP, err := a.extIPMapper.findExternalIP(ip.String()); err == nil {
+ mappedIP = _mappedIP
+ } else {
+ a.log.Warnf("1:1 NAT mapping is enabled but no external IP is found for %s\n", ip.String())
+ }
+ }
+
+ address := mappedIP.String()
+ if a.mDNSMode == MulticastDNSModeQueryAndGather {
+ address = a.mDNSName
+ }
+
+ for network := range networks {
+ var port int
+ var conn net.PacketConn
+ var err error
+
+ var tcpType TCPType
+ switch network {
+ case tcp:
+ // Handle ICE TCP passive mode
+
+ a.log.Debugf("GetConn by ufrag: %s\n", a.localUfrag)
+ conn, err = a.tcpMux.GetConnByUfrag(a.localUfrag)
+ if err != nil {
+ if !errors.Is(err, ErrTCPMuxNotInitialized) {
+ a.log.Warnf("error getting tcp conn by ufrag: %s %s %s\n", network, ip, a.localUfrag)
+ }
+ continue
+ }
+ port = conn.LocalAddr().(*net.TCPAddr).Port
+ tcpType = TCPTypePassive
+ // is there a way to verify that the listen address is even
+ // accessible from the current interface.
+ case udp:
+ conn, err = listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: ip, Port: 0})
+ if err != nil {
+ a.log.Warnf("could not listen %s %s\n", network, ip)
+ continue
+ }
+
+ port = conn.LocalAddr().(*net.UDPAddr).Port
+ }
+ hostConfig := CandidateHostConfig{
+ Network: network,
+ Address: address,
+ Port: port,
+ Component: ComponentRTP,
+ TCPType: tcpType,
+ }
+
+ c, err := NewCandidateHost(&hostConfig)
+ if err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
+ continue
+ }
+
+ if a.mDNSMode == MulticastDNSModeQueryAndGather {
+ if err = c.setIP(ip); err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create host candidate: %s %s %d: %v\n", network, mappedIP, port, err))
+ continue
+ }
+ }
+
+ if err := a.addCandidate(ctx, c, conn); err != nil {
+ if closeErr := c.close(); closeErr != nil {
+ a.log.Warnf("Failed to close candidate: %v", closeErr)
+ }
+ a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
+ }
+ }
+ }
+}
+
+func (a *Agent) gatherCandidatesSrflxMapped(ctx context.Context, networkTypes []NetworkType) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ for _, networkType := range networkTypes {
+ if networkType.IsTCP() {
+ continue
+ }
+
+ network := networkType.String()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ conn, err := listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
+ if err != nil {
+ a.log.Warnf("Failed to listen %s: %v\n", network, err)
+ return
+ }
+
+ laddr := conn.LocalAddr().(*net.UDPAddr)
+ mappedIP, err := a.extIPMapper.findExternalIP(laddr.IP.String())
+ if err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("1:1 NAT mapping is enabled but no external IP is found for %s\n", laddr.IP.String()))
+ return
+ }
+
+ srflxConfig := CandidateServerReflexiveConfig{
+ Network: network,
+ Address: mappedIP.String(),
+ Port: laddr.Port,
+ Component: ComponentRTP,
+ RelAddr: laddr.IP.String(),
+ RelPort: laddr.Port,
+ }
+ c, err := NewCandidateServerReflexive(&srflxConfig)
+ if err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n",
+ network,
+ mappedIP.String(),
+ laddr.Port,
+ err))
+ return
+ }
+
+ if err := a.addCandidate(ctx, c, conn); err != nil {
+ if closeErr := c.close(); closeErr != nil {
+ a.log.Warnf("Failed to close candidate: %v", closeErr)
+ }
+ a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
+ }
+ }()
+ }
+}
+
+func (a *Agent) gatherCandidatesSrflx(ctx context.Context, urls []*URL, networkTypes []NetworkType) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ for _, networkType := range networkTypes {
+ if networkType.IsTCP() {
+ continue
+ }
+
+ for i := range urls {
+ wg.Add(1)
+ go func(url URL, network string) {
+ defer wg.Done()
+ hostPort := fmt.Sprintf("%s:%d", url.Host, url.Port)
+ serverAddr, err := a.net.ResolveUDPAddr(network, hostPort)
+ if err != nil {
+ a.log.Warnf("failed to resolve stun host: %s: %v", hostPort, err)
+ return
+ }
+
+ conn, err := listenUDPInPortRange(a.net, a.log, int(a.portmax), int(a.portmin), network, &net.UDPAddr{IP: nil, Port: 0})
+ if err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to listen for %s: %v\n", serverAddr.String(), err))
+ return
+ }
+
+ xoraddr, err := getXORMappedAddr(conn, serverAddr, stunGatherTimeout)
+ if err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("could not get server reflexive address %s %s: %v\n", network, url, err))
+ return
+ }
+
+ ip := xoraddr.IP
+ port := xoraddr.Port
+
+ laddr := conn.LocalAddr().(*net.UDPAddr)
+ srflxConfig := CandidateServerReflexiveConfig{
+ Network: network,
+ Address: ip.String(),
+ Port: port,
+ Component: ComponentRTP,
+ RelAddr: laddr.IP.String(),
+ RelPort: laddr.Port,
+ }
+ c, err := NewCandidateServerReflexive(&srflxConfig)
+ if err != nil {
+ closeConnAndLog(conn, a.log, fmt.Sprintf("Failed to create server reflexive candidate: %s %s %d: %v\n", network, ip, port, err))
+ return
+ }
+
+ if err := a.addCandidate(ctx, c, conn); err != nil {
+ if closeErr := c.close(); closeErr != nil {
+ a.log.Warnf("Failed to close candidate: %v", closeErr)
+ }
+ a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
+ }
+ }(*urls[i], networkType.String())
+ }
+ }
+}
+
+func (a *Agent) gatherCandidatesRelay(ctx context.Context, urls []*URL) { //nolint:gocognit
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ network := NetworkTypeUDP4.String()
+ for i := range urls {
+ switch {
+ case urls[i].Scheme != SchemeTypeTURN && urls[i].Scheme != SchemeTypeTURNS:
+ continue
+ case urls[i].Username == "":
+ a.log.Errorf("Failed to gather relay candidates: %v", ErrUsernameEmpty)
+ return
+ case urls[i].Password == "":
+ a.log.Errorf("Failed to gather relay candidates: %v", ErrPasswordEmpty)
+ return
+ }
+
+ wg.Add(1)
+ go func(url URL) {
+ defer wg.Done()
+ TURNServerAddr := fmt.Sprintf("%s:%d", url.Host, url.Port)
+ var (
+ locConn net.PacketConn
+ err error
+ RelAddr string
+ RelPort int
+ )
+
+ switch {
+ case url.Proto == ProtoTypeUDP && url.Scheme == SchemeTypeTURN:
+ if locConn, err = a.net.ListenPacket(network, "0.0.0.0:0"); err != nil {
+ a.log.Warnf("Failed to listen %s: %v\n", network, err)
+ return
+ }
+
+ RelAddr = locConn.LocalAddr().(*net.UDPAddr).IP.String()
+ RelPort = locConn.LocalAddr().(*net.UDPAddr).Port
+ case a.proxyDialer != nil && url.Proto == ProtoTypeTCP &&
+ (url.Scheme == SchemeTypeTURN || url.Scheme == SchemeTypeTURNS):
+ conn, connectErr := a.proxyDialer.Dial(NetworkTypeTCP4.String(), TURNServerAddr)
+ if connectErr != nil {
+ a.log.Warnf("Failed to Dial TCP Addr %s via proxy dialer: %v\n", TURNServerAddr, connectErr)
+ return
+ }
+
+ RelAddr = conn.LocalAddr().(*net.TCPAddr).IP.String()
+ RelPort = conn.LocalAddr().(*net.TCPAddr).Port
+ locConn = turn.NewSTUNConn(conn)
+
+ case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURN:
+ tcpAddr, connectErr := net.ResolveTCPAddr(NetworkTypeTCP4.String(), TURNServerAddr)
+ if connectErr != nil {
+ a.log.Warnf("Failed to resolve TCP Addr %s: %v\n", TURNServerAddr, connectErr)
+ return
+ }
+
+ conn, connectErr := net.DialTCP(NetworkTypeTCP4.String(), nil, tcpAddr)
+ if connectErr != nil {
+ a.log.Warnf("Failed to Dial TCP Addr %s: %v\n", TURNServerAddr, connectErr)
+ return
+ }
+
+ RelAddr = conn.LocalAddr().(*net.TCPAddr).IP.String()
+ RelPort = conn.LocalAddr().(*net.TCPAddr).Port
+ locConn = turn.NewSTUNConn(conn)
+ case url.Proto == ProtoTypeUDP && url.Scheme == SchemeTypeTURNS:
+ udpAddr, connectErr := net.ResolveUDPAddr(network, TURNServerAddr)
+ if connectErr != nil {
+ a.log.Warnf("Failed to resolve UDP Addr %s: %v\n", TURNServerAddr, connectErr)
+ return
+ }
+
+ conn, connectErr := dtls.Dial(network, udpAddr, &dtls.Config{
+ InsecureSkipVerify: a.insecureSkipVerify, //nolint:gosec
+ })
+ if connectErr != nil {
+ a.log.Warnf("Failed to Dial DTLS Addr %s: %v\n", TURNServerAddr, connectErr)
+ return
+ }
+
+ RelAddr = conn.LocalAddr().(*net.UDPAddr).IP.String()
+ RelPort = conn.LocalAddr().(*net.UDPAddr).Port
+ locConn = &fakePacketConn{conn}
+ case url.Proto == ProtoTypeTCP && url.Scheme == SchemeTypeTURNS:
+ conn, connectErr := tls.Dial(NetworkTypeTCP4.String(), TURNServerAddr, &tls.Config{
+ InsecureSkipVerify: a.insecureSkipVerify, //nolint:gosec
+ })
+ if connectErr != nil {
+ a.log.Warnf("Failed to Dial TLS Addr %s: %v\n", TURNServerAddr, connectErr)
+ return
+ }
+ RelAddr = conn.LocalAddr().(*net.TCPAddr).IP.String()
+ RelPort = conn.LocalAddr().(*net.TCPAddr).Port
+ locConn = turn.NewSTUNConn(conn)
+ default:
+ a.log.Warnf("Unable to handle URL in gatherCandidatesRelay %v\n", url)
+ return
+ }
+
+ client, err := turn.NewClient(&turn.ClientConfig{
+ TURNServerAddr: TURNServerAddr,
+ Conn: locConn,
+ Username: url.Username,
+ Password: url.Password,
+ LoggerFactory: a.loggerFactory,
+ Net: a.net,
+ })
+ if err != nil {
+ closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to build new turn.Client %s %s\n", TURNServerAddr, err))
+ return
+ }
+
+ if err = client.Listen(); err != nil {
+ client.Close()
+ closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to listen on turn.Client %s %s\n", TURNServerAddr, err))
+ return
+ }
+
+ relayConn, err := client.Allocate()
+ if err != nil {
+ client.Close()
+ closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to allocate on turn.Client %s %s\n", TURNServerAddr, err))
+ return
+ }
+
+ raddr := relayConn.LocalAddr().(*net.UDPAddr)
+ relayConfig := CandidateRelayConfig{
+ Network: network,
+ Component: ComponentRTP,
+ Address: raddr.IP.String(),
+ Port: raddr.Port,
+ RelAddr: RelAddr,
+ RelPort: RelPort,
+ OnClose: func() error {
+ client.Close()
+ return locConn.Close()
+ },
+ }
+ relayConnClose := func() {
+ if relayConErr := relayConn.Close(); relayConErr != nil {
+ a.log.Warnf("Failed to close relay %v", relayConErr)
+ }
+ }
+ candidate, err := NewCandidateRelay(&relayConfig)
+ if err != nil {
+ relayConnClose()
+
+ client.Close()
+ closeConnAndLog(locConn, a.log, fmt.Sprintf("Failed to create relay candidate: %s %s: %v\n", network, raddr.String(), err))
+ return
+ }
+
+ if err := a.addCandidate(ctx, candidate, relayConn); err != nil {
+ relayConnClose()
+
+ if closeErr := candidate.close(); closeErr != nil {
+ a.log.Warnf("Failed to close candidate: %v", closeErr)
+ }
+ a.log.Warnf("Failed to append to localCandidates and run onCandidateHdlr: %v\n", err)
+ }
+ }(*urls[i])
+ }
+}