Skip to content

Data sources

The data source components are responsible for the actual reading of data (the “how”). The design uses an abstract interface, IDataSource, to define a standard contract for any data source, making it easy to swap and compose implementations.

pems_data.sources.IDataSource

Bases: ABC

An abstract interface for a generic data source.

Source code in pems_data/sources/__init__.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class IDataSource(ABC):
    """An abstract interface for a generic data source."""

    @abstractmethod
    def read(self, identifier: str, **kwargs: dict[str, Any]) -> pd.DataFrame:
        """
        Reads data identified by a generic identifier from the source.

        Args:
            identifier (str): The unique identifier for the data, e.g., an S3 key, a database table name, etc.
            **kwargs (dict[str, Any]): Additional arguments for the underlying read operation, such as 'columns' or 'filters'.

        Returns:
            value (pandas.DataFrame): A DataFrame of data from the source for the given identifier.
        """
        raise NotImplementedError  # pragma: no cover

read(identifier, **kwargs) abstractmethod

Reads data identified by a generic identifier from the source.

Parameters:

Name Type Description Default
identifier str

The unique identifier for the data, e.g., an S3 key, a database table name, etc.

required
**kwargs dict[str, Any]

Additional arguments for the underlying read operation, such as ‘columns’ or ‘filters’.

{}

Returns:

Name Type Description
value DataFrame

A DataFrame of data from the source for the given identifier.

Source code in pems_data/sources/__init__.py
10
11
12
13
14
15
16
17
18
19
20
21
22
@abstractmethod
def read(self, identifier: str, **kwargs: dict[str, Any]) -> pd.DataFrame:
    """
    Reads data identified by a generic identifier from the source.

    Args:
        identifier (str): The unique identifier for the data, e.g., an S3 key, a database table name, etc.
        **kwargs (dict[str, Any]): Additional arguments for the underlying read operation, such as 'columns' or 'filters'.

    Returns:
        value (pandas.DataFrame): A DataFrame of data from the source for the given identifier.
    """
    raise NotImplementedError  # pragma: no cover

pems_data.sources.s3.S3DataSource

Bases: IDataSource

A data source for fetching data from an S3 bucket.

Source code in pems_data/sources/s3.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class S3DataSource(IDataSource):
    """A data source for fetching data from an S3 bucket."""

    @property
    def default_bucket(self) -> str:
        """
        Returns:
            value (str): The value from the `S3_BUCKET_NAME` environment variable, or the Caltrans PeMS prod mart bucket name.
        """
        return os.environ.get("S3_BUCKET_NAME", "caltrans-pems-prd-us-west-2-marts")

    @property
    def name(self) -> str:
        """
        Returns:
            value (str): The name of this bucket instance.
        """
        return self._name

    def __init__(self, name: str = None):
        """Initialize a new S3DataSource.

        Args:
            name (str): (Optional) The name of the S3 bucket to source from.
        """
        self._client = boto3.client("s3")
        self._name = name or self.default_bucket

    def get_prefixes(
        self,
        filter_pattern: re.Pattern = re.compile(".+"),
        initial_prefix: str = "",
        match_func: Callable[[re.Match], str] = None,
    ) -> list:
        """
        Lists available object prefixes, optionally filtered by an initial prefix.

        When a match is found, if match_func exists, add its result to the output list. Otherwise add the entire match.

        Args:
            filter_pattern (re.Pattern): A regular expression used to match object prefixes
            initial_prefix (str): The initial prefix to start the search from
            match_func (Callable[[re.Match], str]): A callable used to extract data from prefix matches

        Returns:
            value (list): A sorted list of unique prefixes that matched the pattern.
        """

        s3_keys = self._client.list_objects(Bucket=self.name, Prefix=initial_prefix)

        result = set()

        for item in s3_keys["Contents"]:
            s3_path = item["Key"]
            match = re.search(filter_pattern, s3_path)
            if match:
                if match_func:
                    result.add(match_func(match))
                else:
                    result.add(match.group(0))

        return sorted(result)

    def read(
        self, *args: str, path: str = None, columns: list = None, filters: list = None, **kwargs: dict[str, Any]
    ) -> pd.DataFrame:
        """Reads data from the S3 path into a pandas DataFrame. Extra kwargs are passed along to `pandas.read_parquet()`.

        Args:
            *args (tuple[str]): One or more path relative path components for the data file
            path (str): The absolute S3 URL path to a data file; using `path` overrides any relative path components provided
            columns (list[str]): If not None, only these columns will be read from the file
            filters (list[tuple] | list[list[tuple]]): To filter out data. Filter syntax: `[[(column, op, val), ...],...]`
            **kwargs (dict[str, Any]): Extra kwargs to pass to `pandas.read_parquet()`

        Returns:
            value (pandas.DataFrame): A DataFrame of data read from the source path.
        """
        path = path or self.url(*args)
        return pd.read_parquet(path, columns=columns, filters=filters, **kwargs)

    def url(self, *args: str) -> str:
        """Build an absolute S3 URL to this bucket, with optional path segments.

        Args:
            *args (tuple[str]): The components of the S3 path.

        Returns:
            value (str): An absolute `s3://` URL for this bucket and the path.
        """
        parts = [f"s3://{self.name}"]
        parts.extend(args)
        return "/".join(parts)

