Client & Connections

To connect to a Riak cluster, you must create a RiakClient object. The default configuration connects to a single Riak node on localhost with the default ports. The below instantiation statements are all equivalent:

from riak import RiakClient, RiakNode

RiakClient()
RiakClient(protocol='http', host='127.0.0.1', http_port=8098)
RiakClient(nodes=[{'host':'127.0.0.1','http_port':8098}])
RiakClient(protocol='http', nodes=[RiakNode()])

Note

Connections are not established until you attempt to perform an operation. If the host or port are incorrect, you will not get an error raised immediately.

The client maintains a connection pool behind the scenes, one for each protocol. Connections are opened as-needed; a random node is selected when a new connection is requested.

Client objects

class riak.client.RiakClient(protocol='pbc', transport_options={}, nodes=None, credentials=None, multiget_pool_size=None, multiput_pool_size=None, **kwargs)

The RiakClient object holds information necessary to connect to Riak. Requests can be made to Riak directly through the client or by using the methods on related objects.

Construct a new RiakClient object.

Parameters:
  • protocol (string) – the preferred protocol, defaults to ‘pbc’
  • nodes (list) – a list of node configurations, where each configuration is a dict containing the keys ‘host’, ‘http_port’, and ‘pb_port’
  • transport_options (dict) – Optional key-value args to pass to the transport constructor
  • credentials (SecurityCreds or dict) – optional object of security info
  • multiget_pool_size (int) – the number of threads to use in multiget() operations. Defaults to a factor of the number of CPUs in the system
  • multiput_pool_size (int) – the number of threads to use in multiput() operations. Defaults to a factor of the number of CPUs in the system
PROTOCOLS = ['http', 'pbc']

The supported protocols

Prior to Riak 2.0 the 'https' protocol was also an option, but now secure connections are handled by the Security feature.

protocol

Which protocol to prefer, one of PROTOCOLS. Please note that when one protocol is selected, the other protocols MAY NOT attempt to connect. Changing to another protocol will cause a connection on the next request.

Some requests are only valid over 'http', and will always be sent via those transports, regardless of which protocol is preferred.

client_id

The client ID for this client instance

resolver

The sibling-resolution function for this client. Defaults to riak.resolver.default_resolver().

nodes

The list of nodes that this client will connect to. It is best not to modify this property directly, as it is not thread-safe.

Nodes

The nodes attribute of RiakClient objects is a list of RiakNode objects. If you include multiple host specifications in the RiakClient constructor, they will be turned into this type.

class riak.node.RiakNode(host='127.0.0.1', http_port=8098, pb_port=8087, **unused_args)

The internal representation of a Riak node to which the client can connect. Encapsulates both the configuration for the node and error tracking used for node-selection.

Creates a node.

Parameters:
  • host (string) – an IP address or hostname
  • http_port (integer) – the HTTP port of the node
  • pb_port (integer) – the Protcol Buffers port of the node

Retry logic

Some operations that fail because of network errors or Riak node failure may be safely retried on another node, and the client will do so automatically. The items below can be used to configure this behavior.

RiakClient.retries

The number of times retryable operations will be attempted before raising an exception to the caller. Defaults to 3.

Note:This is a thread-local for safety and operation-specific modification. To change the default globally, modify riak.client.transport.DEFAULT_RETRY_COUNT.
RiakClient.retry_count(retries)

Modifies the number of retries for the scope of the with statement (in the current thread).

Example:

with client.retry_count(10):
    client.ping()
riak.client.transport.DEFAULT_RETRY_COUNT = 3

The default (global) number of times to retry requests that are retryable. This can be modified locally, per-thread, via the RiakClient.retries property, or using the RiakClient.retry_count method in a with statement.

Client-level Operations

Some operations are not scoped by buckets or bucket types and can be performed on the client directly:

RiakClient.ping()

Check if the Riak server for this RiakClient instance is alive.

Note

This request is automatically retried retries times if it fails due to network error.

Return type:boolean
RiakClient.get_buckets(bucket_type=None, timeout=None)

Get the list of buckets as RiakBucket instances.

Warning

Do not use this in production, as it requires traversing through all keys stored in a cluster.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket_type (BucketType) – the optional containing bucket type
  • timeout (int) – a timeout value in milliseconds
Return type:

list of RiakBucket instances

RiakClient.stream_buckets(bucket_type=None, timeout=None)

Streams the list of buckets. This is a generator method that should be iterated over.

Warning

Do not use this in production, as it requires traversing through all keys stored in a cluster.

The caller should explicitly close the returned iterator, either using contextlib.closing() or calling close() explicitly. Consuming the entire iterator will also close the stream. If it does not, the associated connection might not be returned to the pool. Example:

