import asyncio import os from dotenv import load_dotenv from tqdm.asyncio import tqdm as atqdm import httpx load_dotenv() gitea_url = os.getenv("GITEA_URL") api_token = os.getenv("GITEA_API_TOKEN") headers = {"Authorization": f"token {api_token}"} # 强制保留用户 PROTECTED_USER_NAMES = ['prof_chen', 'Ding-JJ', 'GAOJJJ', 'LAN206', 'Old-Watch', 'Yuayifa', 'bakaEC', 'cheny', 'gaoyuki', 'rs', 'shanshan', 'test2333', 'tianma_admin', 'zhangzd', 'zoujielin'] # 最大并发数 MAX_CONCURRENCY = 20 # 删除用户及其仓库的异步函数 async def delete_user(username, client: httpx.AsyncClient, semaphore: asyncio.Semaphore): try: async with semaphore: # 获取用户的所有仓库 repos_url = f"{gitea_url}/api/v1/users/{username}/repos" repos_resp = await client.get(repos_url) if repos_resp.status_code == 200: repos = repos_resp.json() # 并行删除所有仓库 delete_tasks = [] for repo in repos: delete_repo_url = f"{gitea_url}/api/v1/repos/{username}/{repo['name']}" delete_tasks.append(client.delete(delete_repo_url)) if delete_tasks: await asyncio.gather(*delete_tasks, return_exceptions=True) # 删除用户 user_url = f"{gitea_url}/api/v1/admin/users/{username}" await client.delete(user_url) except (httpx.HTTPError, httpx.RequestError) as e: print(f"删除用户 {username} 时出错: {e}") async def get_user_names(client: httpx.AsyncClient): users_url = f"{gitea_url}/api/v1/admin/users" resp = await client.get(users_url) data = resp.json() user_names = [user["username"] for user in data] return user_names async def get_org_user_names(org_name="TianMa431", client: httpx.AsyncClient = None): user_names = [] org_url = f"{gitea_url}/api/v1/orgs/{org_name}/members" resp = await client.get(org_url) data = resp.json() for user in data: user_names.append(user["username"]) return user_names async def delete_users_parallel(user_names): async with httpx.AsyncClient(headers=headers, timeout=30.0) as client: semaphore = asyncio.Semaphore(MAX_CONCURRENCY) print(f"准备删除 {len(user_names)} 个用户...") # 创建所有删除任务 tasks = [delete_user(username, client, semaphore) for username in user_names if username not in PROTECTED_USER_NAMES] # 并行执行所有删除任务,并显示进度条 await atqdm.gather(*tasks, desc="删除用户中") async def main(): async with httpx.AsyncClient(headers=headers, timeout=30.0) as client: # 并行获取用户列表和组织成员列表 user_names, org_user_names = await asyncio.gather( get_user_names(client), get_org_user_names(client=client) ) # 并行删除用户 users_to_delete = [name for name in user_names if name not in org_user_names] await delete_users_parallel(users_to_delete) if __name__ == "__main__": asyncio.run(main())