Skip to content

Services

The services represent the business-logic of “what” data to fetch for specific use-cases. Services require an underlying data source to perform the actual reading of data.

pems_data.services.stations.StationsService

Manages fetching of station-related data.

Source code in pems_data/services/stations.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class StationsService:
    """Manages fetching of station-related data."""

    @property
    def data_source(self) -> IDataSource:
        """This service's IDataSource instance."""
        return self._ds

    @property
    def imputation_detector_agg_5min(self) -> str:
        """
        Returns:
            value (str): The identifier for the imputation detector 5min aggregation
        """
        return "imputation/detector_imputed_agg_five_minutes"

    @property
    def metadata_file(self) -> str:
        """
        Returns:
            value (str): The identifier for the stations metadata file.
        """
        return "geo/current_stations.parquet"

    def __init__(self, data_source: IDataSource):
        """Initialize a new StationsService.

        Args:
            data_source (pems_data.sources.IDataSource): The data source responsible for fetching data for this service.
        """
        self._ds = data_source

    def _build_cache_key(self, *args):
        return Cache.build_key("stations", *args)

    def get_district_metadata(self, district_number: str) -> pd.DataFrame:
        """Loads metadata for all stations in the selected district from the data source.

        Args:
            district_number (str): The number of the Caltrans district to load metadata for, e.g. `"7"`.

        Returns:
            value (pandas.DataFrame): The station's data as a DataFrame.
        """

        cache_opts = {"key": self._build_cache_key("metadata", "district", district_number), "ttl": 3600}  # 1 hour
        columns = [
            "STATION_ID",
            "NAME",
            "PHYSICAL_LANES",
            "STATE_POSTMILE",
            "ABSOLUTE_POSTMILE",
            "LATITUDE",
            "LONGITUDE",
            "LENGTH",
            "STATION_TYPE",
            "DISTRICT",
            "FREEWAY",
            "DIRECTION",
            "COUNTY_NAME",
            "CITY_NAME",
        ]
        filters = [("DISTRICT", "=", district_number)]

        return self._ds.read(self.metadata_file, cache_opts=cache_opts, columns=columns, filters=filters)

    def get_imputed_agg_5min(self, station_id: str) -> pd.DataFrame:
        """Loads imputed aggregate 5 minute data for a specific station from the data source.

        Args:
            station_id (str): The identifier for the station/detector to load data, e.g. `"715898"`

        Returns:
            value (pandas.DataFrame): The station's data as a DataFrame.
        """

        cache_opts = {"key": self._build_cache_key("imputed", "agg", "5m", "station", station_id), "ttl": 300}  # 5 minutes
        columns = [
            "STATION_ID",
            "LANE",
            "SAMPLE_TIMESTAMP",
            "VOLUME_SUM",
            "SPEED_FIVE_MINS",
            "OCCUPANCY_AVG",
        ]
        filters = [("STATION_ID", "=", station_id)]

        return self._ds.read(self.imputation_detector_agg_5min, cache_opts=cache_opts, columns=columns, filters=filters)

data_source property

This service’s IDataSource instance.

imputation_detector_agg_5min property

Returns:

Name Type Description
value str

The identifier for the imputation detector 5min aggregation

metadata_file property

Returns:

Name Type Description
value str

The identifier for the stations metadata file.

__init__(data_source)

Initialize a new StationsService.

Parameters:

Name Type Description Default
data_source IDataSource

The data source responsible for fetching data for this service.

required
Source code in pems_data/services/stations.py
31
32
33
34
35
36
37
def __init__(self, data_source: IDataSource):
    """Initialize a new StationsService.

    Args:
        data_source (pems_data.sources.IDataSource): The data source responsible for fetching data for this service.
    """
    self._ds = data_source

get_district_metadata(district_number)

Loads metadata for all stations in the selected district from the data source.

Parameters:

Name Type Description Default
district_number str

The number of the Caltrans district to load metadata for, e.g. "7".

required

Returns:

Name Type Description
value DataFrame

The station’s data as a DataFrame.

Source code in pems_data/services/stations.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def get_district_metadata(self, district_number: str) -> pd.DataFrame:
    """Loads metadata for all stations in the selected district from the data source.

    Args:
        district_number (str): The number of the Caltrans district to load metadata for, e.g. `"7"`.

    Returns:
        value (pandas.DataFrame): The station's data as a DataFrame.
    """

    cache_opts = {"key": self._build_cache_key("metadata", "district", district_number), "ttl": 3600}  # 1 hour
    columns = [
        "STATION_ID",
        "NAME",
        "PHYSICAL_LANES",
        "STATE_POSTMILE",
        "ABSOLUTE_POSTMILE",
        "LATITUDE",
        "LONGITUDE",
        "LENGTH",
        "STATION_TYPE",
        "DISTRICT",
        "FREEWAY",
        "DIRECTION",
        "COUNTY_NAME",
        "CITY_NAME",
    ]
    filters = [("DISTRICT", "=", district_number)]

    return self._ds.read(self.metadata_file, cache_opts=cache_opts, columns=columns, filters=filters)

get_imputed_agg_5min(station_id)

Loads imputed aggregate 5 minute data for a specific station from the data source.

Parameters:

Name Type Description Default
station_id str

The identifier for the station/detector to load data, e.g. "715898"

required

Returns:

Name Type Description
value DataFrame

The station’s data as a DataFrame.

Source code in pems_data/services/stations.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def get_imputed_agg_5min(self, station_id: str) -> pd.DataFrame:
    """Loads imputed aggregate 5 minute data for a specific station from the data source.

    Args:
        station_id (str): The identifier for the station/detector to load data, e.g. `"715898"`

    Returns:
        value (pandas.DataFrame): The station's data as a DataFrame.
    """

    cache_opts = {"key": self._build_cache_key("imputed", "agg", "5m", "station", station_id), "ttl": 300}  # 5 minutes
    columns = [
        "STATION_ID",
        "LANE",
        "SAMPLE_TIMESTAMP",
        "VOLUME_SUM",
        "SPEED_FIVE_MINS",
        "OCCUPANCY_AVG",
    ]
    filters = [("STATION_ID", "=", station_id)]

    return self._ds.read(self.imputation_detector_agg_5min, cache_opts=cache_opts, columns=columns, filters=filters)