summaryrefslogtreecommitdiff
path: root/vendor/github.com/klauspost/reedsolomon/streaming.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/klauspost/reedsolomon/streaming.go')
-rw-r--r--vendor/github.com/klauspost/reedsolomon/streaming.go603
1 files changed, 0 insertions, 603 deletions
diff --git a/vendor/github.com/klauspost/reedsolomon/streaming.go b/vendor/github.com/klauspost/reedsolomon/streaming.go
deleted file mode 100644
index d048ba0..0000000
--- a/vendor/github.com/klauspost/reedsolomon/streaming.go
+++ /dev/null
@@ -1,603 +0,0 @@
-/**
- * Reed-Solomon Coding over 8-bit values.
- *
- * Copyright 2015, Klaus Post
- * Copyright 2015, Backblaze, Inc.
- */
-
-package reedsolomon
-
-import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "sync"
-)
-
-// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
-// It provides a fully streaming interface, and processes data in blocks of up to 4MB.
-//
-// For small shard sizes, 10MB and below, it is recommended to use the in-memory interface,
-// since the streaming interface has a start up overhead.
-//
-// For all operations, no readers and writers should not assume any order/size of
-// individual reads/writes.
-//
-// For usage examples, see "stream-encoder.go" and "streamdecoder.go" in the examples
-// folder.
-type StreamEncoder interface {
- // Encode parity shards for a set of data shards.
- //
- // Input is 'shards' containing readers for data shards followed by parity shards
- // io.Writer.
- //
- // The number of shards must match the number given to NewStream().
- //
- // Each reader must supply the same number of bytes.
- //
- // The parity shards will be written to the writer.
- // The number of bytes written will match the input size.
- //
- // If a data stream returns an error, a StreamReadError type error
- // will be returned. If a parity writer returns an error, a
- // StreamWriteError will be returned.
- Encode(data []io.Reader, parity []io.Writer) error
-
- // Verify returns true if the parity shards contain correct data.
- //
- // The number of shards must match the number total data+parity shards
- // given to NewStream().
- //
- // Each reader must supply the same number of bytes.
- // If a shard stream returns an error, a StreamReadError type error
- // will be returned.
- Verify(shards []io.Reader) (bool, error)
-
- // Reconstruct will recreate the missing shards if possible.
- //
- // Given a list of valid shards (to read) and invalid shards (to write)
- //
- // You indicate that a shard is missing by setting it to nil in the 'valid'
- // slice and at the same time setting a non-nil writer in "fill".
- // An index cannot contain both non-nil 'valid' and 'fill' entry.
- // If both are provided 'ErrReconstructMismatch' is returned.
- //
- // If there are too few shards to reconstruct the missing
- // ones, ErrTooFewShards will be returned.
- //
- // The reconstructed shard set is complete, but integrity is not verified.
- // Use the Verify function to check if data set is ok.
- Reconstruct(valid []io.Reader, fill []io.Writer) error
-
- // Split a an input stream into the number of shards given to the encoder.
- //
- // The data will be split into equally sized shards.
- // If the data size isn't dividable by the number of shards,
- // the last shard will contain extra zeros.
- //
- // You must supply the total size of your input.
- // 'ErrShortData' will be returned if it is unable to retrieve the
- // number of bytes indicated.
- Split(data io.Reader, dst []io.Writer, size int64) (err error)
-
- // Join the shards and write the data segment to dst.
- //
- // Only the data shards are considered.
- //
- // You must supply the exact output size you want.
- // If there are to few shards given, ErrTooFewShards will be returned.
- // If the total data size is less than outSize, ErrShortData will be returned.
- Join(dst io.Writer, shards []io.Reader, outSize int64) error
-}
-
-// StreamReadError is returned when a read error is encountered
-// that relates to a supplied stream.
-// This will allow you to find out which reader has failed.
-type StreamReadError struct {
- Err error // The error
- Stream int // The stream number on which the error occurred
-}
-
-// Error returns the error as a string
-func (s StreamReadError) Error() string {
- return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
-}
-
-// String returns the error as a string
-func (s StreamReadError) String() string {
- return s.Error()
-}
-
-// StreamWriteError is returned when a write error is encountered
-// that relates to a supplied stream. This will allow you to
-// find out which reader has failed.
-type StreamWriteError struct {
- Err error // The error
- Stream int // The stream number on which the error occurred
-}
-
-// Error returns the error as a string
-func (s StreamWriteError) Error() string {
- return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
-}
-
-// String returns the error as a string
-func (s StreamWriteError) String() string {
- return s.Error()
-}
-
-// rsStream contains a matrix for a specific
-// distribution of datashards and parity shards.
-// Construct if using NewStream()
-type rsStream struct {
- r *reedSolomon
- o options
-
- // Shard reader
- readShards func(dst [][]byte, in []io.Reader) error
- // Shard writer
- writeShards func(out []io.Writer, in [][]byte) error
-
- blockPool sync.Pool
-}
-
-// NewStream creates a new encoder and initializes it to
-// the number of data shards and parity shards that
-// you want to use. You can reuse this encoder.
-// Note that the maximum number of data shards is 256.
-func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
- r := rsStream{o: defaultOptions}
- for _, opt := range o {
- opt(&r.o)
- }
- // Override block size if shard size is set.
- if r.o.streamBS == 0 && r.o.shardSize > 0 {
- r.o.streamBS = r.o.shardSize
- }
- if r.o.streamBS <= 0 {
- r.o.streamBS = 4 << 20
- }
- if r.o.shardSize == 0 && r.o.maxGoroutines == defaultOptions.maxGoroutines {
- o = append(o, WithAutoGoroutines(r.o.streamBS))
- }
-
- enc, err := New(dataShards, parityShards, o...)
- if err != nil {
- return nil, err
- }
- r.r = enc.(*reedSolomon)
-
- r.blockPool.New = func() interface{} {
- out := make([][]byte, dataShards+parityShards)
- for i := range out {
- out[i] = make([]byte, r.o.streamBS)
- }
- return out
- }
- r.readShards = readShards
- r.writeShards = writeShards
- if r.o.concReads {
- r.readShards = cReadShards
- }
- if r.o.concWrites {
- r.writeShards = cWriteShards
- }
-
- return &r, err
-}
-
-// NewStreamC creates a new encoder and initializes it to
-// the number of data shards and parity shards given.
-//
-// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
-func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
- return NewStream(dataShards, parityShards, append(o, WithConcurrentStreamReads(conReads), WithConcurrentStreamWrites(conWrites))...)
-}
-
-func (r *rsStream) createSlice() [][]byte {
- out := r.blockPool.Get().([][]byte)
- for i := range out {
- out[i] = out[i][:r.o.streamBS]
- }
- return out
-}
-
-// Encodes parity shards for a set of data shards.
-//
-// Input is 'shards' containing readers for data shards followed by parity shards
-// io.Writer.
-//
-// The number of shards must match the number given to NewStream().
-//
-// Each reader must supply the same number of bytes.
-//
-// The parity shards will be written to the writer.
-// The number of bytes written will match the input size.
-//
-// If a data stream returns an error, a StreamReadError type error
-// will be returned. If a parity writer returns an error, a
-// StreamWriteError will be returned.
-func (r *rsStream) Encode(data []io.Reader, parity []io.Writer) error {
- if len(data) != r.r.DataShards {
- return ErrTooFewShards
- }
-
- if len(parity) != r.r.ParityShards {
- return ErrTooFewShards
- }
-
- all := r.createSlice()
- defer r.blockPool.Put(all)
- in := all[:r.r.DataShards]
- out := all[r.r.DataShards:]
- read := 0
-
- for {
- err := r.readShards(in, data)
- switch err {
- case nil:
- case io.EOF:
- if read == 0 {
- return ErrShardNoData
- }
- return nil
- default:
- return err
- }
- out = trimShards(out, shardSize(in))
- read += shardSize(in)
- err = r.r.Encode(all)
- if err != nil {
- return err
- }
- err = r.writeShards(parity, out)
- if err != nil {
- return err
- }
- }
-}
-
-// Trim the shards so they are all the same size
-func trimShards(in [][]byte, size int) [][]byte {
- for i := range in {
- if len(in[i]) != 0 {
- in[i] = in[i][0:size]
- }
- if len(in[i]) < size {
- in[i] = in[i][:0]
- }
- }
- return in
-}
-
-func readShards(dst [][]byte, in []io.Reader) error {
- if len(in) != len(dst) {
- panic("internal error: in and dst size do not match")
- }
- size := -1
- for i := range in {
- if in[i] == nil {
- dst[i] = dst[i][:0]
- continue
- }
- n, err := io.ReadFull(in[i], dst[i])
- // The error is EOF only if no bytes were read.
- // If an EOF happens after reading some but not all the bytes,
- // ReadFull returns ErrUnexpectedEOF.
- switch err {
- case io.ErrUnexpectedEOF, io.EOF:
- if size < 0 {
- size = n
- } else if n != size {
- // Shard sizes must match.
- return ErrShardSize
- }
- dst[i] = dst[i][0:n]
- case nil:
- continue
- default:
- return StreamReadError{Err: err, Stream: i}
- }
- }
- if size == 0 {
- return io.EOF
- }
- return nil
-}
-
-func writeShards(out []io.Writer, in [][]byte) error {
- if len(out) != len(in) {
- panic("internal error: in and out size do not match")
- }
- for i := range in {
- if out[i] == nil {
- continue
- }
- n, err := out[i].Write(in[i])
- if err != nil {
- return StreamWriteError{Err: err, Stream: i}
- }
- //
- if n != len(in[i]) {
- return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
- }
- }
- return nil
-}
-
-type readResult struct {
- n int
- size int
- err error
-}
-
-// cReadShards reads shards concurrently
-func cReadShards(dst [][]byte, in []io.Reader) error {
- if len(in) != len(dst) {
- panic("internal error: in and dst size do not match")
- }
- var wg sync.WaitGroup
- wg.Add(len(in))
- res := make(chan readResult, len(in))
- for i := range in {
- if in[i] == nil {
- dst[i] = dst[i][:0]
- wg.Done()
- continue
- }
- go func(i int) {
- defer wg.Done()
- n, err := io.ReadFull(in[i], dst[i])
- // The error is EOF only if no bytes were read.
- // If an EOF happens after reading some but not all the bytes,
- // ReadFull returns ErrUnexpectedEOF.
- res <- readResult{size: n, err: err, n: i}
-
- }(i)
- }
- wg.Wait()
- close(res)
- size := -1
- for r := range res {
- switch r.err {
- case io.ErrUnexpectedEOF, io.EOF:
- if size < 0 {
- size = r.size
- } else if r.size != size {
- // Shard sizes must match.
- return ErrShardSize
- }
- dst[r.n] = dst[r.n][0:r.size]
- case nil:
- default:
- return StreamReadError{Err: r.err, Stream: r.n}
- }
- }
- if size == 0 {
- return io.EOF
- }
- return nil
-}
-
-// cWriteShards writes shards concurrently
-func cWriteShards(out []io.Writer, in [][]byte) error {
- if len(out) != len(in) {
- panic("internal error: in and out size do not match")
- }
- var errs = make(chan error, len(out))
- var wg sync.WaitGroup
- wg.Add(len(out))
- for i := range in {
- go func(i int) {
- defer wg.Done()
- if out[i] == nil {
- errs <- nil
- return
- }
- n, err := out[i].Write(in[i])
- if err != nil {
- errs <- StreamWriteError{Err: err, Stream: i}
- return
- }
- if n != len(in[i]) {
- errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
- }
- }(i)
- }
- wg.Wait()
- close(errs)
- for err := range errs {
- if err != nil {
- return err
- }
- }
-
- return nil
-}
-
-// Verify returns true if the parity shards contain correct data.
-//
-// The number of shards must match the number total data+parity shards
-// given to NewStream().
-//
-// Each reader must supply the same number of bytes.
-// If a shard stream returns an error, a StreamReadError type error
-// will be returned.
-func (r *rsStream) Verify(shards []io.Reader) (bool, error) {
- if len(shards) != r.r.Shards {
- return false, ErrTooFewShards
- }
-
- read := 0
- all := r.createSlice()
- defer r.blockPool.Put(all)
- for {
- err := r.readShards(all, shards)
- if err == io.EOF {
- if read == 0 {
- return false, ErrShardNoData
- }
- return true, nil
- }
- if err != nil {
- return false, err
- }
- read += shardSize(all)
- ok, err := r.r.Verify(all)
- if !ok || err != nil {
- return ok, err
- }
- }
-}
-
-// ErrReconstructMismatch is returned by the StreamEncoder, if you supply
-// "valid" and "fill" streams on the same index.
-// Therefore it is impossible to see if you consider the shard valid
-// or would like to have it reconstructed.
-var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
-
-// Reconstruct will recreate the missing shards if possible.
-//
-// Given a list of valid shards (to read) and invalid shards (to write)
-//
-// You indicate that a shard is missing by setting it to nil in the 'valid'
-// slice and at the same time setting a non-nil writer in "fill".
-// An index cannot contain both non-nil 'valid' and 'fill' entry.
-//
-// If there are too few shards to reconstruct the missing
-// ones, ErrTooFewShards will be returned.
-//
-// The reconstructed shard set is complete when explicitly asked for all missing shards.
-// However its integrity is not automatically verified.
-// Use the Verify function to check in case the data set is complete.
-func (r *rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
- if len(valid) != r.r.Shards {
- return ErrTooFewShards
- }
- if len(fill) != r.r.Shards {
- return ErrTooFewShards
- }
-
- all := r.createSlice()
- defer r.blockPool.Put(all)
- reconDataOnly := true
- for i := range valid {
- if valid[i] != nil && fill[i] != nil {
- return ErrReconstructMismatch
- }
- if i >= r.r.DataShards && fill[i] != nil {
- reconDataOnly = false
- }
- }
-
- read := 0
- for {
- err := r.readShards(all, valid)
- if err == io.EOF {
- if read == 0 {
- return ErrShardNoData
- }
- return nil
- }
- if err != nil {
- return err
- }
- read += shardSize(all)
- all = trimShards(all, shardSize(all))
-
- if reconDataOnly {
- err = r.r.ReconstructData(all) // just reconstruct missing data shards
- } else {
- err = r.r.Reconstruct(all) // reconstruct all missing shards
- }
- if err != nil {
- return err
- }
- err = r.writeShards(fill, all)
- if err != nil {
- return err
- }
- }
-}
-
-// Join the shards and write the data segment to dst.
-//
-// Only the data shards are considered.
-//
-// You must supply the exact output size you want.
-// If there are to few shards given, ErrTooFewShards will be returned.
-// If the total data size is less than outSize, ErrShortData will be returned.
-func (r *rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
- // Do we have enough shards?
- if len(shards) < r.r.DataShards {
- return ErrTooFewShards
- }
-
- // Trim off parity shards if any
- shards = shards[:r.r.DataShards]
- for i := range shards {
- if shards[i] == nil {
- return StreamReadError{Err: ErrShardNoData, Stream: i}
- }
- }
- // Join all shards
- src := io.MultiReader(shards...)
-
- // Copy data to dst
- n, err := io.CopyN(dst, src, outSize)
- if err == io.EOF {
- return ErrShortData
- }
- if err != nil {
- return err
- }
- if n != outSize {
- return ErrShortData
- }
- return nil
-}
-
-// Split a an input stream into the number of shards given to the encoder.
-//
-// The data will be split into equally sized shards.
-// If the data size isn't dividable by the number of shards,
-// the last shard will contain extra zeros.
-//
-// You must supply the total size of your input.
-// 'ErrShortData' will be returned if it is unable to retrieve the
-// number of bytes indicated.
-func (r *rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
- if size == 0 {
- return ErrShortData
- }
- if len(dst) != r.r.DataShards {
- return ErrInvShardNum
- }
-
- for i := range dst {
- if dst[i] == nil {
- return StreamWriteError{Err: ErrShardNoData, Stream: i}
- }
- }
-
- // Calculate number of bytes per shard.
- perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
-
- // Pad data to r.Shards*perShard.
- padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
- data = io.MultiReader(data, bytes.NewBuffer(padding))
-
- // Split into equal-length shards and copy.
- for i := range dst {
- n, err := io.CopyN(dst[i], data, perShard)
- if err != io.EOF && err != nil {
- return err
- }
- if n != perShard {
- return ErrShortData
- }
- }
-
- return nil
-}