Coverage for compiler_admin / services / google.py: 100%

237 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-28 05:48 +0000

1import csv 

2import io 

3import json 

4import subprocess 

5import sys 

6from tempfile import NamedTemporaryFile 

7from typing import IO, Any, Callable, Sequence 

8 

9from gam import CallGAMCommand, initializeLogging 

10 

11from compiler_admin import Format, Result 

12 

13initializeLogging() 

14 

15 

16class GoogleService: 

17 GAM = "gam" 

18 GYB = "gyb" 

19 

20 def gam_command(self, args: Sequence[str], stdout: str = None, stderr: str = None) -> int: 

21 """Call GAM with the provided arguments, optionally redirecting stdout and/or stderr.""" 

22 if stdout: 

23 args = ("redirect", "stdout", stdout, *args) 

24 

25 if stderr: 

26 args = ("redirect", "stderr", stderr, *args) 

27 

28 if not args[0] == self.GAM: 

29 args = (self.GAM, *args) 

30 

31 # convert all args to str to ensure proper GAM calling 

32 args = tuple(str(a) for a in args) 

33 

34 return int(CallGAMCommand(args)) 

35 

36 def gam_command_output(self, args: Sequence[str], stderr: str = None) -> list[str]: 

37 """Call GAM with the provided arguments, optionally redirecting stderr. Returns stdout lines as a list of str.""" 

38 with NamedTemporaryFile("w+") as stdout: 

39 self.gam_command(args, stdout=stdout.name, stderr=stderr) 

40 return stdout.readlines() 

41 

42 def gyb_command(self, args: Sequence[str], stdout: IO[Any] = sys.stdout, stderr: IO[Any] = sys.stderr) -> int: 

43 """Call GYB with the provided arguments.""" 

44 if not args[0] == self.GYB: 

45 args = (self.GYB, *args) 

46 

47 # convert all args to str to ensure proper GYB calling 

48 args = tuple(str(a) for a in args) 

49 

50 return subprocess.call(args, stdout=stdout, stderr=stderr) 

51 

52 

53class GoogleAccount(GoogleService): 

54 """Models a user account in the Compiler domain.""" 

55 

56 DOMAIN = "compiler.la" 

57 

58 def __init__(self, username: str): 

59 self.username = username or "" 

60 if self.username: 

61 self.account = username if username.endswith(f"@{self.DOMAIN}") else f"{username}@{self.DOMAIN}" 

62 else: 

63 self.account = "" 

64 

65 def __eq__(self, value): 

66 return str(self) == value 

67 

68 def __str__(self): 

69 return self.account 

70 

71 def add_email_alias(self, alias: str) -> int: 

72 """Add a new email alias for this account. 

73 

74 Args: 

75 alias (str): The user@compiler.la to add as an email alias for this account. 

76 

77 Returns: 

78 int: A code indicating if the operation succeeded or failed. 

79 """ 

80 command = ("create", "alias", alias, "user", self) 

81 self.gam_command(command) 

82 

83 def exists(self) -> bool: 

84 """Checks if an account exists. 

85 

86 Returns: 

87 True if the account exists. False otherwise. 

88 """ 

89 return self.get_info() != {} 

90 

91 def get_backup_codes(self) -> str: 

92 """Gets an account's backup codes, refreshing if needed. 

93 

94 Returns: 

95 (str): The backup code output. 

96 """ 

97 if not self.exists(): 

98 print(f"User does not exist: {self}") 

99 return "" 

100 

101 command = ("user", self, "show", "backupcodes") 

102 output = "".join(self.gam_command_output(command)) 

103 

104 if "Show 0 Backup Verification Codes" in output: 

105 command = ("user", self, "update", "backupcodes") 

106 output = "".join(self.gam_command_output(command)) 

107 

108 return output 

109 

110 def get_info(self) -> dict: 

111 """Get a dict of basic user information. 

112 

113 Args: 

114 account (GoogleAccount): The user@compiler.la to check for existence. 

115 

116 Returns: 

117 (dict): The user's information. 

118 """ 

