diff --git a/service/datastore/serializer.go b/service/datastore/serializer.go new file mode 100644 index 0000000..d97c5c7 --- /dev/null +++ b/service/datastore/serializer.go @@ -0,0 +1,68 @@ +package datastore + +import ( + "bytes" + "google.golang.org/grpc/encoding" + "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/encoding/proto" + "io/ioutil" + "errors" +) + + +type Serializer interface { + Encode(val interface{}) (res []byte, err error) + Decode(source []byte, destination interface{}) (err error) +} + +type SerializerProtobuf struct { + Codec encoding.Codec + Compressor encoding.Compressor +} + +func (s *SerializerProtobuf) Encode(val interface{}) (res []byte, err error) { + raw,err := s.Codec.Marshal(val) + if err != nil { return res,err } + + if s.Compressor == nil { + return raw,nil + } else { + compressTarget := &bytes.Buffer{} + compressSrc,err := s.Compressor.Compress(compressTarget) + if err != nil { return res,err } + n,err := compressSrc.Write(raw) + if err != nil { return res,err } + if n != len(raw) { return res,errors.New("Error while compressing serialized data")} + + if err = compressSrc.Close(); err != nil { + return res,err + } + return compressTarget.Bytes(), nil + } + + return +} + +func (s *SerializerProtobuf) Decode(source []byte, destination interface{}) (err error) { + if s.Compressor == nil { + return s.Codec.Unmarshal(source, destination) + } else { + targetReader,err := s.Compressor.Decompress(bytes.NewReader(source)) + if err != nil { return err } + decompressed, err := ioutil.ReadAll(targetReader) + if err != nil { return err } + return s.Codec.Unmarshal(decompressed, destination) + } + +} + +func NewSerializerProtobuf(compress bool) *SerializerProtobuf { + sz := &SerializerProtobuf{ + Codec: encoding.GetCodec(proto.Name), + } + if compress { + sz.Compressor = encoding.GetCompressor(gzip.Name) + } + return sz +} + diff --git a/service/datastore/store.go b/service/datastore/store.go new file mode 100644 index 0000000..df7981a --- /dev/null +++ b/service/datastore/store.go @@ -0,0 +1,93 @@ +package datastore + +import ( + "errors" + "github.com/dgraph-io/badger" + _ "github.com/dgraph-io/badger" +) + +/* +https://github.com/dgraph-io/badger --> Apache 2 (compatible) + */ + +var ( + ErrCreate = errors.New("Error creating store") + ErrOpen = errors.New("Error opening store") + ErrGet = errors.New("Error retrieving value from store") + ErrDelete = errors.New("Error deleting value from store") + ErrPut = errors.New("Error putting value into store") +) + + +type Store struct { + Path string + Db *badger.DB + serializer Serializer +} + +func (s *Store) Open() (err error) { + badgerOpts := badger.DefaultOptions + badgerOpts.Dir = s.Path + badgerOpts.ValueDir = s.Path + badgerOpts.SyncWrites = true + s.Db,err = badger.Open(badgerOpts) + if s.serializer == nil { + s.serializer = NewSerializerProtobuf(false) + } + return err +} + +func (s *Store) Close() { + s.Db.Close() +} + +func (s *Store) Put(key string, value interface{}, allowOverwrite bool) (err error) { + // serialize value + sv,err := s.serializer.Encode(value) + if err != nil { return } + + err = s.Db.Update(func(txn *badger.Txn) error { + err := txn.Set([]byte(key), []byte(sv)) + return err + }) + if err != nil { return ErrPut } + return +} + +func (s *Store) Get(key string, target interface{}) (err error) { + err = s.Db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(key)) + if err != nil { + return err + } + val, err := item.Value() + if err != nil { + return err + } + return s.serializer.Decode(val, target) + //fmt.Printf("The answer is: %s\n", val) + //return nil + }) + if err != nil { return ErrGet } + return +} + +func (s *Store) Delete(key string) (err error) { + err = s.Db.Update(func(txn *badger.Txn) error { + // Your code hereā€¦ + return nil + }) + if err != nil { return ErrDelete } + return +} + + +func Open(path string) (store *Store, err error) { + store = &Store{ + Path: path, + } + if err = store.Open(); err != nil { + return nil,err + } + return +}