default_bucket property

Returns:

Name Type Description
value str

The value from the S3_BUCKET_NAME environment variable, or the Caltrans PeMS prod mart bucket name.

name property

Returns:

Name Type Description
value str

The name of this bucket instance.

__init__(name=None)

Initialize a new S3DataSource.

Parameters:

Name Type Description Default
name str

(Optional) The name of the S3 bucket to source from.

None
Source code in pems_data/sources/s3.py
30
31
32
33
34
35
36
37
def __init__(self, name: str = None):
    """Initialize a new S3DataSource.

    Args:
        name (str): (Optional) The name of the S3 bucket to source from.
    """
    self._client = boto3.client("s3")
    self._name = name or self.default_bucket

get_prefixes(filter_pattern=re.compile('.+'), initial_prefix='', match_func=None)

Lists available object prefixes, optionally filtered by an initial prefix.

When a match is found, if match_func exists, add its result to the output list. Otherwise add the entire match.

Parameters:

Name Type Description Default
filter_pattern Pattern

A regular expression used to match object prefixes

compile('.+')
initial_prefix str

The initial prefix to start the search from

''
match_func Callable[[Match], str]

A callable used to extract data from prefix matches

None

Returns:

Name Type Description
value list

A sorted list of unique prefixes that matched the pattern.

Source code in pems_data/sources/s3.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def get_prefixes(
    self,
    filter_pattern: re.Pattern = re.compile(".+"),
    initial_prefix: str = "",
    match_func: Callable[[re.Match], str] = None,
) -> list:
    """
    Lists available object prefixes, optionally filtered by an initial prefix.

    When a match is found, if match_func exists, add its result to the output list. Otherwise add the entire match.

    Args:
        filter_pattern (re.Pattern): A regular expression used to match object prefixes
        initial_prefix (str): The initial prefix to start the search from
        match_func (Callable[[re.Match], str]): A callable used to extract data from prefix matches

    Returns:
        value (list): A sorted list of unique prefixes that matched the pattern.
    """

    s3_keys = self._client.list_objects(Bucket=self.name, Prefix=initial_prefix)

    result = set()

    for item in s3_keys["Contents"]:
        s3_path = item["Key"]
        match = re.search(filter_pattern, s3_path)
        if match:
            if match_func:
                result.add(match_func(match))
            else:
                result.add(match.group(0))

    return sorted(result)

read(*args, path=None, columns=None, filters=None, **kwargs)

Reads data from the S3 path into a pandas DataFrame. Extra kwargs are passed along to pandas.read_parquet().

Parameters:

Name Type Description Default
*args tuple[str]

One or more path relative path components for the data file

()
path str

The absolute S3 URL path to a data file; using path overrides any relative path components provided

None
columns list[str]

If not None, only these columns will be read from the file

None
filters list[tuple] | list[list[tuple]]

To filter out data. Filter syntax: [[(column, op, val), ...],...]

None
**kwargs dict[str, Any]

Extra kwargs to pass to pandas.read_parquet()

{}

Returns:

Name Type Description
value DataFrame

A DataFrame of data read from the source path.

Source code in pems_data/sources/s3.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def read(
    self, *args: str, path: str = None, columns: list = None, filters: list = None, **kwargs: dict[str, Any]
) -> pd.DataFrame:
    """Reads data from the S3 path into a pandas DataFrame. Extra kwargs are passed along to `pandas.read_parquet()`.

    Args:
        *args (tuple[str]): One or more path relative path components for the data file
        path (str): The absolute S3 URL path to a data file; using `path` overrides any relative path components provided
        columns (list[str]): If not None, only these columns will be read from the file
        filters (list[tuple] | list[list[tuple]]): To filter out data. Filter syntax: `[[(column, op, val), ...],...]`
        **kwargs (dict[str, Any]): Extra kwargs to pass to `pandas.read_parquet()`

    Returns:
        value (pandas.DataFrame): A DataFrame of data read from the source path.
    """
    path = path or self.url(*args)
    return pd.read_parquet(path, columns=columns, filters=filters, **kwargs)

url(*args)

Build an absolute S3 URL to this bucket, with optional path segments.

Parameters:

Name Type Description Default
*args tuple[str]

The components of the S3 path.

()

Returns:

Name Type Description
value str

An absolute s3:// URL for this bucket and the path.

