A typical example of using Collection Pools in redis-py is:
pool = redis.ConnectionPool(host='localhost', port=6379, db=0)
r = redis.Redis(connection_pool=pool)
Create a pool
By default, each Redis instance you create will in turn create its own connection pool. You can override this behavior and use an existing connection pool by passing an already created connection pool instance to the connection_pool
argument of the Redis class, just like the above example does.
class Redis(object):
def __init__(self, host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs='required', ssl_ca_certs=None,
max_connections=None):
if not connection_pool:
# ...
connection_pool = ConnectionPool(**kwargs)
self.connection_pool = c
Each command execution calls execute_command
method, here is what it looks like:
Attention: if the execution throws a
ConnectionError
orTimeError
, this connection will be returned to the pool.
class Redis(object):
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection)
Uh hum, pool.get_connection
. Let’s see what it does.
class ConnectionPool(object):
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection
The logic is very obvious:
- Get a connection from the pool if available.
- Make a new connection if not.
- Apppend the connection to
_in_use_connections
.
_available_connection
is a list (container) for connections, and max_connections
specifies the limit of this pool.
class ConnectionPool(object):
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset()
def reset(self):
self.pid = os.getpid()
self._created_connections = 0
self._available_connections = []
self._in_use_connections = set()
self._check_lock = threading.Lock()
Create a connection
By default, TCP connections are created unless connection_class
is specified. Use redis.UnixDomainSocketConnection
for unix sockets.
def make_connection(self):
"Create a new connection"
if self._created_connections >= self.max_connections:
raise ConnectionError("Too many connections")
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)
Release a connection
A connection will be given back to the pool (_available_connections
) if it is released, also it will be removed fron _in_use_connections
set.
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)
Someone may wonder what’s the meaning of _in_use_connections
if we just want to record the number of connections in use? _created_connections
can do that! See this:
class ConnectionPool(object):
def disconnect(self):
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()
Aha, bingo!