Feat: [main] baekjoon-bot-v1

This commit is contained in:
sm4640
2026-01-14 14:54:42 +09:00
commit 122b367bed
10 changed files with 918 additions and 0 deletions

6
.dockerignore Normal file
View File

@@ -0,0 +1,6 @@
# .dockerignore
__pycache__/
*.py[cod]
venv/
.env
.git/

216
.gitignore vendored Normal file
View File

@@ -0,0 +1,216 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[codz]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
# poetry.lock
# poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
# pdm.lock
# pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
# pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# Redis
*.rdb
*.aof
*.pid
# RabbitMQ
mnesia/
rabbitmq/
rabbitmq-data/
# ActiveMQ
activemq-data/
# SageMath parsed files
*.sage.py
# Environments
.env
.envrc
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
# .idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# Streamlit
.streamlit/secrets.toml

187
app.py Normal file
View File

@@ -0,0 +1,187 @@
from typing import Optional
from fastapi import FastAPI, HTTPException, Query, Depends
from fastapi.responses import JSONResponse
from dotenv import load_dotenv
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from utils import env, resolve_difficulty, resolve_tags, build_query, get_problem
from db import get_db
from workbook_picker import pick_from_workbook
from workbook_importer import import_workbook
from workbook_enricher import enrich_workbook
load_dotenv()
app = FastAPI()
@app.get("/")
def root():
return {"status": "ok"}
@app.post("/admin/workbooks/{workbook_id}/enrich")
async def admin_enrich_workbook(
workbook_id: int,
only_missing: bool = Query(True, description="True면 NULL만 채움 / False면 덮어씀"),
commit_every: int = Query(50, ge=1, le=500, description="몇 개마다 commit 할지"),
sleep_sec: float = Query(0.12, ge=0.0, le=2.0, description="solved.ac 호출 사이 sleep"),
db: AsyncSession = Depends(get_db),
):
result = await enrich_workbook(
db,
workbook_id=workbook_id,
only_missing=only_missing,
commit_every=commit_every,
sleep_sec=sleep_sec,
)
return {"status": "ok", "result": result}
@app.delete("/admin/workbooks/{workbook_id}/reset")
async def reset_workbook_progress(
workbook_id: int,
db: AsyncSession = Depends(get_db),
):
try:
res = await db.execute(
text("DELETE FROM workbook_sends WHERE workbook_id = :wid"),
{"wid": workbook_id},
)
await db.commit()
# res.rowcount: 삭제된 행 수(=초기화된 문제 수)
return {
"status": "ok",
"workbook_id": workbook_id,
"deleted_sends": int(res.rowcount or 0),
"message": "workbook progress reset (problems can be picked again)",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/today")
async def today(
source_mode: str = Query(env("SOURCE_MODE_DEFAULT", "search"), description="search|workbook"),
workbook_id: Optional[int] = Query(None, description="문제집 모드일 때 workbook id"),
workbook_pick: str = Query("level_asc", description="random|level_asc"),
difficulty_mode: str = Query(env("DIFFICULTY_MODE_DEFAULT", "easy"), description="easy|hard|all"),
tag_mode: str = Query(env("TAG_MODE_DEFAULT", "easy"), description="easy|hard|all"),
difficulty: Optional[str] = Query(None, description="예: 6..10 (주면 mode보다 우선)"),
tags: Optional[str] = Query(None, description="예: dp,graphs (주면 mode보다 우선)"),
lang: str = Query(env("LANG_DEFAULT", "all"), description="ko | en | ko,en | all"),
db: AsyncSession = Depends(get_db),
):
sm = (source_mode or "").lower().strip()
if sm == "workbook":
wid = workbook_id or (int(env("WORKBOOK_ID_DEFAULT")) if env("WORKBOOK_ID_DEFAULT") else None)
if not wid:
return JSONResponse(status_code=400, content={"error": "workbook_id is required for workbook mode"})
pid, title, level = await pick_from_workbook(db, wid, pick=workbook_pick)
if not pid:
return JSONResponse(status_code=409, content={"error": "no_more_problems_in_workbook", "workbook_id": wid})
problem_url = f"https://www.acmicpc.net/problem/{pid}"
solved_url = f"https://solved.ac/problems/id/{pid}"
level_text = f"Lv. {level}" if level is not None else "Lv. ?"
discord_payload = {
"embeds": [{
"title": "🔔 오늘의 백준 추천 문제 (문제집)",
"description": (
f"**{pid}번: {title}**\n"
f"난이도: **{level_text}**\n"
f"source_mode: `workbook` / workbook_id: `{wid}`"
),
"fields": [
{"name": "문제 링크", "value": f"[바로가기]({problem_url})", "inline": True},
{"name": "해설/정보", "value": f"[Solved.ac]({solved_url})", "inline": True},
],
"footer": {"text": "매일 오전 10시 정기 알림 (n8n)"}
}]
}
return {
"source_mode": "workbook",
"workbook_id": wid,
"problemId": pid,
"title": title,
"level": level,
"problemUrl": problem_url,
"solvedUrl": solved_url,
"discordPayload": discord_payload,
}
# 2) 기존 search 모드(네가 쓰던 그대로)
dm = (difficulty_mode or "").lower()
tm = (tag_mode or "").lower()
if dm not in ("easy", "hard", "all"):
return JSONResponse(status_code=400, content={"error": "difficulty_mode must be easy|hard|all"})
if tm not in ("easy", "hard", "all"):
return JSONResponse(status_code=400, content={"error": "tag_mode must be easy|hard|all"})
chosen_difficulty = resolve_difficulty(difficulty, dm)
chosen_tags = resolve_tags(tags if tags is not None else None, tm)
query = build_query(chosen_difficulty, chosen_tags, lang)
pid, title, level = get_problem(query=query)
if not pid:
return JSONResponse(status_code=503, content={"error": "failed_to_fetch_problem", "query": query})
problem_url = f"https://www.acmicpc.net/problem/{pid}"
solved_url = f"https://solved.ac/problems/id/{pid}"
level_text = f"Lv. {level}" if level is not None else "Lv. ?"
discord_payload = {
"embeds": [{
"title": "🔔 오늘의 백준 추천 문제",
"description": (
f"**{pid}번: {title}**\n"
f"난이도: **{level_text}**\n"
f"difficulty_mode: `{dm}` / tag_mode: `{tm}` / lang: `{lang}`\n"
f"filter: `{chosen_difficulty}` / tags: `{', '.join(chosen_tags) if chosen_tags else 'none'}`"
),
"fields": [
{"name": "문제 링크", "value": f"[바로가기]({problem_url})", "inline": True},
{"name": "해설/정보", "value": f"[Solved.ac]({solved_url})", "inline": True},
],
"footer": {"text": "매일 오전 10시 정기 알림 (n8n)"}
}]
}
return {
"source_mode": "search",
"difficulty_mode": dm,
"tag_mode": tm,
"lang": lang,
"difficulty": chosen_difficulty,
"tags": chosen_tags,
"query": query,
"problemId": pid,
"title": title,
"level": level,
"problemUrl": problem_url,
"solvedUrl": solved_url,
"discordPayload": discord_payload,
}
# @app.post("/admin/workbooks/{workbook_id}/import")
# async def admin_import_workbook(
# workbook_id: int,
# title: Optional[str] = Query(None, description="문제집 제목(옵션)"),
# db: AsyncSession = Depends(get_db),
# ):
# try:
# result = await import_workbook(db, workbook_id=workbook_id, title=title)
# return {"status": "ok", "result": result}
# except Exception as e:
# return JSONResponse(status_code=500, content={"error": str(e)})

21
db.py Normal file
View File

@@ -0,0 +1,21 @@
import os
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
DATABASE_URL = os.getenv("DATABASE_URL", "").strip()
if not DATABASE_URL:
raise RuntimeError("DATABASE_URL is required (e.g. postgresql+asyncpg://...)")
engine = create_async_engine(DATABASE_URL, pool_pre_ping=True)
SessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
autoflush=False,
expire_on_commit=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with SessionLocal() as session:
yield session

20
dockerfile Normal file
View File

@@ -0,0 +1,20 @@
FROM python:3.12-slim
WORKDIR /app
# 타임존 설정 (로그에 한국 시간이 찍히도록 함)
ENV TZ=Asia/Seoul
RUN apt-get update && apt-get install -y tzdata && \
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 현재 디렉토리의 모든 파일(app.py, utils.py 등)을 복사
COPY . .
# 로그가 즉시 출력되도록 설정
ENV PYTHONUNBUFFERED=1
EXPOSE 8000
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8000"]

BIN
requirements.txt Normal file

Binary file not shown.

147
utils.py Normal file
View File

@@ -0,0 +1,147 @@
import os
import random
import time
from typing import Optional, Tuple, List
import requests
# ====== HTTP Session ======
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "baekjoon-n8n-bot/1.0"})
# 필요한 언어만 관리(엄격 모드에서 제외할 대상)
KNOWN_LANGS = ["ko", "en", "ja", "ru", "zh", "de", "fr", "es", "pt", "it"]
def fetch_json_with_retry(url: str, params: dict, retries: int = 3, timeout=(3.05, 10)) -> dict:
last_err = None
for i in range(retries):
try:
res = SESSION.get(url, params=params, timeout=timeout)
res.raise_for_status()
return res.json()
except Exception as e:
last_err = e
time.sleep(0.7 * (2 ** i))
raise last_err
def parse_csv(s: str) -> List[str]:
return [x.strip() for x in (s or "").split(",") if x.strip()]
def env(name: str, default: str = "") -> str:
return os.getenv(name, default).strip()
def build_lang_filter(lang: str) -> str:
"""
사용자가 읽을 수 있는 언어가 '하나라도' 포함된 문제를 찾는 것이 목적입니다.
-%en 처럼 제외 필터를 쓰면, 한국어와 영어가 모두 있는 양질의 문제가 제외되므로
긍정 필터(%ko) 위주로 구성합니다.
"""
raw = (lang or "all").strip().lower()
if raw in ("all", ""):
return ""
allow = set(parse_csv(raw)) & set(KNOWN_LANGS)
if not allow:
return ""
# 여러 언어를 선택했을 경우(예: ko,en) -> (%ko | %en)
# 즉, 한국어 '또는' 영어 중 하나라도 지문이 있는 문제
if len(allow) == 1:
return f"%{next(iter(allow))}"
else:
expr = " | ".join(f"%{c}" for c in sorted(allow))
return f"({expr})"
def resolve_difficulty(difficulty: Optional[str], difficulty_mode: str) -> str:
if difficulty and difficulty.strip():
return difficulty.strip()
mode = (difficulty_mode or env("DIFFICULTY_MODE_DEFAULT", "easy")).lower()
if mode == "easy":
return env("DIFFICULTY_EASY", "6..10")
if mode == "hard":
return env("DIFFICULTY_HARD", "11..15")
if mode == "all":
return env("DIFFICULTY_ALL", "1..30")
return env("DIFFICULTY_EASY", "6..10")
def resolve_tags(tags_csv: Optional[str], tag_mode: str) -> List[str]:
"""
tags_csv(쿼리)가 있으면 그것이 최우선.
없으면 tag_mode 프리셋 기반으로 선택.
TAG_PICK_* 정책에 따라 랜덤 1개 / 전체 / 필터 없음으로 결정.
"""
if tags_csv is not None:
return parse_csv(tags_csv)
mode = (tag_mode or env("TAG_MODE_DEFAULT", "easy")).lower()
if mode == "easy":
preset = parse_csv(env("TAGS_EASY", ""))
pick = env("TAG_PICK_EASY", env("TAG_PICK", "random")).lower()
elif mode == "hard":
preset = parse_csv(env("TAGS_HARD", ""))
pick = env("TAG_PICK_HARD", env("TAG_PICK", "random")).lower()
elif mode == "all":
preset = parse_csv(env("TAGS_ALL", ""))
pick = env("TAG_PICK_ALL", env("TAG_PICK", "none")).lower()
else:
preset = parse_csv(env("TAGS_EASY", ""))
pick = env("TAG_PICK_EASY", "random").lower()
if pick == "none":
return []
if pick == "random":
return [random.choice(preset)] if preset else []
return preset
def build_query(difficulty: str, tags: List[str], lang: str) -> str:
# 1. 난이도 기본 조건
query_parts = [f"*{difficulty}"]
# 2. 태그 조건 (괄호로 감싸서 우선순위 확보)
if tags:
join_op = env("TAGS_JOIN", "or").lower()
if join_op == "and":
# 모든 태그가 다 있어야 함: tag:a tag:b
tag_expr = " ".join(f"tag:{t}" for t in tags)
else:
# 태그 중 하나만 있어도 됨: (tag:a | tag:b)
tag_expr = "(" + " | ".join(f"tag:{t}" for t in tags) + ")"
query_parts.append(tag_expr)
# 3. 언어 조건 (괄호로 감싸기)
lang_filter = build_lang_filter(lang)
if lang_filter:
# 언어 필터가 복합적일 수 있으므로 괄호 처리
query_parts.append(f"({lang_filter})")
# 결과 예시: *6..10 (tag:dp | tag:bfs) (%ko)
return " ".join(query_parts)
def get_problem(query: str, size: int = 50) -> Tuple[Optional[int], Optional[str], Optional[int]]:
url = "https://solved.ac/api/v3/search/problem"
params = {
"query": query,
"sort": "random",
"direction": "desc",
"page": 1,
"size": size,
}
try:
data = fetch_json_with_retry(url, params=params)
items = data.get("items", [])
if not items:
return None, None, None
p = random.choice(items)
return p.get("problemId"), (p.get("titleKo") or p.get("titleEn") or "제목 없음"), p.get("level")
except Exception:
return None, None, None

