这篇博客的起源是一位内部用户询问,Azure Cache for Redis 有没有 .pem 或者 .cem 的证书,然后可以在客户端指定它:

import redis

try:
    conn = redis.StrictRedis(
        host='<cache-name>.redis.cache.windows.net',
        port=6380,
        password='your password here',
        ssl=True,
        ssl_ca_certs='path to cert')
    print (conn)
    conn.ping()
    print ('Connected!')
except Exception as ex:
    print ('Error:', ex)
    exit('Failed to connect, terminating.')

在 Azure 的官方文档上,有这么一句:

Redis server does not natively support SSL, but Azure Cache for Redis does. If you are connecting to Azure Cache for Redis and your client supports SSL, like StackExchange.Redis, then you should use SSL.

Redis 并不支持 SSL,但 Azure Cache for Redis 可以。

关于 SSL 在 Redis 上的始先,争论由来已久。这个 issue Redis SSL support,最后无疾而终。官网的 Redis Encryption 也是建议引入一个中间层来解决 SSL 的问题。我相信 Azure Cache 也是这么干的。

SSL 的单向认证和双向认证

所以,才有这篇文档 Quickstart: Use Azure Cache for Redis with Python 中的 ssl = True.

import redis
r = redis.StrictRedis(host='<Your Host Name>.redis.cache.windows.net',
        port=6380, db=0, password='<Your Access Key>', ssl=True)
r.set('foo', 'bar') # True
r.get('foo') #b'bar'

既然 ssl = True 已经开启了 SSL,那 ssl_ca_certs 这个参数是做什么的呢?

这就要说到 SSL 协议的加密过程了。SSL协议即用到了对称加密也用到了非对称加密(公钥加密),在建立传输链路时,SSL首先对对称加密的密钥使用公钥进行非对称加密,链路建立好之后,SSL对传输内容使用对称加密。

下图来自 SSL单向认证和双向认证交互流程

没有设置 ssl_ca_certs 参数的时候,采用的是 SSL 单向认证,即客户端会验证服务器的身份,服务器不会验证客户端。这种情况下,客户端也就不用提交自己的证书,任何用户都可以访问服务器。

设置 ssl_ca_certs 的情况下,采用 SSL 双向认证。服务端和客户都拿都需要身份证明,服务端只能由允许的客户访问,安全性也会高一些。

Take is cheap. Show me the code.

redis-py 源码

如果设置了 ssl 参数,客户端就会新建一个 SSLConnection 连接,否则就是一个 UNIXDomainSocketConnection

class Redis(object):
    def __init__(self, host='localhost', port=6379,
                 db=0, password=None, socket_timeout=None,
                 socket_connect_timeout=None,
                 socket_keepalive=None, socket_keepalive_options=None,
                 connection_pool=None, unix_socket_path=None,
                 encoding='utf-8', encoding_errors='strict',
                 charset=None, errors=None,
                 decode_responses=False, retry_on_timeout=False,
                 ssl=False, ssl_keyfile=None, ssl_certfile=None,
                 ssl_cert_reqs='required', ssl_ca_certs=None,
                 max_connections=None):

        if not connection_pool:
            kwargs = {
                'db': db,
                'password': password,
                'socket_timeout': socket_timeout,
                'encoding': encoding,
                'encoding_errors': encoding_errors,
                'decode_responses': decode_responses,
                'retry_on_timeout': retry_on_timeout,
                'max_connections': max_connections
            }

            if unix_socket_path is not None:
                kwargs.update({
                    'path': unix_socket_path,
                    'connection_class': UnixDomainSocketConnection
                })
            else:
                # TCP specific options
                kwargs.update({
                    'host': host,
                    'port': port,
                    'socket_connect_timeout': socket_connect_timeout,
                    'socket_keepalive': socket_keepalive,
                    'socket_keepalive_options': socket_keepalive_options,
                })

                if ssl:
                    kwargs.update({
                        'connection_class': SSLConnection,
                        'ssl_keyfile': ssl_keyfile,
                        'ssl_certfile': ssl_certfile,
                        'ssl_cert_reqs': ssl_cert_reqs,
                        'ssl_ca_certs': ssl_ca_certs,
                    })

             connection_pool = ConnectionPool(**kwargs)
        self.connection_pool = connection_pool