119 with NamedTemporaryFile("w+") as stdout: 

120 res = self.gam_command(("info", "user", str(self), "quick"), stdout=stdout.name) 

121 if res != Result.SUCCESS: 

122 # user doesn't exist 

123 return {} 

124 # user exists, read data 

125 lines = stdout.readlines() 

126 # split on newline and filter out lines that aren't line "Key:Value" and empty value lines like "Key:<empty>" 

127 lines = [L.strip() for L in lines if len(L.split(":")) == 2 and L.split(":")[1].strip()] 

128 # make a map by splitting the lines, trimming key and value 

129 info = {} 

130 for line in lines: 

131 k, v = line.split(":") 

132 info[k.strip()] = v.strip() 

133 return info 

134 

135 def is_deactivated(self) -> bool: 

136 """Checks if the account is deactivated. 

137 

138 Returns: 

139 True if the user is deactivated. False otherwise. 

140 """ 

141 return GoogleOrgs(GoogleOrgs.OU_ALUMNI).contains_user(self) 

142 

143 def is_partner(self) -> bool: 

144 """Checks if the account is a Compiler Partner. 

145 

146 Returns: 

147 True if the user is a Partner. False otherwise. 

148 """ 

149 return GoogleGroups(GoogleGroups.GROUP_PARTNERS).contains_user(self) 

150 

151 def is_staff(self) -> bool: 

152 """Checks if an account is a Compiler Staff. 

153 

154 Returns: 

155 True if the user is a Staff. False otherwise. 

156 """ 

157 return GoogleGroups(GoogleGroups.GROUP_STAFF).contains_user(self) 

158 

159 

160class GoogleArchive(GoogleService): 

161 def archive_content(self, account: GoogleAccount) -> int: 

162 """DESTRUCTIVE! Archive this account's Calendar and Drive content. 

163 

164 Args: 

165 account (GoogleAccount): The account with content to archive. 

166 

167 Returns: 

168 int: A code indicating if the operation succeeded or failed. 

169 """ 

170 command = ("create", "transfer", account, "calendar,drive", GoogleUsers.USER_ARCHIVE, "all", "releaseresources") 

171 return self.gam_command(command) 

172 

173 def await_archive_completion(self, account: GoogleAccount, callback: Callable[[str, str], None] = None) -> int: 

174 status = "" 

175 command = ("show", "transfers", "olduser", account) 

176 

177 while "Overall Transfer Status: completed" not in status: 

178 pre_status = status 

179 status = " ".join(self.gam_command_output(command, stderr="stdout")) 

180 if callback: 

181 callback(pre_status, status) 

182 

183 def create_email_backup(self, account: GoogleAccount) -> int: 

184 """Create a backup of the account's email. 

185 

186 Args: 

187 account (GoogleAccount): The account to create an email backup. 

188 

189 Returns: 

190 int: A code indicating if the operation succeeded or failed. 

191 """ 

192 command = ("--service-account", "--email", account, "--action", "backup") 

193 return self.gyb_command(command) 

194 

195 def restore_email_backup(self, account: GoogleAccount, backup_dir: str) -> int: 

196 """Restore a backup of the account's email. 

197 

198 Args: 

199 account (GoogleAccount): The account to create an email backup. 

200 backup_dir (str): The path to a directory containing the account's email backup. 

201 

202 Returns: 

203 int: A code indicating if the operation succeeded or failed. 

204 """ 

205 command = ( 

206 "--service-account", 

207 "--email", 

208 GoogleUsers.USER_ARCHIVE, 

209 "--action", 

210 "restore", 

211 "--local-folder", 

212 backup_dir, 

213 "--label-restored", 

214 account, 

215 ) 

216 return self.gyb_command(command) 

217 

218 

219class GoogleGroups(GoogleService): 

220 

221 GROUP_PARTNERS = GoogleAccount("partners") 

222 GROUP_STAFF = GoogleAccount("staff") 

223 GROUP_TEAM = GoogleAccount("team") 

224 

