from datetime import UTC, datetime from sqlalchemy import select from sqlalchemy.orm import Session from app.crawler import BOARD_CONFIG, HufsCrawler, PostStub from app.models import CrawlRun, ScrapedPost from app.schemas import AttachmentOut, CrawlResponse, LatestBoardPostOut, PostOut class CrawlService: def __init__(self, db: Session) -> None: self.db = db self.crawler = HufsCrawler() def crawl_new_posts(self) -> CrawlResponse: bootstrap_mode = self._is_bootstrap_mode() run = CrawlRun(status="running", discovered_count=0, inserted_count=0) self.db.add(run) self.db.commit() self.db.refresh(run) inserted_posts: list[ScrapedPost] = [] latest_posts_by_board_map: dict[str, PostStub] = {} try: for board_key in BOARD_CONFIG: board_inserted_posts, latest_stub = self._crawl_board(board_key) inserted_posts.extend(board_inserted_posts) if latest_stub is not None: latest_posts_by_board_map[board_key] = latest_stub run.status = "success" run.discovered_count = len(inserted_posts) run.inserted_count = len(inserted_posts) run.finished_at = datetime.now(UTC).replace(tzinfo=None) self.db.add(run) self.db.commit() except Exception as exc: run.status = "failed" run.error_message = str(exc) run.finished_at = datetime.now(UTC).replace(tzinfo=None) self.db.add(run) self.db.commit() raise response_posts = [] if bootstrap_mode else inserted_posts response_latest_posts = ( list(latest_posts_by_board_map.values()) if (0 if bootstrap_mode else len(inserted_posts)) == 0 else [] ) return CrawlResponse( checked_at=datetime.now(UTC), bootstrap_mode=bootstrap_mode, bootstrap_inserted_count=len(inserted_posts) if bootstrap_mode else 0, new_posts_count=0 if bootstrap_mode else len(inserted_posts), new_posts=[ PostOut( board_key=post.board_key, board_name=post.board_name, board_id=post.board_id, article_id=post.article_id, title=post.title, post_url=post.post_url, author=post.author, published_at=post.published_at, summary=post.summary, content_text=post.content_text, attachments=[ AttachmentOut(name=item["name"], url=item["url"]) for item in (post.attachments or []) ], ) for post in response_posts ], latest_posts_by_board=[ LatestBoardPostOut( board_key=post.board_key, board_name=post.board_name, board_id=post.board_id, article_id=post.article_id, title=post.title, post_url=post.post_url, published_at=post.published_at, ) for post in response_latest_posts ], ) def _is_bootstrap_mode(self) -> bool: first_saved_post = self.db.scalar(select(ScrapedPost.id).limit(1)) return first_saved_post is None def _crawl_board(self, board_key: str) -> tuple[list[ScrapedPost], PostStub | None]: candidates = [] latest_stub: PostStub | None = None known_article_ids = { article_id for article_id in self.db.scalars( select(ScrapedPost.article_id).where(ScrapedPost.board_key == board_key) ) } seen_article_ids: set[int] = set() for page in range(1, self.crawler.max_pages_per_board + 1): page_posts = self.crawler.crawl_board_list(board_key=board_key, page=page) if not page_posts: break if page == 1 and latest_stub is None: latest_stub = page_posts[0] for stub in page_posts: if stub.article_id in seen_article_ids: continue seen_article_ids.add(stub.article_id) if stub.article_id in known_article_ids: continue candidates.append(stub) inserted_posts: list[ScrapedPost] = [] for stub in reversed(candidates): detail = self.crawler.crawl_post_detail(stub) record = ScrapedPost( board_key=detail.board_key, board_name=detail.board_name, board_id=detail.board_id, article_id=detail.article_id, title=detail.title, post_url=detail.post_url, author=detail.author, published_at=detail.published_at, summary=detail.summary, content_text=detail.content_text, attachments=detail.attachments, ) self.db.add(record) self.db.commit() self.db.refresh(record) inserted_posts.append(record) return inserted_posts, latest_stub