Source code in pems_data/sources/s3.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def url(self, *args: str) -> str:
    """Build an absolute S3 URL to this bucket, with optional path segments.

    Args:
        *args (tuple[str]): The components of the S3 path.

    Returns:
        value (str): An absolute `s3://` URL for this bucket and the path.
    """
    parts = [f"s3://{self.name}"]
    parts.extend(args)
    return "/".join(parts)

pems_data.sources.cache.CachingDataSource

Bases: IDataSource

A data source decorator that adds a caching layer to another data source.

Source code in pems_data/sources/cache.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class CachingDataSource(IDataSource):
    """A data source decorator that adds a caching layer to another data source."""

    @property
    def cache(self) -> Cache:
        """
        Returns:
            value (pems_data.cache.Cache): This data source's underlying Cache instance.
        """
        return self._cache

    @property
    def data_source(self) -> IDataSource:
        """
        Returns:
            value (pems_data.sources.IDataSource): This data source's underlying data source instance.
        """
        return self._data_source

    def __init__(self, data_source: IDataSource, cache: Cache):
        """Initialize a new CachingDataSource.

        Args:
            data_source (pems_data.sources.IDataSource): The underlying data source to use for cache misses
            cache (pems_data.cache.Cache): The underlying cache to use for get/set operations
        """
        self._cache = cache
        self._data_source = data_source

    def read(self, identifier: str, cache_opts: dict[str, Any] = {}, **kwargs: dict[str, Any]) -> pd.DataFrame:
        """
        Reads data identified by a generic identifier from the source. Tries the cache first, setting on a miss.

        Args:
            identifier (str): The unique identifier for the data, e.g., an S3 key, a database table name, etc.
            cache_opts (dict[str, Any]): A dictionary of options for configuring caching of the data
            **kwargs (dict[str, Any]): Additional arguments for the underlying read operation, such as 'columns' or 'filters'

        Returns:
            value (pandas.DataFrame): A DataFrame of data read from the cache (or the source), for the given identifier.
        """
        # use cache key from options, fallback to identifier
        cache_key = cache_opts.get("key", identifier)
        ttl = cache_opts.get("ttl")

        # try to get df from cache
        cached_df = self._cache.get_df(cache_key)
        if cached_df is not None:
            return cached_df

        # on miss, call the wrapped source
        df = self._data_source.read(identifier, **kwargs)
        # store the result in the cache
        self._cache.set_df(cache_key, df, ttl=ttl)

        return df

cache property

Returns:

Name Type Description
value Cache

This data source’s underlying Cache instance.

data_source property

Returns:

Name Type Description
value IDataSource

This data source’s underlying data source instance.

__init__(data_source, cache)

Initialize a new CachingDataSource.

Parameters:

Name Type Description Default
data_source IDataSource

The underlying data source to use for cache misses

required
cache Cache

The underlying cache to use for get/set operations

required
Source code in pems_data/sources/cache.py
27
28
29
30
31
32
33
34
35
def __init__(self, data_source: IDataSource, cache: Cache):
    """Initialize a new CachingDataSource.

    Args:
        data_source (pems_data.sources.IDataSource): The underlying data source to use for cache misses
        cache (pems_data.cache.Cache): The underlying cache to use for get/set operations
    """
    self._cache = cache
    self._data_source = data_source

read(identifier, cache_opts={}, **kwargs)

Reads data identified by a generic identifier from the source. Tries the cache first, setting on a miss.

Parameters:

Name Type Description Default
identifier str

The unique identifier for the data, e.g., an S3 key, a database table name, etc.

required
cache_opts dict[str, Any]

A dictionary of options for configuring caching of the data

{}
**kwargs dict[str, Any]

Additional arguments for the underlying read operation, such as ‘columns’ or ‘filters’

{}

Returns:

Name Type Description
value DataFrame

A DataFrame of data read from the cache (or the source), for the given identifier.

Source code in pems_data/sources/cache.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def read(self, identifier: str, cache_opts: dict[str, Any] = {}, **kwargs: dict[str, Any]) -> pd.DataFrame:
    """
    Reads data identified by a generic identifier from the source. Tries the cache first, setting on a miss.

    Args:
        identifier (str): The unique identifier for the data, e.g., an S3 key, a database table name, etc.
        cache_opts (dict[str, Any]): A dictionary of options for configuring caching of the data
        **kwargs (dict[str, Any]): Additional arguments for the underlying read operation, such as 'columns' or 'filters'

    Returns:
        value (pandas.DataFrame): A DataFrame of data read from the cache (or the source), for the given identifier.
    """
    # use cache key from options, fallback to identifier
    cache_key = cache_opts.get("key", identifier)
    ttl = cache_opts.get("ttl")

    # try to get df from cache
    cached_df = self._cache.get_df(cache_key)
    if cached_df is not None:
        return cached_df

    # on miss, call the wrapped source
    df = self._data_source.read(identifier, **kwargs)
    # store the result in the cache
    self._cache.set_df(cache_key, df, ttl=ttl)

    return df