225 def __init__(self, group: GoogleAccount = None): 

226 self._group = group 

227 

228 def add_user(self, account: GoogleAccount, group: GoogleAccount = None): 

229 """Add a user to a group. 

230 

231 Args: 

232 account (GoogleAccount): The user@compiler.la to add. 

233 group (GoogleAccount): The group@compiler.la to add the account to. 

234 """ 

235 group = group or self._group 

236 return self.gam_command(("user", account, "add", "groups", "member", group)) 

237 

238 def contains_user(self, account: GoogleAccount, group: GoogleAccount = None) -> bool: 

239 """Checks if the account is in a group. 

240 

241 Args: 

242 account (GoogleAccount): The user@compiler.la to check for group membership. 

243 group (GoogleAccount): The group@compiler.la to check for account's membership. 

244 

245 Returns: 

246 True if the user is a member of the group. False otherwise. 

247 """ 

248 group = group or self._group 

249 command = ("print", "groups", "member", account) 

250 output = "".join(self.gam_command_output(command)) 

251 return str(group) in output 

252 

253 def get(self, format: int = Format.BASIC, **kwargs) -> int: 

254 """Get information about the groups.""" 

255 output = "" 

256 command = ("print", "groups") 

257 

258 if len(kwargs) > 0: 

259 for k, v in kwargs.items(): 

260 command += (k, v) 

261 

262 if format in [Format.CSV, Format.JSON]: 

263 command += ("allfields",) 

264 if format == Format.JSON: 

265 command += ( 

266 "members", 

267 "managers", 

268 "owners", 

269 "formatjson", 

270 ) 

271 

272 lines = self.gam_command_output(command) 

273 if format == Format.JSON: 

274 # GAM returns a CSV structure like "email,JSON,JSON-settings" 

275 # extract JSON cols to array of dicts for convenience 

276 

277 # ensure we start from the CSV header 

278 start_index = 0 

279 groups_data = [] 

280 for i, line in enumerate(lines): 

281 if line.startswith("email,JSON,JSON-members,JSON-settings"): 

282 start_index = i 

283 break 

284 

285 # rebuild the clean CSV string and parse it 

286 clean_csv = "\n".join(lines[start_index:]) 

287 reader = csv.DictReader(io.StringIO(clean_csv)) 

288 

289 for row in reader: 

290 # check if the JSON columns exist and have data 

291 if all((col in row and row[col].strip() for col in ["JSON", "JSON-members", "JSON-settings"])): 

292 try: 

293 # unpack the JSON strings back into native Python dicts 

294 group_obj: dict = json.loads(row["JSON"]) 

295 group_obj["members"] = json.loads(row["JSON-members"]) 

296 group_obj.update(json.loads(row["JSON-settings"])) 

297 groups_data.append(group_obj) 

298 except json.JSONDecodeError as e: 

299 print(f"Skipping row for {row.get('email')} due to JSON error: {e}") 

300 

301 output = json.dumps(groups_data) 

302 else: 

303 output = "".join(lines) 

304 

305 return output 

306 

307 def remove_user(self, account: GoogleAccount, group: GoogleAccount = None): 

308 """Remove a user from a group.""" 

309 group = group or self._group 

310 return self.gam_command(("update", "group", group, "delete", account)) 

311 

312 

313class GoogleOrgs(GoogleService): 

314 OU_ALUMNI = "/alumni" 

315 OU_CONTRACTORS = "/contractors" 

316 OU_SERVICE_ACCOUNTS = "/service-accounts" 

317 OU_STAFF = "/staff" 

318 OU_PARTNERS = f"{OU_STAFF}/partners" 

319 

320 ORG_UNITS = dict( 

321 alumni=OU_ALUMNI, 

322 contractors=OU_CONTRACTORS, 

323 service_accounts=OU_SERVICE_ACCOUNTS, 

324 staff=OU_STAFF, 

325 partners=OU_PARTNERS, 

326 ) 

327 

328 def __init__(self, ou: str = None): 

329 self._ou = ou 

330 

