summaryrefslogtreecommitdiff
path: root/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/apparentlymart/go-openvpn-mgmt/demux')
-rw-r--r--vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go63
-rw-r--r--vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go218
-rw-r--r--vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go12
3 files changed, 293 insertions, 0 deletions
diff --git a/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go
new file mode 100644
index 0000000..c57964e
--- /dev/null
+++ b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer.go
@@ -0,0 +1,63 @@
+package demux
+
+import (
+ "bufio"
+ "io"
+)
+
+var readErrSynthEvent = []byte("FATAL:Error reading from OpenVPN")
+
+// Demultiplex reads from the given io.Reader, assumed to be the client
+// end of an OpenVPN Management Protocol connection, and splits it into
+// distinct messages from OpenVPN.
+//
+// It then writes the raw message buffers into either replyCh or eventCh
+// depending on whether each message is a reply to a client command or
+// an asynchronous event notification.
+//
+// The buffers written to replyCh are entire raw message lines (without the
+// trailing newlines), while the buffers written to eventCh are the raw
+// event strings with the prototcol's leading '>' indicator omitted.
+//
+// The caller should usually provide buffered channels of sufficient buffer
+// depth so that the reply channel will not be starved by slow event
+// processing.
+//
+// Once the io.Reader signals EOF, eventCh will be closed, then replyCh
+// will be closed, and then this function will return.
+//
+// As a special case, if a non-EOF error occurs while reading from the
+// io.Reader then a synthetic "FATAL" event will be written to eventCh
+// before the two buffers are closed and the function returns. This
+// synthetic message will have the error message "Error reading from OpenVPN".
+func Demultiplex(r io.Reader, replyCh, eventCh chan<- []byte) {
+ scanner := bufio.NewScanner(r)
+ for scanner.Scan() {
+ buf := scanner.Bytes()
+
+ if len(buf) < 1 {
+ // Should never happen but we'll be robust and ignore this,
+ // rather than crashing below.
+ continue
+ }
+
+ // Asynchronous messages always start with > to differentiate
+ // them from replies.
+ if buf[0] == '>' {
+ // Trim off the > when we post the message, since it's
+ // redundant after we've demuxed.
+ eventCh <- buf[1:]
+ } else {
+ replyCh <- buf
+ }
+ }
+
+ if err := scanner.Err(); err != nil {
+ // Generate a synthetic FATAL event so that the caller can
+ // see that the connection was not gracefully closed.
+ eventCh <- readErrSynthEvent
+ }
+
+ close(eventCh)
+ close(replyCh)
+}
diff --git a/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go
new file mode 100644
index 0000000..45edac6
--- /dev/null
+++ b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/demuxer_test.go
@@ -0,0 +1,218 @@
+package demux
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "reflect"
+ "testing"
+)
+
+func TestDemultiplex(t *testing.T) {
+ type TestCase struct {
+ Input []string
+ ExpectedReplies []string
+ ExpectedEvents []string
+ }
+
+ testCases := []TestCase{
+ {
+ Input: []string{},
+ ExpectedReplies: []string{},
+ ExpectedEvents: []string{},
+ },
+ {
+ Input: []string{
+ "SUCCESS: foo bar baz",
+ },
+ ExpectedReplies: []string{
+ "SUCCESS: foo bar baz",
+ },
+ ExpectedEvents: []string{},
+ },
+ {
+ Input: []string{
+ ">STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ },
+ ExpectedReplies: []string{},
+ ExpectedEvents: []string{
+ "STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ },
+ },
+ {
+ Input: []string{
+ ">STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ ">STATE:5678,ASSIGN_IP,,10.0.0.1,",
+ ">STATE:9012,ASSIGN_IP,,10.0.0.1,",
+ },
+ ExpectedReplies: []string{},
+ ExpectedEvents: []string{
+ "STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ "STATE:5678,ASSIGN_IP,,10.0.0.1,",
+ "STATE:9012,ASSIGN_IP,,10.0.0.1,",
+ },
+ },
+ {
+ Input: []string{
+ ">STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ "SUCCESS: foo bar baz",
+ ">STATE:5678,ASSIGN_IP,,10.0.0.1,",
+ },
+ ExpectedReplies: []string{
+ "SUCCESS: foo bar baz",
+ },
+ ExpectedEvents: []string{
+ "STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ "STATE:5678,ASSIGN_IP,,10.0.0.1,",
+ },
+ },
+ {
+ Input: []string{
+ "SUCCESS: foo bar baz",
+ ">STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ "SUCCESS: baz bar foo",
+ },
+ ExpectedReplies: []string{
+ "SUCCESS: foo bar baz",
+ "SUCCESS: baz bar foo",
+ },
+ ExpectedEvents: []string{
+ "STATE:1234,ASSIGN_IP,,10.0.0.1,",
+ },
+ },
+ }
+
+ for i, testCase := range testCases {
+ r := mockReader(testCase.Input)
+ gotReplies, gotEvents := captureMsgs(r)
+
+ if !reflect.DeepEqual(gotReplies, testCase.ExpectedReplies) {
+ t.Errorf(
+ "test %d returned incorrect replies\ngot %#v\nwant %#v",
+ i, gotReplies, testCase.ExpectedReplies,
+ )
+ }
+
+ if !reflect.DeepEqual(gotEvents, testCase.ExpectedEvents) {
+ t.Errorf(
+ "test %d returned incorrect events\ngot %#v\nwant %#v",
+ i, gotEvents, testCase.ExpectedEvents,
+ )
+ }
+ }
+}
+
+func TestDemultiplex_error(t *testing.T) {
+ r := &alwaysErroringReader{}
+
+ gotReplies, gotEvents := captureMsgs(r)
+
+ expectedReplies := []string{}
+ expectedEvents := []string{
+ "FATAL:Error reading from OpenVPN",
+ }
+
+ if !reflect.DeepEqual(gotReplies, expectedReplies) {
+ t.Errorf(
+ "incorrect replies\ngot %#v\nwant %#v",
+ gotReplies, expectedReplies,
+ )
+ }
+
+ if !reflect.DeepEqual(gotEvents, expectedEvents) {
+ t.Errorf(
+ "incorrect events\ngot %#v\nwant %#v",
+ gotEvents, expectedEvents,
+ )
+ }
+}
+
+func mockReader(msgs []string) io.Reader {
+ var buf []byte
+ for _, msg := range msgs {
+ buf = append(buf, []byte(msg)...)
+ buf = append(buf, '\n')
+ }
+ return bytes.NewReader(buf)
+}
+
+func captureMsgs(r io.Reader) (replies, events []string) {
+ replyCh := make(chan []byte)
+ eventCh := make(chan []byte)
+
+ replies = make([]string, 0)
+ events = make([]string, 0)
+
+ go Demultiplex(r, replyCh, eventCh)
+
+ for replyCh != nil || eventCh != nil {
+ select {
+
+ case msg, ok := <-replyCh:
+ if ok {
+ replies = append(replies, string(msg))
+ } else {
+ replyCh = nil
+ }
+
+ case msg, ok := <-eventCh:
+ if ok {
+ events = append(events, string(msg))
+ } else {
+ eventCh = nil
+ }
+
+ }
+
+ }
+
+ return replies, events
+}
+
+type alwaysErroringReader struct{}
+
+func (r *alwaysErroringReader) Read(buf []byte) (int, error) {
+ return 0, fmt.Errorf("mock error")
+}
+
+// Somewhat-contrived example of blocking for a reply while concurrently
+// processing asynchronous events.
+func ExampleDemultiplex() {
+ // In a real caller we would have a net.IPConn as our reader,
+ // but we'll use a bytes reader here as a test.
+ r := bytes.NewReader([]byte(
+ // A reply to a hypothetical command interspersed between
+ // two asynchronous events.
+ ">HOLD:Waiting for hold release\nSUCCESS: foo\n>FATAL:baz\n",
+ ))
+
+ // No strong need for buffering on this channel because usually
+ // a message sender will immediately block waiting for the
+ // associated response message.
+ replyCh := make(chan []byte)
+
+ // Make sure the event channel buffer is deep enough that slow event
+ // processing won't significantly delay synchronous replies. If you
+ // process events quickly, or if you aren't sending any commands
+ // concurrently with acting on events, then this is not so important.
+ eventCh := make(chan []byte, 10)
+
+ // Start demultiplexing the message stream in the background.
+ // This goroutine will exit once the reader signals EOF.
+ go Demultiplex(r, replyCh, eventCh)
+
+ // Some coroutine has sent a hypothetical message to OpenVPN,
+ // and it can directly block until the associated reply arrives.
+ // The events will be concurrently handled by our event loop
+ // below while we wait for the reply to show up.
+ go func() {
+ replyMsgBuf := <-replyCh
+ fmt.Printf("Command reply: %s\n", string(replyMsgBuf))
+ }()
+
+ // Main event loop deals with the async events as they arrive,
+ // independently of any commands that are pending.
+ for msgBuf := range eventCh {
+ fmt.Printf("Event: %s\n", string(msgBuf))
+ }
+}
diff --git a/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go
new file mode 100644
index 0000000..263a267
--- /dev/null
+++ b/vendor/github.com/apparentlymart/go-openvpn-mgmt/demux/doc.go
@@ -0,0 +1,12 @@
+// Package demux implements low-level demultiplexing of the stream of
+// messages sent from OpenVPN on the management channel.
+//
+// OpenVPN's protocol includes two different kinds of message from the OpenVPN
+// process: replies to commands sent by the management client, and asynchronous
+// event notifications.
+//
+// This package's purpose is to split these messages into two separate streams,
+// so that functions executing command/response sequences can just block
+// on the reply channel while an event loop elsewhere deals with any async
+// events that might show up.
+package demux