Skip to content

Caching layer

The caching layer wraps a backing redis service and provides a simple, focused interface to its usage.

pems_data.cache

The primary caching interface for pems_data.

Cache

Basic wrapper for a cache backend.

Source code in pems_data/cache.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class Cache:
    """Basic wrapper for a cache backend."""

    @classmethod
    def build_key(cls, *args: Any) -> str:
        """Build a standard cache key from the given parts.

        Args:
            *args (tuple[Any]): The individual parts that make up the key

        Returns:
            value (str): A standard representation of the parts for use in a cache key.
        """
        return ":".join([str(a).lower() for a in args])

    def __init__(self, host: str = None, port: int = None):
        """Create a new instance of the Cache interface.

        Args:
            host (str): (Optional) The hostname of the cache backend
            port (int): (Optional) The port to connect on the cache backend
        """

        self.host = host
        self.port = port
        self.c = None

    def _connect(self) -> None:
        """Establish a connection to the cache backend if necessary."""
        if not isinstance(self.c, redis.Redis):
            self.c = redis_connection(self.host, self.port)

    def is_available(self) -> bool:
        """Return a bool indicating if the cache backend is available or not.

        Returns:
            value (bool): True if the connection and backend is available; False otherwise
        """
        self._connect()
        available = self.c and self.c.ping() is True
        logger.debug(f"cache is available: {available}")
        return available

    def get(self, key: str, mutate_func: Callable[[Any], Any] = None) -> Any | None:
        """Get a raw value from the cache, or None if the key doesn't exist.

        Args:
            key (str): The item's cache key
            mutate_func (callable): (Optional) If provided, call this on the cached value and return its result

        Returns:
            value (Any | None): The value from the cache, optionally mutated by mutate_func, or None.
        """
        if self.is_available():
            logger.debug(f"read from cache: {key}")
            value = self.c.get(key)
            if value and mutate_func:
                logger.debug(f"mutating cached value: {key}")
                return mutate_func(value)
            return value
        logger.warning(f"cache unavailable to get: {key}")
        return None

    def get_df(self, key: str) -> pd.DataFrame:
        """Get a DataFrame from the cache, or an empty DataFrame if the key wasn't found.

        Args:
            key (str): The item's cache key

        Returns:
            value (pandas.DataFrame): The DataFrame materialized from the cache, or an empty DataFrame if the key wasn't found.
        """
        return self.get(key, mutate_func=arrow_bytes_to_df)

    def set(self, key: str, value: Any, ttl: int = None, mutate_func: Callable[[Any], Any] = None) -> None:
        """Set a value in the cache, with an optional TTL (seconds until expiration).

        Args:
            key (str): The item's cache key
            value (Any): The item's value to store in the cache
            ttl (int): (Optional) Seconds until expiration
            mutate_func (callable): (Optional) If provided, call this on the value and insert the result in the cache
        """
        if self.is_available():
            if mutate_func:
                logger.debug(f"mutating value for cache: {key}")
                value = mutate_func(value)
            logger.debug(f"store in cache: {key}")
            self.c.set(key, value, ex=ttl)
        else:
            logger.warning(f"cache unavailable to set: {key}")

    def set_df(self, key: str, value: pd.DataFrame, ttl: int = None) -> None:
        """Set a DataFrame in the cache, with an optional TTL (seconds until expiration).

        Args:
            key (str): The item's cache key
            value (Any): The DataFrame to store in the cache
            ttl (int): (Optional) Seconds until expiration
        """
        self.set(key, value, ttl=ttl, mutate_func=df_to_arrow_bytes)

__init__(host=None, port=None)

Create a new instance of the Cache interface.

Parameters:

Name Type Description Default
host str

(Optional) The hostname of the cache backend

None
port int

(Optional) The port to connect on the cache backend

None
Source code in pems_data/cache.py
58
59
60
61
62
63
64
65
66
67
68
def __init__(self, host: str = None, port: int = None):
    """Create a new instance of the Cache interface.

    Args:
        host (str): (Optional) The hostname of the cache backend
        port (int): (Optional) The port to connect on the cache backend
    """

    self.host = host
    self.port = port
    self.c = None

build_key(*args) classmethod

Build a standard cache key from the given parts.

Parameters:

Name Type Description Default
*args tuple[Any]