331 def __getitem__(self, key): 

332 return self.ORG_UNITS.get(key) 

333 

334 def contains_user(self, account: GoogleAccount, ou: str = None) -> bool: 

335 """Checks if the account is in an OU. 

336 

337 Args: 

338 account (GoogleAccount): The user@compiler.la to check for membership. 

339 ou (str): The name of an OU to check for account's membership. 

340 

341 Returns: 

342 True if the account is a member of the OU. False otherwise. 

343 """ 

344 ou = ou or self._ou 

345 

346 if ou not in self.ORG_UNITS.values(): 

347 raise ValueError(f"Unexpected OU: {ou}") 

348 

349 command = ("info", "ou", ou) 

350 output = "".join(self.gam_command_output(command)) 

351 return str(account) in output 

352 

353 def get(self, **kwargs) -> str: 

354 """Print information about the org units.""" 

355 command = ("print", "orgs") 

356 

357 if len(kwargs) > 0: 

358 for k, v in kwargs.items(): 

359 command += (k, v) 

360 

361 return "".join(self.gam_command_output(command)) 

362 

363 def move_user(self, account: GoogleAccount, ou: str = None) -> int: 

364 """Move an account into a new OU.""" 

365 ou = ou or self._ou 

366 

367 if ou not in self.ORG_UNITS.values(): 

368 raise ValueError(f"Unexpected OU: {ou}") 

369 

370 return self.gam_command(("update", "ou", ou, "move", account)) 

371 

372 

373class GoogleUsers(GoogleService): 

374 """Interact with users in the Compiler domain.""" 

375 

376 # Archive account 

377 USER_ARCHIVE = GoogleAccount("archive") 

378 # Hello account 

379 USER_HELLO = GoogleAccount("hello") 

380 

381 def clear_profile(self, account: GoogleAccount) -> int: 

382 """DESTRUCTIVE! Clears user account profile info. 

383 

384 Args: 

385 account (GoogleAccount): The user@compiler.la whose profile to clear. 

386 

387 Returns: 

388 int: A code indicating if the operation succeeded or failed. 

389 """ 

390 for prop in ["address", "location", "otheremail", "phone"]: 

391 command = ("update", "user", account, prop, "clear") 

392 self.gam_command(command) 

393 

394 def create(self, account: GoogleAccount, notify: str = None, *args): 

395 command = ("create", "user", account, "password", "random", "changepassword") 

396 if notify: 

397 command += ("notify", notify, "from", self.USER_HELLO) 

398 command += (*args,) 

399 

400 return self.gam_command(command) 

401 

402 def delete(self, account: GoogleAccount) -> int: 

403 """DESTRUCTIVE! Deletes an account. 

404 

405 Args: 

406 account (GoogleAccount): The user@compiler.la to delete. 

407 

408 Returns: 

409 int: A code indicating if the operation succeeded or failed. 

410 """ 

411 command = ("delete", "user", account, "noactionifalias") 

412 return self.gam_command(command) 

413 

414 def deprovision_popimap(self, account: GoogleAccount) -> int: 

415 """DESTRUCTIVE! Deprovisions POP/IMAP (email services) for the account. 

416 

417 Args: 

418 account (GoogleAccount): The user@compiler.la to deprovision POP/IMAP for. 

419 

420 Returns: 

421 int: A code indicating if the operation succeeded or failed. 

422 """ 

423 command = ("user", account, "deprovision", "popimap") 

424 return self.gam_command(command) 

425 

426 def disable_2fa(self, account: GoogleAccount) -> int: 

427 """DESTRUCTIVE! Disables 2FA on the account. 

428 

429 Args: 

430 account (GoogleAccount): The user@compiler.la to disable 2FA for. 

431 

432 Returns: 

433 int: A code indicating if the operation succeeded or failed. 

434 """ 

435 command = ("user", account, "turnoff2sv") 

436 return self.gam_command(command) 

437 

438 def get(self, format: int = Format.BASIC, inactive: bool = False, org_units: list[str] = [], **kwargs) -> str: 