from contextlib import closing

# Using contextlib.closing
with closing(client.stream_buckets()) as buckets:
    for bucket_list in buckets:
        do_something(bucket_list)

# Explicit close()
stream = client.stream_buckets()
for bucket_list in stream:
     do_something(bucket_list)
stream.close()
Parameters:
  • bucket_type (BucketType) – the optional containing bucket type
  • timeout (int) – a timeout value in milliseconds
Return type:

iterator that yields lists of RiakBucket instances

Accessing Bucket Types and Buckets

Most client operations are on bucket type objects, the bucket objects they contain or keys within those buckets. Use the bucket_type or bucket methods for creating bucket types and buckets that will proxy operations to the called client.

RiakClient.bucket_type(name)

Gets the bucket-type by the specified name. Bucket-types do not always exist (unlike buckets), but this will always return a BucketType object.

Parameters:name (str) – the bucket-type name
Return type:BucketType
RiakClient.bucket(name, bucket_type='default')

Get the bucket by the specified name. Since buckets always exist, this will always return a RiakBucket.

If you are using a bucket that is contained in a bucket type, it is preferable to access it from the bucket type object:

# Preferred:
client.bucket_type("foo").bucket("bar")

# Equivalent, but not preferred:
client.bucket("bar", bucket_type="foo")
Parameters:
  • name (str) – the bucket name
  • bucket_type (BucketType or str) – the parent bucket-type
Return type:

RiakBucket

Bucket Type Operations

RiakClient.get_bucket_type_props(bucket_type)

Fetches properties for the given bucket-type.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:bucket_type (BucketType) – the bucket-type whose properties will be fetched
Return type:dict
RiakClient.set_bucket_type_props(bucket_type, props)

Sets properties for the given bucket-type.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket_type (BucketType) – the bucket-type whose properties will be set
  • props (dict) – the properties to set

Bucket Operations

RiakClient.get_bucket_props(bucket)

Fetches bucket properties for the given bucket.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:bucket (RiakBucket) – the bucket whose properties will be fetched
Return type:dict
RiakClient.set_bucket_props(bucket, props)

Sets bucket properties for the given bucket.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket (RiakBucket) – the bucket whose properties will be set
  • props (dict) – the properties to set
RiakClient.clear_bucket_props(bucket)

Resets bucket properties for the given bucket.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:bucket (RiakBucket) – the bucket whose properties will be set
RiakClient.get_keys(bucket, timeout=None)

Lists all keys in a bucket.

Warning

Do not use this in production, as it requires traversing through all keys stored in a cluster.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket (RiakBucket) – the bucket whose keys are fetched
  • timeout (int) – a timeout value in milliseconds
Return type:

list

RiakClient.stream_keys(bucket, timeout=None)

Lists all keys in a bucket via a stream. This is a generator method which should be iterated over.

Warning

Do not use this in production, as it requires traversing through all keys stored in a cluster.

The caller should explicitly close the returned iterator, either using contextlib.closing() or calling close() explicitly. Consuming the entire iterator will also close the stream. If it does not, the associated connection might not be returned to the pool. Example:

from contextlib import closing

# Using contextlib.closing
with closing(client.stream_keys(mybucket)) as keys:
    for key_list in keys:
        do_something(key_list)

# Explicit close()
stream = client.stream_keys(mybucket)
for key_list in stream:
     do_something(key_list)
stream.close()
Parameters:
  • bucket (RiakBucket) – the bucket whose properties will be set
  • timeout (int) – a timeout value in milliseconds
Return type:

iterator

Key-level Operations

RiakClient.get(robj, r=None, pr=None, timeout=None)

Fetches the contents of a Riak object.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • robj (RiakObject) – the object to fetch
  • r (integer, string, None) – the read quorum
  • pr (integer, string, None) – the primary read quorum
  • timeout (int) – a timeout value in milliseconds
  • basic_quorum (bool) – whether to use the “basic quorum” policy for not-founds
  • notfound_ok (bool) – whether to treat not-found responses as successful
  • head_only (bool) – whether to fetch without value, so only metadata (only available on PB transport)
RiakClient.put(robj, w=None, dw=None, pw=None, return_body=None, if_none_match=None, timeout=None)

Stores an object in the Riak cluster.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • robj (RiakObject) – the object to store
  • w (integer, string, None) – the write quorum
  • dw (integer, string, None) – the durable write quorum
  • pw (integer, string, None) – the primary write quorum
  • return_body (boolean) – whether to return the resulting object after the write
  • if_none_match (boolean) – whether to fail the write if the object exists
  • timeout (int) – a timeout value in milliseconds
RiakClient.delete(robj, rw=None, r=None, w=None, dw=None, pr=None, pw=None, timeout=None)

Deletes an object from Riak.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • robj (RiakObject) – the object to delete
  • rw (integer, string, None) – the read/write (delete) quorum
  • r (integer, string, None) – the read quorum
  • pr (integer, string, None) – the primary read quorum
  • w (integer, string, None) – the write quorum
  • dw (integer, string, None) – the durable write quorum
  • pw (integer, string, None) – the primary write quorum
  • timeout (int) – a timeout value in milliseconds
RiakClient.multiget(pairs, **params)

Fetches many keys in parallel via threads.

Parameters:
  • pairs (list) – list of bucket_type/bucket/key tuple triples
  • params (dict) – additional request flags, e.g. r, pr
Return type:

list of RiakObjects, Datatypes, or tuples of bucket_type, bucket, key, and the exception raised on fetch

RiakClient.fetch_datatype(bucket, key, r=None, pr=None, basic_quorum=None, notfound_ok=None, timeout=None, include_context=None)

Fetches the value of a Riak Datatype.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket (RiakBucket) – the bucket of the datatype, which must belong to a BucketType
  • key (string) – the key of the datatype
  • r (integer, string, None) – the read quorum
  • pr (integer, string, None) – the primary read quorum
  • basic_quorum (bool, None) – whether to use the “basic quorum” policy for not-founds
  • notfound_ok (bool, None) – whether to treat not-found responses as successful
  • timeout (int, None) – a timeout value in milliseconds
  • include_context (bool, None) – whether to return the opaque context as well as the value, which is useful for removal operations on sets and maps
Return type:

Datatype

RiakClient.update_datatype(datatype, w=None, dw=None, pw=None, return_body=None, timeout=None, include_context=None)

Sends an update to a Riak Datatype to the server. This operation is not idempotent and so will not be retried automatically.

Parameters:
  • datatype (Datatype) – the datatype with pending updates
  • w (integer, string, None) – the write quorum
  • dw (integer, string, None) – the durable write quorum
  • pw (integer, string, None) – the primary write quorum
  • timeout (int) – a timeout value in milliseconds
  • include_context (bool) – whether to return the opaque context as well as the value, which is useful for removal operations on sets and maps
Return type:

tuple of datatype, opaque value and opaque context

Timeseries Operations

RiakClient.ts_describe(table)

Retrieve a time series table description from the Riak cluster.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:table (string or Table) – The timeseries table.
Return type:TsObject
RiakClient.ts_get(table, key)

Retrieve timeseries value by key

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • table (string or Table) – The timeseries table.
  • key (list) – The timeseries value’s key.
Return type:

TsObject

RiakClient.ts_put(tsobj)

Stores time series data in the Riak cluster.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:tsobj (RiakTsObject) – the time series object to store
Return type:boolean
RiakClient.ts_delete(table, key)

Delete timeseries value by key

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • table (string or Table) – The timeseries table.
  • key (list or dict) – The timeseries value’s key.
Return type:

boolean

RiakClient.ts_query(table, query, interpolations=None)

Queries time series data in the Riak cluster.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • table (string or Table) – The timeseries table.
  • query (string) – The timeseries query.
Return type:

TsObject

RiakClient.ts_stream_keys(table, timeout=None)

Lists all keys in a time series table via a stream. This is a generator method which should be iterated over.

The caller should explicitly close the returned iterator, either using contextlib.closing() or calling close() explicitly. Consuming the entire iterator will also close the stream. If it does not, the associated connection might not be returned to the pool. Example:

from contextlib import closing

# Using contextlib.closing
with closing(client.ts_stream_keys(mytable)) as keys:
    for key_list in keys:
        do_something(key_list)

# Explicit close()
stream = client.ts_stream_keys(mytable)
for key_list in stream:
     do_something(key_list)
stream.close()
Parameters:
  • table (string or Table) – the table from which to stream keys
  • timeout (int) – a timeout value in milliseconds
Return type:

iterator

Query Operations

RiakClient.mapred(inputs, query, timeout)

Executes a MapReduce query.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • inputs (list, dict) – the input list/structure
  • query (list) – the list of query phases
  • timeout (integer, None) – the query timeout
Return type:

mixed

RiakClient.stream_mapred(inputs, query, timeout)

Streams a MapReduce query as (phase, data) pairs. This is a generator method which should be iterated over.

The caller should explicitly close the returned iterator, either using contextlib.closing() or calling close() explicitly. Consuming the entire iterator will also close the stream. If it does not, the associated connection might not be returned to the pool. Example:

from contextlib import closing

# Using contextlib.closing
with closing(mymapred.stream()) as results:
    for phase, result in results:
        do_something(phase, result)

# Explicit close()
stream = mymapred.stream()
for phase, result in stream:
     do_something(phase, result)
stream.close()
Parameters:
  • inputs (list, dict) – the input list/structure
  • query (list) – the list of query phases
  • timeout (integer, None) – the query timeout
Return type:

iterator

RiakClient.get_index(bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None, term_regex=None)

Queries a secondary index, returning matching keys.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket (RiakBucket) – the bucket whose index will be queried
  • index (string) – the index to query
  • startkey (string, integer) – the sole key to query, or beginning of the query range
  • endkey (string, integer) – the end of the query range (optional if equality)
  • return_terms (boolean) – whether to include the secondary index value
  • max_results (integer) – the maximum number of results to return (page size)
  • continuation (string) – the opaque continuation returned from a previous paginated request
  • timeout (int) – a timeout value in milliseconds, or ‘infinity’
  • term_regex (string) – a regular expression used to filter index terms
Return type:

IndexPage

RiakClient.stream_index(bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None, term_regex=None)

Queries a secondary index, streaming matching keys through an iterator.

The caller should explicitly close the returned iterator, either using contextlib.closing() or calling close() explicitly. Consuming the entire iterator will also close the stream. If it does not, the associated connection might not be returned to the pool. Example:

from contextlib import closing

# Using contextlib.closing
with closing(client.stream_index(mybucket, 'name_bin',
                                 'Smith')) as index:
    for key in index:
        do_something(key)

# Explicit close()
stream = client.stream_index(mybucket, 'name_bin', 'Smith')
for key in stream:
     do_something(key)
stream.close()
Parameters:
  • bucket (RiakBucket) – the bucket whose index will be queried
  • index (string) – the index to query
  • startkey (string, integer) – the sole key to query, or beginning of the query range
  • endkey (string, integer) – the end of the query range (optional if equality)
  • return_terms (boolean) – whether to include the secondary index value
  • max_results (integer) – the maximum number of results to return (page size)
  • continuation (string) – the opaque continuation returned from a previous paginated request
  • timeout (int) – a timeout value in milliseconds, or ‘infinity’
  • term_regex (string) – a regular expression used to filter index terms
Return type:

IndexPage

Performs a full-text search query.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • index (string) – the bucket/index to search over
  • query (string) – the search query
  • params (dict) – additional query flags
Return type:

dict

RiakClient.paginate_index(bucket, index, startkey, endkey=None, max_results=1000, return_terms=None, continuation=None, timeout=None, term_regex=None)

Iterates over a paginated index query. This is equivalent to calling get_index() and then successively calling next_page() until all results are exhausted.

Because limiting the result set is necessary to invoke pagination, the max_results option has a default of 1000.

Parameters:
  • bucket (RiakBucket) – the bucket whose index will be queried
  • index (string) – the index to query
  • startkey (string, integer) – the sole key to query, or beginning of the query range
  • endkey (string, integer) – the end of the query range (optional if equality)
  • return_terms (boolean) – whether to include the secondary index value
  • max_results (integer) – the maximum number of results to return (page size), defaults to 1000
  • continuation (string) – the opaque continuation returned from a previous paginated request
  • timeout (int) – a timeout value in milliseconds, or ‘infinity’
  • term_regex (string) – a regular expression used to filter index terms
Return type:

generator over instances of IndexPage

RiakClient.paginate_stream_index(bucket, index, startkey, endkey=None, max_results=1000, return_terms=None, continuation=None, timeout=None, term_regex=None)

Iterates over a streaming paginated index query. This is equivalent to calling stream_index() and then successively calling next_page() until all results are exhausted.

Because limiting the result set is necessary to invoke pagination, the max_results option has a default of 1000.

The caller should explicitly close each yielded page, either using contextlib.closing() or calling close() explicitly. Consuming the entire page will also close the stream. If it does not, the associated connection might not be returned to the pool. Example:

from contextlib import closing

# Using contextlib.closing
for page in client.paginate_stream_index(mybucket, 'name_bin',
                                         'Smith'):
    with closing(page):
        for key in page:
            do_something(key)

# Explicit close()
for page in client.paginate_stream_index(mybucket, 'name_bin',
                                         'Smith'):
    for key in page:
        do_something(key)
    page.close()
Parameters:
  • bucket (RiakBucket) – the bucket whose index will be queried
  • index (string) – the index to query
  • startkey (string, integer) – the sole key to query, or beginning of the query range
  • endkey (string, integer) – the end of the query range (optional if equality)
  • return_terms (boolean) – whether to include the secondary index value
  • max_results (integer) – the maximum number of results to return (page size), defaults to 1000
  • continuation (string) – the opaque continuation returned from a previous paginated request
  • timeout (int) – a timeout value in milliseconds, or ‘infinity’
  • term_regex (string) – a regular expression used to filter index terms
Return type:

generator over instances of IndexPage

Search Maintenance Operations

RiakClient.create_search_schema(schema, content)

Creates a Solr schema of the given name and content. Content must be valid Solr schema XML.

Parameters:
  • schema (string) – the name of the schema to create
  • content (string) – the solr schema xml content
RiakClient.get_search_schema(schema)

Gets a search schema of the given name if it exists. Raises a RiakError if no such schema exists. The schema is returned as a dict with keys 'name' and 'content'.

Parameters:schema (string) – the name of the schema to get
Returns:dict
RiakClient.create_search_index(index, schema=None, n_val=None)

Create a search index of the given name, and optionally set a schema. If no schema is set, the default will be used.

Parameters:
  • index (string) – the name of the index to create
  • schema (string, None) – the schema that this index will follow
  • n_val (integer, None) – this indexes N value
  • timeout (integer, None) – optional timeout (in ms)
RiakClient.get_search_index(index)

Gets a search index of the given name if it exists, which will also return the schema. Raises a RiakError if no such schema exists. The returned dict contains keys 'name', 'schema' and 'n_val'.

Parameters:index (string) – the name of the index to create
Return type:dict
RiakClient.delete_search_index(index)

Delete the search index that matches the given name.

Parameters:index (string) – the name of the index to delete
RiakClient.list_search_indexes()

Gets all search indexes and their schemas. The returned list contains dicts with keys 'name', 'schema' and 'n_val'.

Returns:list of dicts

Serialization

The client supports automatic transformation of Riak responses into Python types if encoders and decoders are registered for the media-types. Supported by default are application/json and text/plain.

riak.client.default_encoder(obj)

Default encoder for JSON datatypes, which returns UTF-8 encoded json instead of the default bloated backslash u XXXX escaped ASCII strings.

RiakClient.get_encoder(content_type)

Get the encoding function for the provided content type.

Parameters:content_type (str) – the requested media type
Return type:function
RiakClient.set_encoder(content_type, encoder)

Set the encoding function for the provided content type.

Parameters:
  • content_type (str) – the requested media type
  • encoder (function) – an encoding function, takes a single object argument and returns encoded data
RiakClient.get_decoder(content_type)

Get the decoding function for the provided content type.

Parameters:content_type (str) – the requested media type
Return type:function
RiakClient.set_decoder(content_type, decoder)

Set the decoding function for the provided content type.

Parameters:
  • content_type (str) – the requested media type
  • decoder (function) – a decoding function, takes encoded data and returns a Python type

Deprecated Features

Legacy Counters

The first Data Type introduced in Riak 1.4 were counters. These pre-date Bucket Types and the current implementation. Rather than returning objects, the counter operations act directly on the value of the counter. Legacy counters are deprecated as of Riak 2.0. Please use Counter instead.

Warning

Legacy counters are incompatible with Bucket Types.

RiakClient.get_counter(bucket, key, r=None, pr=None, basic_quorum=None, notfound_ok=None)

Gets the value of a counter.

Deprecated since version 2.1.0: (Riak 2.0) Riak 1.4-style counters are deprecated in favor of the Counter datatype.

Note

This request is automatically retried retries times if it fails due to network error.

Parameters:
  • bucket (RiakBucket) – the bucket of the counter
  • key (string) – the key of the counter
  • r (integer, string, None) – the read quorum
  • pr (integer, string, None) – the primary read quorum
  • basic_quorum (bool) – whether to use the “basic quorum” policy for not-founds
  • notfound_ok (bool) – whether to treat not-found responses as successful
Return type:

integer

RiakClient.update_counter(bucket, key, value, w=None, dw=None, pw=None, returnvalue=False)

Deprecated since version 2.1.0: (Riak 2.0) Riak 1.4-style counters are deprecated in favor of the Counter datatype.

Updates a counter by the given value. This operation is not idempotent and so should not be retried automatically.

Parameters:
  • bucket (RiakBucket) – the bucket of the counter
  • key (string) – the key of the counter
  • value (integer) – the amount to increment or decrement
  • w (integer, string, None) – the write quorum
  • dw (integer, string, None) – the durable write quorum
  • pw (integer, string, None) – the primary write quorum
  • returnvalue (bool) – whether to return the updated value of the counter