这里暂且跳过连接池的部分,直接关注 SSLConnection. 如果 import ssl 失败,直接抛出异常。否则,就会创建一个安全的 Socket。

try:
    import ssl
    ssl_available = True
except ImportError:
    ssl_available = False

class SSLConnection(Connection):
    def __init__(self, ssl_keyfile=None, ssl_certfile=None,
                 ssl_cert_reqs='required', ssl_ca_certs=None, **kwargs):
        if not ssl_available:
            raise RedisError("Python wasn't built with SSL support")

        super(SSLConnection, self).__init__(**kwargs)

        self.keyfile = ssl_keyfile
        self.certfile = ssl_certfile
        # ...
        self.cert_reqs = ssl_cert_reqs
        self.ca_certs = ssl_ca_certs

    def _connect(self):
        "Wrap the socket with SSL support"
        sock = super(SSLConnection, self)._connect()
        sock = ssl.wrap_socket(sock,
                               cert_reqs=self.cert_reqs,
                               keyfile=self.keyfile,
                               certfile=self.certfile,
                               ca_certs=self.ca_certs)
        return sock

_connect() 这个函数创建了一个 TCP 连接。

class Connection(object):
    def _connect(self):
        err = None
        for res in socket.getaddrinfo(self.host, self.port, self.socket_type,
                                      socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            sock = None
            try:
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                # TCP_KEEPALIVE
                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in iteritems(self.socket_keepalive_options):
                        sock.setsockopt(socket.SOL_TCP, k, v)

                # set the socket_connect_timeout before we connect
                sock.settimeout(self.socket_connect_timeout)

                # connect
                sock.connect(socket_address)

                # set the socket_timeout now that we're connected
                sock.settimeout(self.socket_timeout)
                return sock

            except socket.error as _:
                err = _
                if sock is not None:
                    sock.close()

        if err is not None:
            raise err
        raise socket.error("socket.getaddrinfo returned an empty list")

wrap_socket 给这个 Socket 包了一层 SSL 协议。这个函数已经到达 Python 内置库,咱们继续。

Python SSL

ssl.py 这个文件中,有两个重要的类:SSLContextSSLSocketwrap_socket 做的事情就是填充 SSLContext,然后返回一个 SSLSocket

def wrap_socket(sock, keyfile=None, certfile=None,
                server_side=False, cert_reqs=CERT_NONE,
                ssl_version=PROTOCOL_TLS, ca_certs=None,
                do_handshake_on_connect=True,
                suppress_ragged_eofs=True,
                ciphers=None):

    if server_side and not certfile:
        raise ValueError("certfile must be specified for server-side "
                         "operations")
    if keyfile and not certfile:
        raise ValueError("certfile must be specified")
    context = SSLContext(ssl_version)
    context.verify_mode = cert_reqs
    if ca_certs:
        context.load_verify_locations(ca_certs)
    if certfile:
        context.load_cert_chain(certfile, keyfile)
    if ciphers:
        context.set_ciphers(ciphers)
    return context.wrap_socket(
        sock=sock, server_side=server_side,
        do_handshake_on_connect=do_handshake_on_connect,
        suppress_ragged_eofs=suppress_ragged_eofs
    )

SSLContext 也是调用 SSLSocket_create 工厂函数来创建对象。

class SSLContext(_SSLContext):
    def wrap_socket(self, sock, server_side=False,
                    do_handshake_on_connect=True,
                    suppress_ragged_eofs=True,
                    server_hostname=None, session=None):
        # SSLSocket class handles server_hostname encoding before it calls
        # ctx._wrap_socket()
        return self.sslsocket_class._create(
            sock=sock,
            server_side=server_side,
            do_handshake_on_connect=do_handshake_on_connect,
            suppress_ragged_eofs=suppress_ragged_eofs,
            server_hostname=server_hostname,
            context=self,
            session=session
        )

到了 SSLSocket 里, 最重要的就是握手过程,这是由 do_handshake() 函数实现的。握手的过程,图解SSL/TLS协议 有图文并茂的展示。