156
workbook_enricher.py Normal file
View File

@@ -0,0 +1,156 @@
import asyncio
from typing import Dict, List, Optional, Tuple
import httpx
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
SOLVED_SHOW_URL = "https://solved.ac/api/v3/problem/show"
async def solved_problem_show(client: httpx.AsyncClient, problem_id: int) -> Tuple[Optional[str], Optional[str], Optional[int], Optional[List[str]]]:
"""
solved.ac problem/show 호출해서 메타 가져오기
반환: (title_ko, title_en, level, tags_keys)
"""
r = await client.get(SOLVED_SHOW_URL, params={"problemId": problem_id})
if r.status_code != 200:
return None, None, None, None
data = r.json()
title_ko = data.get("titleKo")
title_en = data.get("titleEn")
level = data.get("level")
# tags: [{"key": "...", "isMeta": ..., "bojTagId": ..., "problemCount": ...}, ...]
tags = data.get("tags") or []
tag_keys = [t.get("key") for t in tags if t.get("key")]
return title_ko, title_en, level, tag_keys
async def enrich_workbook(
db: AsyncSession,
workbook_id: int,
only_missing: bool = True, # True면 NULL인 것만 채움, False면 덮어씀
commit_every: int = 50, # 몇 개마다 커밋할지
sleep_sec: float = 0.12, # solved.ac 부하 줄이려고 약간 쉬기
timeout: float = 10.0,
) -> Dict:
"""
전제: workbook_problems에 (workbook_id, problem_id)는 이미 채워져 있음
목표: solved.ac problem/show로 title_ko/title_en/level/tags를 채움
"""
# 1) 대상 problem_id 목록 뽑기 (missing만 or 전체)
if only_missing:
rows = (await db.execute(
text("""
SELECT problem_id
FROM workbook_problems
WHERE workbook_id = :wid
AND (
title_ko IS NULL
OR title_en IS NULL
OR level IS NULL
OR tags IS NULL
)
ORDER BY problem_id
"""),
{"wid": workbook_id},
)).all()
else:
rows = (await db.execute(
text("""
SELECT problem_id
FROM workbook_problems
WHERE workbook_id = :wid
ORDER BY problem_id
"""),
{"wid": workbook_id},
)).all()
problem_ids = [int(r[0]) for r in rows]
if not problem_ids:
return {
"workbook_id": workbook_id,
"target_count": 0,
"updated": 0,
"skipped": 0,
"failed": 0,
"message": "nothing to enrich (already filled)",
}
updated = 0
skipped = 0
failed = 0
# 2) solved.ac 호출 + 업데이트
async with httpx.AsyncClient(timeout=timeout, headers={"User-Agent": "baekjoon-n8n-bot/1.0"}) as client:
for i, pid in enumerate(problem_ids, start=1):
try:
title_ko, title_en, level, tag_keys = await solved_problem_show(client, pid)
# 응답이 다 비었으면 스킵
if title_ko is None and title_en is None and level is None and (not tag_keys):
skipped += 1
continue
# tags 저장: TEXT[] (Postgres) -> 파라미터에 list 넘기면 asyncpg가 배열로 처리해줌
# only_missing=True면 COALESCE로 NULL만 채우고, False면 그냥 덮어씀
if only_missing:
await db.execute(
text("""
UPDATE workbook_problems
SET title_ko = COALESCE(:tko, title_ko),
title_en = COALESCE(:ten, title_en),
level = COALESCE(:lvl, level),
tags = COALESCE(:tags, tags)
WHERE workbook_id = :wid
AND problem_id = :pid
"""),
{"tko": title_ko, "ten": title_en, "lvl": level, "tags": tag_keys if tag_keys else None, "wid": workbook_id, "pid": pid},
)
else:
await db.execute(
text("""
UPDATE workbook_problems
SET title_ko = :tko,
title_en = :ten,
level = :lvl,
tags = :tags
WHERE workbook_id = :wid
AND problem_id = :pid
"""),
{"tko": title_ko, "ten": title_en, "lvl": level, "tags": tag_keys if tag_keys else None, "wid": workbook_id, "pid": pid},
)
updated += 1
# 커밋 배치
if i % commit_every == 0:
await db.commit()
# rate limit
if sleep_sec > 0:
await asyncio.sleep(sleep_sec)
except Exception:
failed += 1
# 실패해도 다음 문제로 계속 진행
await db.commit()
return {
"workbook_id": workbook_id,
"target_count": len(problem_ids),
"updated": updated,
"skipped": skipped,
"failed": failed,
"only_missing": only_missing,
"commit_every": commit_every,
"sleep_sec": sleep_sec,
"message": "enrich done",
}

