import requests, math from .models import * from projects.models import * from portfolios.models import * from django.utils import timezone from datetime import timedelta from django.conf import settings DUPLICATE_CHECK = { 'email': 'email', 'nickname': 'nickname', 'phone': 'phone' } GITHUB_TOKEN_URL = "https://github.com/login/oauth/access_token" class CheckUserFieldDuplicateService: @staticmethod def check_duplicate(field: str, value: str) -> bool: if field not in DUPLICATE_CHECK: raise ValueError(f"{field}는 지원하지 않는 필드입니다.") filter_dict = {DUPLICATE_CHECK[field]:value} return User.objects.filter(**filter_dict).exists() class CheckUserFieldValueExistService: @staticmethod def check_exist(user: User, field) -> bool: if not field: return False if getattr(user, field) == None: return False else: return True class UserToNotificationService: @staticmethod def get_new_notification_count(user: User): thirty_days_ago = timezone.now() - timedelta(days=30) return user.notifications.filter(created_at__gt=thirty_days_ago, is_read=False).count() @staticmethod def get_all_notification(user: User): thirty_days_ago = timezone.now() - timedelta(days=30) return user.notifications.filter(created_at__gt=thirty_days_ago) # 유저 -> 포트폴리오 관련 서비스 로직 class UserToPortfolioService: @staticmethod def get_represent_portfolio(user: User): if represent_portfolio := user.owned_portfolios.filter(is_represent=True).first(): return represent_portfolio.id else: return None @staticmethod def get_published_portfolio(user: User): return user.owned_portfolios.filter(is_published=True) @staticmethod def get_unpublished_portfolio(user: User): return user.owned_portfolios.filter(is_published=False) @staticmethod def get_scrap_portfolio(user: User): return user.scrapped_portfolios.filter(is_published=True) # 유저 -> 프로젝트 관련 서비스 로직 class UserToProjectService: @staticmethod def get_published_solo_project(user: User): return user.owned_projects.filter(is_team=False, is_published=True) @staticmethod def get_unpublished_solo_project(user: User): return user.owned_projects.filter(is_team=False, is_published=False) @staticmethod def get_published_team_project(user: User): return Project.objects.filter( team_project_member_list__user = user, is_published=True ).distinct() @staticmethod def get_unpublished_team_project(user: User): return Project.objects.filter( team_project_member_list__user = user, is_published=False ).distinct() @staticmethod def get_scrap_project(user: User): return user.scrapped_projects.filter(is_published=True) class GithubTokenService: @staticmethod def exchange_code_to_access_token(code: str) -> dict: try: resp = requests.post( GITHUB_TOKEN_URL, headers={"Accept": "application/json"}, data = { "client_id": settings.GITHUB_CLIENT_ID, "client_secret": settings.GITHUB_CLIENT_SECRET, "code": code, }, timeout=10, ) resp.raise_for_status() data = resp.json() if "error" in data: msg = data.get("error_description", data["error"]) raise Exception(f"GitHub token exchange failed: {msg}") return data except requests.HTTPError as e: raise Exception( f"Github token exchange failed: {e.response.status_code} - {e.response.text}" ) def github_token_save(data: dict, user: User): return GithubToken.objects.update_or_create( user=user, defaults={ "access_token": data["access_token"], "scope": data.get("scope", ""), "token_type": data.get("token_type", "bearer"), }, ) def cal_github_total_page(list_count, page_size): return math.ceil(list_count / page_size)