439 """Get a list of accounts in the workspace. 

440 

441 Args: 

442 format (int): The format for the output. Use the `compiler_admin.Format` helper class. 

443 inactive (bool): True to display inactive users. False to display active users. 

444 org_units (list[str]): A list of org units that, if provided, filters users to only those in any of the org units. 

445 

446 Returns: 

447 str: A formatted list of user accounts. 

448 """ 

449 flag = str(inactive).lower() 

450 output = "" 

451 queries = "" 

452 command = ("print", "users", "issuspended", flag, "isarchived", flag) 

453 

454 if len(kwargs) > 0: 

455 for k, v in kwargs.items(): 

456 command += (k, v) 

457 

458 if len(org_units) > 0: 

459 workspace_org_units = GoogleOrgs.ORG_UNITS.values() 

460 if not all((ou in workspace_org_units for ou in org_units)): 

461 raise ValueError(f"Unexpected org_unit(s): {', '.join(org_units)}") 

462 queries = ",".join([f"'orgUnitPath={ou}'" for ou in org_units]) 

463 command += ("queries", queries) 

464 

465 if format == Format.CSV: 

466 command += ("full",) 

467 elif format == Format.JSON: 

468 queries = ",".join(org_units) 

469 if queries: 

470 user_entity = ("ous_arch" if inactive else "ou_na_ns", queries) 

471 else: 

472 user_entity = ("all", "users_arch_or_susp" if inactive else "users_na_ns") 

473 

474 command = ( 

475 "info", 

476 "users", 

477 *user_entity, 

478 "nobuildingnames", 

479 "noschemas", 

480 "formatjson", 

481 ) 

482 

483 lines = self.gam_command_output(command) 

484 if format == Format.JSON: 

485 # GAM returns JSON record lines, write a JSON array for convenience 

486 output = f"[{",".join(lines)}]" 

487 else: 

488 output = "".join(lines) 

489 

490 return output 

491 

492 def reset_recovery_info(self, account: GoogleAccount, recovery_email: str, recovery_phone: str) -> int: 

493 """DESTRUCTIVE! Resets the account's recovery information. 

494 

495 Args: 

496 account (GoogleAccount): The user@compiler.la to reset recovery info for. 

497 

498 Returns: 

499 int: A code indicating if the operation succeeded or failed. 

500 """ 

501 command = ("update", "user", account, "recoveryemail", recovery_email) 

502 res = self.gam_command(command) 

503 

504 command = ("update", "user", account, "recoveryphone", recovery_phone) 

505 res += self.gam_command(command) 

506 

507 return res 

508 

509 def reset_password(self, account: GoogleAccount, notify: str = None) -> int: 

510 """DESTRUCTIVE! Resets the account's password to a new, random password. 

511 

512 Args: 

513 account (GoogleAccount): The user@compiler.la to reset the password for. 

514 notify (str): Optional email address to send a notification with the new password. 

515 

516 Returns: 

517 int: A code indicating if the operation succeeded or failed. 

518 """ 

519 command = ("update", "user", account, "password", "random", "changepassword") 

520 if notify: 

521 command += ("notify", notify, "from", GoogleUsers.USER_HELLO) 

522 

523 self.gam_command(command) 

524 

525 def remove_from_groups(self, account: GoogleAccount) -> int: 

526 """DESTRUCTIVE! Removes the account from all groups. 

527 

528 Args: 

529 account (GoogleAccount): The user@compiler.la to remove from all its groups. 

530 

531 Returns: 

532 int: A code indicating if the operation succeeded or failed. 

533 """ 

534 command = ("user", account, "delete", "groups") 

535 return self.gam_command(command) 

536 

537 def signout(self, account: GoogleAccount) -> int: 

538 """Sign a user out from all active sessions. 

539 

540 Args: 

541 account (GoogleAccount): The user@compiler.la to sign out. 

542 

543 Returns: 

544 int: A code indicating if the operation succeeded or failed. 

545 """ 

546 command = ("user", account, "signout") 

547 return self.gam_command(command)