103
workbook_importer.py Normal file
View File

@@ -0,0 +1,103 @@
import asyncio
import re
from typing import List, Tuple, Optional
import requests
from bs4 import BeautifulSoup
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "baekjoon-n8n-bot/1.0"})
PROBLEM_LINK_RE = re.compile(r"/problem/(\d+)")
def fetch_workbook_problem_ids(workbook_id: int) -> List[int]:
url = f"https://www.acmicpc.net/workbook/view/{workbook_id}"
r = SESSION.get(url, timeout=(3.05, 10))
r.raise_for_status()
soup = BeautifulSoup(r.text, "lxml")
ids = set()
# workbook 페이지 내 /problem/{id} 링크들에서 id 수집
for a in soup.select('a[href^="/problem/"]'):
href = a.get("href", "")
m = PROBLEM_LINK_RE.search(href)
if m:
ids.add(int(m.group(1)))
return sorted(ids)
def solved_problem_show(problem_id: int) -> Tuple[Optional[str], Optional[str], Optional[int]]:
"""
solved.ac problem/show로 메타 보강 (titleKo/titleEn/level)
"""
url = "https://solved.ac/api/v3/problem/show"
r = SESSION.get(url, params={"problemId": problem_id}, timeout=(3.05, 10))
if r.status_code != 200:
return None, None, None
data = r.json()
return data.get("titleKo"), data.get("titleEn"), data.get("level")
async def import_workbook(db: AsyncSession, workbook_id: int, title: str = None) -> dict:
problem_ids = fetch_workbook_problem_ids(workbook_id)
# upsert workbook
await db.execute(
text("""
INSERT INTO workbooks(id, title, source)
VALUES (:id, :title, 'boj')
ON CONFLICT (id) DO UPDATE
SET title = COALESCE(EXCLUDED.title, workbooks.title),
updated_at = now()
"""),
{"id": workbook_id, "title": title},
)
inserted = 0
updated_meta = 0
# 문제 목록 upsert
for pid in problem_ids:
# 먼저 매핑 넣고
await db.execute(
text("""
INSERT INTO workbook_problems(workbook_id, problem_id)
VALUES (:wid, :pid)
ON CONFLICT (workbook_id, problem_id) DO NOTHING
"""),
{"wid": workbook_id, "pid": pid},
)
await db.commit()
# 메타 보강(너무 빠르게 치면 부담될 수 있으니 간단한 rate limit)
for i, pid in enumerate(problem_ids):
title_ko, title_en, level = solved_problem_show(pid)
if title_ko is None and title_en is None and level is None:
continue
await db.execute(
text("""
UPDATE workbook_problems
SET title_ko = COALESCE(:tko, title_ko),
title_en = COALESCE(:ten, title_en),
level = COALESCE(:lvl, level)
WHERE workbook_id = :wid AND problem_id = :pid
"""),
{"tko": title_ko, "ten": title_en, "lvl": level, "wid": workbook_id, "pid": pid},
)
updated_meta += 1
if i % 10 == 0:
await db.commit()
await asyncio.sleep(0.2)
await db.commit()
return {
"workbook_id": workbook_id,
"count": len(problem_ids),
"meta_updated": updated_meta,
}

