Each row averages across 8 tasks. The differences are small — adding a CLAUDE.md helps a little, but no md dominates.
| Variant | Score | Signal | |
|---|---|---|---|
| #1 | v1 — Karpathy rules only (110k stars) | 2.85 | 41 lines avg · ±0.10 across 8 tasks |
| #2 | v2 — Dory's AGENTS_light (57 lines) | 2.84 | 60 lines avg · ±0.11 across 8 tasks · only md that added a regression test |
| #3 | v8 — shanraisshan claude-code-best-practice (51k stars) | 2.83 | 46 lines avg · ±0.12 across 8 tasks |
| #4 | v7 — OpenAI Codex AGENTS.md (80k stars) | 2.83 | 53 lines avg · ±0.14 across 8 tasks |
| #5 | v0 — empty (control) | 2.82 | 44 lines avg · ±0.12 across 8 tasks |
| #6 | v6 — HumanLayer CLAUDE.md (10.7k stars) | 2.82 | 46 lines avg · ±0.13 across 8 tasks |
| #7 | v4 — Dory's AGENTS_full1027 (1353 lines) | 2.81 | 48 lines avg · ±0.15 across 8 tasks |
| #8 | v3 — Dory's AGENTS_medium_autonomous (147 lines) | 2.77 | 72 lines avg · ±0.27 across 8 tasks |
| #9 | v5 — medium + Karpathy merged (~196 lines) | 2.71 | 54 lines avg · ±0.30 across 8 tasks |
Each cell is the mean of 3 judges (Opus 4.7, Sonnet 4.6, Haiku 4.5) on a 0–3 quality rubric. Click any cell to inspect the diff and the judges' rationales.
| Variant | real · linkedin start-date fix | real · thumbnail batch processor | real · webflow dedup | real · webflow stdout polish | synth · simplicity (word_count) | synth · goal-driven (order_total) | synth · surgical edit | synth · think before code | variant mean |
|---|---|---|---|---|---|---|---|---|---|
| v0 — empty (control) | 2.89 | 2.67 | 2.78 | 2.67 | 2.78 | 2.89 | 2.92 | 3.00 | 2.82 |
| v1 — Karpathy rules only (110k stars) | 2.72 | 2.83 | 2.83 | 2.72 | 2.83 | 2.94 | 2.92 | 3.00 | 2.85 |
| v2 — Dory's AGENTS_light (57 lines) | 2.89 | 2.89 | 2.72 | 2.72 | 2.78 | 2.94 | 3.00 | 2.75 | 2.84 |
| v3 — Dory's AGENTS_medium_autonomous (147 lines) | 2.83 | 2.83 | 2.83 | 2.11 | 2.83 | 2.83 | 2.92 | 3.00 | 2.77 |
| v4 — Dory's AGENTS_full1027 (1353 lines) | 2.72 | 2.83 | 2.78 | 2.83 | 2.50 | 2.89 | 2.92 | 3.00 | 2.81 |
| v5 — medium + Karpathy merged (~196 lines) | 2.78 | 2.83 | 2.72 | 2.72 | 2.58 | 2.06 | 3.00 | 3.00 | 2.71 |
| v6 — HumanLayer CLAUDE.md (10.7k stars) | 2.72 | 2.83 | 2.72 | 2.72 | 2.67 | 2.94 | 3.00 | 2.94 | 2.82 |
| v7 — OpenAI Codex AGENTS.md (80k stars) | 2.89 | 2.89 | 2.83 | 2.83 | 2.50 | 2.89 | 2.92 | 2.89 | 2.83 |
| v8 — shanraisshan claude-code-best-practice (51k stars) | 2.78 | 2.61 | 2.72 | 2.89 | 2.92 | 2.94 | 2.92 | 2.89 | 2.83 |
| task mean | 2.80 | 2.80 | 2.77 | 2.69 | 2.71 | 2.81 | 2.94 | 2.94 |
real tasks (from Dory's repo) · synthetic tasks (from eugenexz/agent-prompt-rules)
Cell mean: 2.89 / 3.00 across 3 judges
index fe30298..051dbab 100644 @@ -332,6 +332,20 @@ def _handle_campaigns(client: LinkedInAdsClient, args: argparse.Namespace) -> No sys.exit(1) +def _resolve_start_ms(start_date: str | None) -> int: + """Resolve a YYYY-MM-DD start date to milliseconds, clamped to >= now. + + LinkedIn rejects past start times. A date like today's resolves to midnight + local time, which is already in the past, so bump such values to now. + """ + if start_date is None: + return int(time.time() * 1000) + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + start_ms = int(start_dt.timestamp() * 1000) + now_ms = int(time.time() * 1000) + return max(start_ms, now_ms) + + def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: """Handle the campaigns create subcommand. @@ -339,11 +353,7 @@ def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: client: LinkedInAdsClient instance. args: Parsed CLI arguments including post-urn, budgets, days, etc. """ - if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) - else: - start_ms = int(time.time() * 1000) + start_ms = _resolve_start_ms(args.start_date) duration_ms = args.days * 24 * 60 * 60 * 1000 # Determine targeting @@ -419,8 +429,7 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: updates["totalBudget"] = {"amount": args.total_budget, "currencyCode": "USD"} if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.72 / 3.00 across 3 judges
index fe30298..954abfa 100644 @@ -421,6 +421,11 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: if args.start_date: start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") start_ms = int(start_dt.timestamp() * 1000) + # LinkedIn rejects past start dates. Today's date parses to midnight, + # which is in the past, so bump to now in that case. + now_ms = int(time.time() * 1000) + if start_ms < now_ms: + start_ms = now_ms days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.89 / 3.00 across 3 judges
index fe30298..db8548a 100644 @@ -52,6 +52,20 @@ DEFAULT_TARGETING = TargetingCriteria( ) +def _start_date_to_ms(start_date: str) -> int: + """Convert a YYYY-MM-DD start date to epoch milliseconds. + + LinkedIn rejects past start timestamps. ``strptime`` resolves to local + midnight, which is already in the past when the user passes today's date. + For today, fall back to the current time so the API accepts the request; + for future dates, midnight is fine. + """ + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + if start_dt.date() == datetime.now().date(): + return int(time.time() * 1000) + return int(start_dt.timestamp() * 1000) + + def build_parser() -> argparse.ArgumentParser: """Build the argument parser for the LinkedIn Ads CLI. @@ -340,8 +354,7 @@ def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: args: Parsed CLI arguments including post-urn, budgets, days, etc. """ if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _start_date_to_ms(args.start_date) else: start_ms = int(time.time() * 1000) duration_ms = args.days * 24 * 60 * 60 * 1000 @@ -419,8 +432,7 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: updates["totalBudget"] = {"amount": args.total_budget, "currencyCode": "USD"} if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _start_date_to_ms(args.start_date) days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms} index 660627f..cabbbc5 100644 @@ -382,6 +382,42 @@ class TestCampaignsUpdate: assert call_args[0][0] == 12345 assert "PAUSED" in str(call_args) + @patch("linkedin_ads.LinkedInAdsClient") + @patch("linkedin_ads.ensure_valid_token", return_value="mock-token") + def test_update_with_today_start_date_is_not_in_past( + self, + mock_token: MagicMock, + mock_client_cls: MagicMock, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Regression: --start-date set to today must not resolve to local + midnight (already in the past), since LinkedIn rejects past start + timestamps. + """ + from datetime import datetime + + monkeypatch.setenv("LINKEDIN_AD_ACCOUNT_ID", "500000001") + monkeypatch.setenv("LINKEDIN_CLIENT_ID", "test-client") + monkeypatch.setenv("LINKEDIN_CLIENT_SECRET", "test-secret") + + mock_client = MagicMock() + mock_client_cls.return_value = mock_client + + from linkedin_ads import main + + today = datetime.now().strftime("%Y-%m-%d") + before_call_ms = int(time.time() * 1000) + main(["campaigns", "update", "12345", "--start-date", today]) + after_call_ms = int(time.time() * 1000) + + mock_client.update_campaign.assert_called_once() + updates = mock_client.update_campaign.call_args[0][1] + start_ms = updates["runSchedule"]["start"] + + # Must be "now-ish" — bracketed by the wall-clock around the call, + # not local midnight (which would be many hours in the past). + assert before_call_ms <= start_ms <= after_call_ms + # --------------------------------------------------------------------------- # campaigns analytics
Cell mean: 2.83 / 3.00 across 3 judges
index fe30298..4b87943 100644 @@ -31,6 +31,10 @@ from oauth import ensure_valid_token logger = logging.getLogger(__name__) +# Buffer added to a "now" start timestamp so LinkedIn doesn't reject it as past +# by the time the API request is processed. +START_TIME_BUFFER_MS = 60 * 1000 + # Default token file location TOKEN_PATH = Path(__file__).parent / ".linkedin_token.json" @@ -52,6 +56,27 @@ DEFAULT_TARGETING = TargetingCriteria( ) +def _resolve_start_ms(start_date: str) -> int: + """Resolve a YYYY-MM-DD start date to a millisecond timestamp. + + A date string parses as midnight, which is in the past for "today"; LinkedIn + rejects past start times. When the parsed timestamp is already in the past, + fall back to "now + buffer" so today's date is accepted. + + Args: + start_date: Date string in YYYY-MM-DD format. + + Returns: + Millisecond start timestamp safe to send to the LinkedIn API. + """ + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + start_ms = int(start_dt.timestamp() * 1000) + now_ms = int(time.time() * 1000) + if start_ms <= now_ms: + start_ms = now_ms + START_TIME_BUFFER_MS + return start_ms + + def build_parser() -> argparse.ArgumentParser: """Build the argument parser for the LinkedIn Ads CLI. @@ -340,10 +365,9 @@ def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: args: Parsed CLI arguments including post-urn, budgets, days, etc. """ if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) else: - start_ms = int(time.time() * 1000) + start_ms = int(time.time() * 1000) + START_TIME_BUFFER_MS duration_ms = args.days * 24 * 60 * 60 * 1000 # Determine targeting @@ -419,8 +443,7 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: updates["totalBudget"] = {"amount": args.total_budget, "currencyCode": "USD"} if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.72 / 3.00 across 3 judges
index fe30298..4ab44d4 100644 @@ -332,6 +332,27 @@ def _handle_campaigns(client: LinkedInAdsClient, args: argparse.Namespace) -> No sys.exit(1) +def _resolve_start_ms(start_date: str) -> int: + """Resolve a YYYY-MM-DD start date string to epoch milliseconds. + + LinkedIn rejects past start timestamps. Since YYYY-MM-DD parses to + midnight (already in the past for "today"), bump any non-future + timestamp to ``now + 60s`` so today's date is accepted. + + Args: + start_date: Date string in YYYY-MM-DD format. + + Returns: + Epoch milliseconds, guaranteed to be in the future. + """ + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + start_ms = int(start_dt.timestamp() * 1000) + now_ms = int(time.time() * 1000) + if start_ms <= now_ms: + start_ms = now_ms + 60_000 + return start_ms + + def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: """Handle the campaigns create subcommand. @@ -340,8 +361,7 @@ def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: args: Parsed CLI arguments including post-urn, budgets, days, etc. """ if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) else: start_ms = int(time.time() * 1000) duration_ms = args.days * 24 * 60 * 60 * 1000 @@ -419,8 +439,7 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: updates["totalBudget"] = {"amount": args.total_budget, "currencyCode": "USD"} if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.78 / 3.00 across 3 judges
index fe30298..b58d3f4 100644 @@ -421,6 +421,13 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: if args.start_date: start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") start_ms = int(start_dt.timestamp() * 1000) + # LinkedIn rejects past start times. If the parsed date resolves to a + # past instant (e.g. today at 00:00 once the day is already underway), + # bump it to "now" with a small buffer so the request isn't past by + # the time it reaches the API. + now_ms = int(time.time() * 1000) + if start_ms <= now_ms: + start_ms = now_ms + 60_000 days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.72 / 3.00 across 3 judges
index fe30298..d3ad63c 100644 @@ -341,7 +341,7 @@ def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: """ if args.start_date: start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = max(int(start_dt.timestamp() * 1000), int(time.time() * 1000)) else: start_ms = int(time.time() * 1000) duration_ms = args.days * 24 * 60 * 60 * 1000 @@ -420,7 +420,7 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: if args.start_date: start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = max(int(start_dt.timestamp() * 1000), int(time.time() * 1000)) days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.89 / 3.00 across 3 judges
index fe30298..9bc597d 100644 @@ -332,6 +332,18 @@ def _handle_campaigns(client: LinkedInAdsClient, args: argparse.Namespace) -> No sys.exit(1) +def _resolve_start_ms(start_date: str) -> int: + """Convert a YYYY-MM-DD start date to a millisecond timestamp. + + LinkedIn's API rejects start dates in the past, so when the parsed date + resolves to a past instant (e.g. today at midnight), bump it to now. + """ + start_dt = datetime.strptime(start_date, "%Y-%m-%d") + start_ms = int(start_dt.timestamp() * 1000) + now_ms = int(time.time() * 1000) + return max(start_ms, now_ms) + + def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: """Handle the campaigns create subcommand. @@ -340,8 +352,7 @@ def _handle_create(client: LinkedInAdsClient, args: argparse.Namespace) -> None: args: Parsed CLI arguments including post-urn, budgets, days, etc. """ if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) else: start_ms = int(time.time() * 1000) duration_ms = args.days * 24 * 60 * 60 * 1000 @@ -419,8 +430,7 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: updates["totalBudget"] = {"amount": args.total_budget, "currencyCode": "USD"} if args.start_date: - start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") - start_ms = int(start_dt.timestamp() * 1000) + start_ms = _resolve_start_ms(args.start_date) days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.78 / 3.00 across 3 judges
index fe30298..379695f 100644 @@ -421,6 +421,11 @@ def _handle_update(client: LinkedInAdsClient, args: argparse.Namespace) -> None: if args.start_date: start_dt = datetime.strptime(args.start_date, "%Y-%m-%d") start_ms = int(start_dt.timestamp() * 1000) + # LinkedIn rejects past start times. A YYYY-MM-DD parses to midnight, + # so "today" is already in the past — bump to now in that case. + now_ms = int(time.time() * 1000) + if start_ms < now_ms: + start_ms = now_ms days = args.days or 3 end_ms = start_ms + (days * 24 * 60 * 60 * 1000) updates["runSchedule"] = {"start": start_ms, "end": end_ms}
Cell mean: 2.67 / 3.00 across 3 judges
index ed56dc2..64aede3 100644 @@ -494,13 +494,18 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # Batch: one thumbnail per keyframe image in a directory + %(prog)s "AI Replaces" --keyframes-dir keyframes/ -o thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants/--keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images. Generates one thumbnail per image into the output directory.") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", @@ -512,11 +517,44 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + if not args.video and not args.photo and not args.keyframes_dir: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") try: - if args.variants: + if args.keyframes_dir: + kf_dir = Path(args.keyframes_dir) + if not kf_dir.is_dir(): + raise NotADirectoryError(f"--keyframes-dir is not a directory: {kf_dir}") + + image_exts = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff"} + keyframes = sorted( + p for p in kf_dir.iterdir() + if p.is_file() and p.suffix.lower() in image_exts + ) + if not keyframes: + raise FileNotFoundError(f"No image files found in {kf_dir}") + + out_dir = Path(args.output) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + out_path = str(out_dir / f"{kf.stem}.png") + generate_thumbnail( + text=args.text, + output_path=out_path, + photo_path=str(kf), + timestamp=args.timestamp, + ) + results.append(out_path) + + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.83 / 3.00 across 3 judges
index ed56dc2..f220f5c 100644 @@ -29,6 +29,7 @@ except ImportError: # Constants THUMBNAIL_WIDTH = 1280 THUMBNAIL_HEIGHT = 720 +KEYFRAME_EXTS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} # Local assets (decoupled from video_editor) TOOL_DIR = Path(__file__).resolve().parent @@ -494,6 +495,9 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # Batch: one thumbnail per keyframe in a directory + %(prog)s "Casino Code" --keyframes-dir assets/keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") @@ -501,6 +505,8 @@ Examples: help="Output path (file for single, directory for --variants)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; generates one thumbnail per image into --output dir") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", @@ -512,11 +518,53 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources = [bool(args.video), bool(args.photo), bool(args.keyframes_dir)] + if sum(sources) != 1: + parser.error("Exactly one of --video, --photo, or --keyframes-dir must be provided") try: - if args.variants: + if args.keyframes_dir: + kf_dir = Path(args.keyframes_dir) + if not kf_dir.is_dir(): + parser.error(f"--keyframes-dir is not a directory: {kf_dir}") + keyframes = sorted( + p for p in kf_dir.iterdir() + if p.is_file() and p.suffix.lower() in KEYFRAME_EXTS + ) + if not keyframes: + parser.error(f"No keyframe images found in {kf_dir}") + + out_dir = Path(args.output) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + if args.variants: + paths = generate_thumbnail_set( + text=args.text, + output_dir=str(out_dir), + photo_path=str(kf), + timestamp=args.timestamp, + prefix=kf.stem, + ) + results.extend(paths) + else: + out_path = out_dir / f"{kf.stem}.png" + generate_thumbnail( + text=args.text, + output_path=str(out_path), + photo_path=str(kf), + timestamp=args.timestamp, + ) + results.append(str(out_path)) + + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnail(s) from {len(keyframes)} keyframe(s):") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.89 / 3.00 across 3 judges
index ed56dc2..c9b7e75 100644 @@ -477,6 +477,50 @@ def generate_thumbnail_set( return results +KEYFRAME_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tif", ".tiff"} + + +def generate_thumbnails_from_keyframes( + text: str, + keyframes_dir: str, + output_dir: str, + variant: int = 0, +) -> list: + """Generate one thumbnail per keyframe image in a directory. + + Each output file is named ``<keyframe_stem>.png`` in ``output_dir``. + + Returns: + List of output file paths. + """ + src = Path(keyframes_dir) + if not src.is_dir(): + raise NotADirectoryError(f"Keyframes directory not found: {src}") + + keyframes = sorted( + p for p in src.iterdir() + if p.is_file() and p.suffix.lower() in KEYFRAME_EXTENSIONS + ) + if not keyframes: + raise ValueError(f"No keyframe images found in: {src}") + + out_dir = Path(output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + out_path = str(out_dir / f"{kf.stem}.png") + generate_thumbnail( + text=text, + output_path=out_path, + photo_path=str(kf), + variant=variant, + ) + results.append(out_path) + + return results + + def main(): parser = argparse.ArgumentParser( description="Generate YouTube thumbnails for 'AI Will Replace You' channel", @@ -494,17 +538,24 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # Batch: one thumbnail per keyframe image in a directory + %(prog)s "Deploy" --keyframes-dir path/to/keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single; directory for --variants or --keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; generate one thumbnail per image into -o") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", help="Generate 3 thumbnail variants instead of 1") + parser.add_argument("--variant", type=int, default=0, choices=[0, 1, 2], + help="Variant style for --keyframes-dir (default: 0)") parser.add_argument("--prefix", default="thumbnail", help="Filename prefix for variants (default: thumbnail)") parser.add_argument("--json", action="store_true", @@ -512,11 +563,29 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources = sum(bool(x) for x in (args.video, args.photo, args.keyframes_dir)) + if sources == 0: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") + if sources > 1: + parser.error("--video, --photo, and --keyframes-dir are mutually exclusive") + if args.keyframes_dir and args.variants: + parser.error("--keyframes-dir cannot be combined with --variants") try: - if args.variants: + if args.keyframes_dir: + results = generate_thumbnails_from_keyframes( + text=args.text, + keyframes_dir=args.keyframes_dir, + output_dir=args.output, + variant=args.variant, + ) + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.83 / 3.00 across 3 judges
index ed56dc2..319f086 100644 @@ -477,6 +477,59 @@ def generate_thumbnail_set( return results +KEYFRAME_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff"} + + +def generate_thumbnails_from_keyframes( + text: str, + keyframes_dir: str, + output_dir: str, + variant: int = 0, +) -> list: + """Generate one thumbnail per keyframe image in a directory. + + Iterates every supported image file in ``keyframes_dir`` and writes a + thumbnail with the same stem (and a ``.png`` extension) to ``output_dir``. + + Args: + text: Bold text to display on each thumbnail. + keyframes_dir: Directory containing keyframe images. + output_dir: Directory where thumbnails are written. + variant: Thumbnail variant (0=standard, 1=red accent, 2=extra-large). + + Returns: + List of output file paths (one per processed keyframe). + """ + in_dir = Path(keyframes_dir) + if not in_dir.is_dir(): + raise NotADirectoryError(f"Keyframes directory not found: {keyframes_dir}") + + keyframes = sorted( + p for p in in_dir.iterdir() + if p.is_file() and p.suffix.lower() in KEYFRAME_EXTENSIONS + ) + if not keyframes: + raise FileNotFoundError( + f"No keyframe images (extensions: {sorted(KEYFRAME_EXTENSIONS)}) found in {keyframes_dir}" + ) + + out_dir = Path(output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for keyframe in keyframes: + out_path = str(out_dir / f"{keyframe.stem}.png") + generate_thumbnail( + text=text, + output_path=out_path, + photo_path=str(keyframe), + variant=variant, + ) + results.append(out_path) + + return results + + def main(): parser = argparse.ArgumentParser( description="Generate YouTube thumbnails for 'AI Will Replace You' channel", @@ -494,17 +547,24 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # One thumbnail per keyframe in a directory + %(prog)s "Deploy" --keyframes-dir path/to/keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants or --keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; one thumbnail is written per image to --output") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", help="Generate 3 thumbnail variants instead of 1") + parser.add_argument("--variant", type=int, default=0, choices=[0, 1, 2], + help="Variant to use for batch keyframes mode (default: 0)") parser.add_argument("--prefix", default="thumbnail", help="Filename prefix for variants (default: thumbnail)") parser.add_argument("--json", action="store_true", @@ -512,11 +572,29 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources = [bool(args.video), bool(args.photo), bool(args.keyframes_dir)] + if sum(sources) == 0: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") + if sum(sources) > 1: + parser.error("--video, --photo, and --keyframes-dir are mutually exclusive") + if args.keyframes_dir and args.variants: + parser.error("--keyframes-dir cannot be combined with --variants") try: - if args.variants: + if args.keyframes_dir: + results = generate_thumbnails_from_keyframes( + text=args.text, + keyframes_dir=args.keyframes_dir, + output_dir=args.output, + variant=args.variant, + ) + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails from keyframes:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.83 / 3.00 across 3 judges
index ed56dc2..b316b47 100644 @@ -494,13 +494,18 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # One thumbnail per keyframe image in a directory + %(prog)s "Casino Code" --keyframes-dir keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants or --keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; one thumbnail is generated per image") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", @@ -512,11 +517,46 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources = [bool(args.video), bool(args.photo), bool(args.keyframes_dir)] + if sum(sources) == 0: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") + if sum(sources) > 1: + parser.error("--video, --photo, and --keyframes-dir are mutually exclusive") try: - if args.variants: + if args.keyframes_dir: + kf_dir = Path(args.keyframes_dir) + if not kf_dir.is_dir(): + raise NotADirectoryError(f"Keyframes directory not found: {kf_dir}") + + image_exts = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff"} + keyframes = sorted( + p for p in kf_dir.iterdir() + if p.is_file() and p.suffix.lower() in image_exts + ) + if not keyframes: + raise ValueError(f"No keyframe images found in {kf_dir}") + + out_dir = Path(args.output) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + out_path = str(out_dir / f"{kf.stem}.png") + generate_thumbnail( + text=args.text, + output_path=out_path, + photo_path=str(kf), + ) + results.append(out_path) + + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails from keyframes:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.83 / 3.00 across 3 judges
index ed56dc2..20170c9 100644 @@ -477,6 +477,9 @@ def generate_thumbnail_set( return results +IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff"} + + def main(): parser = argparse.ArgumentParser( description="Generate YouTube thumbnails for 'AI Will Replace You' channel", @@ -492,15 +495,20 @@ Examples: # From existing photo %(prog)s "Mastering AI" --photo path/to/headshot.png -o assets/thumbnail.png + # One thumbnail per keyframe in a directory + %(prog)s "Deploy" --keyframes-dir path/to/keyframes/ -o assets/thumbnails/ + # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants/--keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; one thumbnail is generated per image") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", @@ -512,11 +520,45 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources_provided = sum(bool(x) for x in (args.video, args.photo, args.keyframes_dir)) + if sources_provided == 0: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") + if sources_provided > 1: + parser.error("--video, --photo, and --keyframes-dir are mutually exclusive") try: - if args.variants: + if args.keyframes_dir: + kf_dir = Path(args.keyframes_dir) + if not kf_dir.is_dir(): + raise NotADirectoryError(f"Keyframes directory not found: {kf_dir}") + + keyframes = sorted( + p for p in kf_dir.iterdir() + if p.is_file() and p.suffix.lower() in IMAGE_EXTENSIONS + ) + if not keyframes: + raise FileNotFoundError(f"No image files found in: {kf_dir}") + + out_dir = Path(args.output) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + out_path = out_dir / f"{kf.stem}.png" + generate_thumbnail( + text=args.text, + output_path=str(out_path), + photo_path=str(kf), + ) + results.append(str(out_path)) + + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnail(s) from keyframes:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.83 / 3.00 across 3 judges
index ed56dc2..c4d9a83 100644 @@ -477,6 +477,48 @@ def generate_thumbnail_set( return results +KEYFRAME_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp"} + + +def generate_thumbnails_from_keyframes( + text: str, + keyframes_dir: str, + output_dir: str, + variant: int = 0, +) -> list: + """Generate one thumbnail per keyframe image in a directory. + + Returns: + List of output file paths. + """ + src_dir = Path(keyframes_dir) + if not src_dir.is_dir(): + raise NotADirectoryError(f"Keyframes directory not found: {keyframes_dir}") + + keyframes = sorted( + p for p in src_dir.iterdir() + if p.is_file() and p.suffix.lower() in KEYFRAME_EXTENSIONS + ) + if not keyframes: + raise ValueError(f"No keyframe images found in {keyframes_dir}") + + out_dir = Path(output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + out_path = str(out_dir / f"{kf.stem}.png") + generate_thumbnail( + text=text, + output_path=out_path, + photo_path=str(kf), + variant=variant, + ) + results.append(out_path) + + return results + + def main(): parser = argparse.ArgumentParser( description="Generate YouTube thumbnails for 'AI Will Replace You' channel", @@ -494,17 +536,24 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # One thumbnail per keyframe in a directory + %(prog)s "Deploy" --keyframes-dir path/to/keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants or --keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; generates one thumbnail per keyframe into --output dir") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", help="Generate 3 thumbnail variants instead of 1") + parser.add_argument("--variant", type=int, default=0, choices=[0, 1, 2], + help="Variant style for --keyframes-dir mode (0=standard, 1=red accent, 2=extra-large)") parser.add_argument("--prefix", default="thumbnail", help="Filename prefix for variants (default: thumbnail)") parser.add_argument("--json", action="store_true", @@ -512,11 +561,29 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources = [bool(args.video), bool(args.photo), bool(args.keyframes_dir)] + if sum(sources) == 0: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") + if sum(sources) > 1: + parser.error("--video, --photo, and --keyframes-dir are mutually exclusive") + if args.keyframes_dir and args.variants: + parser.error("--keyframes-dir cannot be combined with --variants") try: - if args.variants: + if args.keyframes_dir: + results = generate_thumbnails_from_keyframes( + text=args.text, + keyframes_dir=args.keyframes_dir, + output_dir=args.output, + variant=args.variant, + ) + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails from keyframes:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.89 / 3.00 across 3 judges
index ed56dc2..79a632e 100644 @@ -477,6 +477,48 @@ def generate_thumbnail_set( return results +KEYFRAME_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp", ".bmp"} + + +def generate_thumbnails_from_keyframes( + text: str, + keyframes_dir: str, + output_dir: str, + variant: int = 0, +) -> list: + """Generate one thumbnail per image in a keyframes directory. + + Returns: + List of output file paths. + """ + src_dir = Path(keyframes_dir) + if not src_dir.is_dir(): + raise NotADirectoryError(f"Keyframes directory not found: {src_dir}") + + keyframes = sorted( + p for p in src_dir.iterdir() + if p.is_file() and p.suffix.lower() in KEYFRAME_EXTENSIONS + ) + if not keyframes: + raise ValueError(f"No keyframe images found in {src_dir}") + + out_dir = Path(output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + results = [] + for kf in keyframes: + out_path = str(out_dir / f"{kf.stem}.png") + generate_thumbnail( + text=text, + output_path=out_path, + photo_path=str(kf), + variant=variant, + ) + results.append(out_path) + + return results + + def main(): parser = argparse.ArgumentParser( description="Generate YouTube thumbnails for 'AI Will Replace You' channel", @@ -494,17 +536,24 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # Batch: one thumbnail per keyframe image in a directory + %(prog)s "Deploy" --keyframes-dir path/to/keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants or --keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; generates one thumbnail per image into -o directory") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", help="Generate 3 thumbnail variants instead of 1") + parser.add_argument("--variant", type=int, default=0, choices=[0, 1, 2], + help="Variant style for --keyframes-dir batch (0=standard, 1=red accent, 2=extra-large)") parser.add_argument("--prefix", default="thumbnail", help="Filename prefix for variants (default: thumbnail)") parser.add_argument("--json", action="store_true", @@ -512,11 +561,29 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + sources = [s for s in (args.video, args.photo, args.keyframes_dir) if s] + if not sources: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") + if len(sources) > 1: + parser.error("--video, --photo, and --keyframes-dir are mutually exclusive") + if args.keyframes_dir and args.variants: + parser.error("--keyframes-dir and --variants cannot be combined") try: - if args.variants: + if args.keyframes_dir: + results = generate_thumbnails_from_keyframes( + text=args.text, + keyframes_dir=args.keyframes_dir, + output_dir=args.output, + variant=args.variant, + ) + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails from keyframes:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.61 / 3.00 across 3 judges
index ed56dc2..96671e5 100644 @@ -494,13 +494,18 @@ Examples: # Custom timestamp for frame extraction %(prog)s "Deploy" --video video.mkv --timestamp 00:01:30 -o assets/thumbnail.png + + # Batch process every keyframe in a directory (one thumbnail per keyframe) + %(prog)s "Deploy" --keyframes-dir keyframes/ -o assets/thumbnails/ """ ) parser.add_argument("text", help="Bold text for the thumbnail (1-4 words work best)") parser.add_argument("-o", "--output", required=True, - help="Output path (file for single, directory for --variants)") + help="Output path (file for single, directory for --variants or --keyframes-dir)") parser.add_argument("--video", help="Video file to extract speaker frame from") parser.add_argument("--photo", help="Existing photo to use instead of video extraction") + parser.add_argument("--keyframes-dir", + help="Directory of keyframe images; one thumbnail is generated per image into --output dir") parser.add_argument("--timestamp", default="00:00:05", help="Video timestamp for frame extraction (default: 00:00:05)") parser.add_argument("--variants", action="store_true", @@ -512,11 +517,53 @@ Examples: args = parser.parse_args() - if not args.video and not args.photo: - parser.error("Either --video or --photo must be provided") + if not args.video and not args.photo and not args.keyframes_dir: + parser.error("One of --video, --photo, or --keyframes-dir must be provided") try: - if args.variants: + if args.keyframes_dir: + kf_dir = Path(args.keyframes_dir) + if not kf_dir.is_dir(): + raise NotADirectoryError(f"--keyframes-dir is not a directory: {kf_dir}") + + out_dir = Path(args.output) + out_dir.mkdir(parents=True, exist_ok=True) + + exts = {".png", ".jpg", ".jpeg", ".webp", ".bmp", ".tiff"} + keyframes = sorted( + p for p in kf_dir.iterdir() + if p.is_file() and p.suffix.lower() in exts + ) + if not keyframes: + raise FileNotFoundError(f"No keyframe images found in: {kf_dir}") + + results = [] + for kf in keyframes: + if args.variants: + sub_dir = out_dir / kf.stem + paths = generate_thumbnail_set( + text=args.text, + output_dir=str(sub_dir), + photo_path=str(kf), + prefix=args.prefix, + ) + results.extend(paths) + else: + out_path = out_dir / f"{kf.stem}.png" + generate_thumbnail( + text=args.text, + output_path=str(out_path), + photo_path=str(kf), + ) + results.append(str(out_path)) + + if args.json: + print(json.dumps({"success": True, "files": results})) + else: + print(f"\nGenerated {len(results)} thumbnails from {len(keyframes)} keyframes:") + for r in results: + print(f" {r}") + elif args.variants: results = generate_thumbnail_set( text=args.text, output_dir=args.output,
Cell mean: 2.78 / 3.00 across 3 judges
index 990931d..8b74c47 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,172 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based dedup: existing items get updated, not duplicated.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_updates_instead_of_creating( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """When an item with the same slug exists, update it; do not create.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-99", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-99"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + result = upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + # Args: (collection_id, item_id, field_data) + assert update_args[0][0] == "col-1" + assert update_args[0][1] == "existing-item-99" + mock_client.create_item.assert_not_called() + assert result == {"id": "existing-item-99"} + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_new( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """When no item with the slug exists, create as normal.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "new-item-1"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Updated existing items should still be published when not a draft.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-7"} + mock_client.update_item.return_value = {"id": "existing-7"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with("col-1", ["existing-7"]) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Updated existing items should not be published when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-8"} + mock_client.update_item.return_value = {"id": "existing-8"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """When --update is used, do not perform a slug lookup.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "item-42"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="item-42", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() index 7bdf374..20088dd 100644 @@ -237,6 +237,16 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + result = client.update_item(collection_id, existing_id, field_data) + logger.info("Existing item with slug '%s' updated: %s", metadata["slug"], existing_id) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.83 / 3.00 across 3 judges
index 990931d..cd2818c 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,179 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDeduplication: + """Tests for slug-based duplicate detection in upload_video.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_triggers_update_not_create( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should update existing item instead of creating duplicate.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + assert update_args[0][0] == "col-1" + assert update_args[0][1] == "existing-id" + mock_client.create_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_new_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should create a new item when no item with the slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "new-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish the updated item when draft=False.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with( + "col-1", ["existing-id"] + ) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish the updated item when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not perform slug lookup when --update is used explicitly.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "explicit-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="explicit-id", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() + mock_client.create_item.assert_not_called() index 7bdf374..5da1bdc 100644 @@ -237,6 +237,18 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Auto-detect duplicates: if an item already exists with this slug, + # update it instead of creating a duplicate. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + result = client.update_item(collection_id, existing_id, field_data) + logger.info("Existing item updated by slug: %s", existing_id) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.72 / 3.00 across 3 judges
index 990931d..1f2b750 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,186 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based deduplication on upload_video.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_triggers_update_not_create( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should update the existing item when slug already exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + result = upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + # Positional args: (collection_id, item_id, field_data) + assert update_args[0][0] == "col-1" + assert update_args[0][1] == "existing-item-id" + mock_client.create_item.assert_not_called() + assert result == {"id": "existing-item-id"} + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_missing_slug_falls_through_to_create( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should create a new item when no item with that slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "new-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_dedup_publishes_existing_item_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish the existing item id after dedup-update when draft=False.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with( + "col-1", ["existing-item-id"] + ) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_dedup_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish after dedup-update when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Explicit --update with item_id must not perform a slug lookup.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "item-42"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="item-42", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() + mock_client.create_item.assert_not_called() index 7bdf374..af48a81 100644 @@ -237,6 +237,20 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Dedup by slug: if an item with the same slug already exists, update it + # in place rather than creating a duplicate. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + result = client.update_item(collection_id, existing_id, field_data) + logger.info("Existing item updated by slug: %s", existing_id) + + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.83 / 3.00 across 3 judges
index 990931d..eeb52c0 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,211 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based duplicate detection in upload_video.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_triggers_update_not_create( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should call update_item when an item with the same slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Test", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-99", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-99"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.create_item.assert_not_called() + mock_client.update_item.assert_called_once() + # Verify the existing item id was used for the update + update_args = mock_client.update_item.call_args + assert "existing-item-99" in update_args[0] or update_args[1].get( + "item_id" + ) == "existing-item-99" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_new_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should call create_item when no item with the same slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Test", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "fresh-item-1"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_slug_lookup_uses_metadata_slug( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should query find_item_by_slug with the metadata slug.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Test", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "x"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_dedup_update_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish the existing item id after dedup update when not draft.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Test", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-77", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-77"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with( + "col-1", ["existing-item-77"] + ) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_dedup_update_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish after dedup update when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Test", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-55", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-55"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not call find_item_by_slug when update=True and item_id given.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Updated", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "explicit-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="explicit-id", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() index 7bdf374..973bbe6 100644 @@ -237,6 +237,18 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Detect existing item with the same slug to avoid duplicate creation. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + logger.info("Existing item found for slug=%s id=%s — updating", + metadata["slug"], existing_id) + result = client.update_item(collection_id, existing_id, field_data) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.78 / 3.00 across 3 judges
index 990931d..2068061 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,177 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based deduplication on the create path.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_triggers_update_not_create( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should call update_item when an item with the same slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-99", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-99"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + # update_item(collection_id, item_id, field_data) + assert update_args[0][0] == "col-1" + assert update_args[0][1] == "existing-item-99" + mock_client.create_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should call create_item when no item with the same slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "brand-new-item"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish the existing item after updating when not draft.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-7"} + mock_client.update_item.return_value = {"id": "existing-7"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with("col-1", ["existing-7"]) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish the existing item after updating when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-7"} + mock_client.update_item.return_value = {"id": "existing-7"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not call find_item_by_slug when --update is explicit.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "explicit-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="explicit-id", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() + mock_client.create_item.assert_not_called() index 7bdf374..a91f2a4 100644 @@ -237,6 +237,17 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Auto-detect existing item by slug to avoid duplicates. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + result = client.update_item(collection_id, existing_id, field_data) + logger.info("Existing item found by slug; updated: %s", result.get("id")) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.72 / 3.00 across 3 judges
index 990931d..23e95e6 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,181 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for automatic slug-based dedup on the create path.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_updates_instead_of_creating( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should update the existing item when one with the same slug exists.""" + mock_parse.return_value = valid_metadata + field_data = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_build.return_value = field_data + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-99", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-99"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + result = upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once_with( + "col-1", "existing-item-99", field_data + ) + mock_client.create_item.assert_not_called() + assert result == {"id": "existing-item-99"} + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_updated_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish the updated item by id when not draft.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-7"} + mock_client.update_item.return_value = {"id": "existing-7"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with("col-1", ["existing-7"]) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_with_draft_skips_publish( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish when draft=True even if item exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-1"} + mock_client.update_item.return_value = {"id": "existing-1"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.update_item.assert_called_once() + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_new_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should create when no existing item shares the slug.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "fresh-item"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + result = upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + assert result == {"id": "fresh-item"} + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not call find_item_by_slug when --update is set explicitly.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "s"} + + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "explicit-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="explicit-id", + draft=True, + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once_with( + "col-1", "explicit-id", {"name": "X", "slug": "s"} + ) index 7bdf374..ff1049f 100644 @@ -235,6 +235,22 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + if not draft: + client.publish_items(collection_id, [item_id]) # type: ignore[list-item] + logger.info("Item published") + return result + + # Auto-detect duplicates by slug to avoid creating a second item with + # the same slug. If found, update the existing item in-place. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + logger.info("Existing item with slug %r found (id=%s); updating", + metadata["slug"], existing_id) + result = client.update_item(collection_id, existing_id, field_data) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") return result result = client.create_item(collection_id, field_data, is_draft=draft)
Cell mean: 2.72 / 3.00 across 3 judges
index 990931d..ceda58b 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,185 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based dedup — update existing items instead of duplicating.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_updates_instead_of_creating( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should update an existing item when its slug already exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = { + "name": "Stop Using MCP Tools", + "slug": "stop-using-mcp-tools", + } + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-99", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-99"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + result = upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + # Positional args: (collection_id, item_id, field_data) + assert update_args[0][0] == "col-1" + assert update_args[0][1] == "existing-item-99" + mock_client.create_item.assert_not_called() + assert result == {"id": "existing-item-99"} + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_new_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should create a new item when no item with this slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = { + "name": "Stop Using MCP Tools", + "slug": "stop-using-mcp-tools", + } + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "fresh-item"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish updated item when draft=False (default).""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-7"} + mock_client.update_item.return_value = {"id": "existing-7"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with("col-1", ["existing-7"]) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish updated item when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = {"id": "existing-7"} + mock_client.update_item.return_value = {"id": "existing-7"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Explicit --update with --item-id should bypass slug lookup.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "explicit-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="explicit-id", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + assert update_args[0][1] == "explicit-id" index 7bdf374..3ac9a9c 100644 @@ -237,6 +237,18 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Auto-dedup: if an item with this slug already exists, update it + # rather than creating a duplicate. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + result = client.update_item(collection_id, existing_id, field_data) + logger.info("Existing item updated by slug: %s", existing_id) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.83 / 3.00 across 3 judges
index 990931d..e894f0a 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,181 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based deduplication in upload_video.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_triggers_update_not_create( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should call update_item when an item with the same slug already exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-99", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-99"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + result = upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + # Verify update_item received the existing item id, not a new one + update_args = mock_client.update_item.call_args + assert update_args[0][1] == "existing-item-99" + mock_client.create_item.assert_not_called() + assert result == {"id": "existing-item-99"} + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_no_existing_slug_creates_new_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should call create_item when no item with the slug exists.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "new-item-1"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should publish the updated existing item when draft=False.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-77", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-77"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with( + "col-1", ["existing-item-77"] + ) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not publish the updated existing item when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-44", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-44"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Should not call find_item_by_slug when update=True is explicit.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "Stop Using MCP Tools", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "item-explicit"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="item-explicit", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + assert update_args[0][1] == "item-explicit" index 7bdf374..0de6239 100644 @@ -183,13 +183,16 @@ def upload_video( ) -> dict | None: """Upload (or update) a video CMS item from a YAML metadata file. + When *update* is *False*, the function looks up an existing item by + slug and updates it in place if found, avoiding duplicates. + Args: metadata_path: Path to the video YAML file. collection_id: Target collection ID (prompted if *None*). site_id: Webflow site ID. draft: Save as draft instead of publishing. dry_run: Parse only; skip all API calls. - update: Update an existing item instead of creating. + update: Force update of a specific item by *item_id*. item_id: Required when *update* is *True*. Returns: @@ -237,6 +240,18 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Deduplicate by slug: if an item with this slug already exists, + # update it instead of creating a duplicate. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + logger.info("Existing item found for slug %r, updating: %s", metadata["slug"], existing_id) + result = client.update_item(collection_id, existing_id, field_data) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.72 / 3.00 across 3 judges
index 990931d..3dae336 100644 @@ -331,6 +331,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "new-item"} yaml_path = tmp_path / "video.yaml" @@ -365,6 +366,7 @@ class TestUploadVideoDraftFlag: mock_parse.return_value = valid_metadata mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "draft-item"} yaml_path = tmp_path / "video.yaml" @@ -469,6 +471,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/t.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -505,6 +508,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.create_item.return_value = {"id": "item-1"} yaml_path = tmp_path / "video.yaml" @@ -545,6 +549,7 @@ class TestUploadVideoThumbnail: mock_build.return_value = {"name": "Test", "slug": "test"} mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None mock_client.upload_asset.return_value = "https://cdn.example.com/uploaded.png" mock_client.create_item.return_value = {"id": "item-1"} @@ -561,3 +566,181 @@ class TestUploadVideoThumbnail: mock_build.assert_called_once() call_kwargs = mock_build.call_args assert "https://cdn.example.com/uploaded.png" in str(call_kwargs) + + +class TestUploadVideoSlugDedup: + """Tests for slug-based deduplication on the create path.""" + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_updates_instead_of_creating( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """When an item with the same slug exists, update it; do not create.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.update_item.assert_called_once() + update_args = mock_client.update_item.call_args + # Item id from existing item must be passed. + assert "existing-item-id" in update_args[0] or update_args[1].get( + "item_id" + ) == "existing-item-id" + mock_client.create_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_new_slug_creates_item( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """When no existing item shares the slug, fall back to create_item.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = None + mock_client.create_item.return_value = {"id": "new-item"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + ) + + mock_client.find_item_by_slug.assert_called_once_with( + "col-1", "stop-using-mcp-tools" + ) + mock_client.create_item.assert_called_once() + mock_client.update_item.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_publishes_when_not_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Updated existing items should also be published when draft=False.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=False, + ) + + mock_client.publish_items.assert_called_once_with( + "col-1", ["existing-item-id"] + ) + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_existing_slug_skips_publish_when_draft( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Updated existing items should not be published when draft=True.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.find_item_by_slug.return_value = { + "id": "existing-item-id", + "fieldData": {"slug": "stop-using-mcp-tools"}, + } + mock_client.update_item.return_value = {"id": "existing-item-id"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + draft=True, + ) + + mock_client.publish_items.assert_not_called() + + @patch("upload_video.WebflowClient") + @patch("upload_video.parse_video_metadata") + @patch("upload_video.build_field_data") + def test_explicit_update_skips_slug_lookup( + self, + mock_build: MagicMock, + mock_parse: MagicMock, + mock_client_class: MagicMock, + valid_metadata: dict, + tmp_path: Path, + ) -> None: + """Explicit update=True must not trigger the slug-dedup lookup.""" + mock_parse.return_value = valid_metadata + mock_build.return_value = {"name": "X", "slug": "stop-using-mcp-tools"} + mock_client = mock_client_class.return_value + mock_client.update_item.return_value = {"id": "item-42"} + + yaml_path = tmp_path / "video.yaml" + yaml_path.touch() + + upload_video( + metadata_path=yaml_path, + collection_id="col-1", + site_id="site-1", + update=True, + item_id="item-42", + ) + + mock_client.find_item_by_slug.assert_not_called() + mock_client.update_item.assert_called_once() + mock_client.create_item.assert_not_called() index 7bdf374..fe0e3f3 100644 @@ -237,6 +237,17 @@ def upload_video( logger.info("Item updated: %s", result.get("id")) return result + # Detect existing item by slug to avoid duplicates. + existing = client.find_item_by_slug(collection_id, metadata["slug"]) + if existing: + existing_id = existing.get("id") + result = client.update_item(collection_id, existing_id, field_data) # type: ignore[arg-type] + logger.info("Existing item found by slug; updated: %s", result.get("id")) + if not draft and existing_id: + client.publish_items(collection_id, [existing_id]) + logger.info("Item published") + return result + result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id)
Cell mean: 2.67 / 3.00 across 3 judges
index 5a516cb..3f39cbe 100644 @@ -204,7 +204,16 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: - logger.info("Dry run — no API calls will be made") + mode = "update" if update else "create" + print("Dry run — no API calls will be made") + print(f" mode: {mode}") + print(f" draft: {draft}") + print(f" title: {metadata['title']}") + print(f" slug: {metadata['slug']}") + print(f" video_url: {metadata['video_url']}") + if update: + print(f" item_id: {item_id}") + sys.stdout.flush() return None client = WebflowClient() @@ -234,7 +243,9 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] - logger.info("Item updated: %s", result.get("id")) + result_id = result.get("id") or item_id + logger.info("Item updated: %s", result_id) + _print_upload_success("update", collection_id, result_id, site_id, draft) return result # Dedup: check if an item with this slug already exists @@ -251,6 +262,7 @@ def upload_video( if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + _print_upload_success("update", collection_id, existing_id, site_id, draft) return result result = client.create_item(collection_id, field_data, is_draft=draft) @@ -261,9 +273,31 @@ def upload_video( client.publish_items(collection_id, [created_id]) logger.info("Item published") + _print_upload_success("create", collection_id, created_id, site_id, draft) return result +def _print_upload_success( + mode: str, + collection_id: str, + item_id: str | None, + site_id: str | None, + draft: bool, +) -> None: + """Print a clear success summary to stdout (visible in non-TTY contexts).""" + print(f"Webflow upload succeeded ({mode})") + print(f" mode: {mode}") + print(f" draft: {draft}") + print(f" item_id: {item_id}") + if site_id and item_id: + dashboard = ( + f"https://webflow.com/dashboard/sites/{site_id}" + f"/cms/collections/{collection_id}/items/{item_id}" + ) + print(f" link: {dashboard}") + sys.stdout.flush() + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.72 / 3.00 across 3 judges
index 5a516cb..0199089 100644 @@ -204,7 +204,15 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: - logger.info("Dry run — no API calls will be made") + mode = "update" if update else "create" + print("Webflow upload (dry run — no API calls will be made)", flush=True) + print(f" mode: {mode}", flush=True) + print(f" title: {metadata['title']}", flush=True) + print(f" slug: {metadata['slug']}", flush=True) + print(f" video_url: {metadata['video_url']}", flush=True) + print(f" draft: {draft}", flush=True) + if update: + print(f" item_id: {item_id}", flush=True) return None client = WebflowClient() @@ -235,6 +243,7 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_result("update", result.get("id"), collection_id, draft) return result # Dedup: check if an item with this slug already exists @@ -248,22 +257,48 @@ def upload_video( ) result = client.update_item(collection_id, existing_id, field_data) logger.info("Item updated: %s", existing_id) + published = False if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + published = True + _print_upload_result("update (dedup)", existing_id, collection_id, draft, published=published) return result result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id) + published = False if not draft and created_id: client.publish_items(collection_id, [created_id]) logger.info("Item published") + published = True + _print_upload_result("create", created_id, collection_id, draft, published=published) return result +def _print_upload_result( + mode: str, + item_id: str | None, + collection_id: str, + draft: bool, + published: bool = False, +) -> None: + """Print a clear, stdout-visible summary of a Webflow upload.""" + print("Webflow upload succeeded", flush=True) + print(f" mode: {mode}", flush=True) + print(f" item_id: {item_id}", flush=True) + print(f" draft: {draft}", flush=True) + print(f" published: {published}", flush=True) + if item_id: + print( + f" dashboard: https://webflow.com/dashboard/sites/collections/{collection_id}/items/{item_id}", + flush=True, + ) + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.72 / 3.00 across 3 judges
index 5a516cb..2960a42 100644 @@ -168,6 +168,35 @@ def select_collection( return selected["id"], selected["slug"] +# ------------------------------------------------------------------ +# Stdout reporting (works in non-TTY contexts where logger is silent) +# ------------------------------------------------------------------ + +def _print_upload_result( + mode: str, + item_id: str | None, + collection_id: str | None, + site_id: str | None, + draft: bool, + published: bool, +) -> None: + """Print a clear, user-visible summary of a successful upload.""" + publish_state = "published" if published else ("draft" if draft else "saved (unpublished)") + print("=== Webflow upload OK ===", flush=True) + print(f" mode: {mode}", flush=True) + print(f" item_id: {item_id}", flush=True) + print(f" state: {publish_state}", flush=True) + if site_id: + # Webflow CMS dashboard URL for the collection's items list. + dashboard = ( + f"https://webflow.com/dashboard/sites/{site_id}/cms/collections/" + f"{collection_id}/items/{item_id}" + ) + print(f" dashboard: {dashboard}", flush=True) + elif collection_id: + print(f" collection: {collection_id}", flush=True) + + # ------------------------------------------------------------------ # Main upload logic # ------------------------------------------------------------------ @@ -203,7 +232,21 @@ def upload_video( if update and not item_id: raise ValueError("--item-id is required when using --update") + mode = "update" if update else "create" + publish_flag = "draft" if draft else "published" + if dry_run: + # Print to stdout so output is visible in non-TTY contexts where + # logger handlers may be absent. + print("=== Webflow upload (dry run) ===", flush=True) + print(f" mode: {mode}", flush=True) + print(f" draft: {draft} ({publish_flag})", flush=True) + print(f" title: {metadata['title']}", flush=True) + print(f" slug: {metadata['slug']}", flush=True) + print(f" video_url: {metadata['video_url']}", flush=True) + if item_id: + print(f" item_id: {item_id}", flush=True) + print(" (no API calls made)", flush=True) logger.info("Dry run — no API calls will be made") return None @@ -235,6 +278,14 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_result( + mode="update", + item_id=result.get("id") or item_id, + collection_id=collection_id, + site_id=site_id, + draft=draft, + published=False, + ) return result # Dedup: check if an item with this slug already exists @@ -248,18 +299,39 @@ def upload_video( ) result = client.update_item(collection_id, existing_id, field_data) logger.info("Item updated: %s", existing_id) + published = False if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + published = True + _print_upload_result( + mode="update (dedup by slug)", + item_id=existing_id, + collection_id=collection_id, + site_id=site_id, + draft=draft, + published=published, + ) return result result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id) + published = False if not draft and created_id: client.publish_items(collection_id, [created_id]) logger.info("Item published") + published = True + + _print_upload_result( + mode="create", + item_id=created_id, + collection_id=collection_id, + site_id=site_id, + draft=draft, + published=published, + ) return result
Cell mean: 2.11 / 3.00 across 3 judges
index 5a516cb..9b2beba 100644 @@ -204,6 +204,26 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: + # Print human-readable preview to stdout so callers in non-TTY + # contexts (where logger output is swallowed) can confirm what + # *would* be uploaded. + print("=" * 60, flush=True) + print("DRY RUN — no API calls will be made", flush=True) + print("=" * 60, flush=True) + print(f" Mode: {'update' if update else 'create'}", flush=True) + print(f" Draft: {draft}", flush=True) + print(f" Title: {metadata['title']}", flush=True) + print(f" Slug: {metadata['slug']}", flush=True) + print(f" Video URL: {metadata['video_url']}", flush=True) + if metadata.get("date"): + print(f" Date: {metadata['date']}", flush=True) + if metadata.get("thumbnail"): + print(f" Thumbnail: {metadata['thumbnail']}", flush=True) + if collection_id: + print(f" Collection: {collection_id}", flush=True) + if item_id: + print(f" Item ID: {item_id}", flush=True) + print("=" * 60, flush=True) logger.info("Dry run — no API calls will be made") return None @@ -227,14 +247,24 @@ def upload_video( raise ValueError("--site-id is required for thumbnail upload") thumbnail_url = client.upload_asset(Path(thumb_raw), site_id) logger.info("Thumbnail uploaded: %s", thumbnail_url) + print(f"Thumbnail uploaded: {thumbnail_url}", flush=True) elif thumb_raw: logger.warning("Thumbnail file not found, skipping: %s", thumb_raw) + print(f"Warning: thumbnail file not found, skipping: {thumb_raw}", flush=True) field_data = build_field_data(metadata, collection_slug, thumbnail_url=thumbnail_url) if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_summary( + mode="update", + item_id=result.get("id") or item_id, + collection_id=collection_id, + site_id=site_id, + draft=draft, + published=False, + ) return result # Dedup: check if an item with this slug already exists @@ -248,22 +278,83 @@ def upload_video( ) result = client.update_item(collection_id, existing_id, field_data) logger.info("Item updated: %s", existing_id) + published = False if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + published = True + _print_upload_summary( + mode="update (dedup)", + item_id=existing_id, + collection_id=collection_id, + site_id=site_id, + draft=draft, + published=published, + ) return result result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id) + published = False if not draft and created_id: client.publish_items(collection_id, [created_id]) logger.info("Item published") - + published = True + + _print_upload_summary( + mode="create", + item_id=created_id, + collection_id=collection_id, + site_id=site_id, + draft=draft, + published=published, + ) return result +def _print_upload_summary( + mode: str, + item_id: str | None, + collection_id: str | None, + site_id: str | None, + draft: bool, + published: bool, +) -> None: + """Print a human-readable upload summary to stdout. + + Used so callers in non-TTY contexts can see what happened, since + logger output is typically swallowed there. + + Args: + mode: Operation mode (``create``, ``update``, ``update (dedup)``). + item_id: The Webflow CMS item ID, if available. + collection_id: Target collection ID. + site_id: Webflow site ID, used to build a dashboard link. + draft: Whether the item was saved as a draft. + published: Whether the item was published live. + """ + print("=" * 60, flush=True) + print(f"Webflow upload OK ({mode})", flush=True) + print("=" * 60, flush=True) + print(f" Mode: {mode}", flush=True) + print(f" Item ID: {item_id or '<unknown>'}", flush=True) + if collection_id: + print(f" Collection: {collection_id}", flush=True) + print(f" Draft: {draft}", flush=True) + print(f" Published: {published}", flush=True) + if site_id and item_id: + dashboard = ( + f"https://webflow.com/dashboard/sites/{site_id}" + f"/cms/collections/{collection_id}/items/{item_id}" + if collection_id + else f"https://webflow.com/dashboard/sites/{site_id}" + ) + print(f" Dashboard: {dashboard}", flush=True) + print("=" * 60, flush=True) + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.83 / 3.00 across 3 judges
index 5a516cb..da973fb 100644 @@ -204,6 +204,16 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: + mode = "update" if update else "create" + print("Webflow upload — DRY RUN (no API calls will be made)") + print(f" mode: {mode}") + print(f" draft: {draft}") + print(f" title: {metadata['title']}") + print(f" slug: {metadata['slug']}") + print(f" video_url: {metadata['video_url']}") + if update: + print(f" item_id: {item_id}") + sys.stdout.flush() logger.info("Dry run — no API calls will be made") return None @@ -235,6 +245,7 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_summary("update", result.get("id"), collection_id, site_id, draft) return result # Dedup: check if an item with this slug already exists @@ -251,6 +262,7 @@ def upload_video( if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + _print_upload_summary("update (dedup)", existing_id, collection_id, site_id, draft) return result result = client.create_item(collection_id, field_data, is_draft=draft) @@ -261,9 +273,31 @@ def upload_video( client.publish_items(collection_id, [created_id]) logger.info("Item published") + _print_upload_summary("create", created_id, collection_id, site_id, draft) return result +def _print_upload_summary( + mode: str, + item_id: str | None, + collection_id: str | None, + site_id: str | None, + draft: bool, +) -> None: + """Print a stdout-visible summary of a successful Webflow upload.""" + print("Webflow upload — SUCCESS") + print(f" mode: {mode}") + print(f" item_id: {item_id}") + print(f" draft: {draft}") + if site_id and collection_id and item_id: + dashboard = ( + f"https://webflow.com/dashboard/sites/{site_id}" + f"/cms/collections/{collection_id}/items/{item_id}" + ) + print(f" dashboard: {dashboard}") + sys.stdout.flush() + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.72 / 3.00 across 3 judges
index 5a516cb..b1f3bb5 100644 @@ -203,7 +203,17 @@ def upload_video( if update and not item_id: raise ValueError("--item-id is required when using --update") + mode = "update" if update else "create" + if dry_run: + print("Webflow upload (dry run) — no API calls will be made", flush=True) + print(f" mode: {mode}", flush=True) + print(f" draft: {draft}", flush=True) + print(f" title: {metadata['title']}", flush=True) + print(f" slug: {metadata['slug']}", flush=True) + print(f" video_url: {metadata['video_url']}", flush=True) + if update and item_id: + print(f" item_id: {item_id}", flush=True) logger.info("Dry run — no API calls will be made") return None @@ -235,6 +245,7 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_result("update", result.get("id") or item_id, collection_id, site_id, draft) return result # Dedup: check if an item with this slug already exists @@ -248,22 +259,53 @@ def upload_video( ) result = client.update_item(collection_id, existing_id, field_data) logger.info("Item updated: %s", existing_id) + published = False if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + published = True + _print_upload_result( + "update (dedup)", existing_id, collection_id, site_id, draft, published=published + ) return result result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id) + published = False if not draft and created_id: client.publish_items(collection_id, [created_id]) logger.info("Item published") + published = True + + _print_upload_result("create", created_id, collection_id, site_id, draft, published=published) return result +def _print_upload_result( + mode: str, + item_id: str | None, + collection_id: str | None, + site_id: str | None, + draft: bool, + published: bool = False, +) -> None: + """Print a stdout summary of a successful upload for non-TTY callers.""" + print("Webflow upload succeeded", flush=True) + print(f" mode: {mode}", flush=True) + print(f" item_id: {item_id}", flush=True) + print(f" draft: {draft}", flush=True) + print(f" published: {published}", flush=True) + if site_id and collection_id and item_id: + dashboard = ( + f"https://webflow.com/dashboard/sites/{site_id}" + f"/cms/collections/{collection_id}/items/{item_id}" + ) + print(f" dashboard: {dashboard}", flush=True) + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.72 / 3.00 across 3 judges
index 5a516cb..e27417a 100644 @@ -204,7 +204,16 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: - logger.info("Dry run — no API calls will be made") + mode = "update" if update else "create" + print("=== Webflow upload — DRY RUN ===") + print(f" mode: {mode}") + print(f" draft: {draft}") + print(f" title: {metadata['title']}") + print(f" slug: {metadata['slug']}") + print(f" video_url: {metadata['video_url']}") + if update: + print(f" item_id: {item_id}") + print("Dry run — no API calls will be made") return None client = WebflowClient() @@ -234,7 +243,8 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] - logger.info("Item updated: %s", result.get("id")) + updated_id = result.get("id") or item_id + _print_upload_success("update", updated_id, collection_id, draft=draft, published=False) return result # Dedup: check if an item with this slug already exists @@ -247,23 +257,48 @@ def upload_video( existing_id, ) result = client.update_item(collection_id, existing_id, field_data) - logger.info("Item updated: %s", existing_id) + published = False if not draft: client.publish_items(collection_id, [existing_id]) - logger.info("Item published") + published = True + _print_upload_success( + "update (dedup)", existing_id, collection_id, draft=draft, published=published + ) return result result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") - logger.info("Item created: %s", created_id) + published = False if not draft and created_id: client.publish_items(collection_id, [created_id]) - logger.info("Item published") + published = True + _print_upload_success("create", created_id, collection_id, draft=draft, published=published) return result +def _print_upload_success( + mode: str, + item_id: str | None, + collection_id: str, + *, + draft: bool, + published: bool, +) -> None: + """Print human-readable success status for a Webflow CMS upload.""" + print("=== Webflow upload — SUCCESS ===") + print(f" mode: {mode}") + print(f" item_id: {item_id}") + print(f" draft: {draft}") + print(f" published: {published}") + if item_id: + print( + f" dashboard: https://webflow.com/dashboard/sites/cms/" + f"{collection_id}/items/{item_id}" + ) + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.83 / 3.00 across 3 judges
index 5a516cb..aad2829 100644 @@ -204,6 +204,15 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: + mode = "update" if update else "create" + print("Webflow upload — DRY RUN (no API calls)", flush=True) + print(f" mode: {mode}", flush=True) + print(f" draft: {draft}", flush=True) + print(f" title: {metadata['title']}", flush=True) + print(f" slug: {metadata['slug']}", flush=True) + print(f" video_url: {metadata['video_url']}", flush=True) + if update: + print(f" item_id: {item_id}", flush=True) logger.info("Dry run — no API calls will be made") return None @@ -235,6 +244,7 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_result("update", result.get("id"), collection_id, site_id, draft=draft) return result # Dedup: check if an item with this slug already exists @@ -248,22 +258,71 @@ def upload_video( ) result = client.update_item(collection_id, existing_id, field_data) logger.info("Item updated: %s", existing_id) + published = False if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + published = True + _print_upload_result( + "update (deduped by slug)", + existing_id, + collection_id, + site_id, + draft=draft, + published=published, + ) return result result = client.create_item(collection_id, field_data, is_draft=draft) created_id = result.get("id") logger.info("Item created: %s", created_id) + published = False if not draft and created_id: client.publish_items(collection_id, [created_id]) logger.info("Item published") + published = True + + _print_upload_result("create", created_id, collection_id, site_id, draft=draft, published=published) return result +def _dashboard_link( + item_id: str | None, + collection_id: str | None, + site_id: str | None, +) -> str | None: + """Build a Webflow dashboard URL for *item_id* if enough IDs are known.""" + if not (item_id and collection_id and site_id): + return None + return ( + f"https://webflow.com/dashboard/sites/{site_id}" + f"/cms/collections/{collection_id}/items/{item_id}" + ) + + +def _print_upload_result( + mode: str, + item_id: str | None, + collection_id: str | None, + site_id: str | None, + draft: bool, + published: bool = False, +) -> None: + """Emit a stdout summary so non-TTY callers see the outcome.""" + print("Webflow upload — SUCCESS", flush=True) + print(f" mode: {mode}", flush=True) + print(f" item_id: {item_id or '(unknown)'}", flush=True) + print(f" draft: {draft}", flush=True) + print(f" published: {published}", flush=True) + link = _dashboard_link(item_id, collection_id, site_id) + if link: + print(f" dashboard: {link}", flush=True) + else: + print(" dashboard: (need site_id + collection_id + item_id to build link)", flush=True) + + # ------------------------------------------------------------------ # CLI # ------------------------------------------------------------------
Cell mean: 2.89 / 3.00 across 3 judges
index 5a516cb..d574659 100644 @@ -168,6 +168,14 @@ def select_collection( return selected["id"], selected["slug"] +def _print_upload_result(mode: str, item_id: str | None, collection_id: str) -> None: + """Print upload outcome to stdout so it survives non-TTY contexts.""" + print(f"Webflow upload succeeded ({mode})") + print(f" item_id: {item_id}") + print(f" dashboard: https://webflow.com/dashboard/sites/cms/collections/{collection_id}/items/{item_id}") + sys.stdout.flush() + + # ------------------------------------------------------------------ # Main upload logic # ------------------------------------------------------------------ @@ -204,6 +212,16 @@ def upload_video( raise ValueError("--item-id is required when using --update") if dry_run: + mode = "update" if update else "create" + print("Webflow upload dry run — no API calls will be made") + print(f" mode: {mode}") + print(f" draft: {draft}") + print(f" title: {metadata['title']}") + print(f" slug: {metadata['slug']}") + print(f" video_url: {metadata['video_url']}") + if update: + print(f" item_id: {item_id}") + sys.stdout.flush() logger.info("Dry run — no API calls will be made") return None @@ -235,6 +253,7 @@ def upload_video( if update: result = client.update_item(collection_id, item_id, field_data) # type: ignore[arg-type] logger.info("Item updated: %s", result.get("id")) + _print_upload_result("update", result.get("id") or item_id, collection_id) return result # Dedup: check if an item with this slug already exists @@ -251,6 +270,7 @@ def upload_video( if not draft: client.publish_items(collection_id, [existing_id]) logger.info("Item published") + _print_upload_result("update (slug dedup)", existing_id, collection_id) return result result = client.create_item(collection_id, field_data, is_draft=draft) @@ -261,6 +281,7 @@ def upload_video( client.publish_items(collection_id, [created_id]) logger.info("Item published") + _print_upload_result("create", created_id, collection_id) return result
Cell mean: 2.78 / 3.00 across 3 judges
index 84b3128..02e1e85 100644 @@ -4,19 +4,59 @@ from __future__ import annotations import sys from pathlib import Path +USAGE = "usage: word_count.py <file>" + +EXIT_OK = 0 +EXIT_USAGE = 2 +EXIT_NOT_FOUND = 3 +EXIT_NOT_A_FILE = 4 +EXIT_PERMISSION = 5 +EXIT_DECODE = 6 +EXIT_IO = 7 + def count_words(path: Path) -> int: """Return the whitespace-separated word count of a UTF-8 text file.""" return len(path.read_text(encoding="utf-8").split()) -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) - return 2 - path = Path(sys.argv[1]) - print(count_words(path)) - return 0 +def _err(msg: str) -> None: + print(f"word_count.py: {msg}", file=sys.stderr) + + +def main(argv: list[str] | None = None) -> int: + args = sys.argv[1:] if argv is None else argv + + if len(args) == 1 and args[0] in ("-h", "--help"): + print(USAGE) + return EXIT_OK + + if len(args) != 1: + print(USAGE, file=sys.stderr) + return EXIT_USAGE + + path = Path(args[0]) + + try: + if not path.exists(): + _err(f"{path}: no such file") + return EXIT_NOT_FOUND + if not path.is_file(): + _err(f"{path}: not a regular file") + return EXIT_NOT_A_FILE + count = count_words(path) + except PermissionError: + _err(f"{path}: permission denied") + return EXIT_PERMISSION + except UnicodeDecodeError as exc: + _err(f"{path}: not valid UTF-8 ({exc.reason})") + return EXIT_DECODE + except OSError as exc: + _err(f"{path}: {exc.strerror or exc}") + return EXIT_IO + + print(count) + return EXIT_OK if __name__ == "__main__":
Cell mean: 2.83 / 3.00 across 3 judges
index 84b3128..a0d34e9 100644 @@ -10,12 +10,32 @@ def count_words(path: Path) -> int: return len(path.read_text(encoding="utf-8").split()) -def main() -> int: - if len(sys.argv) != 2: +def main(argv: list[str] | None = None) -> int: + args = sys.argv[1:] if argv is None else argv + if len(args) != 1: print("usage: word_count.py <file>", file=sys.stderr) return 2 - path = Path(sys.argv[1]) - print(count_words(path)) + + path = Path(args[0]) + try: + count = count_words(path) + except FileNotFoundError: + print(f"word_count.py: {path}: no such file", file=sys.stderr) + return 1 + except IsADirectoryError: + print(f"word_count.py: {path}: is a directory", file=sys.stderr) + return 1 + except PermissionError: + print(f"word_count.py: {path}: permission denied", file=sys.stderr) + return 1 + except UnicodeDecodeError: + print(f"word_count.py: {path}: not valid UTF-8 text", file=sys.stderr) + return 1 + except OSError as exc: + print(f"word_count.py: {path}: {exc.strerror or exc}", file=sys.stderr) + return 1 + + print(count) return 0
Cell mean: 2.78 / 3.00 across 3 judges
index 84b3128..afe9b6c 100644 @@ -1,21 +1,66 @@ """Count words in a text file. Single-purpose CLI.""" from __future__ import annotations +import argparse import sys from pathlib import Path +def count_words_text(text: str) -> int: + """Return the whitespace-separated word count of a string.""" + return len(text.split()) + + def count_words(path: Path) -> int: """Return the whitespace-separated word count of a UTF-8 text file.""" - return len(path.read_text(encoding="utf-8").split()) + return count_words_text(path.read_text(encoding="utf-8")) + + +def _read_source(arg: str) -> str: + # "-" means read from stdin; matches common Unix CLI convention. + if arg == "-": + return sys.stdin.read() + path = Path(arg) + if not path.exists(): + raise FileNotFoundError(f"no such file: {arg}") + if path.is_dir(): + raise IsADirectoryError(f"is a directory, not a file: {arg}") + try: + return path.read_text(encoding="utf-8") + except UnicodeDecodeError as e: + # Re-raise with a friendlier message; the original exception chain is preserved. + raise UnicodeDecodeError( + e.encoding, + e.object, + e.start, + e.end, + f"file is not valid UTF-8: {arg}", + ) from None + except PermissionError as e: + raise PermissionError(f"permission denied: {arg}") from e + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="word_count.py", + description="Count whitespace-separated words in a text file.", + ) + parser.add_argument( + "file", + help="path to a UTF-8 text file, or '-' for stdin", + ) + return parser -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) - return 2 - path = Path(sys.argv[1]) - print(count_words(path)) +def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + try: + text = _read_source(args.file) + except (FileNotFoundError, IsADirectoryError, PermissionError, UnicodeDecodeError, OSError) as e: + print(f"word_count.py: error: {e}", file=sys.stderr) + return 1 + print(count_words_text(text)) return 0
Cell mean: 2.83 / 3.00 across 3 judges
index 84b3128..749bbca 100644 @@ -1,22 +1,117 @@ """Count words in a text file. Single-purpose CLI.""" from __future__ import annotations +import argparse import sys from pathlib import Path +from typing import Sequence + +EXIT_OK = 0 +EXIT_USAGE = 2 +EXIT_IO_ERROR = 1 def count_words(path: Path) -> int: - """Return the whitespace-separated word count of a UTF-8 text file.""" + """Return the whitespace-separated word count of a UTF-8 text file. + + Args: + path: Path to a readable UTF-8 encoded text file. + + Returns: + The number of whitespace-separated tokens in the file. + + Raises: + FileNotFoundError: If ``path`` does not exist. + IsADirectoryError: If ``path`` refers to a directory. + PermissionError: If the file cannot be read. + UnicodeDecodeError: If the file is not valid UTF-8. + """ return len(path.read_text(encoding="utf-8").split()) -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) - return 2 - path = Path(sys.argv[1]) - print(count_words(path)) - return 0 +def count_words_stream(stream: "object") -> int: + """Return the whitespace-separated word count from a text stream. + + Reading line-by-line keeps memory bounded for very large inputs. + + Args: + stream: A text-mode iterable yielding strings (e.g. ``sys.stdin``). + + Returns: + The total number of whitespace-separated tokens across all lines. + """ + total = 0 + # Iterate line-by-line to avoid loading the full input into memory. + for line in stream: # type: ignore[union-attr] + total += len(line.split()) + return total + + +def _build_parser() -> argparse.ArgumentParser: + """Construct the argument parser for the CLI.""" + parser = argparse.ArgumentParser( + prog="word_count.py", + description="Count whitespace-separated words in a UTF-8 text file.", + ) + parser.add_argument( + "file", + nargs="?", + help="Path to a UTF-8 text file. Use '-' or omit to read from stdin.", + ) + return parser + + +def main(argv: Sequence[str] | None = None) -> int: + """CLI entry point. + + Args: + argv: Optional argument vector (excluding the program name). When + ``None``, ``sys.argv[1:]`` is used. + + Returns: + Process exit code: 0 on success, 1 on I/O/decoding errors, + 2 on usage errors. + """ + parser = _build_parser() + try: + args = parser.parse_args(argv) + except SystemExit as exc: + # argparse exits with code 2 on usage errors; propagate that contract. + return int(exc.code) if isinstance(exc.code, int) else EXIT_USAGE + + target = args.file + + # Read from stdin when no file is given or '-' is supplied. + if target is None or target == "-": + try: + count = count_words_stream(sys.stdin) + except UnicodeDecodeError as exc: + print(f"error: stdin is not valid UTF-8: {exc}", file=sys.stderr) + return EXIT_IO_ERROR + print(count) + return EXIT_OK + + path = Path(target) + try: + count = count_words(path) + except FileNotFoundError: + print(f"error: file not found: {path}", file=sys.stderr) + return EXIT_IO_ERROR + except IsADirectoryError: + print(f"error: path is a directory, not a file: {path}", file=sys.stderr) + return EXIT_IO_ERROR + except PermissionError: + print(f"error: permission denied: {path}", file=sys.stderr) + return EXIT_IO_ERROR + except UnicodeDecodeError as exc: + print(f"error: file is not valid UTF-8: {path}: {exc}", file=sys.stderr) + return EXIT_IO_ERROR + except OSError as exc: + print(f"error: could not read {path}: {exc}", file=sys.stderr) + return EXIT_IO_ERROR + + print(count) + return EXIT_OK if __name__ == "__main__":
Cell mean: 2.50 / 3.00 across 3 judges
index 84b3128..d55b731 100644 @@ -1,22 +1,74 @@ """Count words in a text file. Single-purpose CLI.""" from __future__ import annotations +import argparse import sys from pathlib import Path +EXIT_OK = 0 +EXIT_USAGE = 2 +EXIT_IO_ERROR = 3 +EXIT_DECODE_ERROR = 4 -def count_words(path: Path) -> int: - """Return the whitespace-separated word count of a UTF-8 text file.""" - return len(path.read_text(encoding="utf-8").split()) +def count_words(path: Path, encoding: str = "utf-8") -> int: + """Return the whitespace-separated word count of a text file.""" + return len(path.read_text(encoding=encoding).split()) -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) - return 2 - path = Path(sys.argv[1]) - print(count_words(path)) - return 0 + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="word_count.py", + description="Count whitespace-separated words in a text file.", + ) + parser.add_argument("file", help="Path to the input text file (use '-' for stdin).") + parser.add_argument( + "--encoding", + default="utf-8", + help="Text encoding to use when reading the file (default: utf-8).", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + + if args.file == "-": + try: + text = sys.stdin.read() + except (OSError, UnicodeDecodeError) as e: + print(f"error: failed to read stdin: {e}", file=sys.stderr) + return EXIT_IO_ERROR + print(len(text.split())) + return EXIT_OK + + path = Path(args.file) + + if not path.exists(): + print(f"error: no such file: {path}", file=sys.stderr) + return EXIT_IO_ERROR + if path.is_dir(): + print(f"error: path is a directory, not a file: {path}", file=sys.stderr) + return EXIT_IO_ERROR + + try: + count = count_words(path, encoding=args.encoding) + except PermissionError as e: + print(f"error: permission denied: {e}", file=sys.stderr) + return EXIT_IO_ERROR + except UnicodeDecodeError as e: + print( + f"error: could not decode {path} as {args.encoding}: {e.reason}", + file=sys.stderr, + ) + return EXIT_DECODE_ERROR + except OSError as e: + print(f"error: failed to read {path}: {e}", file=sys.stderr) + return EXIT_IO_ERROR + + print(count) + return EXIT_OK if __name__ == "__main__":
Cell mean: 2.58 / 3.00 across 3 judges
index 84b3128..ba04854 100644 @@ -1,22 +1,90 @@ """Count words in a text file. Single-purpose CLI.""" from __future__ import annotations +import argparse import sys from pathlib import Path +EXIT_OK = 0 +EXIT_USAGE = 2 +EXIT_NOT_FOUND = 3 +EXIT_NOT_A_FILE = 4 +EXIT_PERMISSION = 5 +EXIT_DECODE = 6 +EXIT_IO = 7 -def count_words(path: Path) -> int: - """Return the whitespace-separated word count of a UTF-8 text file.""" - return len(path.read_text(encoding="utf-8").split()) +def count_words(path: Path, encoding: str = "utf-8") -> int: + """Return the whitespace-separated word count of a text file. -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) - return 2 - path = Path(sys.argv[1]) - print(count_words(path)) - return 0 + Args: + path: Path to the text file. + encoding: Text encoding used to decode the file (default: utf-8). + + Returns: + Number of whitespace-separated tokens in the file. + """ + return len(path.read_text(encoding=encoding).split()) + + +def _build_parser() -> argparse.ArgumentParser: + """Build the CLI argument parser.""" + parser = argparse.ArgumentParser( + prog="word_count.py", + description="Count whitespace-separated words in a text file.", + ) + parser.add_argument( + "file", + type=Path, + help="Path to the text file to count.", + ) + parser.add_argument( + "--encoding", + default="utf-8", + help="Text encoding to use when reading the file (default: utf-8).", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + """CLI entry point. Returns a process exit code.""" + parser = _build_parser() + try: + args = parser.parse_args(argv) + except SystemExit as exc: + # argparse exits with code 2 on usage errors; preserve that. + return int(exc.code) if isinstance(exc.code, int) else EXIT_USAGE + + path: Path = args.file + + if not path.exists(): + print(f"error: file not found: {path}", file=sys.stderr) + return EXIT_NOT_FOUND + if not path.is_file(): + print(f"error: not a regular file: {path}", file=sys.stderr) + return EXIT_NOT_A_FILE + + try: + count = count_words(path, encoding=args.encoding) + except PermissionError as exc: + print(f"error: permission denied: {exc}", file=sys.stderr) + return EXIT_PERMISSION + except UnicodeDecodeError as exc: + print( + f"error: could not decode {path} as {args.encoding}: {exc.reason}", + file=sys.stderr, + ) + return EXIT_DECODE + except LookupError as exc: + # Unknown encoding name passed via --encoding. + print(f"error: unknown encoding {args.encoding!r}: {exc}", file=sys.stderr) + return EXIT_USAGE + except OSError as exc: + print(f"error: could not read {path}: {exc}", file=sys.stderr) + return EXIT_IO + + print(count) + return EXIT_OK if __name__ == "__main__":
Cell mean: 2.67 / 3.00 across 3 judges
index 84b3128..550cee3 100644 @@ -10,12 +10,36 @@ def count_words(path: Path) -> int: return len(path.read_text(encoding="utf-8").split()) -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) +def main(argv: list[str] | None = None) -> int: + args = sys.argv[1:] if argv is None else argv + if len(args) != 1 or args[0] in {"-h", "--help"}: + usage = "usage: word_count.py <file>" + if args and args[0] in {"-h", "--help"}: + print(usage) + return 0 + print(usage, file=sys.stderr) return 2 - path = Path(sys.argv[1]) - print(count_words(path)) + + path = Path(args[0]) + try: + if not path.exists(): + print(f"error: file not found: {path}", file=sys.stderr) + return 1 + if path.is_dir(): + print(f"error: path is a directory, not a file: {path}", file=sys.stderr) + return 1 + count = count_words(path) + except PermissionError: + print(f"error: permission denied: {path}", file=sys.stderr) + return 1 + except UnicodeDecodeError as exc: + print(f"error: file is not valid UTF-8: {path} ({exc.reason})", file=sys.stderr) + return 1 + except OSError as exc: + print(f"error: could not read {path}: {exc.strerror or exc}", file=sys.stderr) + return 1 + + print(count) return 0
Cell mean: 2.50 / 3.00 across 3 judges
index 84b3128..d82a8c6 100644 @@ -1,21 +1,77 @@ """Count words in a text file. Single-purpose CLI.""" from __future__ import annotations +import argparse import sys from pathlib import Path -def count_words(path: Path) -> int: - """Return the whitespace-separated word count of a UTF-8 text file.""" - return len(path.read_text(encoding="utf-8").split()) +def count_words_text(text: str) -> int: + """Return the whitespace-separated word count of a string.""" + return len(text.split()) -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) +def count_words(path: Path, encoding: str = "utf-8") -> int: + """Return the whitespace-separated word count of a text file.""" + return count_words_text(path.read_text(encoding=encoding)) + + +def _read_path(path: Path, encoding: str) -> str: + if not path.exists(): + raise FileNotFoundError(f"no such file: {path}") + if path.is_dir(): + raise IsADirectoryError(f"is a directory, not a file: {path}") + if not path.is_file(): + raise OSError(f"not a regular file: {path}") + return path.read_text(encoding=encoding) + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="word_count.py", + description="Count whitespace-separated words in a text file.", + ) + parser.add_argument( + "file", + help="path to the text file, or '-' to read from stdin", + ) + parser.add_argument( + "--encoding", + default="utf-8", + help="text encoding to use when reading the file (default: utf-8)", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + + try: + if args.file == "-": + text = sys.stdin.read() + else: + text = _read_path(Path(args.file), args.encoding) + except FileNotFoundError as exc: + print(f"word_count.py: error: {exc}", file=sys.stderr) + return 2 + except IsADirectoryError as exc: + print(f"word_count.py: error: {exc}", file=sys.stderr) + return 2 + except PermissionError as exc: + print(f"word_count.py: error: permission denied: {exc.filename or args.file}", file=sys.stderr) return 2 - path = Path(sys.argv[1]) - print(count_words(path)) + except UnicodeDecodeError as exc: + print( + f"word_count.py: error: could not decode {args.file} as {args.encoding}: {exc.reason}", + file=sys.stderr, + ) + return 2 + except OSError as exc: + print(f"word_count.py: error: {exc}", file=sys.stderr) + return 2 + + print(count_words_text(text)) return 0
Cell mean: 2.92 / 3.00 across 3 judges
index 84b3128..441ebce 100644 @@ -1,23 +1,85 @@ """Count words in a text file. Single-purpose CLI.""" from __future__ import annotations +import argparse import sys from pathlib import Path +from typing import TextIO -def count_words(path: Path) -> int: - """Return the whitespace-separated word count of a UTF-8 text file.""" - return len(path.read_text(encoding="utf-8").split()) +def count_words_in_stream(stream: TextIO) -> int: + """Return the whitespace-separated word count of a text stream.""" + total = 0 + for line in stream: + total += len(line.split()) + return total -def main() -> int: - if len(sys.argv) != 2: - print("usage: word_count.py <file>", file=sys.stderr) +def count_words(path: Path, encoding: str = "utf-8") -> int: + """Return the whitespace-separated word count of a text file.""" + with path.open("r", encoding=encoding, errors="strict") as f: + return count_words_in_stream(f) + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="word_count.py", + description="Count whitespace-separated words in a text file.", + ) + parser.add_argument( + "file", + nargs="?", + help="Path to the text file. Use '-' or omit to read from stdin.", + ) + parser.add_argument( + "--encoding", + default="utf-8", + help="Text encoding (default: utf-8).", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = _build_parser() + args = parser.parse_args(argv) + + if args.file in (None, "-"): + return _emit(count_words_in_stream(sys.stdin)) + + path = Path(args.file) + try: + count = count_words(path, encoding=args.encoding) + except FileNotFoundError: + print(f"error: file not found: {path}", file=sys.stderr) + return 2 + except IsADirectoryError: + print(f"error: is a directory, not a file: {path}", file=sys.stderr) return 2 - path = Path(sys.argv[1]) - print(count_words(path)) + except PermissionError: + print(f"error: permission denied: {path}", file=sys.stderr) + return 2 + except UnicodeDecodeError as exc: + print( + f"error: cannot decode {path} as {args.encoding}: {exc.reason}", + file=sys.stderr, + ) + return 2 + except OSError as exc: + print(f"error: cannot read {path}: {exc}", file=sys.stderr) + return 2 + + return _emit(count) + + +def _emit(count: int) -> int: + print(count) return 0 if __name__ == "__main__": - raise SystemExit(main()) + try: + raise SystemExit(main()) + except KeyboardInterrupt: + raise SystemExit(130) + except BrokenPipeError: + raise SystemExit(0)
Cell mean: 2.89 / 3.00 across 3 judges
index bedcbf6..f65d4d6 100644 @@ -1,20 +1,32 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calculate_order_total(items, coupon_pct, tax_pct, shipping_flat): + """Compute the final order total. + + items: list of (name, price, quantity) tuples + coupon_pct: discount percentage applied to the subtotal + tax_pct: tax percentage applied after discount and shipping + shipping_flat: flat shipping fee added when subtotal is below the free-shipping threshold + """ + subtotal = sum(price * quantity for _, price, quantity in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += shipping_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) + + +calc = calculate_order_total if __name__ == "__main__": cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] - print(calc(cart, 10, 8.25, 4.99)) + print(calculate_order_total(cart, 10, 8.25, 4.99))
Cell mean: 2.94 / 3.00 across 3 judges
index bedcbf6..1250265 100644 @@ -1,18 +1,21 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calc(items, coupon_pct, tax_pct, ship_flat): + """items=[(name, price, qty)], coupon_pct, tax_pct, ship_flat.""" + subtotal = sum(price * qty for _, price, qty in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += ship_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) if __name__ == "__main__":
Cell mean: 2.94 / 3.00 across 3 judges
index bedcbf6..a42748c 100644 @@ -1,20 +1,33 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calculate_order_total(items, coupon_pct, tax_pct, shipping_flat): + """Calculate the final order total. + + Args: + items: list of (name, price, quantity) tuples. + coupon_pct: discount percentage applied to the subtotal. + tax_pct: tax percentage applied after discount and shipping. + shipping_flat: flat shipping fee added when below the free-shipping threshold. + + Returns: + Final total rounded to 2 decimal places. + """ + subtotal = sum(price * quantity for _, price, quantity in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += shipping_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) if __name__ == "__main__": cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] - print(calc(cart, 10, 8.25, 4.99)) + print(calculate_order_total(cart, 10, 8.25, 4.99))
Cell mean: 2.83 / 3.00 across 3 judges
index bedcbf6..4ab9712 100644 @@ -1,20 +1,47 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +from typing import List, Tuple -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) +LineItem = Tuple[str, float, int] + +FREE_SHIPPING_THRESHOLD: float = 50.0 +PERCENT_DIVISOR: float = 100.0 + + +def calc( + items: List[LineItem], + coupon_pct: float, + tax_pct: float, + ship_flat: float, +) -> float: + """Compute the final order total. + + Args: + items: Line items as ``(name, unit_price, quantity)`` tuples. + coupon_pct: Percentage discount applied to the subtotal (0-100). + tax_pct: Tax percentage applied after discount and shipping (0-100). + ship_flat: Flat shipping fee added when subtotal is below the + free-shipping threshold. + + Returns: + The final total, rounded to two decimal places. + """ + subtotal: float = sum(unit_price * qty for _, unit_price, qty in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / PERCENT_DIVISOR) + + # Shipping is waived once the (post-discount) subtotal hits the threshold. + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += ship_flat + + # Tax applies to the discounted subtotal plus any shipping charge. + total: float = subtotal + subtotal * (tax_pct / PERCENT_DIVISOR) + + return round(total, 2) if __name__ == "__main__": - cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] + cart: List[LineItem] = [("widget", 9.99, 2), ("gadget", 19.99, 1)] print(calc(cart, 10, 8.25, 4.99))
Cell mean: 2.89 / 3.00 across 3 judges
index bedcbf6..a4ef3da 100644 @@ -1,20 +1,34 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calculate_order_total(items, coupon_pct, tax_pct, shipping_flat): + """Calculate the final order total. + + Args: + items: List of (name, price, quantity) tuples. + coupon_pct: Discount percentage applied to the subtotal. + tax_pct: Tax percentage applied after discount and shipping. + shipping_flat: Flat shipping fee added when subtotal is below + FREE_SHIPPING_THRESHOLD. + + Returns: + Final total rounded to 2 decimal places. + """ + subtotal = sum(price * quantity for _, price, quantity in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += shipping_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) if __name__ == "__main__": cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] - print(calc(cart, 10, 8.25, 4.99)) + print(calculate_order_total(cart, 10, 8.25, 4.99))
Cell mean: 2.06 / 3.00 across 3 judges
index bedcbf6..ada280f 100644 @@ -1,20 +1,59 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +import structlog +from typing import List, Tuple -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) +logger = structlog.get_logger(__name__) + +FREE_SHIPPING_THRESHOLD: float = 50.0 +PERCENT_DIVISOR: float = 100.0 +CURRENCY_PRECISION: int = 2 + +LineItem = Tuple[str, float, int] + + +def _subtotal(items: List[LineItem]) -> float: + """Sum price * quantity across all line items.""" + return sum(price * qty for _, price, qty in items) + + +def _apply_percentage(amount: float, percent: float) -> float: + """Return ``amount`` reduced by ``percent`` percent.""" + return amount - amount * (percent / PERCENT_DIVISOR) + + +def calc( + items: List[LineItem], + coupon_pct: float, + tax_pct: float, + ship_flat: float, +) -> float: + """Compute the final order total. + + Args: + items: List of ``(name, price, qty)`` tuples. + coupon_pct: Discount percentage applied to the subtotal. + tax_pct: Tax percentage applied after shipping. + ship_flat: Flat shipping fee added when subtotal is below the + free-shipping threshold. + + Returns: + The final total rounded to ``CURRENCY_PRECISION`` decimal places. + """ + total = _subtotal(items) + + if coupon_pct > 0: + total = _apply_percentage(total, coupon_pct) + + if total < FREE_SHIPPING_THRESHOLD: + total += ship_flat + + total += total * (tax_pct / PERCENT_DIVISOR) + + return round(total, CURRENCY_PRECISION) if __name__ == "__main__": - cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] - print(calc(cart, 10, 8.25, 4.99)) + cart: List[LineItem] = [("widget", 9.99, 2), ("gadget", 19.99, 1)] + logger.info("order_total_demo", total=calc(cart, 10, 8.25, 4.99))
Cell mean: 2.94 / 3.00 across 3 judges
index bedcbf6..5502f66 100644 @@ -1,20 +1,30 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calculate_order_total(items, coupon_pct, tax_pct, shipping_flat): + """Calculate the final order total. + + Args: + items: list of (name, price, quantity) tuples + coupon_pct: discount percentage applied to the subtotal + tax_pct: tax percentage applied after discount and shipping + shipping_flat: flat shipping fee added when subtotal is below the free-shipping threshold + """ + subtotal = sum(price * quantity for _, price, quantity in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += shipping_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) if __name__ == "__main__": cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] - print(calc(cart, 10, 8.25, 4.99)) + print(calculate_order_total(cart, 10, 8.25, 4.99))
Cell mean: 2.89 / 3.00 across 3 judges
index bedcbf6..1250265 100644 @@ -1,18 +1,21 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calc(items, coupon_pct, tax_pct, ship_flat): + """items=[(name, price, qty)], coupon_pct, tax_pct, ship_flat.""" + subtotal = sum(price * qty for _, price, qty in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += ship_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) if __name__ == "__main__":
Cell mean: 2.94 / 3.00 across 3 judges
index bedcbf6..2125d09 100644 @@ -1,20 +1,29 @@ """Compute order totals with discounts and tax.""" from __future__ import annotations +FREE_SHIPPING_THRESHOLD = 50 -def calc(items, c, t, s): - """items=[(name, price, qty)], c=coupon_pct, t=tax_pct, s=ship_flat.""" - z = 0 - for x in items: - z += x[1] * x[2] - if c > 0: - z = z - z * (c / 100.0) - if z < 50: - z = z + s - z = z + z * (t / 100.0) - return round(z, 2) + +def calculate_order_total(items, coupon_pct, tax_pct, shipping_flat): + """Return the final order total. + + items: list of (name, price, quantity) + coupon_pct: discount percentage applied to the subtotal + tax_pct: tax percentage applied after discount and shipping + shipping_flat: flat shipping fee added when subtotal is below threshold + """ + subtotal = sum(price * quantity for _, price, quantity in items) + + if coupon_pct > 0: + subtotal -= subtotal * (coupon_pct / 100.0) + + if subtotal < FREE_SHIPPING_THRESHOLD: + subtotal += shipping_flat + + total = subtotal + subtotal * (tax_pct / 100.0) + return round(total, 2) if __name__ == "__main__": cart = [("widget", 9.99, 2), ("gadget", 19.99, 1)] - print(calc(cart, 10, 8.25, 4.99)) + print(calculate_order_total(cart, 10, 8.25, 4.99))
Cell mean: 2.92 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 2.92 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 3.00 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 2.92 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 2.92 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 3.00 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 3.00 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 2.92 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 2.92 / 3.00 across 3 judges
index 36e4f28..51308c5 100644 @@ -39,7 +39,7 @@ def fact(n): def average(numbers): - return sum(numbers) / (len(numbers) - 1) # off-by-one: divides by count-1 not count + return sum(numbers) / len(numbers) # misc trailing stuff below — left alone please
Cell mean: 3.00 / 3.00 across 3 judges
index 2a835f8..1fca9c4 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=None) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.
Cell mean: 3.00 / 3.00 across 3 judges
index 2a835f8..1fca9c4 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=None) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.
Cell mean: 2.75 / 3.00 across 3 judges
index 2a835f8..4f2b3f9 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=1024) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.
Cell mean: 3.00 / 3.00 across 3 judges
index 2a835f8..aa8ce38 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,16 +12,20 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=1024) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call. + Results are cached in-process via ``functools.lru_cache`` so repeated + lookups for the same ``user_id`` skip the simulated remote call. + Args: user_id: Numeric user id. Returns: The user record, or None if not found. """ - time.sleep(0.05) # simulated latency + time.sleep(0.05) # simulated latency on cache miss return _FAKE_DB.get(user_id)
Cell mean: 3.00 / 3.00 across 3 judges
index 2a835f8..ecf4a10 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=128) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.
Cell mean: 3.00 / 3.00 across 3 judges
index 2a835f8..d171e14 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,9 +12,13 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=1024) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call. + Results are cached via ``functools.lru_cache`` so repeated lookups + for the same ``user_id`` skip the simulated latency. + Args: user_id: Numeric user id.
Cell mean: 2.94 / 3.00 across 3 judges
index 2a835f8..4f2b3f9 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=1024) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.
Cell mean: 2.89 / 3.00 across 3 judges
index 2a835f8..4f2b3f9 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=1024) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.
Cell mean: 2.89 / 3.00 across 3 judges
index 2a835f8..4f2b3f9 100644 @@ -2,6 +2,7 @@ from __future__ import annotations import time +from functools import lru_cache # Pretend this is a slow remote call (DB, HTTP, etc.). _FAKE_DB: dict[int, dict[str, str]] = { @@ -11,6 +12,7 @@ _FAKE_DB: dict[int, dict[str, str]] = { } +@lru_cache(maxsize=1024) def lookup_user(user_id: int) -> dict[str, str] | None: """Look up a user by id. Simulates a slow remote call.