Skip to main content

RedisBackend

Redis task result store.

Attributes

AttributeTypeDescription
ResultConsumerThe class used to consume task results from Redis.
redismodule:pypi:redis client module.
connection_class_ssl= redis.SSLConnectionThe Redis connection class used when SSL is enabled, defaulting to redis.SSLConnection.
max_connectionsint = nullMaximum number of connections in the pool.
supports_autoexpireboolean = trueIndicates that the backend supports automatic expiration of keys via Redis EXPIRE commands.
supports_native_joinboolean = trueIndicates that the backend supports native join operations for chords.

Constructor

Signature

def RedisBackend(
host: string = null,
port: int = null,
db: int = null,
password: string = null,
max_connections: int = null,
url: string = null,
connection_pool: redis.ConnectionPool = null,
**kwargs: dict
) - > null

Parameters

NameTypeDescription
hoststring = nullThe Redis server host. Can also be a connection URL.
portint = nullThe port number for the Redis server.
dbint = nullThe Redis database index.
passwordstring = nullThe password for Redis authentication.
max_connectionsint = nullMaximum number of connections in the pool.
urlstring = nullA full Redis connection URL.
connection_poolredis.ConnectionPool = nullAn existing Redis connection pool instance.
**kwargsdictAdditional keyword arguments passed to the base class.

Methods


exception_safe_to_retry()

@classmethod
def exception_safe_to_retry(
exc: Exception
) - > boolean

Determines if a raised exception is a connection-related error that can be safely retried.

Parameters

NameTypeDescription
excExceptionThe exception instance to evaluate

Returns

TypeDescription
booleanTrue if the exception is in the known connection error classes, False otherwise

retry_policy()

@classmethod
def retry_policy() - > dict

Retrieves the retry configuration for Redis operations, merging base policies with transport-specific options.

Returns

TypeDescription
dictA dictionary containing retry settings like max_retries and intervals

on_task_call()

@classmethod
def on_task_call(
producer: object,
task_id: string
)

Initiates result consumption for a specific task ID if the current context allows non-blocking joins.

Parameters

NameTypeDescription
producerobjectThe producer instance initiating the task call
task_idstringThe unique identifier of the task to start consuming

get()

@classmethod
def get(
key: string
) - > bytes

Fetches the raw value associated with a specific key from the Redis store.

Parameters

NameTypeDescription
keystringThe Redis key to retrieve

Returns

TypeDescription
bytesThe raw data stored at the key, or None if the key does not exist

mget()

@classmethod
def mget(
keys: list
) - > list

Fetches multiple values from Redis in a single batch operation.

Parameters

NameTypeDescription
keyslistA list of Redis keys to retrieve

Returns

TypeDescription
listA list of raw values corresponding to the provided keys

ensure()

@classmethod
def ensure(
fun: callable,
args: tuple
) - > any

Executes a function with a retry policy to handle transient connection errors.

Parameters

NameTypeDescription
funcallableThe function to execute with retry logic
argstuplePositional arguments to pass to the function

Returns

TypeDescription
anyThe return value of the executed function

on_connection_error()

@classmethod
def on_connection_error(
max_retries: int,
exc: Exception,
intervals: iterator,
retries: int
) - > float

Logs a connection loss and calculates the next retry interval during a retry loop.

Parameters

NameTypeDescription
max_retriesintThe maximum number of allowed retry attempts
excExceptionThe connection error that triggered this handler
intervalsiteratorAn iterator providing the sequence of wait times between retries
retriesintThe current number of retry attempts already made

Returns

TypeDescription
floatThe number of seconds to wait before the next retry attempt

set()

@classmethod
def set(
key: string,
value: string
) - > any

Stores a value in Redis with retry logic, ensuring the value does not exceed the maximum allowed size.

Parameters

NameTypeDescription
keystringThe Redis key where the value will be stored
valuestringThe data to store; must be within Redis string size limits

Returns

TypeDescription
anyThe result of the underlying _set operation

forget()

@classmethod
def forget(
task_id: string
)

Removes a task's result from the store and cancels any active result consumption for that task.

Parameters

NameTypeDescription
task_idstringThe unique identifier of the task to forget

delete()

@classmethod
def delete(
key: string
)

Deletes a specific key from the Redis database.

Parameters

NameTypeDescription
keystringThe Redis key to remove

incr()

@classmethod
def incr(
key: string
) - > int

Increments the integer value of a key by one.

Parameters

NameTypeDescription
keystringThe Redis key to increment

Returns

TypeDescription
intThe new value of the key after incrementing

expire()

@classmethod
def expire(
key: string,
value: int
)

Sets a time-to-live (TTL) expiration on a specific Redis key.

Parameters

NameTypeDescription
keystringThe Redis key to expire
valueintThe number of seconds until the key expires

add_to_chord()

@classmethod
def add_to_chord(
group_id: string,
result: any
)

Increments the counter tracking the number of completed parts in a chord group.

Parameters

NameTypeDescription
group_idstringThe unique identifier for the chord group
resultanyThe result of the individual task part (unused in this implementation)

set_chord_size()

@classmethod
def set_chord_size(
group_id: string,
chord_size: int
)

Stores the total number of expected tasks in a chord group to track completion.

Parameters

NameTypeDescription
group_idstringThe unique identifier for the chord group
chord_sizeintThe total number of tasks in the chord header

apply_chord()

@classmethod
def apply_chord(
header_result_args: tuple,
body: any
)

Saves the chord header metadata if it contains complex nested group results to preserve structure.

Parameters

NameTypeDescription
header_result_argstupleArguments used to reconstruct the GroupResult for the chord header
bodyanyThe callback task to be executed when the chord is complete

on_chord_part_return()

@classmethod
def on_chord_part_return(
request: object,
state: string,
result: any
)

Handles the completion of a single chord task, updating the join state and triggering the callback if all parts are ready.

Parameters

NameTypeDescription
requestobjectThe task request object containing group and chord metadata
statestringThe execution state of the completed task part
resultanyThe return value of the completed task part

ConnectionPool()

@classmethod
def ConnectionPool() - > type

Property that returns the connection pool class, defaulting to the standard redis.ConnectionPool if not specified.

Returns

TypeDescription
typeThe class used to instantiate connection pools

client()

@classmethod
def client() - > redis.StrictRedis

Property that provides a cached Redis client instance for performing operations.

Returns

TypeDescription
redis.StrictRedisThe active Redis client for this backend