Coverage for compiler_admin / services / harvest.py: 100%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 05:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 05:48 +0000
1import os
2import sys
3import time
4from datetime import timedelta
5from typing import TextIO
7import pandas as pd
9import compiler_admin.services.files as files
10from compiler_admin.services.time import TimeSummary
13class HarvestTime:
15 # input CSV columns needed for conversion
16 HARVEST_COLUMNS = ["Date", "Client", "Project", "Notes", "Hours", "First name", "Last name"]
18 # default output CSV columns
19 TOGGL_COLUMNS = ["Email", "Start date", "Start time", "Duration", "Project", "Task", "Client", "Billable", "Description"]
21 def __init__(self):
22 self.converters = {"toggl": self.convert_to_toggl}
24 def _calc_start_time(self, group: pd.DataFrame):
25 """Start time is offset by the previous record's duration, with a default of 0 offset for the first record."""
26 group["Start time"] = group["Start time"] + group["Duration"].shift(fill_value=pd.to_timedelta("00:00:00")).cumsum()
27 return group
29 def _duration_str(self, duration: timedelta) -> str:
30 """Use total seconds to convert to a datetime and format as a string e.g. 01:30."""
31 return time.strftime("%H:%M", time.gmtime(duration.total_seconds()))
33 def _toggl_client_name(self):
34 """Gets the value of the TOGGL_CLIENT_NAME env var."""
35 return os.environ.get("TOGGL_CLIENT_NAME")
37 def convert_to_toggl(
38 self,
39 source_path: str | TextIO = sys.stdin,
40 output_path: str | TextIO = sys.stdout,
41 output_cols: list[str] = TOGGL_COLUMNS,
42 client_name: str = None,
43 **kwargs,
44 ):
45 """Convert Harvest formatted entries in source_path to equivalent Toggl formatted entries.
47 Args:
48 source_path: The path to a readable CSV file of Harvest time entries; or a readable buffer of the same.
50 output_cols (list[str]): A list of column names for the output
52 output_path: The path to a CSV file where Toggl time entries will be written; or a writeable buffer for the same.
54 Returns:
55 None. Either prints the resulting CSV data or writes to output_path.
56 """
57 if client_name is None:
58 client_name = self._toggl_client_name()
60 # read CSV file, parsing dates
61 source = files.read_csv(source_path, usecols=self.HARVEST_COLUMNS, parse_dates=["Date"], cache_dates=True)
63 # rename columns that can be imported as-is
64 source.rename(columns={"Project": "Task", "Notes": "Description", "Date": "Start date"}, inplace=True)
66 # update static calculated columns
67 source["Client"] = client_name
68 source["Project"] = client_name
69 source["Billable"] = "Yes"
71 # add the Email column
72 source["Email"] = source["First name"].apply(lambda x: f"{x.lower()}@compiler.la")
74 # Convert numeric Hours to timedelta Duration
75 source["Duration"] = source["Hours"].apply(pd.to_timedelta, unit="hours")
77 # Default start time to 09:00
78 source["Start time"] = pd.to_timedelta("09:00:00")
80 user_days = (
81 source
82 # sort and group by email and date
83 .sort_values(["Email", "Start date"]).groupby(["Email", "Start date"], observed=False)
84 # calculate a start time within each group (excluding the groupby columns)
85 .apply(self._calc_start_time, include_groups=False)
86 )
88 # convert timedeltas to duration strings
89 user_days["Duration"] = user_days["Duration"].apply(self._duration_str)
90 user_days["Start time"] = user_days["Start time"].apply(self._duration_str)
92 # re-sort by start date/time and user
93 # reset the index to get rid of the group multi index and fold the group columns back down
94 output_data = pd.DataFrame(data=user_days).reset_index()
95 output_data.sort_values(["Start date", "Start time", "Email"], inplace=True)
97 files.write_csv(output_path, output_data, output_cols)
99 def summarize(self, path: str | TextIO) -> "TimeSummary":
100 """Summarize a Harvest CSV file.
102 Args:
103 path (str | TextIO): The path to a readable CSV file of Harvest time entries; or a readable buffer of the same.
105 Returns:
106 TimeSummary: A summary of the time entries.
107 """
109 # read CSV file, parsing dates
110 source = files.read_csv(path, usecols=self.HARVEST_COLUMNS, parse_dates=["Date"], cache_dates=True)
112 summary = TimeSummary(
113 earliest_date=source["Date"].min().date(),
114 latest_date=source["Date"].max().date(),
115 total_rows=len(source),
116 total_hours=source["Hours"].sum(),
117 )
119 # Group by Project to get hours per project
120 project_hours = source.groupby(["Project"])["Hours"].sum().to_dict()
121 summary.hours_per_project = project_hours
123 # Group by User and Project to get hours per user/project
124 user_project_hours = source.groupby(["First name", "Last name", "Project"])["Hours"].sum().to_dict()
125 # create a nested dict of the form {user: {project: hours}}
126 for (first, last, project), hours in user_project_hours.items():
127 user = f"{first} {last}"
128 if user not in summary.hours_per_user_project:
129 summary.hours_per_user_project[user] = {}
130 summary.hours_per_user_project[user][project] = hours
132 return summary