commit 122b367bed0d9299b06b9179f98e7c4f585ac158 Author: sm4640 Date: Wed Jan 14 14:54:42 2026 +0900 Feat: [main] baekjoon-bot-v1 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..d31cbf3 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +# .dockerignore +__pycache__/ +*.py[cod] +venv/ +.env +.git/ \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..64d49ae --- /dev/null +++ b/.gitignore @@ -0,0 +1,216 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[codz] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py.cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +# Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +# poetry.lock +# poetry.toml + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. +# https://pdm-project.org/en/latest/usage/project/#working-with-version-control +# pdm.lock +# pdm.toml +.pdm-python +.pdm-build/ + +# pixi +# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. +# pixi.lock +# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one +# in the .venv directory. It is recommended not to include this directory in version control. +.pixi + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# Redis +*.rdb +*.aof +*.pid + +# RabbitMQ +mnesia/ +rabbitmq/ +rabbitmq-data/ + +# ActiveMQ +activemq-data/ + +# SageMath parsed files +*.sage.py + +# Environments +.env +.envrc +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +# .idea/ + +# Abstra +# Abstra is an AI-powered process automation framework. +# Ignore directories containing user credentials, local state, and settings. +# Learn more at https://abstra.io/docs +.abstra/ + +# Visual Studio Code +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore +# and can be added to the global gitignore or merged into this file. However, if you prefer, +# you could uncomment the following to ignore the entire vscode folder +# .vscode/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# Marimo +marimo/_static/ +marimo/_lsp/ +__marimo__/ + +# Streamlit +.streamlit/secrets.toml \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..c0015db --- /dev/null +++ b/app.py @@ -0,0 +1,187 @@ +from typing import Optional + +from fastapi import FastAPI, HTTPException, Query, Depends +from fastapi.responses import JSONResponse +from dotenv import load_dotenv +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from utils import env, resolve_difficulty, resolve_tags, build_query, get_problem +from db import get_db +from workbook_picker import pick_from_workbook +from workbook_importer import import_workbook +from workbook_enricher import enrich_workbook + + +load_dotenv() + +app = FastAPI() + + +@app.get("/") +def root(): + return {"status": "ok"} + + +@app.post("/admin/workbooks/{workbook_id}/enrich") +async def admin_enrich_workbook( + workbook_id: int, + only_missing: bool = Query(True, description="True면 NULL만 채움 / False면 덮어씀"), + commit_every: int = Query(50, ge=1, le=500, description="몇 개마다 commit 할지"), + sleep_sec: float = Query(0.12, ge=0.0, le=2.0, description="solved.ac 호출 사이 sleep"), + db: AsyncSession = Depends(get_db), +): + result = await enrich_workbook( + db, + workbook_id=workbook_id, + only_missing=only_missing, + commit_every=commit_every, + sleep_sec=sleep_sec, + ) + return {"status": "ok", "result": result} + + +@app.delete("/admin/workbooks/{workbook_id}/reset") +async def reset_workbook_progress( + workbook_id: int, + db: AsyncSession = Depends(get_db), +): + try: + res = await db.execute( + text("DELETE FROM workbook_sends WHERE workbook_id = :wid"), + {"wid": workbook_id}, + ) + await db.commit() + + # res.rowcount: 삭제된 행 수(=초기화된 문제 수) + return { + "status": "ok", + "workbook_id": workbook_id, + "deleted_sends": int(res.rowcount or 0), + "message": "workbook progress reset (problems can be picked again)", + } + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +@app.get("/today") +async def today( + source_mode: str = Query(env("SOURCE_MODE_DEFAULT", "search"), description="search|workbook"), + workbook_id: Optional[int] = Query(None, description="문제집 모드일 때 workbook id"), + workbook_pick: str = Query("level_asc", description="random|level_asc"), + + difficulty_mode: str = Query(env("DIFFICULTY_MODE_DEFAULT", "easy"), description="easy|hard|all"), + tag_mode: str = Query(env("TAG_MODE_DEFAULT", "easy"), description="easy|hard|all"), + difficulty: Optional[str] = Query(None, description="예: 6..10 (주면 mode보다 우선)"), + tags: Optional[str] = Query(None, description="예: dp,graphs (주면 mode보다 우선)"), + lang: str = Query(env("LANG_DEFAULT", "all"), description="ko | en | ko,en | all"), + + db: AsyncSession = Depends(get_db), +): + sm = (source_mode or "").lower().strip() + if sm == "workbook": + wid = workbook_id or (int(env("WORKBOOK_ID_DEFAULT")) if env("WORKBOOK_ID_DEFAULT") else None) + if not wid: + return JSONResponse(status_code=400, content={"error": "workbook_id is required for workbook mode"}) + + pid, title, level = await pick_from_workbook(db, wid, pick=workbook_pick) + if not pid: + return JSONResponse(status_code=409, content={"error": "no_more_problems_in_workbook", "workbook_id": wid}) + + problem_url = f"https://www.acmicpc.net/problem/{pid}" + solved_url = f"https://solved.ac/problems/id/{pid}" + level_text = f"Lv. {level}" if level is not None else "Lv. ?" + + discord_payload = { + "embeds": [{ + "title": "🔔 오늘의 백준 추천 문제 (문제집)", + "description": ( + f"**{pid}번: {title}**\n" + f"난이도: **{level_text}**\n" + f"source_mode: `workbook` / workbook_id: `{wid}`" + ), + "fields": [ + {"name": "문제 링크", "value": f"[바로가기]({problem_url})", "inline": True}, + {"name": "해설/정보", "value": f"[Solved.ac]({solved_url})", "inline": True}, + ], + "footer": {"text": "매일 오전 10시 정기 알림 (n8n)"} + }] + } + + return { + "source_mode": "workbook", + "workbook_id": wid, + "problemId": pid, + "title": title, + "level": level, + "problemUrl": problem_url, + "solvedUrl": solved_url, + "discordPayload": discord_payload, + } + + # 2) 기존 search 모드(네가 쓰던 그대로) + dm = (difficulty_mode or "").lower() + tm = (tag_mode or "").lower() + + if dm not in ("easy", "hard", "all"): + return JSONResponse(status_code=400, content={"error": "difficulty_mode must be easy|hard|all"}) + if tm not in ("easy", "hard", "all"): + return JSONResponse(status_code=400, content={"error": "tag_mode must be easy|hard|all"}) + + chosen_difficulty = resolve_difficulty(difficulty, dm) + chosen_tags = resolve_tags(tags if tags is not None else None, tm) + query = build_query(chosen_difficulty, chosen_tags, lang) + + pid, title, level = get_problem(query=query) + if not pid: + return JSONResponse(status_code=503, content={"error": "failed_to_fetch_problem", "query": query}) + + problem_url = f"https://www.acmicpc.net/problem/{pid}" + solved_url = f"https://solved.ac/problems/id/{pid}" + level_text = f"Lv. {level}" if level is not None else "Lv. ?" + + discord_payload = { + "embeds": [{ + "title": "🔔 오늘의 백준 추천 문제", + "description": ( + f"**{pid}번: {title}**\n" + f"난이도: **{level_text}**\n" + f"difficulty_mode: `{dm}` / tag_mode: `{tm}` / lang: `{lang}`\n" + f"filter: `{chosen_difficulty}` / tags: `{', '.join(chosen_tags) if chosen_tags else 'none'}`" + ), + "fields": [ + {"name": "문제 링크", "value": f"[바로가기]({problem_url})", "inline": True}, + {"name": "해설/정보", "value": f"[Solved.ac]({solved_url})", "inline": True}, + ], + "footer": {"text": "매일 오전 10시 정기 알림 (n8n)"} + }] + } + + return { + "source_mode": "search", + "difficulty_mode": dm, + "tag_mode": tm, + "lang": lang, + "difficulty": chosen_difficulty, + "tags": chosen_tags, + "query": query, + "problemId": pid, + "title": title, + "level": level, + "problemUrl": problem_url, + "solvedUrl": solved_url, + "discordPayload": discord_payload, + } + + +# @app.post("/admin/workbooks/{workbook_id}/import") +# async def admin_import_workbook( +# workbook_id: int, +# title: Optional[str] = Query(None, description="문제집 제목(옵션)"), +# db: AsyncSession = Depends(get_db), +# ): +# try: +# result = await import_workbook(db, workbook_id=workbook_id, title=title) +# return {"status": "ok", "result": result} +# except Exception as e: +# return JSONResponse(status_code=500, content={"error": str(e)}) \ No newline at end of file diff --git a/db.py b/db.py new file mode 100644 index 0000000..e7769e7 --- /dev/null +++ b/db.py @@ -0,0 +1,21 @@ +import os +from typing import AsyncGenerator + +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession + +DATABASE_URL = os.getenv("DATABASE_URL", "").strip() +if not DATABASE_URL: + raise RuntimeError("DATABASE_URL is required (e.g. postgresql+asyncpg://...)") + +engine = create_async_engine(DATABASE_URL, pool_pre_ping=True) + +SessionLocal = async_sessionmaker( + bind=engine, + class_=AsyncSession, + autoflush=False, + expire_on_commit=False, +) + +async def get_db() -> AsyncGenerator[AsyncSession, None]: + async with SessionLocal() as session: + yield session diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..78dce83 --- /dev/null +++ b/dockerfile @@ -0,0 +1,20 @@ +FROM python:3.12-slim + +WORKDIR /app + +# 타임존 설정 (로그에 한국 시간이 찍히도록 함) +ENV TZ=Asia/Seoul +RUN apt-get update && apt-get install -y tzdata && \ + ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# 현재 디렉토리의 모든 파일(app.py, utils.py 등)을 복사 +COPY . . + +# 로그가 즉시 출력되도록 설정 +ENV PYTHONUNBUFFERED=1 + +EXPOSE 8000 +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..d77e83d Binary files /dev/null and b/requirements.txt differ diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..405751b --- /dev/null +++ b/utils.py @@ -0,0 +1,147 @@ +import os +import random +import time +from typing import Optional, Tuple, List + +import requests + +# ====== HTTP Session ====== +SESSION = requests.Session() +SESSION.headers.update({"User-Agent": "baekjoon-n8n-bot/1.0"}) + +# 필요한 언어만 관리(엄격 모드에서 제외할 대상) +KNOWN_LANGS = ["ko", "en", "ja", "ru", "zh", "de", "fr", "es", "pt", "it"] + + +def fetch_json_with_retry(url: str, params: dict, retries: int = 3, timeout=(3.05, 10)) -> dict: + last_err = None + for i in range(retries): + try: + res = SESSION.get(url, params=params, timeout=timeout) + res.raise_for_status() + return res.json() + except Exception as e: + last_err = e + time.sleep(0.7 * (2 ** i)) + raise last_err + + +def parse_csv(s: str) -> List[str]: + return [x.strip() for x in (s or "").split(",") if x.strip()] + + +def env(name: str, default: str = "") -> str: + return os.getenv(name, default).strip() + + +def build_lang_filter(lang: str) -> str: + """ + 사용자가 읽을 수 있는 언어가 '하나라도' 포함된 문제를 찾는 것이 목적입니다. + -%en 처럼 제외 필터를 쓰면, 한국어와 영어가 모두 있는 양질의 문제가 제외되므로 + 긍정 필터(%ko) 위주로 구성합니다. + """ + raw = (lang or "all").strip().lower() + if raw in ("all", ""): + return "" + + allow = set(parse_csv(raw)) & set(KNOWN_LANGS) + if not allow: + return "" + + # 여러 언어를 선택했을 경우(예: ko,en) -> (%ko | %en) + # 즉, 한국어 '또는' 영어 중 하나라도 지문이 있는 문제 + if len(allow) == 1: + return f"%{next(iter(allow))}" + else: + expr = " | ".join(f"%{c}" for c in sorted(allow)) + return f"({expr})" + + + +def resolve_difficulty(difficulty: Optional[str], difficulty_mode: str) -> str: + if difficulty and difficulty.strip(): + return difficulty.strip() + + mode = (difficulty_mode or env("DIFFICULTY_MODE_DEFAULT", "easy")).lower() + if mode == "easy": + return env("DIFFICULTY_EASY", "6..10") + if mode == "hard": + return env("DIFFICULTY_HARD", "11..15") + if mode == "all": + return env("DIFFICULTY_ALL", "1..30") + return env("DIFFICULTY_EASY", "6..10") + + +def resolve_tags(tags_csv: Optional[str], tag_mode: str) -> List[str]: + """ + tags_csv(쿼리)가 있으면 그것이 최우선. + 없으면 tag_mode 프리셋 기반으로 선택. + TAG_PICK_* 정책에 따라 랜덤 1개 / 전체 / 필터 없음으로 결정. + """ + if tags_csv is not None: + return parse_csv(tags_csv) + + mode = (tag_mode or env("TAG_MODE_DEFAULT", "easy")).lower() + if mode == "easy": + preset = parse_csv(env("TAGS_EASY", "")) + pick = env("TAG_PICK_EASY", env("TAG_PICK", "random")).lower() + elif mode == "hard": + preset = parse_csv(env("TAGS_HARD", "")) + pick = env("TAG_PICK_HARD", env("TAG_PICK", "random")).lower() + elif mode == "all": + preset = parse_csv(env("TAGS_ALL", "")) + pick = env("TAG_PICK_ALL", env("TAG_PICK", "none")).lower() + else: + preset = parse_csv(env("TAGS_EASY", "")) + pick = env("TAG_PICK_EASY", "random").lower() + + if pick == "none": + return [] + if pick == "random": + return [random.choice(preset)] if preset else [] + return preset + + +def build_query(difficulty: str, tags: List[str], lang: str) -> str: + # 1. 난이도 기본 조건 + query_parts = [f"*{difficulty}"] + + # 2. 태그 조건 (괄호로 감싸서 우선순위 확보) + if tags: + join_op = env("TAGS_JOIN", "or").lower() + if join_op == "and": + # 모든 태그가 다 있어야 함: tag:a tag:b + tag_expr = " ".join(f"tag:{t}" for t in tags) + else: + # 태그 중 하나만 있어도 됨: (tag:a | tag:b) + tag_expr = "(" + " | ".join(f"tag:{t}" for t in tags) + ")" + query_parts.append(tag_expr) + + # 3. 언어 조건 (괄호로 감싸기) + lang_filter = build_lang_filter(lang) + if lang_filter: + # 언어 필터가 복합적일 수 있으므로 괄호 처리 + query_parts.append(f"({lang_filter})") + + # 결과 예시: *6..10 (tag:dp | tag:bfs) (%ko) + return " ".join(query_parts) + + +def get_problem(query: str, size: int = 50) -> Tuple[Optional[int], Optional[str], Optional[int]]: + url = "https://solved.ac/api/v3/search/problem" + params = { + "query": query, + "sort": "random", + "direction": "desc", + "page": 1, + "size": size, + } + try: + data = fetch_json_with_retry(url, params=params) + items = data.get("items", []) + if not items: + return None, None, None + p = random.choice(items) + return p.get("problemId"), (p.get("titleKo") or p.get("titleEn") or "제목 없음"), p.get("level") + except Exception: + return None, None, None diff --git a/workbook_enricher.py b/workbook_enricher.py new file mode 100644 index 0000000..9c21fd5 --- /dev/null +++ b/workbook_enricher.py @@ -0,0 +1,156 @@ +import asyncio +from typing import Dict, List, Optional, Tuple + +import httpx +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + + +SOLVED_SHOW_URL = "https://solved.ac/api/v3/problem/show" + + +async def solved_problem_show(client: httpx.AsyncClient, problem_id: int) -> Tuple[Optional[str], Optional[str], Optional[int], Optional[List[str]]]: + """ + solved.ac problem/show 호출해서 메타 가져오기 + 반환: (title_ko, title_en, level, tags_keys) + """ + r = await client.get(SOLVED_SHOW_URL, params={"problemId": problem_id}) + if r.status_code != 200: + return None, None, None, None + + data = r.json() + title_ko = data.get("titleKo") + title_en = data.get("titleEn") + level = data.get("level") + + # tags: [{"key": "...", "isMeta": ..., "bojTagId": ..., "problemCount": ...}, ...] + tags = data.get("tags") or [] + tag_keys = [t.get("key") for t in tags if t.get("key")] + + return title_ko, title_en, level, tag_keys + + +async def enrich_workbook( + db: AsyncSession, + workbook_id: int, + only_missing: bool = True, # True면 NULL인 것만 채움, False면 덮어씀 + commit_every: int = 50, # 몇 개마다 커밋할지 + sleep_sec: float = 0.12, # solved.ac 부하 줄이려고 약간 쉬기 + timeout: float = 10.0, +) -> Dict: + """ + 전제: workbook_problems에 (workbook_id, problem_id)는 이미 채워져 있음 + 목표: solved.ac problem/show로 title_ko/title_en/level/tags를 채움 + """ + + # 1) 대상 problem_id 목록 뽑기 (missing만 or 전체) + if only_missing: + rows = (await db.execute( + text(""" + SELECT problem_id + FROM workbook_problems + WHERE workbook_id = :wid + AND ( + title_ko IS NULL + OR title_en IS NULL + OR level IS NULL + OR tags IS NULL + ) + ORDER BY problem_id + """), + {"wid": workbook_id}, + )).all() + else: + rows = (await db.execute( + text(""" + SELECT problem_id + FROM workbook_problems + WHERE workbook_id = :wid + ORDER BY problem_id + """), + {"wid": workbook_id}, + )).all() + + problem_ids = [int(r[0]) for r in rows] + + if not problem_ids: + return { + "workbook_id": workbook_id, + "target_count": 0, + "updated": 0, + "skipped": 0, + "failed": 0, + "message": "nothing to enrich (already filled)", + } + + updated = 0 + skipped = 0 + failed = 0 + + # 2) solved.ac 호출 + 업데이트 + async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "baekjoon-n8n-bot/1.0"}) as client: + for i, pid in enumerate(problem_ids, start=1): + try: + title_ko, title_en, level, tag_keys = await solved_problem_show(client, pid) + + # 응답이 다 비었으면 스킵 + if title_ko is None and title_en is None and level is None and (not tag_keys): + skipped += 1 + continue + + # tags 저장: TEXT[] (Postgres) -> 파라미터에 list 넘기면 asyncpg가 배열로 처리해줌 + # only_missing=True면 COALESCE로 NULL만 채우고, False면 그냥 덮어씀 + if only_missing: + await db.execute( + text(""" + UPDATE workbook_problems + SET title_ko = COALESCE(:tko, title_ko), + title_en = COALESCE(:ten, title_en), + level = COALESCE(:lvl, level), + tags = COALESCE(:tags, tags) + WHERE workbook_id = :wid + AND problem_id = :pid + """), + {"tko": title_ko, "ten": title_en, "lvl": level, "tags": tag_keys if tag_keys else None, "wid": workbook_id, "pid": pid}, + ) + else: + await db.execute( + text(""" + UPDATE workbook_problems + SET title_ko = :tko, + title_en = :ten, + level = :lvl, + tags = :tags + WHERE workbook_id = :wid + AND problem_id = :pid + """), + {"tko": title_ko, "ten": title_en, "lvl": level, "tags": tag_keys if tag_keys else None, "wid": workbook_id, "pid": pid}, + ) + + updated += 1 + + # 커밋 배치 + if i % commit_every == 0: + await db.commit() + + # rate limit + if sleep_sec > 0: + await asyncio.sleep(sleep_sec) + + except Exception: + failed += 1 + # 실패해도 다음 문제로 계속 진행 + + await db.commit() + + return { + "workbook_id": workbook_id, + "target_count": len(problem_ids), + "updated": updated, + "skipped": skipped, + "failed": failed, + "only_missing": only_missing, + "commit_every": commit_every, + "sleep_sec": sleep_sec, + "message": "enrich done", + } diff --git a/workbook_importer.py b/workbook_importer.py new file mode 100644 index 0000000..7cb720e --- /dev/null +++ b/workbook_importer.py @@ -0,0 +1,103 @@ +import asyncio +import re +from typing import List, Tuple, Optional + +import requests +from bs4 import BeautifulSoup +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +SESSION = requests.Session() +SESSION.headers.update({"User-Agent": "baekjoon-n8n-bot/1.0"}) + +PROBLEM_LINK_RE = re.compile(r"/problem/(\d+)") + +def fetch_workbook_problem_ids(workbook_id: int) -> List[int]: + url = f"https://www.acmicpc.net/workbook/view/{workbook_id}" + r = SESSION.get(url, timeout=(3.05, 10)) + r.raise_for_status() + + soup = BeautifulSoup(r.text, "lxml") + ids = set() + + # workbook 페이지 내 /problem/{id} 링크들에서 id 수집 + for a in soup.select('a[href^="/problem/"]'): + href = a.get("href", "") + m = PROBLEM_LINK_RE.search(href) + if m: + ids.add(int(m.group(1))) + + return sorted(ids) + +def solved_problem_show(problem_id: int) -> Tuple[Optional[str], Optional[str], Optional[int]]: + """ + solved.ac problem/show로 메타 보강 (titleKo/titleEn/level) + """ + url = "https://solved.ac/api/v3/problem/show" + r = SESSION.get(url, params={"problemId": problem_id}, timeout=(3.05, 10)) + if r.status_code != 200: + return None, None, None + data = r.json() + return data.get("titleKo"), data.get("titleEn"), data.get("level") + +async def import_workbook(db: AsyncSession, workbook_id: int, title: str = None) -> dict: + problem_ids = fetch_workbook_problem_ids(workbook_id) + + # upsert workbook + await db.execute( + text(""" + INSERT INTO workbooks(id, title, source) + VALUES (:id, :title, 'boj') + ON CONFLICT (id) DO UPDATE + SET title = COALESCE(EXCLUDED.title, workbooks.title), + updated_at = now() + """), + {"id": workbook_id, "title": title}, + ) + + inserted = 0 + updated_meta = 0 + + # 문제 목록 upsert + for pid in problem_ids: + # 먼저 매핑 넣고 + await db.execute( + text(""" + INSERT INTO workbook_problems(workbook_id, problem_id) + VALUES (:wid, :pid) + ON CONFLICT (workbook_id, problem_id) DO NOTHING + """), + {"wid": workbook_id, "pid": pid}, + ) + + await db.commit() + + # 메타 보강(너무 빠르게 치면 부담될 수 있으니 간단한 rate limit) + for i, pid in enumerate(problem_ids): + title_ko, title_en, level = solved_problem_show(pid) + if title_ko is None and title_en is None and level is None: + continue + + await db.execute( + text(""" + UPDATE workbook_problems + SET title_ko = COALESCE(:tko, title_ko), + title_en = COALESCE(:ten, title_en), + level = COALESCE(:lvl, level) + WHERE workbook_id = :wid AND problem_id = :pid + """), + {"tko": title_ko, "ten": title_en, "lvl": level, "wid": workbook_id, "pid": pid}, + ) + updated_meta += 1 + + if i % 10 == 0: + await db.commit() + await asyncio.sleep(0.2) + + await db.commit() + + return { + "workbook_id": workbook_id, + "count": len(problem_ids), + "meta_updated": updated_meta, + } diff --git a/workbook_picker.py b/workbook_picker.py new file mode 100644 index 0000000..6158432 --- /dev/null +++ b/workbook_picker.py @@ -0,0 +1,62 @@ +from typing import Optional, Tuple +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + + +async def pick_from_workbook( + db: AsyncSession, + workbook_id: int, + pick: str = "random", # random | level_asc +) -> Tuple[Optional[int], Optional[str], Optional[int]]: + """ + workbook_id에서 아직 보내지 않은 문제 1개 선택 + workbook_sends 기록까지 원샷. + pick: + - random: 기존처럼 랜덤 + - level_asc: level 낮은 것부터(쉬운 것부터). level NULL은 맨 뒤. + 같은 level이면 랜덤으로 섞어서 뽑음. + """ + mode = (pick or "random").lower().strip() + if mode not in ("random", "level_asc"): + mode = "random" + + # 정렬 기준만 분기 + if mode == "level_asc": + order_sql = "ORDER BY (wp.level IS NULL) ASC, wp.level ASC, random()" + else: + order_sql = "ORDER BY random()" + + sql = f""" + WITH candidate AS ( + SELECT + wp.problem_id, + COALESCE(wp.title_ko, wp.title_en, '제목 없음') AS title, + wp.level + FROM workbook_problems wp + LEFT JOIN workbook_sends ws + ON ws.workbook_id = wp.workbook_id + AND ws.problem_id = wp.problem_id + WHERE wp.workbook_id = :wid + AND ws.problem_id IS NULL + {order_sql} + LIMIT 1 + ), + ins AS ( + INSERT INTO workbook_sends(workbook_id, problem_id) + SELECT :wid, problem_id + FROM candidate + ON CONFLICT DO NOTHING + RETURNING problem_id + ) + SELECT problem_id, title, level + FROM candidate; + """ + + row = (await db.execute(text(sql), {"wid": workbook_id})).first() + if not row: + return None, None, None + + await db.commit() + pid = int(row[0]) + title = str(row[1]) + level = int(row[2]) if row[2] is not None else None + return pid, title, level