62
workbook_picker.py Normal file
View File

@@ -0,0 +1,62 @@
from typing import Optional, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def pick_from_workbook(
db: AsyncSession,
workbook_id: int,
pick: str = "random", # random | level_asc
) -> Tuple[Optional[int], Optional[str], Optional[int]]:
"""
workbook_id에서 아직 보내지 않은 문제 1개 선택 + workbook_sends 기록까지 원샷.
pick:
- random: 기존처럼 랜덤
- level_asc: level 낮은 것부터(쉬운 것부터). level NULL은 맨 뒤.
같은 level이면 랜덤으로 섞어서 뽑음.
"""
mode = (pick or "random").lower().strip()
if mode not in ("random", "level_asc"):
mode = "random"
# 정렬 기준만 분기
if mode == "level_asc":
order_sql = "ORDER BY (wp.level IS NULL) ASC, wp.level ASC, random()"
else:
order_sql = "ORDER BY random()"
sql = f"""
WITH candidate AS (
SELECT
wp.problem_id,
COALESCE(wp.title_ko, wp.title_en, '제목 없음') AS title,
wp.level
FROM workbook_problems wp
LEFT JOIN workbook_sends ws
ON ws.workbook_id = wp.workbook_id
AND ws.problem_id = wp.problem_id
WHERE wp.workbook_id = :wid
AND ws.problem_id IS NULL
{order_sql}
LIMIT 1
),
ins AS (
INSERT INTO workbook_sends(workbook_id, problem_id)
SELECT :wid, problem_id
FROM candidate
ON CONFLICT DO NOTHING
RETURNING problem_id
)
SELECT problem_id, title, level
FROM candidate;
"""
row = (await db.execute(text(sql), {"wid": workbook_id})).first()
if not row:
return None, None, None
await db.commit()
pid = int(row[0])
title = str(row[1])
level = int(row[2]) if row[2] is not None else None
return pid, title, level