Coverage for compiler_admin / services / google.py: 100%
237 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 05:48 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-28 05:48 +0000
1import csv
2import io
3import json
4import subprocess
5import sys
6from tempfile import NamedTemporaryFile
7from typing import IO, Any, Callable, Sequence
9from gam import CallGAMCommand, initializeLogging
11from compiler_admin import Format, Result
13initializeLogging()
16class GoogleService:
17 GAM = "gam"
18 GYB = "gyb"
20 def gam_command(self, args: Sequence[str], stdout: str = None, stderr: str = None) -> int:
21 """Call GAM with the provided arguments, optionally redirecting stdout and/or stderr."""
22 if stdout:
23 args = ("redirect", "stdout", stdout, *args)
25 if stderr:
26 args = ("redirect", "stderr", stderr, *args)
28 if not args[0] == self.GAM:
29 args = (self.GAM, *args)
31 # convert all args to str to ensure proper GAM calling
32 args = tuple(str(a) for a in args)
34 return int(CallGAMCommand(args))
36 def gam_command_output(self, args: Sequence[str], stderr: str = None) -> list[str]:
37 """Call GAM with the provided arguments, optionally redirecting stderr. Returns stdout lines as a list of str."""
38 with NamedTemporaryFile("w+") as stdout:
39 self.gam_command(args, stdout=stdout.name, stderr=stderr)
40 return stdout.readlines()
42 def gyb_command(self, args: Sequence[str], stdout: IO[Any] = sys.stdout, stderr: IO[Any] = sys.stderr) -> int:
43 """Call GYB with the provided arguments."""
44 if not args[0] == self.GYB:
45 args = (self.GYB, *args)
47 # convert all args to str to ensure proper GYB calling
48 args = tuple(str(a) for a in args)
50 return subprocess.call(args, stdout=stdout, stderr=stderr)
53class GoogleAccount(GoogleService):
54 """Models a user account in the Compiler domain."""
56 DOMAIN = "compiler.la"
58 def __init__(self, username: str):
59 self.username = username or ""
60 if self.username:
61 self.account = username if username.endswith(f"@{self.DOMAIN}") else f"{username}@{self.DOMAIN}"
62 else:
63 self.account = ""
65 def __eq__(self, value):
66 return str(self) == value
68 def __str__(self):
69 return self.account
71 def add_email_alias(self, alias: str) -> int:
72 """Add a new email alias for this account.
74 Args:
75 alias (str): The user@compiler.la to add as an email alias for this account.
77 Returns:
78 int: A code indicating if the operation succeeded or failed.
79 """
80 command = ("create", "alias", alias, "user", self)
81 self.gam_command(command)
83 def exists(self) -> bool:
84 """Checks if an account exists.
86 Returns:
87 True if the account exists. False otherwise.
88 """
89 return self.get_info() != {}
91 def get_backup_codes(self) -> str:
92 """Gets an account's backup codes, refreshing if needed.
94 Returns:
95 (str): The backup code output.
96 """
97 if not self.exists():
98 print(f"User does not exist: {self}")
99 return ""
101 command = ("user", self, "show", "backupcodes")
102 output = "".join(self.gam_command_output(command))
104 if "Show 0 Backup Verification Codes" in output:
105 command = ("user", self, "update", "backupcodes")
106 output = "".join(self.gam_command_output(command))
108 return output
110 def get_info(self) -> dict:
111 """Get a dict of basic user information.
113 Args:
114 account (GoogleAccount): The user@compiler.la to check for existence.
116 Returns:
117 (dict): The user's information.
118 """
119 with NamedTemporaryFile("w+") as stdout:
120 res = self.gam_command(("info", "user", str(self), "quick"), stdout=stdout.name)
121 if res != Result.SUCCESS:
122 # user doesn't exist
123 return {}
124 # user exists, read data
125 lines = stdout.readlines()
126 # split on newline and filter out lines that aren't line "Key:Value" and empty value lines like "Key:<empty>"
127 lines = [L.strip() for L in lines if len(L.split(":")) == 2 and L.split(":")[1].strip()]
128 # make a map by splitting the lines, trimming key and value
129 info = {}
130 for line in lines:
131 k, v = line.split(":")
132 info[k.strip()] = v.strip()
133 return info
135 def is_deactivated(self) -> bool:
136 """Checks if the account is deactivated.
138 Returns:
139 True if the user is deactivated. False otherwise.
140 """
141 return GoogleOrgs(GoogleOrgs.OU_ALUMNI).contains_user(self)
143 def is_partner(self) -> bool:
144 """Checks if the account is a Compiler Partner.
146 Returns:
147 True if the user is a Partner. False otherwise.
148 """
149 return GoogleGroups(GoogleGroups.GROUP_PARTNERS).contains_user(self)
151 def is_staff(self) -> bool:
152 """Checks if an account is a Compiler Staff.
154 Returns:
155 True if the user is a Staff. False otherwise.
156 """
157 return GoogleGroups(GoogleGroups.GROUP_STAFF).contains_user(self)
160class GoogleArchive(GoogleService):
161 def archive_content(self, account: GoogleAccount) -> int:
162 """DESTRUCTIVE! Archive this account's Calendar and Drive content.
164 Args:
165 account (GoogleAccount): The account with content to archive.
167 Returns:
168 int: A code indicating if the operation succeeded or failed.
169 """
170 command = ("create", "transfer", account, "calendar,drive", GoogleUsers.USER_ARCHIVE, "all", "releaseresources")
171 return self.gam_command(command)
173 def await_archive_completion(self, account: GoogleAccount, callback: Callable[[str, str], None] = None) -> int:
174 status = ""
175 command = ("show", "transfers", "olduser", account)
177 while "Overall Transfer Status: completed" not in status:
178 pre_status = status
179 status = " ".join(self.gam_command_output(command, stderr="stdout"))
180 if callback:
181 callback(pre_status, status)
183 def create_email_backup(self, account: GoogleAccount) -> int:
184 """Create a backup of the account's email.
186 Args:
187 account (GoogleAccount): The account to create an email backup.
189 Returns:
190 int: A code indicating if the operation succeeded or failed.
191 """
192 command = ("--service-account", "--email", account, "--action", "backup")
193 return self.gyb_command(command)
195 def restore_email_backup(self, account: GoogleAccount, backup_dir: str) -> int:
196 """Restore a backup of the account's email.
198 Args:
199 account (GoogleAccount): The account to create an email backup.
200 backup_dir (str): The path to a directory containing the account's email backup.
202 Returns:
203 int: A code indicating if the operation succeeded or failed.
204 """
205 command = (
206 "--service-account",
207 "--email",
208 GoogleUsers.USER_ARCHIVE,
209 "--action",
210 "restore",
211 "--local-folder",
212 backup_dir,
213 "--label-restored",
214 account,
215 )
216 return self.gyb_command(command)
219class GoogleGroups(GoogleService):
221 GROUP_PARTNERS = GoogleAccount("partners")
222 GROUP_STAFF = GoogleAccount("staff")
223 GROUP_TEAM = GoogleAccount("team")
225 def __init__(self, group: GoogleAccount = None):
226 self._group = group
228 def add_user(self, account: GoogleAccount, group: GoogleAccount = None):
229 """Add a user to a group.
231 Args:
232 account (GoogleAccount): The user@compiler.la to add.
233 group (GoogleAccount): The group@compiler.la to add the account to.
234 """
235 group = group or self._group
236 return self.gam_command(("user", account, "add", "groups", "member", group))
238 def contains_user(self, account: GoogleAccount, group: GoogleAccount = None) -> bool:
239 """Checks if the account is in a group.
241 Args:
242 account (GoogleAccount): The user@compiler.la to check for group membership.
243 group (GoogleAccount): The group@compiler.la to check for account's membership.
245 Returns:
246 True if the user is a member of the group. False otherwise.
247 """
248 group = group or self._group
249 command = ("print", "groups", "member", account)
250 output = "".join(self.gam_command_output(command))
251 return str(group) in output
253 def get(self, format: int = Format.BASIC, **kwargs) -> int:
254 """Get information about the groups."""
255 output = ""
256 command = ("print", "groups")
258 if len(kwargs) > 0:
259 for k, v in kwargs.items():
260 command += (k, v)
262 if format in [Format.CSV, Format.JSON]:
263 command += ("allfields",)
264 if format == Format.JSON:
265 command += (
266 "members",
267 "managers",
268 "owners",
269 "formatjson",
270 )
272 lines = self.gam_command_output(command)
273 if format == Format.JSON:
274 # GAM returns a CSV structure like "email,JSON,JSON-settings"
275 # extract JSON cols to array of dicts for convenience
277 # ensure we start from the CSV header
278 start_index = 0
279 groups_data = []
280 for i, line in enumerate(lines):
281 if line.startswith("email,JSON,JSON-members,JSON-settings"):
282 start_index = i
283 break
285 # rebuild the clean CSV string and parse it
286 clean_csv = "\n".join(lines[start_index:])
287 reader = csv.DictReader(io.StringIO(clean_csv))
289 for row in reader:
290 # check if the JSON columns exist and have data
291 if all((col in row and row[col].strip() for col in ["JSON", "JSON-members", "JSON-settings"])):
292 try:
293 # unpack the JSON strings back into native Python dicts
294 group_obj: dict = json.loads(row["JSON"])
295 group_obj["members"] = json.loads(row["JSON-members"])
296 group_obj.update(json.loads(row["JSON-settings"]))
297 groups_data.append(group_obj)
298 except json.JSONDecodeError as e:
299 print(f"Skipping row for {row.get('email')} due to JSON error: {e}")
301 output = json.dumps(groups_data)
302 else:
303 output = "".join(lines)
305 return output
307 def remove_user(self, account: GoogleAccount, group: GoogleAccount = None):
308 """Remove a user from a group."""
309 group = group or self._group
310 return self.gam_command(("update", "group", group, "delete", account))
313class GoogleOrgs(GoogleService):
314 OU_ALUMNI = "/alumni"
315 OU_CONTRACTORS = "/contractors"
316 OU_SERVICE_ACCOUNTS = "/service-accounts"
317 OU_STAFF = "/staff"
318 OU_PARTNERS = f"{OU_STAFF}/partners"
320 ORG_UNITS = dict(
321 alumni=OU_ALUMNI,
322 contractors=OU_CONTRACTORS,
323 service_accounts=OU_SERVICE_ACCOUNTS,
324 staff=OU_STAFF,
325 partners=OU_PARTNERS,
326 )
328 def __init__(self, ou: str = None):
329 self._ou = ou
331 def __getitem__(self, key):
332 return self.ORG_UNITS.get(key)
334 def contains_user(self, account: GoogleAccount, ou: str = None) -> bool:
335 """Checks if the account is in an OU.
337 Args:
338 account (GoogleAccount): The user@compiler.la to check for membership.
339 ou (str): The name of an OU to check for account's membership.
341 Returns:
342 True if the account is a member of the OU. False otherwise.
343 """
344 ou = ou or self._ou
346 if ou not in self.ORG_UNITS.values():
347 raise ValueError(f"Unexpected OU: {ou}")
349 command = ("info", "ou", ou)
350 output = "".join(self.gam_command_output(command))
351 return str(account) in output
353 def get(self, **kwargs) -> str:
354 """Print information about the org units."""
355 command = ("print", "orgs")
357 if len(kwargs) > 0:
358 for k, v in kwargs.items():
359 command += (k, v)
361 return "".join(self.gam_command_output(command))
363 def move_user(self, account: GoogleAccount, ou: str = None) -> int:
364 """Move an account into a new OU."""
365 ou = ou or self._ou
367 if ou not in self.ORG_UNITS.values():
368 raise ValueError(f"Unexpected OU: {ou}")
370 return self.gam_command(("update", "ou", ou, "move", account))
373class GoogleUsers(GoogleService):
374 """Interact with users in the Compiler domain."""
376 # Archive account
377 USER_ARCHIVE = GoogleAccount("archive")
378 # Hello account
379 USER_HELLO = GoogleAccount("hello")
381 def clear_profile(self, account: GoogleAccount) -> int:
382 """DESTRUCTIVE! Clears user account profile info.
384 Args:
385 account (GoogleAccount): The user@compiler.la whose profile to clear.
387 Returns:
388 int: A code indicating if the operation succeeded or failed.
389 """
390 for prop in ["address", "location", "otheremail", "phone"]:
391 command = ("update", "user", account, prop, "clear")
392 self.gam_command(command)
394 def create(self, account: GoogleAccount, notify: str = None, *args):
395 command = ("create", "user", account, "password", "random", "changepassword")
396 if notify:
397 command += ("notify", notify, "from", self.USER_HELLO)
398 command += (*args,)
400 return self.gam_command(command)
402 def delete(self, account: GoogleAccount) -> int:
403 """DESTRUCTIVE! Deletes an account.
405 Args:
406 account (GoogleAccount): The user@compiler.la to delete.
408 Returns:
409 int: A code indicating if the operation succeeded or failed.
410 """
411 command = ("delete", "user", account, "noactionifalias")
412 return self.gam_command(command)
414 def deprovision_popimap(self, account: GoogleAccount) -> int:
415 """DESTRUCTIVE! Deprovisions POP/IMAP (email services) for the account.
417 Args:
418 account (GoogleAccount): The user@compiler.la to deprovision POP/IMAP for.
420 Returns:
421 int: A code indicating if the operation succeeded or failed.
422 """
423 command = ("user", account, "deprovision", "popimap")
424 return self.gam_command(command)
426 def disable_2fa(self, account: GoogleAccount) -> int:
427 """DESTRUCTIVE! Disables 2FA on the account.
429 Args:
430 account (GoogleAccount): The user@compiler.la to disable 2FA for.
432 Returns:
433 int: A code indicating if the operation succeeded or failed.
434 """
435 command = ("user", account, "turnoff2sv")
436 return self.gam_command(command)
438 def get(self, format: int = Format.BASIC, inactive: bool = False, org_units: list[str] = [], **kwargs) -> str:
439 """Get a list of accounts in the workspace.
441 Args:
442 format (int): The format for the output. Use the `compiler_admin.Format` helper class.
443 inactive (bool): True to display inactive users. False to display active users.
444 org_units (list[str]): A list of org units that, if provided, filters users to only those in any of the org units.
446 Returns:
447 str: A formatted list of user accounts.
448 """
449 flag = str(inactive).lower()
450 output = ""
451 queries = ""
452 command = ("print", "users", "issuspended", flag, "isarchived", flag)
454 if len(kwargs) > 0:
455 for k, v in kwargs.items():
456 command += (k, v)
458 if len(org_units) > 0:
459 workspace_org_units = GoogleOrgs.ORG_UNITS.values()
460 if not all((ou in workspace_org_units for ou in org_units)):
461 raise ValueError(f"Unexpected org_unit(s): {', '.join(org_units)}")
462 queries = ",".join([f"'orgUnitPath={ou}'" for ou in org_units])
463 command += ("queries", queries)
465 if format == Format.CSV:
466 command += ("full",)
467 elif format == Format.JSON:
468 queries = ",".join(org_units)
469 if queries:
470 user_entity = ("ous_arch" if inactive else "ou_na_ns", queries)
471 else:
472 user_entity = ("all", "users_arch_or_susp" if inactive else "users_na_ns")
474 command = (
475 "info",
476 "users",
477 *user_entity,
478 "nobuildingnames",
479 "noschemas",
480 "formatjson",
481 )
483 lines = self.gam_command_output(command)
484 if format == Format.JSON:
485 # GAM returns JSON record lines, write a JSON array for convenience
486 output = f"[{",".join(lines)}]"
487 else:
488 output = "".join(lines)
490 return output
492 def reset_recovery_info(self, account: GoogleAccount, recovery_email: str, recovery_phone: str) -> int:
493 """DESTRUCTIVE! Resets the account's recovery information.
495 Args:
496 account (GoogleAccount): The user@compiler.la to reset recovery info for.
498 Returns:
499 int: A code indicating if the operation succeeded or failed.
500 """
501 command = ("update", "user", account, "recoveryemail", recovery_email)
502 res = self.gam_command(command)
504 command = ("update", "user", account, "recoveryphone", recovery_phone)
505 res += self.gam_command(command)
507 return res
509 def reset_password(self, account: GoogleAccount, notify: str = None) -> int:
510 """DESTRUCTIVE! Resets the account's password to a new, random password.
512 Args:
513 account (GoogleAccount): The user@compiler.la to reset the password for.
514 notify (str): Optional email address to send a notification with the new password.
516 Returns:
517 int: A code indicating if the operation succeeded or failed.
518 """
519 command = ("update", "user", account, "password", "random", "changepassword")
520 if notify:
521 command += ("notify", notify, "from", GoogleUsers.USER_HELLO)
523 self.gam_command(command)
525 def remove_from_groups(self, account: GoogleAccount) -> int:
526 """DESTRUCTIVE! Removes the account from all groups.
528 Args:
529 account (GoogleAccount): The user@compiler.la to remove from all its groups.
531 Returns:
532 int: A code indicating if the operation succeeded or failed.
533 """
534 command = ("user", account, "delete", "groups")
535 return self.gam_command(command)
537 def signout(self, account: GoogleAccount) -> int:
538 """Sign a user out from all active sessions.
540 Args:
541 account (GoogleAccount): The user@compiler.la to sign out.
543 Returns:
544 int: A code indicating if the operation succeeded or failed.
545 """
546 command = ("user", account, "signout")
547 return self.gam_command(command)