Source code for super_pocket.project.req_to_date

import httpx, re, asyncio, uvicorn
import tomllib
from super_pocket.settings import click, CONTEXT_SETTINGS, add_help_argument
from super_pocket.utils import print_error
from pathlib import Path
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Sequence, Callable
from rich.console import Console

console = Console()


app = FastAPI(title="Requirements Checker API")

# CORS settings to allow requests from all origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # In production, specify allowed origins
    allow_credentials=False,
    allow_methods=["*"],
    allow_headers=["*"],
)

[docs] class PackageInput(BaseModel): package: str version: str
[docs] class PackageResult(BaseModel): package: str current_version: str latest_patch: Optional[str] = None latest_overall: Optional[str] = None status: str message: Optional[str] = None
[docs] class CheckRequest(BaseModel): packages: List[PackageInput]
def _read_requirements_file(path: Path) -> List[str]: """Returns the dependencies extracted from a requirements file.""" try: with open(path, "r", encoding="utf-8") as file_obj: lines = [line.strip() for line in file_obj.readlines()] except FileNotFoundError as exc: print_error(exc, custom=True, message=f"Requirements file not found: {path}") raise except OSError as exc: print_error(exc, custom=True, message=f"Unable to read {path}") raise specs: List[str] = [] for line in lines: if not line or line.startswith("#") or line.startswith("--"): continue spec = _normalize_dependency_spec(line) if spec: specs.append(spec) if not specs: error = ValueError(f"No valid package found in {path}") print_error(error, custom=True, message=f"No valid package found in {path}") raise error return specs def _read_pyproject_file(path: Path) -> List[str]: """Returns the dependencies extracted from a pyproject.toml file.""" try: content = path.read_bytes() except FileNotFoundError as e: print_error(e, custom=True, message=f"pyproject.toml file not found: {path}") raise except OSError as exc: print_error(exc, custom=True, message=f"Unable to read {path}") raise try: data = tomllib.loads(content.decode("utf-8")) except tomllib.TOMLDecodeError as exc: print_error(exc, custom=True, message=f"TOML parsing error in {path}") raise specs: List[str] = [] # Extract the main dependencies dependencies = data.get("project", {}).get("dependencies", []) for dep in dependencies: # Convert the version specifications to ==version format # Supported: package>=1.0.0, package~=1.0, package==1.0.0, etc. spec = _normalize_dependency_spec(dep) if spec: specs.append(spec) # Optional: extract also the optional dependencies optional_deps = data.get("project", {}).get("optional-dependencies", {}) for group_name, group_deps in optional_deps.items(): for dep in group_deps: spec = _normalize_dependency_spec(dep) if spec: specs.append(spec) if not specs: error = ValueError(f"No dependencies found in {path}") print_error(error, custom=True, message=f"No dependencies found in {path}") raise error return specs def _normalize_dependency_spec(dep: str) -> Optional[str]: """Normalize a dependency specification to package==version format.""" # If already in package==version format, return it as is if "==" in dep: return dep # Extract the package name and version for the other formats # Supported: >=, ~=, >, <, <=, != match = re.match(r'^([a-zA-Z0-9_-]+)\s*([><=!~]+)\s*(.+)$', dep) if match: package = match.group(1) operator = match.group(2) version = match.group(3) # For >=, ~=, use the minimum version specified if operator in (">=", "~=", ">"): return f"{package}=={version}" # For ==, return it as is elif operator == "==": return dep # If no version specified, ignore return None def _expand_spec_inputs(inputs: Sequence[str]) -> List[str]: """Decompose CLI arguments: commas, requirements files, pyproject.toml, etc.""" expanded: List[str] = [] for entry in inputs: if not entry: continue entry = entry.strip() # Handle comma-separated lists in a single argument if ',' in entry: parts = [part.strip() for part in entry.split(',')] expanded.extend(parts) continue potential_path = Path(entry).expanduser() if potential_path.is_file(): # Detect the file type and use the appropriate parser if potential_path.name == "pyproject.toml": expanded.extend(_read_pyproject_file(potential_path)) else: # By default, treat as a requirements.txt file expanded.extend(_read_requirements_file(potential_path)) continue expanded.append(entry) if not expanded: error = ValueError("The list of packages cannot be empty") print_error(error, custom=True, message=str(error)) raise error return expanded
[docs] def parse_package_specs(specs: Sequence[str]) -> List[PackageInput]: """Convert the CLI arguments list to PackageInput objects.""" expanded_specs = _expand_spec_inputs(specs) parsed: List[PackageInput] = [] for spec in expanded_specs: if "==" not in spec: error = ValueError("Each package must be provided in the form name==version") print_error(error, custom=True, message=str(error)) raise error package, version = spec.split("==", 1) package = package.strip() version = version.strip() if not package or not version: error = ValueError(f"Invalid format for '{spec}': name or version missing") print_error(error, custom=True, message=str(error)) raise error parsed.append(PackageInput(package=package, version=version)) return parsed
[docs] def parse_version(version_str: str) -> Optional[dict]: """Parse a version in semver format""" match = re.match(r'^(\d+)\.(\d+)\.(\d+)', version_str) if not match: return None return { 'major': int(match.group(1)), 'minor': int(match.group(2)), 'patch': int(match.group(3)), 'full': version_str }
[docs] def find_latest_patch(current_version: str, all_versions: List[str]) -> Optional[str]: """Find the latest patch compatible version""" current = parse_version(current_version) if not current: return None compatible_versions = [] for v in all_versions: parsed = parse_version(v) if (parsed and parsed['major'] == current['major'] and parsed['minor'] == current['minor'] and parsed['patch'] > current['patch']): compatible_versions.append(parsed) if not compatible_versions: return None # Sort by patch descending and return the first one compatible_versions.sort(key=lambda x: x['patch'], reverse=True) return compatible_versions[0]['full']
[docs] async def check_package(pkg: str, version: str) -> PackageResult: """Check a package on PyPI""" async with httpx.AsyncClient() as client: try: response = await client.get( f"https://pypi.org/pypi/{pkg}/json", timeout=10.0 ) if response.status_code != 200: return PackageResult( package=pkg, current_version=version, status="error", message=f"Package not found (code {response.status_code})" ) data = response.json() all_versions = list(data.get('releases', {}).keys()) latest_patch = find_latest_patch(version, all_versions) return PackageResult( package=pkg, current_version=version, latest_patch=latest_patch, latest_overall=data['info']['version'], status='outdated' if latest_patch else 'up-to-date' ) except httpx.TimeoutException: return PackageResult( package=pkg, current_version=version, status="error", message="Timeout when querying PyPI" ) except Exception as e: return PackageResult( package=pkg, current_version=version, status="error", message=str(e) )
[docs] @app.get("/") async def root(): return { "message": "Requirements Checker API", "endpoints": { "/check": "POST - Check dependencies", "/docs": "Documentation interactive" } }
async def _check_packages(request_packages: List[PackageInput]) -> List[PackageResult]: """Launch the checks on PyPI for the provided list.""" tasks = [check_package(pkg.package.lower(), pkg.version) for pkg in request_packages] return await asyncio.gather(*tasks)
[docs] @app.post("/check", response_model=List[PackageResult]) async def check_packages(request: CheckRequest): """ Check a list of packages and return the available updates """ if not request.packages: raise HTTPException(status_code=400, detail="Empty package list") return await _check_packages(request.packages)
[docs] async def check_packages_from_specs(specs: Sequence[str]) -> List[PackageResult]: """Utility interface for the command line.""" packages = parse_package_specs(specs) return await _check_packages(packages)
[docs] def run_req_to_date(packages: Sequence[str]) -> List[PackageResult]: """Synchronous entry point for CLI (standalone or via pocket).""" return asyncio.run(check_packages_from_specs(packages))
@click.command(name="req-to-date", context_settings=CONTEXT_SETTINGS) @click.argument("packages", nargs=-1) def req_to_date_cli(packages: tuple[str, ...]): """Dependencies Scanner: scan dependencies and print outdated dependencies. Parameters: - packages: package names in the form name==version, comma-separated lists of name==version, path to a pyproject.toml or a requirements.txt file. """ expanded = _expand_spec_inputs(packages) count = 0 try: results = run_req_to_date(expanded) except ValueError as exc: print_error(exc, custom=True, message="ValueError") raise for result in results: if result.current_version != result.latest_overall: console.print( f"{result.package} [red]{result.current_version}[/red] ---> " f"[green]{result.latest_overall}[/green]", style="bold", justify="center", ) count += 1 if count == 0: console.print("\n\n\nEverything's up to date !\n\n\n", style="bold", justify="center") add_help_argument(req_to_date_cli)