The individual parts that make up the key

()

Returns:

Name Type Description
value str

A standard representation of the parts for use in a cache key.

Source code in pems_data/cache.py
46
47
48
49
50
51
52
53
54
55
56
@classmethod
def build_key(cls, *args: Any) -> str:
    """Build a standard cache key from the given parts.

    Args:
        *args (tuple[Any]): The individual parts that make up the key

    Returns:
        value (str): A standard representation of the parts for use in a cache key.
    """
    return ":".join([str(a).lower() for a in args])

get(key, mutate_func=None)

Get a raw value from the cache, or None if the key doesn’t exist.

Parameters:

Name Type Description Default
key str

The item’s cache key

required
mutate_func callable

(Optional) If provided, call this on the cached value and return its result

None

Returns:

Name Type Description
value Any | None

The value from the cache, optionally mutated by mutate_func, or None.

Source code in pems_data/cache.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def get(self, key: str, mutate_func: Callable[[Any], Any] = None) -> Any | None:
    """Get a raw value from the cache, or None if the key doesn't exist.

    Args:
        key (str): The item's cache key
        mutate_func (callable): (Optional) If provided, call this on the cached value and return its result

    Returns:
        value (Any | None): The value from the cache, optionally mutated by mutate_func, or None.
    """
    if self.is_available():
        logger.debug(f"read from cache: {key}")
        value = self.c.get(key)
        if value and mutate_func:
            logger.debug(f"mutating cached value: {key}")
            return mutate_func(value)
        return value
    logger.warning(f"cache unavailable to get: {key}")
    return None

get_df(key)

Get a DataFrame from the cache, or an empty DataFrame if the key wasn’t found.

Parameters:

Name Type Description Default
key str

The item’s cache key

required

Returns:

Name Type Description
value DataFrame

The DataFrame materialized from the cache, or an empty DataFrame if the key wasn’t found.

Source code in pems_data/cache.py
106
107
108
109
110
111
112
113
114
115
def get_df(self, key: str) -> pd.DataFrame:
    """Get a DataFrame from the cache, or an empty DataFrame if the key wasn't found.

    Args:
        key (str): The item's cache key

    Returns:
        value (pandas.DataFrame): The DataFrame materialized from the cache, or an empty DataFrame if the key wasn't found.
    """
    return self.get(key, mutate_func=arrow_bytes_to_df)

is_available()

Return a bool indicating if the cache backend is available or not.

Returns:

Name Type Description
value bool

True if the connection and backend is available; False otherwise

Source code in pems_data/cache.py
75
76
77
78
79
80
81
82
83
84
def is_available(self) -> bool:
    """Return a bool indicating if the cache backend is available or not.

    Returns:
        value (bool): True if the connection and backend is available; False otherwise
    """
    self._connect()
    available = self.c and self.c.ping() is True
    logger.debug(f"cache is available: {available}")
    return available

set(key, value, ttl=None, mutate_func=None)

Set a value in the cache, with an optional TTL (seconds until expiration).

Parameters:

Name Type Description Default
key str

The item’s cache key

required
value Any

The item’s value to store in the cache

required
ttl int

(Optional) Seconds until expiration

None
mutate_func callable

(Optional) If provided, call this on the value and insert the result in the cache

None
Source code in pems_data/cache.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def set(self, key: str, value: Any, ttl: int = None, mutate_func: Callable[[Any], Any] = None) -> None:
    """Set a value in the cache, with an optional TTL (seconds until expiration).

    Args:
        key (str): The item's cache key
        value (Any): The item's value to store in the cache
        ttl (int): (Optional) Seconds until expiration
        mutate_func (callable): (Optional) If provided, call this on the value and insert the result in the cache
    """
    if self.is_available():
        if mutate_func:
            logger.debug(f"mutating value for cache: {key}")
            value = mutate_func(value)
        logger.debug(f"store in cache: {key}")
        self.c.set(key, value, ex=ttl)
    else:
        logger.warning(f"cache unavailable to set: {key}")

set_df(key, value, ttl=None)

Set a DataFrame in the cache, with an optional TTL (seconds until expiration).

Parameters:

Name Type Description Default
key str

The item’s cache key

required
value Any

The DataFrame to store in the cache

required
ttl int

