diff options
author | Kali Kaneko (leap communications) <kali@leap.se> | 2019-01-12 18:39:45 +0100 |
---|---|---|
committer | Ruben Pollan <meskio@sindominio.net> | 2019-01-17 12:30:32 +0100 |
commit | b1247d2d0d51108c910a73891ff3116e5f032ab1 (patch) | |
tree | e9948964f0bfb1ad2df3bc7bad02aa1f41ccfbd8 /vendor/github.com/apparentlymart/go-openvpn-mgmt/demux | |
parent | efcb8312e31b5c2261b1a1e95ace55b322cfcc27 (diff) |
[pkg] all your deps are vendored to us
Diffstat (limited to 'vendor/github.com/apparentlymart/go-openvpn-mgmt/demux')
3 files changed, 293 insertions, 0 deletions
diff --git a/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go new file mode 100644 index 0000000..c57964e --- /dev/null +++ b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go @@ -0,0 +1,63 @@ +package demux + +import ( + "bufio" + "io" +) + +var readErrSynthEvent = []byte("FATAL:Error reading from OpenVPN") + +// Demultiplex reads from the given io.Reader, assumed to be the client +// end of an OpenVPN Management Protocol connection, and splits it into +// distinct messages from OpenVPN. +// +// It then writes the raw message buffers into either replyCh or eventCh +// depending on whether each message is a reply to a client command or +// an asynchronous event notification. +// +// The buffers written to replyCh are entire raw message lines (without the +// trailing newlines), while the buffers written to eventCh are the raw +// event strings with the prototcol's leading '>' indicator omitted. +// +// The caller should usually provide buffered channels of sufficient buffer +// depth so that the reply channel will not be starved by slow event +// processing. +// +// Once the io.Reader signals EOF, eventCh will be closed, then replyCh +// will be closed, and then this function will return. +// +// As a special case, if a non-EOF error occurs while reading from the +// io.Reader then a synthetic "FATAL" event will be written to eventCh +// before the two buffers are closed and the function returns. This +// synthetic message will have the error message "Error reading from OpenVPN". +func Demultiplex(r io.Reader, replyCh, eventCh chan<- []byte) { + scanner := bufio.NewScanner(r) + for scanner.Scan() { + buf := scanner.Bytes() + + if len(buf) < 1 { + // Should never happen but we'll be robust and ignore this, + // rather than crashing below. + continue + } + + // Asynchronous messages always start with > to differentiate + // them from replies. + if buf[0] == '>' { + // Trim off the > when we post the message, since it's + // redundant after we've demuxed. + eventCh <- buf[1:] + } else { + replyCh <- buf + } + } + + if err := scanner.Err(); err != nil { + // Generate a synthetic FATAL event so that the caller can + // see that the connection was not gracefully closed. + eventCh <- readErrSynthEvent + } + + close(eventCh) + close(replyCh) +} diff --git a/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go new file mode 100644 index 0000000..45edac6 --- /dev/null +++ b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go @@ -0,0 +1,218 @@ +package demux + +import ( + "bytes" + "fmt" + "io" + "reflect" + "testing" +) + +func TestDemultiplex(t *testing.T) { + type TestCase struct { + Input []string + ExpectedReplies []string + ExpectedEvents []string + } + + testCases := []TestCase{ + { + Input: []string{}, + ExpectedReplies: []string{}, + ExpectedEvents: []string{}, + }, + { + Input: []string{ + "SUCCESS: foo bar baz", + }, + ExpectedReplies: []string{ + "SUCCESS: foo bar baz", + }, + ExpectedEvents: []string{}, + }, + { + Input: []string{ + ">STATE:1234,ASSIGN_IP,,10.0.0.1,", + }, + ExpectedReplies: []string{}, + ExpectedEvents: []string{ + "STATE:1234,ASSIGN_IP,,10.0.0.1,", + }, + }, + { + Input: []string{ + ">STATE:1234,ASSIGN_IP,,10.0.0.1,", + ">STATE:5678,ASSIGN_IP,,10.0.0.1,", + ">STATE:9012,ASSIGN_IP,,10.0.0.1,", + }, + ExpectedReplies: []string{}, + ExpectedEvents: []string{ + "STATE:1234,ASSIGN_IP,,10.0.0.1,", + "STATE:5678,ASSIGN_IP,,10.0.0.1,", + "STATE:9012,ASSIGN_IP,,10.0.0.1,", + }, + }, + { + Input: []string{ + ">STATE:1234,ASSIGN_IP,,10.0.0.1,", + "SUCCESS: foo bar baz", + ">STATE:5678,ASSIGN_IP,,10.0.0.1,", + }, + ExpectedReplies: []string{ + "SUCCESS: foo bar baz", + }, + ExpectedEvents: []string{ + "STATE:1234,ASSIGN_IP,,10.0.0.1,", + "STATE:5678,ASSIGN_IP,,10.0.0.1,", + }, + }, + { + Input: []string{ + "SUCCESS: foo bar baz", + ">STATE:1234,ASSIGN_IP,,10.0.0.1,", + "SUCCESS: baz bar foo", + }, + ExpectedReplies: []string{ + "SUCCESS: foo bar baz", + "SUCCESS: baz bar foo", + }, + ExpectedEvents: []string{ + "STATE:1234,ASSIGN_IP,,10.0.0.1,", + }, + }, + } + + for i, testCase := range testCases { + r := mockReader(testCase.Input) + gotReplies, gotEvents := captureMsgs(r) + + if !reflect.DeepEqual(gotReplies, testCase.ExpectedReplies) { + t.Errorf( + "test %d returned incorrect replies\ngot %#v\nwant %#v", + i, gotReplies, testCase.ExpectedReplies, + ) + } + + if !reflect.DeepEqual(gotEvents, testCase.ExpectedEvents) { + t.Errorf( + "test %d returned incorrect events\ngot %#v\nwant %#v", + i, gotEvents, testCase.ExpectedEvents, + ) + } + } +} + +func TestDemultiplex_error(t *testing.T) { + r := &alwaysErroringReader{} + + gotReplies, gotEvents := captureMsgs(r) + + expectedReplies := []string{} + expectedEvents := []string{ + "FATAL:Error reading from OpenVPN", + } + + if !reflect.DeepEqual(gotReplies, expectedReplies) { + t.Errorf( + "incorrect replies\ngot %#v\nwant %#v", + gotReplies, expectedReplies, + ) + } + + if !reflect.DeepEqual(gotEvents, expectedEvents) { + t.Errorf( + "incorrect events\ngot %#v\nwant %#v", + gotEvents, expectedEvents, + ) + } +} + +func mockReader(msgs []string) io.Reader { + var buf []byte + for _, msg := range msgs { + buf = append(buf, []byte(msg)...) + buf = append(buf, '\n') + } + return bytes.NewReader(buf) +} + +func captureMsgs(r io.Reader) (replies, events []string) { + replyCh := make(chan []byte) + eventCh := make(chan []byte) + + replies = make([]string, 0) + events = make([]string, 0) + + go Demultiplex(r, replyCh, eventCh) + + for replyCh != nil || eventCh != nil { + select { + + case msg, ok := <-replyCh: + if ok { + replies = append(replies, string(msg)) + } else { + replyCh = nil + } + + case msg, ok := <-eventCh: + if ok { + events = append(events, string(msg)) + } else { + eventCh = nil + } + + } + + } + + return replies, events +} + +type alwaysErroringReader struct{} + +func (r *alwaysErroringReader) Read(buf []byte) (int, error) { + return 0, fmt.Errorf("mock error") +} + +// Somewhat-contrived example of blocking for a reply while concurrently +// processing asynchronous events. +func ExampleDemultiplex() { + // In a real caller we would have a net.IPConn as our reader, + // but we'll use a bytes reader here as a test. + r := bytes.NewReader([]byte( + // A reply to a hypothetical command interspersed between + // two asynchronous events. + ">HOLD:Waiting for hold release\nSUCCESS: foo\n>FATAL:baz\n", + )) + + // No strong need for buffering on this channel because usually + // a message sender will immediately block waiting for the + // associated response message. + replyCh := make(chan []byte) + + // Make sure the event channel buffer is deep enough that slow event + // processing won't significantly delay synchronous replies. If you + // process events quickly, or if you aren't sending any commands + // concurrently with acting on events, then this is not so important. + eventCh := make(chan []byte, 10) + + // Start demultiplexing the message stream in the background. + // This goroutine will exit once the reader signals EOF. + go Demultiplex(r, replyCh, eventCh) + + // Some coroutine has sent a hypothetical message to OpenVPN, + // and it can directly block until the associated reply arrives. + // The events will be concurrently handled by our event loop + // below while we wait for the reply to show up. + go func() { + replyMsgBuf := <-replyCh + fmt.Printf("Command reply: %s\n", string(replyMsgBuf)) + }() + + // Main event loop deals with the async events as they arrive, + // independently of any commands that are pending. + for msgBuf := range eventCh { + fmt.Printf("Event: %s\n", string(msgBuf)) + } +} diff --git a/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go new file mode 100644 index 0000000..263a267 --- /dev/null +++ b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go @@ -0,0 +1,12 @@ +// Package demux implements low-level demultiplexing of the stream of +// messages sent from OpenVPN on the management channel. +// +// OpenVPN's protocol includes two different kinds of message from the OpenVPN +// process: replies to commands sent by the management client, and asynchronous +// event notifications. +// +// This package's purpose is to split these messages into two separate streams, +// so that functions executing command/response sequences can just block +// on the reply channel while an event loop elsewhere deals with any async +// events that might show up. +package demux |