Files
colio/users/views.py

521 lines
22 KiB
Python

import math
from django.conf import settings
from django.shortcuts import get_object_or_404
from rest_framework.views import APIView
from rest_framework.viewsets import ReadOnlyModelViewSet
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer, TokenRefreshSerializer, TokenObtainSerializer
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError, InvalidToken
from rest_framework import viewsets, status
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.decorators import action
from django.contrib.auth import authenticate
from django.db.models import Case, When, Value, IntegerField, Q
from django.utils.functional import cached_property
from django.core.cache import cache
from .models import *
from .serializers import *
from .services import *
from .permissions import *
from common.utils.fileManager import file_delete
from projects.serializers import *
from portfolios.serializers import *
from django.db import transaction
from google.oauth2 import id_token
from google.auth.transport import requests
from github import Github, GithubException
CACHE_TIMEOUT = 60 * 60
PAGE_SIZE = 20
class RefreshAPIView(APIView):
permission_classes = [AllowAny]
# access token 재발급
@transaction.atomic
def post(self, request):
refresh = request.COOKIES.get("refresh")
if not refresh:
return Response({"message": "No refresh token"}, status=status.HTTP_400_BAD_REQUEST)
if request.data.get("user_id", None) != RefreshToken(refresh).payload.get("user_id", None):
return Response({"message": "Wrong userid"}, status=status.HTTP_400_BAD_REQUEST)
try:
serializer = TokenRefreshSerializer(data={'refresh': refresh})
if serializer.is_valid():
res = Response({"access": serializer.validated_data['access']}, status=status.HTTP_200_OK)
res.set_cookie("refresh", serializer.validated_data['refresh'], httponly=True, samesite=None, secure=True)
return res
except TokenError as e:
return Response({"message": f"Invalid token: {e}"}, status=status.HTTP_401_UNAUTHORIZED)
class GoogleLoginAPIView(APIView):
permission_classes = [AllowAny]
@transaction.atomic
def post(self, request):
token_str = request.data.get("id_token")
if not token_str:
return Response({"message": "no id_token"}, status=status.HTTP_400_BAD_REQUEST)
try:
idinfo = id_token.verify_oauth2_token(
token_str,
requests.Request(),
settings.GOOGLE_CLIENT_ID
)
except ValueError:
return Response({"message": "wrong id_token"}, status=status.HTTP_400_BAD_REQUEST)
email = idinfo["email"]
# sub = idinfo["sub"] # Google 고유 ID
# 이미 소셜 회원가입을 한 경우, 로그인 기능.
if user := User.objects.filter(email=email).first():
# 로그인 전 쿠키에 있는 리프레시 토큰 무효화
try:
refresh = request.COOKIES.get('refresh')
if refresh:
refresh_token = RefreshToken(refresh)
refresh_token.blacklist()
except TokenError as e:
pass # 어차피 만료된거면 그냥 넘어가기
refresh = RefreshToken.for_user(user)
access = str(refresh.access_token)
res = Response(
{
"message": "login success",
"user_id": user.id,
"nickname": user.nickname,
"access": access,
"is_member": True,
"is_custom_url": user.is_custom_url
},
status=status.HTTP_200_OK,
)
res.set_cookie("refresh", str(refresh), httponly=True, samesite=None, secure=True)
return res
else:
return Response(
{
"message": "Not member",
"email": email,
"is_member": False
},
status=status.HTTP_200_OK
)
class GithubAPIViewSet(viewsets.ViewSet):
permission_classes = [IsAuthenticated, IsGithubLinked]
# 깃허브 연동 및 api 요청
def _get_github(self):
uid = self.request.user.id
cache_key = f"gh-token:{uid}"
token = cache.get(cache_key)
if token is None:
token = self.request.user.github_token.access_token
cache.set(cache_key, token, timeout=CACHE_TIMEOUT)
return Github(token, per_page=PAGE_SIZE)
@cached_property
def github(self):
return self._get_github()
@transaction.atomic
@action(detail=False, methods=["post"], url_path="connect", permission_classes=[IsAuthenticated])
def connect(self, request):
serializer = GithubCodeSerializer(data=request.data)
if serializer.is_valid():
try:
data = GithubTokenService.exchange_code_to_access_token(serializer.validated_data['code'])
except Exception as e:
return Response({"connected": False, "detail": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
GithubTokenService.github_token_save(data, request.user)
return Response({"connected": True}, status=status.HTTP_200_OK)
@action(detail=False, methods=["get"], url_path="personal-repos")
def personal_repos(self, request):
try:
page = int(request.query_params.get("page", 1))
except ValueError:
return Response({"is_page_int": False,"detail": "page and page_size must be integers"}, status=status.HTTP_400_BAD_REQUEST)
if page < 1:
return Response({"is_page_gte_1": False, "detail": "page and page_size must be >= 1"}, status=status.HTTP_400_BAD_REQUEST)
user = self.github.get_user()
repo_list = user.get_repos()
total_page = GithubTokenService.cal_github_total_page(repo_list.totalCount, PAGE_SIZE)
repos = repo_list.get_page(page - 1)
data = [{'full_name': r.full_name} for r in repos]
return Response({
"page": page,
"page_size": PAGE_SIZE,
"total_page": total_page,
"results": data
}, status=status.HTTP_200_OK)
@action(detail=False, methods=["get"], url_path="commits")
def commits(self, request):
repo_full_name = request.query_params.get("repo")
if not repo_full_name:
return Response({"is_query_param": False}, status=status.HTTP_400_BAD_REQUEST)
try:
page = int(request.query_params.get("page", 1))
except ValueError:
return Response({"is_page_int": False,"detail": "page and page_size must be integers"}, status=status.HTTP_400_BAD_REQUEST)
if page < 1:
return Response({"is_page_gte_1": False, "detail": "page and page_size must be >= 1"}, status=status.HTTP_400_BAD_REQUEST)
try:
repo = self.github.get_repo(repo_full_name)
commit_list = repo.get_commits()
total_page = GithubTokenService.cal_github_total_page(commit_list.totalCount, PAGE_SIZE)
commits = commit_list.get_page(page - 1)
except Exception as e:
return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
data = [
{"sha": c.sha, "commit_message": c.commit.message}
for c in commits
]
return Response({
"page": page,
"page_size": PAGE_SIZE,
"total_page": total_page,
"results": data
}, status=status.HTTP_200_OK)
@action(detail=False, methods=["get"], url_path="commit-files")
def commit_files(self, request):
repo_full, sha = request.query_params.get("repo"), request.query_params.get("sha")
if not (repo_full and sha):
return Response({"is_query_param": False}, status=status.HTTP_400_BAD_REQUEST)
commit = self.github.get_repo(repo_full).get_commit(sha)
return Response([{"filename": f.filename, "status": f.status} for f in commit.files])
@action(detail=False, methods=["get"], url_path="file-content")
def file_content(self, request):
repo_full = request.query_params.get("repo")
filename = request.query_params.get("filename")
sha = request.query_params.get("sha")
if not (repo_full and filename and sha):
return Response({"is_query_param": False}, status=status.HTTP_400_BAD_REQUEST)
try:
file = self.github.get_repo(repo_full).get_contents(path=filename, ref=sha)
if file.encoding == "base64":
content = file.decoded_content.decode("utf-8", errors="replace")
else:
content = file.content
return Response({"path": file.path, "ref": sha, "content": content})
except GithubException as e:
return Response({"detail": str(e)}, status=e.status)
@action(detail=False, methods=["get"], url_path="organizations")
def organizations(self, request):
try:
page = int(request.query_params.get("page", 1))
except ValueError:
return Response({"is_page_int": False,"detail": "page and page_size must be integers"}, status=status.HTTP_400_BAD_REQUEST)
if page < 1:
return Response({"is_page_gte_1": False, "detail": "page and page_size must be >= 1"}, status=status.HTTP_400_BAD_REQUEST)
user = self.github.get_user()
org_list = user.get_orgs()
total_page = GithubTokenService.cal_github_total_page(org_list.totalCount, PAGE_SIZE)
orgs = org_list.get_page(page - 1)
data = [{'org_login': org.login} for org in orgs]
return Response({
"page": page,
"page_size": PAGE_SIZE,
"total_page": total_page,
"results": data
}, status=status.HTTP_200_OK)
@action(detail=False, methods=["get"], url_path="organization-repos")
def organization_repos(self, request):
try:
page = int(request.query_params.get("page", 1))
except ValueError:
return Response({"is_page_int": False,"detail": "page and page_size must be integers"}, status=status.HTTP_400_BAD_REQUEST)
if page < 1:
return Response({"is_page_gte_1": False, "detail": "page and page_size must be >= 1"}, status=status.HTTP_400_BAD_REQUEST)
org_login = request.query_params.get("org")
if not org_login:
return Response({"is_query_param": False, "detail": "org parameter is required"}, status=status.HTTP_400_BAD_REQUEST)
repo_list = self.github.get_organization(org_login).get_repos()
total_page = GithubTokenService.cal_github_total_page(repo_list.totalCount, PAGE_SIZE)
repos = repo_list.get_page(page - 1)
data = [{'full_name': r.full_name} for r in repos]
return Response({
"page": page,
"page_size": PAGE_SIZE,
"total_page": total_page,
"results": data
}, status=status.HTTP_200_OK)
class JoinAPIView(APIView):
permission_classes = [AllowAny]
# 회원가입
@transaction.atomic
def post(self, request):
serializer = JoinSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
access = str(RefreshToken.for_user(user).access_token)
res = Response({
"message": "회원가입이 완료되었습니다.",
"access": access
}, status=status.HTTP_200_OK)
return res
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class LoginAPIView(APIView):
permission_classes = [AllowAny]
# 로그인
@transaction.atomic
def post(self, request):
email=request.data.get("email", None)
password=request.data.get("password", None)
# 로그인 전 쿠키에 있는 리프레시 토큰 무효화
try:
refresh = request.COOKIES.get('refresh')
if refresh:
refresh_token = RefreshToken(refresh)
refresh_token.blacklist()
except TokenError as e:
pass
if user := authenticate(email=email, password=password):
serializer = TokenObtainPairSerializer(data={'email': email, 'password': password})
# id, 비번 맞나 틀리나 검사
if serializer.is_valid():
res = Response(
{
"message": "login success",
"user_id": user.id,
"nickname": user.nickname,
"access": serializer.validated_data['access'],
"is_custom_url": user.is_custom_url
},
status=status.HTTP_200_OK,
)
res.set_cookie("refresh", serializer.validated_data['refresh'], httponly=True, samesite=None, secure=True)
return res
else:
return Response(serializer.errors)
else: # id, 비번 둘 중 하나가 틀렸을 때
return Response({"message": "아이디 혹은 비밀번호가 맞지 않습니다."}, status=status.HTTP_400_BAD_REQUEST)
class CheckUserFieldDuplicateAPIView(APIView):
permission_classes = [AllowAny]
# 유저 필드 중복 확인
def get(self, request):
field = request.query_params.get('field')
value = request.query_params.get('value')
if not field or not value:
return Response({"message": "올바르지 않은 요청입니다."}, status=status.HTTP_400_BAD_REQUEST)
try:
if CheckUserFieldDuplicateService.check_duplicate(field, value):
return Response({"message": f"존재하는 {field} 입니다."}, status=status.HTTP_400_BAD_REQUEST)
else:
return Response({"message": f"사용해도 되는 {field} 입니다."}, status=status.HTTP_200_OK)
except ValueError as e:
return Response({"message": str(e)}, status=status.HTTP_400_BAD_REQUEST)
class TagUserAPIView(APIView):
def get(self, request):
nickname = request.query_params.get('nickname')
users = User.objects.filter(nickname__icontains=nickname).annotate(
priority=Case(
When(nickname__iexact=nickname, then=Value(0)),
default=Value(1),
output_field=IntegerField()
)
).order_by('priority').values('profile_image', 'nickname')[:5]
serializer = TagUserSerializer(users, many=True)
return Response({'users': serializer.data})
class SetPortofolioRequiredInfoAPIView(APIView):
def get(self, request):
custom_url = request.GET.get('custom_url', None)
if not custom_url:
return Response({"message": "no url"}, status=status.HTTP_400_BAD_REQUEST)
normalized = custom_url.strip().lower()
if "www" in normalized or "vvvvvv" in normalized:
return Response({"message": "custom_url cannot contain 'www'"}, status=status.HTTP_400_BAD_REQUEST)
if User.objects.filter(custom_url=custom_url).exists():
return Response({"message": "already used url"}, status=status.HTTP_400_BAD_REQUEST)
else:
return Response({"message": "can use this url"}, status=status.HTTP_200_OK)
@transaction.atomic
def patch(self, request):
user = request.user
serializer = SetPortofolioRequiredInfoSerializer(user, data=request.data)
if serializer.is_valid(raise_exception=True):
serializer.save()
user.is_custom_url = True
user.save()
return Response({"message": "updated successfully"}, status=status.HTTP_202_ACCEPTED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
# 마이페이지 관련
class MyPageProfileAPIView(APIView):
# 프로필 조회(안 읽은 알림 수)
def get(self, request, nickname):
target_user = get_object_or_404(User, nickname=nickname)
user = request.user
serializer = UserProfileSerializer(target_user)
data = serializer.data
data['represent_portfolio_id'] = UserToPortfolioService.get_represent_portfolio(target_user)
return Response(data, status=status.HTTP_200_OK)
# 프로필 수정
@transaction.atomic
def patch(self, request, nickname):
target_user = get_object_or_404(User, nickname=nickname)
user = request.user
if user == target_user:
serializer = UserProfileSerializer(user, request.data, partial=True)
if serializer.is_valid():
if serializer.validated_data.get('profile_image') and CheckUserFieldValueExistService.check_exist(user, 'profile_image'):
file_delete(user, 'profile_image')
if serializer.validated_data.get('banner_image') and CheckUserFieldValueExistService.check_exist(user, 'banner_image'):
file_delete(user, 'banner_image')
serializer.save()
data = serializer.data
data['represent_portfolio_id'] = UserToPortfolioService.get_represent_portfolio(target_user)
return Response(data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
else:
return Response({"message": "Not account owner"}, status=status.HTTP_400_BAD_REQUEST)
class MyPageWorkListAPIView(APIView):
# 포폴, 플젝, 작업중, 스크랩 조회(안 읽은 알림 수)
def get(self, request, nickname):
target_user = get_object_or_404(User, nickname=nickname)
user = request.user
retreive_type = request.query_params.get('type')
data = {}
if retreive_type == 'portfolio':
portfolios = UserToPortfolioService.get_published_portfolio(target_user)
serializer = PortfolioListViewSerializer(portfolios, many=True)
data = {
"portfolios": serializer.data
}
elif retreive_type == 'project':
solo_projects = UserToProjectService.get_published_solo_project(target_user)
team_projects = UserToProjectService.get_published_team_project(target_user)
sp_serializer = ProjectListViewSerializer(solo_projects, many=True)
tp_serializer = ProjectListViewSerializer(team_projects, many=True)
data = {
"solo_projects": sp_serializer.data,
"team_projects": tp_serializer.data,
}
elif retreive_type == 'working':
if user != target_user:
return Response({"message": "Not account owner"}, status=status.HTTP_400_BAD_REQUEST)
portfolios = UserToPortfolioService.get_unpublished_portfolio(target_user)
solo_projects = UserToProjectService.get_unpublished_solo_project(target_user)
team_projects = UserToProjectService.get_unpublished_team_project(target_user)
po_serializer = PortfolioListViewSerializer(portfolios, many=True)
sp_serializer = ProjectListViewSerializer(solo_projects, many=True)
tp_serializer = ProjectListViewSerializer(team_projects, many=True)
data = {
"portfolios": po_serializer.data,
"solo_projects": sp_serializer.data,
"team_projects": tp_serializer.data,
}
elif retreive_type == 'scrap':
if user != target_user:
return Response({"message": "Not account owner"}, status=status.HTTP_400_BAD_REQUEST)
portfolios = UserToPortfolioService.get_scrap_portfolio(target_user)
po_serializer = PortfolioListViewSerializer(portfolios, many=True)
projects = UserToProjectService.get_scrap_project(target_user)
pr_serializer = ProjectListViewSerializer(projects, many=True)
data = {
"portfolios": po_serializer.data,
"projects": pr_serializer.data,
}
else:
return Response({"message": "not allowed retreive_type"}, status=status.HTTP_400_BAD_REQUEST)
return Response(data, status=status.HTTP_200_OK)
class MyPageMemberInfoAPIView(APIView):
# 내 정보 조회(안 읽은 알림 수)
def get(self, request):
user = request.user
serializer = UserMemberInfoSerializer(user)
data = serializer.data
return Response(data, status=status.HTTP_200_OK)
# 내 정보 수정
@transaction.atomic
def patch(self, request):
user = request.user
serializer = UserMemberInfoSerializer(user, request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class UserInfoAPIView(APIView):
permission_classes = [AllowAny]
# 유저 대표 포트폴리오 id 제공
def get(self, request):
sub_domain = request.query_params.get('sub_domain', '')
if not sub_domain:
return Response({"message": "no sub domain"}, status=status.HTTP_400_BAD_REQUEST)
target_user = get_object_or_404(User, custom_url=sub_domain)
represent_portfolio = target_user.owned_portfolios.filter(is_represent=True, is_published=True).first()
if not represent_portfolio:
return Response({"message": "no represent published portfolio"}, status=status.HTTP_404_NOT_FOUND)
return Response({"portfolio_id": represent_portfolio.id}, status=status.HTTP_200_OK)