(Optional) Seconds until expiration

None
Source code in pems_data/cache.py
135
136
137
138
139
140
141
142
143
def set_df(self, key: str, value: pd.DataFrame, ttl: int = None) -> None:
    """Set a DataFrame in the cache, with an optional TTL (seconds until expiration).

    Args:
        key (str): The item's cache key
        value (Any): The DataFrame to store in the cache
        ttl (int): (Optional) Seconds until expiration
    """
    self.set(key, value, ttl=ttl, mutate_func=df_to_arrow_bytes)

redis_connection(host=None, port=None, **kwargs)

Try to create a new connection to a redis backend. Return None if the connection fails.

Uses the REDIS_HOSTNAME and REDIS_PORT environment variables as fallback.

Parameters:

Name Type Description Default
host str

(Optional) The redis hostname

None
port int

(Optional) The port to connect on

None

Returns:

Name Type Description
value Redis

A Redis instance connected to host:port, or None if the connection failed.

Source code in pems_data/cache.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def redis_connection(host: str = None, port: int = None, **kwargs: dict[str, Any]) -> redis.Redis | None:
    """Try to create a new connection to a redis backend. Return None if the connection fails.

    Uses the `REDIS_HOSTNAME` and `REDIS_PORT` environment variables as fallback.

    Args:
        host (str): (Optional) The redis hostname
        port (int): (Optional) The port to connect on

    Returns:
        value (redis.Redis): A Redis instance connected to `host:port`, or None if the connection failed.
    """

    host = host or os.environ.get("REDIS_HOSTNAME", "redis")
    port = int(port or os.environ.get("REDIS_PORT", "6379"))

    logger.debug(f"connecting to redis @ {host}:{port}")

    kwargs["host"] = host
    kwargs["port"] = port

    try:
        return redis.Redis(**kwargs)
    except redis.ConnectionError as ce:
        logger.error(f"connection failed for redis @ {host}:{port}", exc_info=ce)
        return None

pems_data.serialization

Helpers for serializing specific data types e.g. for storing/retrieving from a cache.

arrow_bytes_to_df(arrow_buffer)

Deserializes Arrow IPC format bytes back to a DataFrame.

Parameters:

Name Type Description Default
arrow_buffer bytes

A buffer of Arrow IPC bytes representing a DataFrame

required

Returns:

Name Type Description
value DataFrame

The DataFrame deserialized from the buffer, or an empty DataFrame if the buffer was empty.

Source code in pems_data/serialization.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def arrow_bytes_to_df(arrow_buffer: bytes) -> pd.DataFrame:
    """Deserializes Arrow IPC format bytes back to a DataFrame.

    Args:
        arrow_buffer (bytes): A buffer of Arrow IPC bytes representing a DataFrame

    Returns:
        value (pandas.DataFrame): The DataFrame deserialized from the buffer, or an empty DataFrame if the buffer was empty.
    """
    if not arrow_buffer:
        return pd.DataFrame()
    # deserialize the Arrow IPC stream
    with pa.BufferReader(arrow_buffer) as buffer:
        # the reader reconstructs the Arrow Table from the buffer
        reader = ipc.RecordBatchStreamReader(buffer)
        arrow_table = reader.read_all()
    return arrow_table.to_pandas()

df_to_arrow_bytes(df)

Serializes a DataFrame to Arrow IPC format bytes.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to serialize

required

Returns:

Name Type Description
value bytes

The Arrow IPC format bytes representation of the DataFrame.

Source code in pems_data/serialization.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def df_to_arrow_bytes(df: pd.DataFrame) -> bytes:
    """Serializes a DataFrame to Arrow IPC format bytes.

    Args:
        df (pandas.DataFrame): The DataFrame to serialize

    Returns:
        value (bytes): The Arrow IPC format bytes representation of the DataFrame.
    """
    if df.empty:
        return b""
    # convert DataFrame to an Arrow Table
    arrow_table = pa.Table.from_pandas(df, preserve_index=False)
    # serialize the Arrow Table to bytes using the IPC stream format
    sink = pa.BufferOutputStream()
    with ipc.RecordBatchStreamWriter(sink, arrow_table.schema) as writer:
        writer.write_table(arrow_table)
    # get the buffer from the stream
    return sink.getvalue().to_pybytes()