rkive

package module
v0.0.0-...-e5dd884 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2014 License: MIT Imports: 16 Imported by: 1

Image README

Riak for Go

Build Status docs examples

A Riak client for Go(1.3+).

Riak

Complete documentation is available at godoc.

Status

Core functionality (fetch, store, secondary indexes, links) is complete, but many advanced features (MapReduce, Yokozuna search) are still on the way. There is no short-term guarantee that the API will remain stable. (We are shooting for a beta release in Nov. '14, followed by a "stable" 1.0 in December.) That being said, this code is already being actively tested in some production applications.

Features

  • Efficient connection pooling and re-dialing.
  • Asynchronous batch fetches (see FetchAsync and MultiFetchAsync).
  • Easy RAM-backed caching (see MakeCache).
  • Transparent sibling conflict resolution.
  • Compare-and-swap (see: PushChangeset).
  • Low per-operation heap allocation overhead.

Usage

Satisfy the Object interface and you're off to the races. The included 'Blob' object is the simplest possible Object implementation.

import (
       "github.com/philhofer/rkive"
)

// Open up one connection
riak, err := rkive.DialOne("127.0.0.1:8087", "test-Client-ID")
// handle err...

blobs := riak.Bucket("blob_bucket")


// let's make an object
myBlob := &rkive.Blob{ Data: []byte("Hello World!") }

// now let's put it in the database
err = blobs.New(myBlob, nil)
// handle err...


// since we didn't specify a key, riak assigned
// an available key
fmt.Printf("Our blob key is %s\n", myBlob.Info().Key())

// Let's make a change to the object...
myBlob.Data = []byte("MOAR DATA")

// ... and store it!
err = blobs.Push(myBlob)
// riak.Push will return an error (riakpb.ErrModified) if
// the object has been modified since the last
// time you called New(), Push(), Store(), Fetch(), etc. 
// You can retreive the latest copy of the object with:
updated, err := blobs.Update(myBlob)
// handle err
if updated { /* the object has been changed! */ }

// you can also fetch a new copy
// of the object like so:

newBlob := &rkive.Blob{}

err = blobs.Fetch(newBlob, myBlob.Info().Key())
// handle err...

For more worked examples, check out the /_examples folder.

For automatic code generation that implements Marshal and Unmarshal, check this out.

If you want to run Riak with allow_mult=true (which you should strongly consider), take a look at the ObjectM interface, which allows you to specify a Merge() operation to be used for your object when multiple values are encountered on a read or write operation. If you have allow_mult=true and your object does not satisfy the ObjectM interface, then read and write operations on a key/bucket pair with siblings will return a *ErrMultipleResponses. (In the degenerate case where 10 consecutive merge conflict resolution attempts fail, *ErrMultipleResponses will be returned for ObjectM operations. This is to avoid "sibling explosion.")

As an example, here's what the Blob type would have to define (internally) if it were to satisfy the ObjectM interface:

// NewEmpty should always return a properly-initialized
// zero value for the type in question. The client
// will marshal data into this object and pass it to
// Merge().
func (b *Blob) NewEmpty() Object {
     return &Blob{}
}

// Merge should make a best-effort attempt to merge
// data from its argument into the method receiver.
// It should be prepared to handle nil/zero values
// for either object.
func (b *Blob) Merge(o Object) {
     // you can always type-assert the argument
     // to Merge() to be the same type returned
     // by NewEmtpy(), which should also be the
     // same type as the method receiver
     nb := o.(*Blob)

     // we don't really have a good way of handling
     // this conflict, so we'll set the content
     // to be the combination of both
     b.Content = append(b.Content, nb.Content...)
}

Performance

This client library was built with performance in mind.

To run benchmarks, start up Riak locally and run: go test -v -tags 'riak' -check.vv -bench .

You will need to be running Riak 2.0+ using the configuration file found at $GOPATH/github.com/philhofer/rkive/_conf/riak.conf.

Here's what I get on my MacBook Pro, keeping in mind that time/op and iowait/op vary by +/- 10% on every benchmark run. (Client time per operation is more consistent between benchmark runs.) Memory allocations are perfectly consistent between benchmark runs.

Operation time/op iowait/op client time / op allocs heap/op
Fetch 418598ns 413398ns 5200ns 6 550B
Store 782187ns 775353ns 6834ns 5 750B

Design & TODOs

This package is focused on using Riak the way it was intended: with allow_mult set to true. This library will always use vclocks when getting and setting values. Additionally, this library adheres strictly to Riak's read-before-write policy.

Internally, Return-Head is always set to true, so every Push() or Store() operation updates the local object's metadata. Consequently, you can carry out a series of transactions on an object in parallel and still avoid conflicts. (PushChangeset() is particularly useful in this regard.) You can retreive the latest version of an object by calling Update().

The core "verbs" of this library (New, Fetch, Store, Push, Update, Delete) are meant to have intuitive and sane default behavior. For instance, New always asks Riak to abort the transaction if an object already exists at the given key, and Update doesn't return the whole body of the object back from the database if it hasn't been modified.

Since garbage collection time bottlenecks many Go applications, a lot of effort was put into reducing memory allocations on database reads and writes. The implementation can only become more memory efficient when Go's escape analysis becomes less pessimistic about escaping pointers.

There is an open issue for cache buckets, which has the potential to dramatically improve performance in query-heavy (2i, map-reduce, Yokozuna) use cases. There are also some open issues related to implementing Riak 2.0 features.

License

This code is MIT licensed. You may use it however you see fit. However, I would very much appreciate it if you create PRs in this repo for patches and improvements!

Image Documentation

Index

Constants

View Source
const (
	DefaultReqTimeout = 500
)

Variables

View Source
var (
	// ErrClosed is returned when the
	// an attempt is made to make a request
	// with a closed clinet
	ErrClosed = errors.New("client closed")

	// ErrUnavail is returned when the client
	// is unable to successfully dial any
	// Riak node.
	ErrUnavail = errors.New("no connection to could be established")
)
View Source
var (
	// ErrUnexpectedResponse is returned when riak returns the wrong
	// message type
	ErrUnexpectedResponse = errors.New("unexpected response")

	// ErrNotFound is returned when
	// no objects are returned for
	// a read operation
	ErrNotFound = errors.New("not found")

	// ErrDeleted is returned
	// when the object has been marked
	// as deleted, but has not yet been reaped
	ErrDeleted = errors.New("object deleted")
)
View Source
var (
	ErrNoPath   = errors.New("bucket and/or key not defined")
	ErrModified = errors.New("object has been modified since last read")
	ErrExists   = errors.New("object already exists")
)
View Source
var (
	ErrDone = errors.New("done")
)

Functions

This section is empty.

Types

type AsyncFetch

type AsyncFetch struct {
	Value Object
	Error error
}

AsyncFetch represents the output of an asynchronous fetch operation. 'Value' is never nil, but 'Error' may or may not be nil. If 'Error' is non-nil, then 'Value' is usually the zero value of the underlying object.

type Blob

type Blob struct {
	RiakInfo Info
	Content  []byte
}

Blob is a generic riak key/value container that implements the Object interface.

func (*Blob) Info

func (r *Blob) Info() *Info

Info implements part of the Object interface.

func (*Blob) Marshal

func (r *Blob) Marshal() ([]byte, error)

Marshal implements part of the Object interface

func (*Blob) Unmarshal

func (r *Blob) Unmarshal(b []byte) error

Unmarshal implements part of the Object interface

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

Bucket represents a Riak bucket

func (*Bucket) Fetch

func (b *Bucket) Fetch(o Object, key string) error

Fetch performs a fetch with the bucket's default properties

func (*Bucket) GetCounter

func (b *Bucket) GetCounter(name string) (*Counter, error)

GetCounter gets a counter.

func (*Bucket) GetProperties

func (b *Bucket) GetProperties() (*rpbc.RpbBucketProps, error)

GetProperties retreives the properties of the bucket

func (*Bucket) IndexLookup

func (b *Bucket) IndexLookup(idx string, val string) (*IndexQueryRes, error)

IndexLookup performs a secondary index query on the bucket

func (*Bucket) IndexRange

func (b *Bucket) IndexRange(idx string, min int64, max int64) (*IndexQueryRes, error)

IndexRange performs a secondary index range query on the bucket

func (*Bucket) MakeCache

func (b *Bucket) MakeCache() error

MakeCache makes a memory-backed cache bucket. You will most likely need the following options enabled in your riak.conf:

# this enables multiple backends
storage_backend = multi

# this creates a backend called 'cache' backed by RAM
multi_backend.cache.storage_backend = memory

# this makes a backend called 'std' and sets its storage backend
# (you can name this one whatever you would like)
multi_backend.std.storage_backend = <leveldb OR bitcask>
multi_backend.default = std

MakeCache will error if your configuration is incorrect.

NB: keep in mind that this bucket will only be backed by RAM and uses no replication. This bucket should only be used to store ephemeral objects.

func (*Bucket) MultiFetchAsync

func (b *Bucket) MultiFetchAsync(o Duplicator, procs int, keys ...string) <-chan *AsyncFetch

MultiFetchAsync returns fetch results as a future. Results may return in any order. Every result on the channel will have its "Value" field type-assertable to the underlying type of 'o'. 'procs' goroutines will be used for fetching.

func (*Bucket) New

func (b *Bucket) New(o Object, key *string) error

New performs a new store with the bucket's default properties

func (*Bucket) NewCounter

func (b *Bucket) NewCounter(name string, start int64) (*Counter, error)

NewCounter creates a new counter with an optional starting value. If the counter already exists, the value returned will be the existing value plus "start".

func (*Bucket) Overwrite

func (b *Bucket) Overwrite(o Object, key string) error

Overwrite performs an overwrite on the specified key

func (*Bucket) Push

func (b *Bucket) Push(o Object) error

Push pushes an object with a bucket's default properties

func (*Bucket) Reset

func (b *Bucket) Reset() error

Reset resets the bucket's properties

func (*Bucket) SetProperties

func (b *Bucket) SetProperties(props *rpbc.RpbBucketProps) error

SetProperties sets the properties of the bucket

func (*Bucket) Store

func (b *Bucket) Store(o Object) error

Store stores an object with a bucket's default properties

func (*Bucket) Update

func (b *Bucket) Update(o Object) (bool, error)

Update updates an object in a bucket

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a pool of connections to a Riak cluster.

func Dial

func Dial(addrs []string, clientID string) (*Client, error)

Dial creates a client connected to one or many Riak nodes. The client will attempt to avoid using downed nodes. Dial returns an error if it is unable to reach a good node.

func DialOne

func DialOne(addr string, clientID string) (*Client, error)

DialOne returns a client that always dials the same node. (See: Dial)

func (*Client) Bucket

func (c *Client) Bucket(name string) *Bucket

Bucket returns a Riak bucket with the provided name

func (*Client) Close

func (c *Client) Close()

Close() idempotently closes the client.

func (*Client) Delete

func (c *Client) Delete(o Object, opts *DelOpts) error

func (*Client) Fetch

func (c *Client) Fetch(o Object, bucket string, key string, opts *ReadOpts) error

Fetch puts whatever exists at the provided bucket+key into the provided Object. It has undefined behavior if the object supplied does not know how to unmarshal the bytes returned from riak.

func (*Client) FetchHead

func (c *Client) FetchHead(bucket string, key string) (*Info, error)

FetchHead returns the head (*Info) of an object stored in Riak. This is the least expensive way to check for the existence of an object.

func (*Client) GetBucketTypeProperties

func (c *Client) GetBucketTypeProperties(typeName string) (*rpbc.RpbBucketProps, error)

GetBucketTypeProperties gets the bucket properties associated with a given bucket type. *NOTE* bucket types are a Riak 2.0 feature.

func (*Client) IndexLookup

func (c *Client) IndexLookup(bucket string, index string, value string, max *int) (*IndexQueryRes, error)

IndexLookup returns the keys that match the index-value pair specified. You can specify the maximum number of returned keys ('max'). Index queries are performed in "streaming" mode.

func (*Client) IndexRange

func (c *Client) IndexRange(bucket string, index string, min int64, max int64, maxret *int) (*IndexQueryRes, error)

IndexRange returns the keys that match the index range query. You can specify the maximum number of returned results ('max'). Index queries are performed in "streaming" mode.

func (*Client) New

func (c *Client) New(o Object, bucket string, key *string, opts *WriteOpts) error

New writes a new object into the database. If 'key' is non-nil, New will attempt to use that key, and return ErrExists if an object already exists at that key-bucket pair. Riak will assign this object a key if 'key' is nil.

func (*Client) Overwrite

func (c *Client) Overwrite(o Object, bucket string, key string, opts *WriteOpts) error

Overwrite performs a store operation on an arbitrary location. It does not send a vclock, and the object itself is not modified. Overwrite ignores NotFound errors. This function is only safe to use with buckets in which "last_write_wins" is turned on. Ideally, this function is only used for caches.

func (*Client) Ping

func (c *Client) Ping() error

Ping pings a random node.

func (*Client) PullHead

func (c *Client) PullHead(o Object) error

PullHead pulls the latest object metadata into the object. The Info() pointed to by the object will be changed if the object has been changed in Riak since the last read. If you want to read the entire object, use Update() instead.

func (*Client) Push

func (c *Client) Push(o Object, opts *WriteOpts) error

Push makes a conditional (if-not-modified) write to the database. This is the recommended way of making writes to the database, as it minimizes the chances of producing sibling objects.

func (*Client) PushChangeset

func (c *Client) PushChangeset(o Object, chng func(Object) error, opts *WriteOpts) error

PushChangeset pushes a changeset to an object, handling the case in which the object has been updated in the database since the last local fetch. The 'chng' function should check if the change that it wanted already happened, and return ErrDone in that case. The 'chng' function is allowed to type-assert its argument to the underlying type of 'o'.

func (*Client) SetBucketTypeProperties

func (c *Client) SetBucketTypeProperties(typeName string, props *rpbc.RpbBucketProps) error

SetBucketTypeProperties sets the bucket properties associated with a given bucket type. *NOTE* bucket types are a Riak 2.0 feature.

func (*Client) Store

func (c *Client) Store(o Object, opts *WriteOpts) error

Store makes a basic write to the database. Store will return ErrNoPath if the object does not already have a key and bucket defined. (Use New() if this object isn't already in the database.)

func (*Client) Update

func (c *Client) Update(o Object, opts *ReadOpts) (bool, error)

Update conditionally fetches the object in question based on whether or not it has been modified in the database. If the object has been changed, the object will be modified and Update() will return true. (The object must have a well-defined) key, bucket, and vclock.)

type Counter

type Counter struct {
	// contains filtered or unexported fields
}

Counter is a Riak CRDT that acts as a distributed counter. Counters only work in buckets with 'allow_mult' turned on.

func (*Counter) Add

func (c *Counter) Add(v int64) error

Add adds the value 'v' to the counter.

func (*Counter) Bucket

func (c *Counter) Bucket() string

Bucket is the bucket of the counter

func (*Counter) Destroy

func (c *Counter) Destroy() error

Destroy deletes the counter.

func (*Counter) Key

func (c *Counter) Key() string

Key is the key of the counter

func (*Counter) Refresh

func (c *Counter) Refresh() error

Refresh gets the latest value of the counter from the database.

func (*Counter) Val

func (c *Counter) Val() int64

Val is the value of the counter

type DelOpts

type DelOpts struct {
	R  *uint32 // required reads
	W  *uint32 // required writes
	PR *uint32 // required primary node reads
	PW *uint32 // required primary node writes
	RW *uint32 // required replica deletions
	DW *uint32 // required durable (to disk) writes
}

DelOpts are options available on delete operations. All values are optional.

type Duplicator

type Duplicator interface {
	Object
	// Empty should return an initialized
	// (zero-value) object of the same underlying
	// type as the parent.
	NewEmpty() Object
}

Duplicator types know how to return an empty copy of themselves, on top of fulfilling the Object interface.

type ErrMultipleResponses

type ErrMultipleResponses struct {
	Bucket      string
	Key         string
	NumSiblings int
}

ErrMultipleResponses is the type of error returned when multiple siblings are retrieved for an object.

func (*ErrMultipleResponses) Error

func (m *ErrMultipleResponses) Error() string

type IndexQueryRes

type IndexQueryRes struct {
	// contains filtered or unexported fields
}

IndexQueryRes is the response to a secondary index query.

func (*IndexQueryRes) Contains

func (i *IndexQueryRes) Contains(key string) bool

Contains returns whether or not the query response contains this particular key

func (*IndexQueryRes) FetchAsync

func (i *IndexQueryRes) FetchAsync(o Duplicator, procs int) <-chan *AsyncFetch

FetchAsync returns a channel on which all of the objects in the query are returned. 'procs' determines the number of goroutines actively fetching. The channel will be closed once all the objects have been returned. Objects are fetched asynchronously. The (underlying) type of every object returned in each AsyncFetch is the same as returned from o.NewEmpty().

func (*IndexQueryRes) FetchNext

func (i *IndexQueryRes) FetchNext(o Object) (done bool, err error)

Fetch fetches the next object in the query. Fetch returns whether or not there are objects remaining in the query result, and any error encountered in fetching that object.

func (*IndexQueryRes) Keys

func (i *IndexQueryRes) Keys() []string

Keys returns the complete list of response keys

func (*IndexQueryRes) Len

func (i *IndexQueryRes) Len() int

Len returns the number of keys in the response

func (*IndexQueryRes) Which

func (i *IndexQueryRes) Which(o Object, conds ...func(Object) bool) ([]string, error)

Which searches within the query result for objects that satisfy the given condition functions.

type Info

type Info struct {
	// contains filtered or unexported fields
}

Info contains information about a specific Riak object. You can use it to satisfy the Object interface. Info's zero value (Info{}) is valid. You can use the Info object to add links, seconary indexes, and user metadata to the object referencing this Info object. Calls to Fetch(), Push(), Store(), New(), etc. will changes the contents of this struct.

func (*Info) AddIndex

func (in *Info) AddIndex(key string, value string) bool

Add adds a key-value pair to an Indexes object, but returns false if a key already exists under that name and has a different value. Returns true if the index already has this exact key-value pair, or if the pair is written in with no conflicts. (All XxxIndex operations append "_bin" to key values internally in order to comply with the Riak secondary index specification, so the user does not have to include it.)

func (*Info) AddIndexInt

func (in *Info) AddIndexInt(key string, value int64) bool

AddIndexInt sets an integer secondary index value using the same conditional rules as AddIndex

func (in *Info) AddLink(name string, bucket string, key string) bool

AddLink adds a link conditionally. It returns true if the value was already set to this bucket-key pair, or if no value existed at 'name'. It returns false otherwise.

func (*Info) AddMeta

func (in *Info) AddMeta(key string, value string) bool

AddMeta conditionally adds a key-value pair if it didn't exist already

func (*Info) Bucket

func (in *Info) Bucket() string

Bucket is the canonical riak bucket

func (*Info) ContentType

func (in *Info) ContentType() string

ContentType is the content-type

func (*Info) GetIndex

func (in *Info) GetIndex(key string) (val string)

Get gets a key-value pair in an indexes object

func (*Info) GetIndexInt

func (in *Info) GetIndexInt(key string) *int64

GetIndexInt gets an integer index value

func (in *Info) GetLink(name string) (bucket string, key string)

GetLink gets a link from the object

func (*Info) GetMeta

func (in *Info) GetMeta(key string) (val string)

GetMeta gets a meta value

func (*Info) Indexes

func (in *Info) Indexes() [][2]string

Indexes returns a list of all of the key-value pairs in this object. (Key first, then value.) Note that string-string indexes will have keys postfixed with "_bin", and string-int indexes will have keys postfixed with "_int", per the Riak secondary index specification.

func (*Info) Key

func (in *Info) Key() string

Key is the canonical riak key

func (*Info) Metas

func (in *Info) Metas() [][2]string

Metas returns all of the metadata key-value pairs. (Key first, then value.)

func (*Info) RemoveIndex

func (in *Info) RemoveIndex(key string)

RemoveIndex removes a key from the object

func (*Info) RemoveIndexInt

func (in *Info) RemoveIndexInt(key string)

RemoveIndexInt removes an integer index key from an object

func (in *Info) RemoveLink(name string)

RemoveLink removes a link (if it exists)

func (*Info) RemoveMeta

func (in *Info) RemoveMeta(key string)

RemoveMeta deletes the meta value at a key

func (*Info) SetContentType

func (in *Info) SetContentType(s string)

SetContentType sets the content-type to 's'.

func (*Info) SetIndex

func (in *Info) SetIndex(key string, value string)

Set sets a key-value pair in an Indexes object

func (*Info) SetIndexInt

func (in *Info) SetIndexInt(key string, value int64)

SetIndexInt sets a integer secondary index value

func (in *Info) SetLink(name string, bucket string, key string)

SetLink sets a link for an object

func (*Info) SetMeta

func (in *Info) SetMeta(key string, value string)

SetMeta sets a key-value pair

func (*Info) Vclock

func (in *Info) Vclock() string

Vclock is the vector clock value as a string

type Object

type Object interface {
	// Objects must maintain
	// a reference to an Info
	// struct, which contains
	// this object's riak
	// metadata. Info() must
	// never return nil, or it
	// will cause a panic.
	Info() *Info

	// Marshal should return the encoded
	// value of the object, and any
	// relevant errors.
	Marshal() ([]byte, error)

	// Unmarshal should unmarshal the object
	// from a []byte. It can safely use
	// zero-copy methods, as the byte slice
	// passed to it will "belong" to the
	// object.
	Unmarshal([]byte) error
}

Object is the interface that must be satisfied in order to fetch or store an object in Riak.

type ObjectM

type ObjectM interface {
	Duplicator

	// Merge should merge the argument object into the method receiver. It
	// is safe to type-assert the argument of Merge to the same type
	// as the type of the object satisfying the inteface. (Under the hood,
	// the argument passed to Merge is simply the value of NewEmpty() after
	// data has been read into it.) Merge is used to iteratively merge many sibling objects.
	Merge(o Object)
}

ObjectM is an object that also knows how to merge itself with siblings. If an object has this interface defined, this package will use the Merge method to transparently handle siblings returned from Riak.

type ReadOpts

type ReadOpts struct {
	R            *uint32 // number of reads
	Pr           *uint32 // number of primary replica reads
	BasicQuorum  *bool   // basic quorum required
	SloppyQuorum *bool   // sloppy quorum required
	NotfoundOk   *bool   // treat not-found as a read for 'R'
	NVal         *uint32 // 'n_val'
}

ReadOpts are read options that can be specified when doing a read operation. All of these default to the default bucket properties.

type RiakError

type RiakError struct {
	// contains filtered or unexported fields
}

RiakError is an error returned from the Riak server iteself.

func (RiakError) Error

func (r RiakError) Error() string

type WriteOpts

type WriteOpts struct {
	W  *uint32 // Required write acknowledgements
	DW *uint32 // 'Durable' (to disk) write
	PW *uint32 // Primary replica writes
}

WriteOpts are options available for all write opertations.

Image Directories

Path Synopsis
Package rpbc is a generated protocol buffer package.
Package rpbc is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL