Advanced Usage & Internals

This page contains documentation for aspects of library internals that you will rarely need to interact with, but are important for understanding how it works and development purposes.

Connection pool

class riak.transports.pool.Resource(obj, pool)

A member of the Pool, a container for the actual resource being pooled and a marker for whether the resource is currently claimed.

Creates a new Resource, wrapping the passed object as the pooled resource.

Parameters:obj (object) – the resource to wrap
release()

Releases this resource back to the pool it came from.

claimed = None

The pool that this resource belongs to.

object = None

Whether the resource is currently in use.

pool = None

True if this Resource errored.

class riak.transports.pool.Pool

A thread-safe, reentrant resource pool, ported from the “Innertube” Ruby library. Pool should be subclassed to implement the create_resource and destroy_resource functions that are responsible for creating and cleaning up the resources in the pool, respectively. Claiming a resource of the pool for a block of code is done using a with statement on the transaction method. The transaction method also allows filtering of the pool and supplying a default value to be used as the resource if no resources are free.

Example:

from riak.transports.pool import Pool
class ListPool(Pool):
    def create_resource(self):
        return []

    def destroy_resource(self):
        # Lists don't need to be cleaned up
        pass

pool = ListPool()
with pool.transaction() as resource:
    resource.append(1)
with pool.transaction() as resource2:
    print(repr(resource2)) # should be [1]

Creates a new Pool. This should be called manually if you override the __init__() method in a subclass.

acquire(_filter=None, default=None)

Claims a resource from the pool for manual use. Resources are created as needed when all members of the pool are claimed or the pool is empty. Most of the time you will want to use transaction().

Parameters:
  • _filter (callable) – a filter that can be used to select a member of the pool
  • default – a value that will be used instead of calling create_resource() if a new resource needs to be created
Return type:

Resource

clear()

Removes all resources from the pool, calling delete_resource() with each one so that the resources are cleaned up.

create_resource()

Implemented by subclasses to allocate a new resource for use in the pool.

delete_resource(resource)

Deletes the resource from the pool and destroys the associated resource. Not usually needed by users of the pool, but called internally when BadResource is raised.

Parameters:resource (Resource) – the resource to remove
destroy_resource(obj)

Called when removing a resource from the pool so that it can be cleanly deallocated. Subclasses should implement this method if additional cleanup is needed beyond normal GC. The default implementation is a no-op.

Parameters:obj – the resource being removed
release(resource)

Returns a resource to the pool. Most of the time you will want to use transaction(), but if you use acquire(), you must release the acquired resource back to the pool when finished. Failure to do so could result in deadlock.

Parameters:resource – Resource
transaction(_filter=None, default=None)

Claims a resource from the pool for use in a thread-safe, reentrant manner (as part of a with statement). Resources are created as needed when all members of the pool are claimed or the pool is empty.

Parameters:
  • _filter (callable) – a filter that can be used to select a member of the pool
  • default – a value that will be used instead of calling create_resource() if a new resource needs to be created
  • yield_resource (boolean) – set to True to yield the Resource object itself
class riak.transports.pool.PoolIterator(pool)

Iterates over a snapshot of the pool in a thread-safe manner, eventually touching all resources that were known when the iteration started.

Note that if claimed resources are not released for long periods, the iterator may hang, waiting for those last resources to be released. The iteration and pool functionality is only meant to be used internally within the client, and resources will be claimed per client operation, making this an unlikely event (although still possible).

exception riak.transports.pool.BadResource(ex, mid_stream=False)

Users of a Pool should raise this error when the pool resource currently in-use is bad and should be removed from the pool.

Parameters:mid_stream (boolean) – did this exception happen mid-streaming op?
exception riak.transports.pool.ConnectionClosed(ex, mid_stream=False)

Users of a Pool should raise this error when the pool resource currently in-use has been closed and should be removed from the pool.

Parameters:mid_stream (boolean) – did this exception happen mid-streaming op?

Retry logic

class riak.client.transport.RiakClientTransport

Methods for RiakClient related to transport selection and retries.

_acquire()

Acquires a connection from the default pool.

_choose_pool(protocol=None)

Selects a connection pool according to the default protocol and the passed one.

Parameters:protocol (string) – the protocol to use
Return type:Pool
_transport()

Yields a single transport to the caller from the default pool, without retries. NB: no need to re-try as this method is only used by CRDT operations that should never be re-tried.

_with_retries(pool, fn)

Performs the passed function with retries against the given pool.

Parameters:
  • pool (Pool) – the connection pool to use
  • fn (function) – the function to pass a transport
retry_count(retries)

Modifies the number of retries for the scope of the with statement (in the current thread).

Example:

with client.retry_count(10):
    client.ping()
retries

The number of times retryable operations will be attempted before raising an exception to the caller. Defaults to 3.

Note:This is a thread-local for safety and operation-specific modification. To change the default globally, modify riak.client.transport.DEFAULT_RETRY_COUNT.
riak.client.transport._is_retryable(error)

Determines whether a given error is retryable according to the exceptions allowed to be retried by each transport.

Parameters:error (Exception) – the error to check
Return type:boolean
riak.client.transport.retryable(fn, protocol=None)

Wraps a client operation that can be retried according to the set RiakClient.retries. Used internally.

riak.client.transport.retryableHttpOnly(fn)

Wraps a retryable client operation that is only valid over HTTP. Used internally.

Multiget / Multiput

riak.client.multi.POOL_SIZE = 4

The default size of the worker pool, either based on the number of CPUS or defaulting to 6

class riak.client.multi.Task(client, outq, bucket_type, bucket, key, object, options)

A namedtuple for tasks that are fed to workers in the multi get pool.

class riak.client.multi.PutTask(client, outq, object, options)

A namedtuple for tasks that are fed to workers in the multi put pool.

class riak.client.multi.MultiGetPool(size=4)
_worker_method()

The body of the multi-get worker. Loops until _should_quit() returns True, taking tasks off the input queue, fetching the object, and putting them on the output queue.

riak.client.multi.multiget(client, keys, **options)

Executes a parallel-fetch across multiple threads. Returns a list containing RiakObject or Datatype instances, or 4-tuples of bucket-type, bucket, key, and the exception raised.

If a pool option is included, the request will use the given worker pool and not a transient MultiGetPool. This option will be passed by the client if the multiget_pool_size option was set on client initialization.

Parameters:
  • client (RiakClient) – the client to use
  • keys (list of three-tuples -- bucket_type/bucket/key) – the keys to fetch in parallel
  • options (dict) – request options to RiakBucket.get
Return type:

list

class riak.client.multi.MultiPutPool(size=4)
_worker_method()

The body of the multi-put worker. Loops until _should_quit() returns True, taking tasks off the input queue, storing the object, and putting the result on the output queue.

riak.client.multi.multiput(client, objs, **options)

Executes a parallel-store across multiple threads. Returns a list containing booleans or RiakObject

If a pool option is included, the request will use the given worker pool and not a transient MultiPutPool. This option will be passed by the client if the multiput_pool_size option was set on client initialization.

Parameters:
  • client (RiakClient) – the client to use
  • objs (list of RiakObject <riak.riak_object.RiakObject> or TsObject <riak.ts_object.TsObject>) – the objects to store in parallel
  • options (dict) – request options to RiakClient.put
Return type:

list

Datatypes

Datatype internals

Datatype.to_op()

Extracts the mutation operation from this datatype, if any. Each type must implement this method, returning the appropriate operation, or None if there is no queued mutation.

Datatype._check_type(new_value)

Checks that initial values of the type are appropriate. Each type must implement this method.

Return type:bool
Datatype._coerce_value(new_value)

Coerces the input value into the internal representation for the type. Datatypes may override this method.

Datatype._default_value()

Returns what the initial value of an empty datatype should be.

Datatype._post_init()

Called at the end of __init__() so that subclasses can tweak their own setup without overriding the constructor.

Datatype._require_context()

Raises an exception if the context is not present

Datatype.type_name = None

The string “name” of this datatype. Each datatype should set this.

Datatype._type_error_msg = 'Invalid value type'

The message included in the exception raised when the value is of incorrect type. See also _check_type().

TypedMapView

class riak.datatypes.map.TypedMapView(parent, datatype)

Implements a sort of view over a Map, filtered by the embedded datatype.

__contains__(key)

Determines whether the given key with this view’s datatype is in the parent Map.

__delitem__(key)

Removes the key with this view’s datatype from the parent Map.

__getitem__(key)

Fetches an item from the parent Map scoped by this view’s datatype.

Parameters:key (str) – the key of the item
Return type:Datatype
__iter__()

Iterates over all keys in the Map scoped by this view’s datatype.

__len__()

Returns the number of keys in this map scoped by this view’s datatype.

TYPES constant

riak.datatypes.TYPES = {'map': <class 'riak.datatypes.map.Map'>, 'set': <class 'riak.datatypes.set.Set'>, 'counter': <class 'riak.datatypes.counter.Counter'>, 'flag': <class 'riak.datatypes.flag.Flag'>, 'hll': <class 'riak.datatypes.hll.Hll'>, 'register': <class 'riak.datatypes.register.Register'>}

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s

(key, value) pairs
dict(iterable) -> new dictionary initialized as if via:

d = {} for k, v in iterable:

d[k] = v
dict(**kwargs) -> new dictionary initialized with the name=value pairs
in the keyword argument list. For example: dict(one=1, two=2)

Transports

class riak.transports.transport.Transport

Class to encapsulate transport details and methods. All protocol transports are subclasses of this class.

_get_index_mapred_emu(bucket, index, startkey, endkey=None)

Emulates a secondary index request via MapReduce. Used in the case where the transport supports MapReduce but has no native secondary index query capability.

_search_mapred_emu(index, query)

Emulates a search request via MapReduce. Used in the case where the transport supports MapReduce but has no native search capability.

clear_bucket_props(bucket)

Reset bucket properties to their defaults

create_search_index(index, schema=None, n_val=None, timeout=None)

Creates a yokozuna search index.

create_search_schema(schema, content)

Creates a yokozuna search schema.

delete(robj, rw=None, r=None, w=None, dw=None, pr=None, pw=None, timeout=None)

Deletes an object.

delete_search_index(index)

Deletes a yokozuna search index.

fetch_datatype(bucket, key, r=None, pr=None, basic_quorum=None, notfound_ok=None, timeout=None, include_context=None)

Fetches a Riak Datatype.

fulltext_add(index, *docs)

Adds documents to the full-text index.

fulltext_delete(index, docs=None, queries=None)

Removes documents from the full-text index.

get(robj, r=None, pr=None, timeout=None, basic_quorum=None, notfound_ok=None, head_only=False)

Fetches an object.

get_bucket_props(bucket)

Fetches properties for the given bucket.

get_bucket_type_props(bucket_type)

Fetches properties for the given bucket-type.

get_buckets(bucket_type=None, timeout=None)

Gets the list of buckets as strings.

get_client_id()

Fetch the client id for the transport.

get_counter(bucket, key, r=None, pr=None, basic_quorum=None, notfound_ok=None)

Gets the value of a counter.

get_index(bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None, term_regex=None)

Performs a secondary index query.

get_keys(bucket, timeout=None)

Lists all keys within the given bucket.

get_preflist(bucket, key)

Fetches the preflist for a bucket/key.

get_search_index(index)

Returns a yokozuna search index or None.

get_search_schema(schema)

Returns a yokozuna search schema.

list_search_indexes()

Lists all yokozuna search indexes.

classmethod make_fixed_client_id()

Returns a unique identifier for the current machine/process/thread.

classmethod make_random_client_id()

Returns a random client identifier

mapred(inputs, query, timeout=None)

Sends a MapReduce request synchronously.

ping()

Ping the remote server

put(robj, w=None, dw=None, pw=None, return_body=None, if_none_match=None, timeout=None)

Stores an object.

search(index, query, **params)

Performs a search query.

set_bucket_props(bucket, props)

Sets properties on the given bucket.

set_bucket_type_props(bucket_type, props)

Sets properties on the given bucket-type.

set_client_id(client_id)

Set the client id. This overrides the default, random client id, which is automatically generated when none is specified in when creating the transport object.

stream_buckets(bucket_type=None, timeout=None)

Streams the list of buckets through an iterator

stream_index(bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None)

Streams a secondary index query.

stream_keys(bucket, timeout=None)

Streams the list of keys for the bucket through an iterator.

stream_mapred(inputs, query, timeout=None)

Streams the results of a MapReduce request through an iterator.

ts_delete(table, key)

Deletes a timeseries object.

ts_describe(table)

Retrieves a timeseries table description.

ts_get(table, key)

Retrieves a timeseries object.

ts_put(tsobj)

Stores a timeseries object.

ts_query(table, query, interpolations=None)

Query timeseries data.

ts_stream_keys(table, timeout=None)

Streams the list of keys for the table through an iterator.

update_counter(bucket, key, value, w=None, dw=None, pw=None, returnvalue=False)

Updates a counter by the given value.

update_datatype(datatype, w=None, dw=None, pw=None, return_body=None, timeout=None, include_context=None)

Updates a Riak Datatype by sending local operations to the server.

client_id

the client ID for this connection

class riak.transports.feature_detect.FeatureDetection

Implements boolean methods that can be checked for the presence of specific server-side features. Subclasses must implement the _server_version() method to use this functionality, which should return the server’s version as a string.

FeatureDetection is a parent class of Transport.

_server_version()

Gets the server version from the server. To be implemented by the individual transport class.

Return type:string
bucket_stream()

Whether streaming bucket lists are supported.

Return type:bool
bucket_types()

Whether bucket-types are supported.

Return type:bool
client_timeouts()

Whether client-supplied timeouts are supported.

Return type:bool
counters()

Whether CRDT counters are supported.

Return type:bool
datatypes()

Whether datatypes are supported.

Return type:bool
index_term_regex()

Whether secondary indexes supports a regexp term filter.

Return type:bool
pb_all_bucket_props()

Whether all normal bucket properties are supported over Protocol Buffers.

Return type:bool
pb_clear_bucket_props()

Whether bucket properties can be cleared over Protocol Buffers.

Return type:bool
pb_conditionals()

Whether conditional fetch/store semantics are supported over Protocol Buffers

Return type:bool
pb_head()

Whether partial-fetches (vclock and metadata only) are supported over Protocol Buffers

Return type:bool
pb_indexes()

Whether secondary index queries are supported over Protocol Buffers

Return type:bool

Whether search queries are supported over Protocol Buffers

Return type:bool
pb_search_admin()

Whether search administration is supported over Protocol Buffers

Return type:bool
phaseless_mapred()

Whether MapReduce requests can be submitted without phases.

Return type:bool
preflists()

Whether bucket/key preflists are supported.

Return type:bool
quorum_controls()

Whether additional quorums and FSM controls are available, e.g. primary quorums, basic_quorum, notfound_ok

Return type:bool
stream_indexes()

Whether secondary indexes support streaming responses.

Return type:bool
tombstone_vclocks()

Whether ‘not found’ responses might include vclocks

Return type:bool
write_once()

Whether write-once operations are supported.

Return type:bool

Security helpers

riak.transports.security.verify_cb(conn, cert, errnum, depth, ok)

The default OpenSSL certificate verification callback.

HTTP Transport

class riak.transports.http.HttpPool(client, **options)

A pool of HTTP(S) transport connections.

riak.transports.http.is_retryable(err)

Determines if the given exception is something that is network/socket-related and should thus cause the HTTP connection to close and the operation retried on another node.

Return type:boolean
class riak.transports.http.HttpTransport(node=None, client=None, connection_class=<class httplib.HTTPConnection>, client_id=None, **options)

The HttpTransport object holds information necessary to connect to Riak via HTTP.

Construct a new HTTP connection to Riak.

clear_bucket_props(bucket)

reset the properties on the bucket object given

create_search_index(index, schema=None, n_val=None, timeout=None)

Create a Solr search index for Yokozuna.

Parameters:
  • index (string) – a name of a yz index
  • schema (string) – XML of Solr schema
  • n_val (int) – N value of the write
  • timeout (integer, None) – optional timeout (in ms)

:rtype boolean

create_search_schema(schema, content)

Create a new Solr schema for Yokozuna.

Parameters:
  • schema (string) – name of Solr schema
  • content (string) – actual defintion of schema (XML)

:rtype boolean

delete(robj, rw=None, r=None, w=None, dw=None, pr=None, pw=None, timeout=None)

Delete an object.

delete_search_index(index)

Fetch the specified Solr search index for Yokozuna.

Parameters:index (string) – a name of a yz index

:rtype boolean

fulltext_add(index, docs)

Adds documents to the search index.

fulltext_delete(index, docs=None, queries=None)

Removes documents from the full-text index.

get(robj, r=None, pr=None, timeout=None, basic_quorum=None, notfound_ok=None, head_only=False)

Get a bucket/key from the server

get_bucket_props(bucket)

Get properties for a bucket

get_bucket_type_props(bucket_type)

Get properties for a bucket-type

get_buckets(bucket_type=None, timeout=None)

Fetch a list of all buckets

get_index(bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None, term_regex=None)

Performs a secondary index query.

get_keys(bucket, timeout=None)

Fetch a list of keys for the bucket

get_preflist(bucket, key)

Get the preflist for a bucket/key

Parameters:
  • bucket (RiakBucket) – Riak Bucket
  • key (string) – Riak Key
Return type:

list of dicts

get_resources()

Gets a JSON mapping of server-side resource names to paths :rtype dict

get_search_index(index)

Fetch the specified Solr search index for Yokozuna.

Parameters:index (string) – a name of a yz index

:rtype string

get_search_schema(schema)

Fetch a Solr schema from Yokozuna.

Parameters:schema (string) – name of Solr schema

:rtype dict

list_search_indexes()

Return a list of Solr search indexes from Yokozuna.

:rtype list of dicts

mapred(inputs, query, timeout=None)

Run a MapReduce query.

ping()

Check server is alive over HTTP

put(robj, w=None, dw=None, pw=None, return_body=True, if_none_match=False, timeout=None)

Puts a (possibly new) object.

search(index, query, **params)

Performs a search query.

set_bucket_props(bucket, props)

Set the properties on the bucket object given

set_bucket_type_props(bucket_type, props)

Set the properties on the bucket-type

stats()

Gets performance statistics and server information

stream_buckets(bucket_type=None, timeout=None)

Stream list of buckets through an iterator

stream_index(bucket, index, startkey, endkey=None, return_terms=None, max_results=None, continuation=None, timeout=None, term_regex=None)

Streams a secondary index query.

TCP Transport

class riak.transports.tcp.TcpPool(client, **options)

A resource pool of TCP transports.

riak.transports.tcp.is_retryable(err)

Determines if the given exception is something that is network/socket-related and should thus cause the TCP connection to close and the operation retried on another node.

Return type:boolean
class riak.transports.tcp.TcpTransport(node=None, client=None, timeout=None, **kwargs)

The TcpTransport object holds a connection to the TCP socket on the Riak server.

clear_bucket_props(bucket)

Clear bucket properties, resetting them to their defaults

get(robj, r=None, pr=None, timeout=None, basic_quorum=None, notfound_ok=None, head_only=False)

Serialize get request and deserialize response

get_bucket_props(bucket)

Serialize bucket property request and deserialize response

get_bucket_type_props(bucket_type)

Fetch bucket-type properties

get_buckets(bucket_type=None, timeout=None)

Serialize bucket listing request and deserialize response

get_keys(bucket, timeout=None)

Lists all keys within a bucket.

get_preflist(bucket, key)

Get the preflist for a bucket/key

Parameters:
  • bucket (RiakBucket) – Riak Bucket
  • key (string) – Riak Key
Return type:

list of dicts

get_server_info()

Get information about the server

ping()

Ping the remote server

set_bucket_props(bucket, props)

Serialize set bucket property request and deserialize response

set_bucket_type_props(bucket_type, props)

Set bucket-type properties

stream_buckets(bucket_type=None, timeout=None)

Stream list of buckets through an iterator

stream_keys(bucket, timeout=None)

Streams keys from a bucket, returning an iterator that yields lists of keys.

ts_stream_keys(table, timeout=None)

Streams keys from a timeseries table, returning an iterator that yields lists of keys.

client_id

the client ID for this connection

Utilities

Multi-valued Dict

class riak.multidict.MultiDict(*args, **kw)

An ordered dictionary that can have multiple values for each key. Adds the methods getall, getone, mixed, and add to the normal dictionary interface.

add(key, value)

Add the key and value, not overwriting any previous value.

getall(key)

Return a list of all values matching the key (may be an empty list)

getone(key)

Get one value matching the key, raising a KeyError if multiple values were found.

mixed()

Returns a dictionary where the values are either single values, or a list of values when a key/value appears more than once in this dictionary. This is similar to the kind of dictionary often used to represent the variables in a web request.

dict_of_lists()

Returns a dictionary where each key is associated with a list of values.

Micro-benchmarking

riak.benchmark.measure()

Runs a benchmark once when used as a context manager. Example:

with riak.benchmark.measure() as b:
    with b.report("pow"):
        for _ in range(10000):
            math.pow(2,10000)
    with b.report("factorial"):
        for i in range(100):
            math.factorial(i)
riak.benchmark.measure_with_rehearsal()

Runs a benchmark when used as an iterator, injecting a garbage collection between iterations. Example:

for b in riak.benchmark.measure_with_rehearsal():
    with b.report("pow"):
        for _ in range(10000):
            math.pow(2,10000)
    with b.report("factorial"):
        for i in range(100):
            math.factorial(i)
class riak.benchmark.Benchmark(rehearse=False)

A benchmarking run, which may consist of multiple steps. See measure_with_rehearsal() and measure() for examples.

Creates a new benchmark reporter.

Parameters:rehearse (boolean) – whether to run twice to take counter the effects of garbage collection
next()

Runs the next iteration of the benchmark.

report(name)

Returns a report for the current step of the benchmark.

Miscellaneous

riak.util.quacks_like_dict(object)

Check if object is dict-like

riak.util.deep_merge(a, b)

Merge two deep dicts non-destructively

Uses a stack to avoid maximum recursion depth exceptions

>>> a = {'a': 1, 'b': {1: 1, 2: 2}, 'd': 6}
>>> b = {'c': 3, 'b': {2: 7}, 'd': {'z': [1, 2, 3]}}
>>> c = deep_merge(a, b)
>>> from pprint import pprint; pprint(c)
{'a': 1, 'b': {1: 1, 2: 7}, 'c': 3, 'd': {'z': [1, 2, 3]}}
riak.util.deprecated(message, stacklevel=3)

Prints a deprecation warning to the console.

class riak.util.lazy_property(fget)

A method decorator meant to be used for lazy evaluation and memoization of an object attribute. The property should represent immutable data, as it replaces itself on first access.

distutils commands

class commands.build_messages(dist)

Generates message code mappings. Add to the build process using:

setup(cmd_class={'build_messages': build_messages})

Create and initialize a new Command object. Most importantly, invokes the ‘initialize_options()’ method, which is the real initializer and depends on the actual command being instantiated.

finalize_options()
initialize_options()
run()
description = 'generate protocol message code mappings'
user_options = [('source=', None, 'source CSV file containing message code mappings'), ('destination=', None, 'destination Python source file')]
class commands.setup_timeseries(dist)

Creates bucket-types appropriate for timeseries.

Create and initialize a new Command object. Most importantly, invokes the ‘initialize_options()’ method, which is the real initializer and depends on the actual command being instantiated.

description = 'create bucket-types used in timeseries tests'
user_options = [('riak-admin=', None, 'path to the riak-admin script')]

Version extraction (version module)

Gets the current version number. If in a git repository, it is the current git tag. Otherwise it is the one contained in the PKG-INFO file.

To use this script, simply import it in your setup.py file and use the results of get_version() as your package version:

from version import *

setup(
    version=get_version()
)