Adds to #34 persitent DataStore supporting protobuf

This commit is contained in:
MaMe82 2018-09-21 16:25:34 +02:00
parent a208c1ca29
commit e9aa58f72e
2 changed files with 161 additions and 0 deletions

View File

@ -0,0 +1,68 @@
package datastore
import (
"bytes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/encoding/proto"
"io/ioutil"
"errors"
)
type Serializer interface {
Encode(val interface{}) (res []byte, err error)
Decode(source []byte, destination interface{}) (err error)
}
type SerializerProtobuf struct {
Codec encoding.Codec
Compressor encoding.Compressor
}
func (s *SerializerProtobuf) Encode(val interface{}) (res []byte, err error) {
raw,err := s.Codec.Marshal(val)
if err != nil { return res,err }
if s.Compressor == nil {
return raw,nil
} else {
compressTarget := &bytes.Buffer{}
compressSrc,err := s.Compressor.Compress(compressTarget)
if err != nil { return res,err }
n,err := compressSrc.Write(raw)
if err != nil { return res,err }
if n != len(raw) { return res,errors.New("Error while compressing serialized data")}
if err = compressSrc.Close(); err != nil {
return res,err
}
return compressTarget.Bytes(), nil
}
return
}
func (s *SerializerProtobuf) Decode(source []byte, destination interface{}) (err error) {
if s.Compressor == nil {
return s.Codec.Unmarshal(source, destination)
} else {
targetReader,err := s.Compressor.Decompress(bytes.NewReader(source))
if err != nil { return err }
decompressed, err := ioutil.ReadAll(targetReader)
if err != nil { return err }
return s.Codec.Unmarshal(decompressed, destination)
}
}
func NewSerializerProtobuf(compress bool) *SerializerProtobuf {
sz := &SerializerProtobuf{
Codec: encoding.GetCodec(proto.Name),
}
if compress {
sz.Compressor = encoding.GetCompressor(gzip.Name)
}
return sz
}

View File

@ -0,0 +1,93 @@
package datastore
import (
"errors"
"github.com/dgraph-io/badger"
_ "github.com/dgraph-io/badger"
)
/*
https://github.com/dgraph-io/badger --> Apache 2 (compatible)
*/
var (
ErrCreate = errors.New("Error creating store")
ErrOpen = errors.New("Error opening store")
ErrGet = errors.New("Error retrieving value from store")
ErrDelete = errors.New("Error deleting value from store")
ErrPut = errors.New("Error putting value into store")
)
type Store struct {
Path string
Db *badger.DB
serializer Serializer
}
func (s *Store) Open() (err error) {
badgerOpts := badger.DefaultOptions
badgerOpts.Dir = s.Path
badgerOpts.ValueDir = s.Path
badgerOpts.SyncWrites = true
s.Db,err = badger.Open(badgerOpts)
if s.serializer == nil {
s.serializer = NewSerializerProtobuf(false)
}
return err
}
func (s *Store) Close() {
s.Db.Close()
}
func (s *Store) Put(key string, value interface{}, allowOverwrite bool) (err error) {
// serialize value
sv,err := s.serializer.Encode(value)
if err != nil { return }
err = s.Db.Update(func(txn *badger.Txn) error {
err := txn.Set([]byte(key), []byte(sv))
return err
})
if err != nil { return ErrPut }
return
}
func (s *Store) Get(key string, target interface{}) (err error) {
err = s.Db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
}
val, err := item.Value()
if err != nil {
return err
}
return s.serializer.Decode(val, target)
//fmt.Printf("The answer is: %s\n", val)
//return nil
})
if err != nil { return ErrGet }
return
}
func (s *Store) Delete(key string) (err error) {
err = s.Db.Update(func(txn *badger.Txn) error {
// Your code here…
return nil
})
if err != nil { return ErrDelete }
return
}
func Open(path string) (store *Store, err error) {
store = &Store{
Path: path,
}
if err = store.Open(); err != nil {
return nil,err
}
return
}