summaryrefslogtreecommitdiff
path: root/vendor/github.com/klauspost/reedsolomon/streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/reedsolomon/streaming.go')
-rw-r--r--vendor/github.com/klauspost/reedsolomon/streaming.go603
1 files changed, 603 insertions, 0 deletions
diff --git a/vendor/github.com/klauspost/reedsolomon/streaming.go b/vendor/github.com/klauspost/reedsolomon/streaming.go
new file mode 100644
index 0000000..d048ba0
--- /dev/null
+++ b/vendor/github.com/klauspost/reedsolomon/streaming.go
@@ -0,0 +1,603 @@
+/**
+ * Reed-Solomon Coding over 8-bit values.
+ *
+ * Copyright 2015, Klaus Post
+ * Copyright 2015, Backblaze, Inc.
+ */
+
+package reedsolomon
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "sync"
+)
+
+// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
+// It provides a fully streaming interface, and processes data in blocks of up to 4MB.
+//
+// For small shard sizes, 10MB and below, it is recommended to use the in-memory interface,
+// since the streaming interface has a start up overhead.
+//
+// For all operations, no readers and writers should not assume any order/size of
+// individual reads/writes.
+//
+// For usage examples, see "stream-encoder.go" and "streamdecoder.go" in the examples
+// folder.
+type StreamEncoder interface {
+ // Encode parity shards for a set of data shards.
+ //
+ // Input is 'shards' containing readers for data shards followed by parity shards
+ // io.Writer.
+ //
+ // The number of shards must match the number given to NewStream().
+ //
+ // Each reader must supply the same number of bytes.
+ //
+ // The parity shards will be written to the writer.
+ // The number of bytes written will match the input size.
+ //
+ // If a data stream returns an error, a StreamReadError type error
+ // will be returned. If a parity writer returns an error, a
+ // StreamWriteError will be returned.
+ Encode(data []io.Reader, parity []io.Writer) error
+
+ // Verify returns true if the parity shards contain correct data.
+ //
+ // The number of shards must match the number total data+parity shards
+ // given to NewStream().
+ //
+ // Each reader must supply the same number of bytes.
+ // If a shard stream returns an error, a StreamReadError type error
+ // will be returned.
+ Verify(shards []io.Reader) (bool, error)
+
+ // Reconstruct will recreate the missing shards if possible.
+ //
+ // Given a list of valid shards (to read) and invalid shards (to write)
+ //
+ // You indicate that a shard is missing by setting it to nil in the 'valid'
+ // slice and at the same time setting a non-nil writer in "fill".
+ // An index cannot contain both non-nil 'valid' and 'fill' entry.
+ // If both are provided 'ErrReconstructMismatch' is returned.
+ //
+ // If there are too few shards to reconstruct the missing
+ // ones, ErrTooFewShards will be returned.
+ //
+ // The reconstructed shard set is complete, but integrity is not verified.
+ // Use the Verify function to check if data set is ok.
+ Reconstruct(valid []io.Reader, fill []io.Writer) error
+
+ // Split a an input stream into the number of shards given to the encoder.
+ //
+ // The data will be split into equally sized shards.
+ // If the data size isn't dividable by the number of shards,
+ // the last shard will contain extra zeros.
+ //
+ // You must supply the total size of your input.
+ // 'ErrShortData' will be returned if it is unable to retrieve the
+ // number of bytes indicated.
+ Split(data io.Reader, dst []io.Writer, size int64) (err error)
+
+ // Join the shards and write the data segment to dst.
+ //
+ // Only the data shards are considered.
+ //
+ // You must supply the exact output size you want.
+ // If there are to few shards given, ErrTooFewShards will be returned.
+ // If the total data size is less than outSize, ErrShortData will be returned.
+ Join(dst io.Writer, shards []io.Reader, outSize int64) error
+}
+
+// StreamReadError is returned when a read error is encountered
+// that relates to a supplied stream.
+// This will allow you to find out which reader has failed.
+type StreamReadError struct {
+ Err error // The error
+ Stream int // The stream number on which the error occurred
+}
+
+// Error returns the error as a string
+func (s StreamReadError) Error() string {
+ return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
+}
+
+// String returns the error as a string
+func (s StreamReadError) String() string {
+ return s.Error()
+}
+
+// StreamWriteError is returned when a write error is encountered
+// that relates to a supplied stream. This will allow you to
+// find out which reader has failed.
+type StreamWriteError struct {
+ Err error // The error
+ Stream int // The stream number on which the error occurred
+}
+
+// Error returns the error as a string
+func (s StreamWriteError) Error() string {
+ return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
+}
+
+// String returns the error as a string
+func (s StreamWriteError) String() string {
+ return s.Error()
+}
+
+// rsStream contains a matrix for a specific
+// distribution of datashards and parity shards.
+// Construct if using NewStream()
+type rsStream struct {
+ r *reedSolomon
+ o options
+
+ // Shard reader
+ readShards func(dst [][]byte, in []io.Reader) error
+ // Shard writer
+ writeShards func(out []io.Writer, in [][]byte) error
+
+ blockPool sync.Pool
+}
+
+// NewStream creates a new encoder and initializes it to
+// the number of data shards and parity shards that
+// you want to use. You can reuse this encoder.
+// Note that the maximum number of data shards is 256.
+func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
+ r := rsStream{o: defaultOptions}
+ for _, opt := range o {
+ opt(&r.o)
+ }
+ // Override block size if shard size is set.
+ if r.o.streamBS == 0 && r.o.shardSize > 0 {
+ r.o.streamBS = r.o.shardSize
+ }
+ if r.o.streamBS <= 0 {
+ r.o.streamBS = 4 << 20
+ }
+ if r.o.shardSize == 0 && r.o.maxGoroutines == defaultOptions.maxGoroutines {
+ o = append(o, WithAutoGoroutines(r.o.streamBS))
+ }
+
+ enc, err := New(dataShards, parityShards, o...)
+ if err != nil {
+ return nil, err
+ }
+ r.r = enc.(*reedSolomon)
+
+ r.blockPool.New = func() interface{} {
+ out := make([][]byte, dataShards+parityShards)
+ for i := range out {
+ out[i] = make([]byte, r.o.streamBS)
+ }
+ return out
+ }
+ r.readShards = readShards
+ r.writeShards = writeShards
+ if r.o.concReads {
+ r.readShards = cReadShards
+ }
+ if r.o.concWrites {
+ r.writeShards = cWriteShards
+ }
+
+ return &r, err
+}
+
+// NewStreamC creates a new encoder and initializes it to
+// the number of data shards and parity shards given.
+//
+// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
+func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
+ return NewStream(dataShards, parityShards, append(o, WithConcurrentStreamReads(conReads), WithConcurrentStreamWrites(conWrites))...)
+}
+
+func (r *rsStream) createSlice() [][]byte {
+ out := r.blockPool.Get().([][]byte)
+ for i := range out {
+ out[i] = out[i][:r.o.streamBS]
+ }
+ return out
+}
+
+// Encodes parity shards for a set of data shards.
+//
+// Input is 'shards' containing readers for data shards followed by parity shards
+// io.Writer.
+//
+// The number of shards must match the number given to NewStream().
+//
+// Each reader must supply the same number of bytes.
+//
+// The parity shards will be written to the writer.
+// The number of bytes written will match the input size.
+//
+// If a data stream returns an error, a StreamReadError type error
+// will be returned. If a parity writer returns an error, a
+// StreamWriteError will be returned.
+func (r *rsStream) Encode(data []io.Reader, parity []io.Writer) error {
+ if len(data) != r.r.DataShards {
+ return ErrTooFewShards
+ }
+
+ if len(parity) != r.r.ParityShards {
+ return ErrTooFewShards
+ }
+
+ all := r.createSlice()
+ defer r.blockPool.Put(all)
+ in := all[:r.r.DataShards]
+ out := all[r.r.DataShards:]
+ read := 0
+
+ for {
+ err := r.readShards(in, data)
+ switch err {
+ case nil:
+ case io.EOF:
+ if read == 0 {
+ return ErrShardNoData
+ }
+ return nil
+ default:
+ return err
+ }
+ out = trimShards(out, shardSize(in))
+ read += shardSize(in)
+ err = r.r.Encode(all)
+ if err != nil {
+ return err
+ }
+ err = r.writeShards(parity, out)
+ if err != nil {
+ return err
+ }
+ }
+}
+
+// Trim the shards so they are all the same size
+func trimShards(in [][]byte, size int) [][]byte {
+ for i := range in {
+ if len(in[i]) != 0 {
+ in[i] = in[i][0:size]
+ }
+ if len(in[i]) < size {
+ in[i] = in[i][:0]
+ }
+ }
+ return in
+}
+
+func readShards(dst [][]byte, in []io.Reader) error {
+ if len(in) != len(dst) {
+ panic("internal error: in and dst size do not match")
+ }
+ size := -1
+ for i := range in {
+ if in[i] == nil {
+ dst[i] = dst[i][:0]
+ continue
+ }
+ n, err := io.ReadFull(in[i], dst[i])
+ // The error is EOF only if no bytes were read.
+ // If an EOF happens after reading some but not all the bytes,
+ // ReadFull returns ErrUnexpectedEOF.
+ switch err {
+ case io.ErrUnexpectedEOF, io.EOF:
+ if size < 0 {
+ size = n
+ } else if n != size {
+ // Shard sizes must match.
+ return ErrShardSize
+ }
+ dst[i] = dst[i][0:n]
+ case nil:
+ continue
+ default:
+ return StreamReadError{Err: err, Stream: i}
+ }
+ }
+ if size == 0 {
+ return io.EOF
+ }
+ return nil
+}
+
+func writeShards(out []io.Writer, in [][]byte) error {
+ if len(out) != len(in) {
+ panic("internal error: in and out size do not match")
+ }
+ for i := range in {
+ if out[i] == nil {
+ continue
+ }
+ n, err := out[i].Write(in[i])
+ if err != nil {
+ return StreamWriteError{Err: err, Stream: i}
+ }
+ //
+ if n != len(in[i]) {
+ return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
+ }
+ }
+ return nil
+}
+
+type readResult struct {
+ n int
+ size int
+ err error
+}
+
+// cReadShards reads shards concurrently
+func cReadShards(dst [][]byte, in []io.Reader) error {
+ if len(in) != len(dst) {
+ panic("internal error: in and dst size do not match")
+ }
+ var wg sync.WaitGroup
+ wg.Add(len(in))
+ res := make(chan readResult, len(in))
+ for i := range in {
+ if in[i] == nil {
+ dst[i] = dst[i][:0]
+ wg.Done()
+ continue
+ }
+ go func(i int) {
+ defer wg.Done()
+ n, err := io.ReadFull(in[i], dst[i])
+ // The error is EOF only if no bytes were read.
+ // If an EOF happens after reading some but not all the bytes,
+ // ReadFull returns ErrUnexpectedEOF.
+ res <- readResult{size: n, err: err, n: i}
+
+ }(i)
+ }
+ wg.Wait()
+ close(res)
+ size := -1
+ for r := range res {
+ switch r.err {
+ case io.ErrUnexpectedEOF, io.EOF:
+ if size < 0 {
+ size = r.size
+ } else if r.size != size {
+ // Shard sizes must match.
+ return ErrShardSize
+ }
+ dst[r.n] = dst[r.n][0:r.size]
+ case nil:
+ default:
+ return StreamReadError{Err: r.err, Stream: r.n}
+ }
+ }
+ if size == 0 {
+ return io.EOF
+ }
+ return nil
+}
+
+// cWriteShards writes shards concurrently
+func cWriteShards(out []io.Writer, in [][]byte) error {
+ if len(out) != len(in) {
+ panic("internal error: in and out size do not match")
+ }
+ var errs = make(chan error, len(out))
+ var wg sync.WaitGroup
+ wg.Add(len(out))
+ for i := range in {
+ go func(i int) {
+ defer wg.Done()
+ if out[i] == nil {
+ errs <- nil
+ return
+ }
+ n, err := out[i].Write(in[i])
+ if err != nil {
+ errs <- StreamWriteError{Err: err, Stream: i}
+ return
+ }
+ if n != len(in[i]) {
+ errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
+ }
+ }(i)
+ }
+ wg.Wait()
+ close(errs)
+ for err := range errs {
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+// Verify returns true if the parity shards contain correct data.
+//
+// The number of shards must match the number total data+parity shards
+// given to NewStream().
+//
+// Each reader must supply the same number of bytes.
+// If a shard stream returns an error, a StreamReadError type error
+// will be returned.
+func (r *rsStream) Verify(shards []io.Reader) (bool, error) {
+ if len(shards) != r.r.Shards {
+ return false, ErrTooFewShards
+ }
+
+ read := 0
+ all := r.createSlice()
+ defer r.blockPool.Put(all)
+ for {
+ err := r.readShards(all, shards)
+ if err == io.EOF {
+ if read == 0 {
+ return false, ErrShardNoData
+ }
+ return true, nil
+ }
+ if err != nil {
+ return false, err
+ }
+ read += shardSize(all)
+ ok, err := r.r.Verify(all)
+ if !ok || err != nil {
+ return ok, err
+ }
+ }
+}
+
+// ErrReconstructMismatch is returned by the StreamEncoder, if you supply
+// "valid" and "fill" streams on the same index.
+// Therefore it is impossible to see if you consider the shard valid
+// or would like to have it reconstructed.
+var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
+
+// Reconstruct will recreate the missing shards if possible.
+//
+// Given a list of valid shards (to read) and invalid shards (to write)
+//
+// You indicate that a shard is missing by setting it to nil in the 'valid'
+// slice and at the same time setting a non-nil writer in "fill".
+// An index cannot contain both non-nil 'valid' and 'fill' entry.
+//
+// If there are too few shards to reconstruct the missing
+// ones, ErrTooFewShards will be returned.
+//
+// The reconstructed shard set is complete when explicitly asked for all missing shards.
+// However its integrity is not automatically verified.
+// Use the Verify function to check in case the data set is complete.
+func (r *rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
+ if len(valid) != r.r.Shards {
+ return ErrTooFewShards
+ }
+ if len(fill) != r.r.Shards {
+ return ErrTooFewShards
+ }
+
+ all := r.createSlice()
+ defer r.blockPool.Put(all)
+ reconDataOnly := true
+ for i := range valid {
+ if valid[i] != nil && fill[i] != nil {
+ return ErrReconstructMismatch
+ }
+ if i >= r.r.DataShards && fill[i] != nil {
+ reconDataOnly = false
+ }
+ }
+
+ read := 0
+ for {
+ err := r.readShards(all, valid)
+ if err == io.EOF {
+ if read == 0 {
+ return ErrShardNoData
+ }
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ read += shardSize(all)
+ all = trimShards(all, shardSize(all))
+
+ if reconDataOnly {
+ err = r.r.ReconstructData(all) // just reconstruct missing data shards
+ } else {
+ err = r.r.Reconstruct(all) // reconstruct all missing shards
+ }
+ if err != nil {
+ return err
+ }
+ err = r.writeShards(fill, all)
+ if err != nil {
+ return err
+ }
+ }
+}
+
+// Join the shards and write the data segment to dst.
+//
+// Only the data shards are considered.
+//
+// You must supply the exact output size you want.
+// If there are to few shards given, ErrTooFewShards will be returned.
+// If the total data size is less than outSize, ErrShortData will be returned.
+func (r *rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
+ // Do we have enough shards?
+ if len(shards) < r.r.DataShards {
+ return ErrTooFewShards
+ }
+
+ // Trim off parity shards if any
+ shards = shards[:r.r.DataShards]
+ for i := range shards {
+ if shards[i] == nil {
+ return StreamReadError{Err: ErrShardNoData, Stream: i}
+ }
+ }
+ // Join all shards
+ src := io.MultiReader(shards...)
+
+ // Copy data to dst
+ n, err := io.CopyN(dst, src, outSize)
+ if err == io.EOF {
+ return ErrShortData
+ }
+ if err != nil {
+ return err
+ }
+ if n != outSize {
+ return ErrShortData
+ }
+ return nil
+}
+
+// Split a an input stream into the number of shards given to the encoder.
+//
+// The data will be split into equally sized shards.
+// If the data size isn't dividable by the number of shards,
+// the last shard will contain extra zeros.
+//
+// You must supply the total size of your input.
+// 'ErrShortData' will be returned if it is unable to retrieve the
+// number of bytes indicated.
+func (r *rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
+ if size == 0 {
+ return ErrShortData
+ }
+ if len(dst) != r.r.DataShards {
+ return ErrInvShardNum
+ }
+
+ for i := range dst {
+ if dst[i] == nil {
+ return StreamWriteError{Err: ErrShardNoData, Stream: i}
+ }
+ }
+
+ // Calculate number of bytes per shard.
+ perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
+
+ // Pad data to r.Shards*perShard.
+ padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
+ data = io.MultiReader(data, bytes.NewBuffer(padding))
+
+ // Split into equal-length shards and copy.
+ for i := range dst {
+ n, err := io.CopyN(dst[i], data, perShard)
+ if err != io.EOF && err != nil {
+ return err
+ }
+ if n != perShard {
+ return ErrShortData
+ }
+ }
+
+ return nil
+}