feat: added generation process for nginx redirects and listing of 404s from versioned docs data

chore: migrate json to separate repo
This commit is contained in:
Dave O'Connor
2025-06-20 15:51:30 -07:00
parent eeebb708be
commit 7d1b1b8f5b
11 changed files with 1443 additions and 0 deletions

4
.gitignore vendored
View File

@@ -203,3 +203,7 @@ override.tf.json
# Ignore CLI configuration files
.terraformrc
terraform.rc
# for the nginx redirect generation
tarballs/
nginx_redirects_workspace/

154
Taskfile.yml Normal file
View File

@@ -0,0 +1,154 @@
version: '3'
vars:
VERSION_FILTER: "1.30.0-1.88.0"
# below shouldn't need to be changed much, if at all
PYTHON: .venv/bin/python
JUST_MANAGE: "just manage"
JUST_MANAGE_FILTERING: " 2>/dev/null | sed -n '/^\\[/,/^\\]/p'"
DJANGO_SETTINGS_MODULE: config.settings
BZ2_PATH: "https://archives.boost.io/release"
LIVE_DOCS_PATH: "https://boost.org"
TARBALLS_DIR: "tarballs"
WEBSITE_V2_PROCESSING_DIR: "../website-v2-processing"
NGINX_REDIRECTS_DATA: "{{.WEBSITE_V2_PROCESSING_DIR}}/nginx_redirects_data"
VERIFIED_PATHS_DIR: "{{.NGINX_REDIRECTS_DATA}}/verified_paths"
NGINX_REDIRECT_WORKSPACE: "nginx_redirects_workspace"
RESULTS_DIR: "{{.NGINX_REDIRECT_WORKSPACE}}/results"
TARBALLS_JSON: "{{.NGINX_REDIRECT_WORKSPACE}}/stage_1_tarballs.json"
DOCS_JSON: "{{.NGINX_REDIRECT_WORKSPACE}}/stage_2_docs_files.json"
STATUS_404_CSV: "{{.RESULTS_DIR}}/404_urls.csv"
KNOWN_REDIRECTS_JSON: "{{.NGINX_REDIRECTS_DATA}}/known_redirects.json"
REDIRECT_CONFIG: "{{.RESULTS_DIR}}/nginx_redirects.conf"
env:
DJANGO_SETTINGS_MODULE: "{{.DJANGO_SETTINGS_MODULE}}"
tasks:
list-tarballs-json:
desc: "List tarball filenames in JSON format with version info"
cmds:
- "{{.JUST_MANAGE}} 'list_tarballs --version-filter=\"{{.VERSION_FILTER}}\"'"
generate-tarballs-json:
desc: "Generate 1_tarballs.json file with tarball filenames and version info"
preconditions:
- "test -d {{.TARBALLS_DIR}} || mkdir -p {{.TARBALLS_DIR}}"
- "test -d {{.NGINX_REDIRECT_WORKSPACE}} || mkdir -p {{.NGINX_REDIRECT_WORKSPACE}}"
cmds:
- "{{.JUST_MANAGE}} 'list_tarballs --version-filter=\"{{.VERSION_FILTER}}\"' {{.JUST_MANAGE_FILTERING}} > {{.TARBALLS_JSON}}"
determine-docs-files:
desc: "Determine documentation URLs for tarballs"
preconditions:
- "test -f {{.TARBALLS_JSON}}"
cmds:
- "{{.JUST_MANAGE}} 'determine_version_docs_files --json-file={{.TARBALLS_JSON}} --base-dir={{.TARBALLS_DIR}}' {{.JUST_MANAGE_FILTERING}} > {{.DOCS_JSON}}"
analyze-docs-urls:
desc: "Analyze documentation URLs and verify directory contents for tarballs"
preconditions:
- "test -f {{.DOCS_JSON}}"
cmds:
- |
if [ ! -d "{{.WEBSITE_V2_PROCESSING_DIR}}" ]; then
echo "Cloning website-v2-processing repository..."
git clone https://github.com/cppalliance/website-v2-processing "{{.WEBSITE_V2_PROCESSING_DIR}}"
else
echo "Updating website-v2-processing repository..."
cd "{{.WEBSITE_V2_PROCESSING_DIR}}" && git pull
fi
- |
current_branch=$(git branch --show-current)
echo "Checking out branch '$current_branch' in website-v2-processing..."
cd "{{.WEBSITE_V2_PROCESSING_DIR}} && git checkout -B "$current_branch"
- "mkdir -p {{.VERIFIED_PATHS_DIR}}"
- "{{.JUST_MANAGE}} 'analyze_docs_urls --json-file={{.DOCS_JSON}} --output-dir={{.VERIFIED_PATHS_DIR}}'"
generate-path-data:
desc: "Generate path data for tarballs for use in ..."
cmds:
- task: generate-tarballs-json
- |
files_to_download=""
for tarball_path in $(jq -r '.[] | .tarball_filename' {{.TARBALLS_JSON}}); do
tarball_file=$(basename "$tarball_path")
version_slug=$(echo "$tarball_file" | sed 's/boost_//' | sed 's/.tar.bz2//' | sed 's/_/\./g')
paths_file="{{.VERIFIED_PATHS_DIR}}/${version_slug}_paths.json"
if [ -f "$paths_file" ]; then
echo "Skipping download of $tarball_path - verified paths file exists"
else
files_to_download="$files_to_download --filename $tarball_path"
fi
done
if [ -n "$files_to_download" ]; then
{{.JUST_MANAGE}} "download_tarballs --base-url=\"{{.BZ2_PATH}}\" $files_to_download"
else
echo "All verified paths files exist, skipping downloads"
fi
- |
jq -r '.[] | .tarball_filename' {{.TARBALLS_JSON}} | while read tarball_path; do \
tarball_file=$(basename "$tarball_path"); \
dir_name=$(basename "$tarball_file" .tar.bz2); \
version_slug=$(echo "$dir_name" | sed 's/boost_//' | sed 's/_/\./g'); \
paths_file="{{.VERIFIED_PATHS_DIR}}/${version_slug}_paths.json"; \
echo "Processing: $tarball_file -> $dir_name"; \
if [ -f "$paths_file" ]; then \
echo "Verified paths file $paths_file exists, skipping"; \
elif [ -d "{{.TARBALLS_DIR}}/$dir_name" ]; then \
echo "Directory $dir_name exists, skipping"; \
elif [ -f "{{.TARBALLS_DIR}}/$tarball_file" ]; then \
echo "Extracting $tarball_file"; \
tar -xjf "{{.TARBALLS_DIR}}/$tarball_file" -C {{.TARBALLS_DIR}} && echo "Success" || echo "Failed"; \
else \
echo "File $tarball_file not found"; \
fi; \
done
- task: determine-docs-files
- task: analyze-docs-urls
generate-404-list:
desc: "Generate a list of 404 URLs from tarballs"
preconditions:
- "test -d {{.VERIFIED_PATHS_DIR}}"
cmds:
- |
mkdir -p {{.TARBALLS_DIR}}/{{.RESULTS_DIR}}
- |
echo "Version,Source File,Href,Merged Path" > {{.STATUS_404_CSV}}
for file in {{.VERIFIED_PATHS_DIR}}/*_paths.json; do
jq -r '
.[] as $version_data |
$version_data.paths |
to_entries[] |
select(
(.value.is_directory == false) or
(.value.is_directory == true and .value.has_index == false)
) |
.value.references[] as $ref |
[$version_data.version, $ref.referencing_file, $ref.original_url, .key] |
@csv
' "$file" >> {{.STATUS_404_CSV}}
done
generate-nginx-redirect-list:
desc: "Generate nginx redirect configuration for 404 URLs"
preconditions:
- "test -d {{.VERIFIED_PATHS_DIR}}"
cmds:
- |
mkdir -p {{.RESULTS_DIR}}
- "{{.JUST_MANAGE}} 'generate_redirect_list --input-dir={{.VERIFIED_PATHS_DIR}} --known-redirects={{.KNOWN_REDIRECTS_JSON}} --output-file={{.REDIRECT_CONFIG}}'"
check-redirect-urls:
desc: "Check if destination URLs in known_redirects.json return status 200"
preconditions:
- "test -f {{.KNOWN_REDIRECTS_JSON}}"
cmds:
- "{{.PYTHON}} versions/check_redirects.py $(realpath {{.KNOWN_REDIRECTS_JSON}})"
cleanup-total:
desc: "Clean up temporary files and directories"
cmds:
- echo "Cleaning up temporary files..."
- rm -rf {{.TARBALLS_DIR}}
- rm -rf {{.NGINX_REDIRECT_WORKSPACE}}

52
docs/nginx_redirects.md Normal file
View File

@@ -0,0 +1,52 @@
# Nginx Redirect Generation
`/nginx_redirects_data` contains files used to generate the Nginx redirects configuration:
1. `verified_paths` directory, there are files that cache results for each pass over the docs. While it shouldn't be necessary to re-run analysis of a version's docs, if you need to you should delete the relevant version file in the `verified_paths` directory.
1. `known_redirects.json` is the canonical data for destinations for the redirects by version. More details below.
1. `check_redirects.py` is a script you can use to mass confirm that the destinations in known_redirects.json are valid.
## known_redirects.json
format:
```json
{
"/doc/libs/1.34.0/boost/archive/impl": {
"destination": "https://github.com/boostorg/serialization/tree/boost-1.34.0/include/boost/archive/impl",
"redirect_format": "BoostRedirectFormat"
}
}
```
The key `/doc/libs/1.34.0/boost/archive/impl` is the path as it will appear in the url.
The `destination` value is the URL to which a visiting user will be 301 redirected.
the `redirect_format` is the format used to generate the redirect in Nginx. For now we only support `BoostRedirectFormat`, more could be added in the future if needed if the format was to change. This is optional really as BoostRedirectFormat is the default, but was added to be explicit about it, for the sake of developer clarity for the future.
Note: The generated output will merge redirects where they all match.
## Generating Nginx Redirects and 404 data
In the root of the repository:
1. update the `VERSION_FILTER` (e.g. "1.30.0-1.88.0") value in Taskfile.yml to any new versions you want to include.
1. run: `task generate-path-data`.
* For any ACTIVE version that has not been processed before, this will:
1. Create `website-v2-processing` directory with a clone of [`boostorg/website-v2-processing`](https://github.com/cppalliance/website-v2-processing) as a sibling directory to this project's root (i.e. `../website-v2-processing`)
1. Generate a new file in `website-v2-processing/nginx_redirects_data/verified_paths/` with the format: `a.b.c.json` matching the version.
1. You should update `website-v2-processing/nginx_redirects_data/known_redirects.json` with any new 404 directory paths found in the docs for the version which need a redirect. (LLMs are useful for this if there are many)
1. Optional: you may run `task check-redirect-urls` from this project to verify all the destination urls in `known_redirects.json` return a 200 status.
1. For nginx redicts:
1. Run `task generate-nginx-redirect-list` which will create the redirects in `nginx_redirects_workspace/results/nginx_redirects.conf`
1. Use that content to replace the block of locations in `kube/boost/templates/configmap-nginx.yml`.
1. Commit the changes and create a PR.
1. For 404 data: Run `task generate-404-list` which will create the 404 data in `nginx_redirects_workspace/results/404_urls.csv`.
1. To save the analysis for future use a new branch has been created with the same name as the current one in this project, so you should:
1. Commit any changes generated in:
1. `website-v2-processing/nginx_redirects_data/verified_paths/`
2. `website-v2-processing/nginx_redirects_data/known_redirects.json`
1. Create a PR in the [`cppalliance/website-v2-processing`](https://github.com/cppalliance/website-v2-processing) repository with the changes and mention it in the PR/ticket.
## Troubleshooting
For any issues you might see there are stages to the process of generating the nginx and redirect data, with intermediate files that can be inspected to see where the problem is.
These are generated in nginx_redirects_workspace/ as `stage_1_tarballs.json` and `stage_2_docs_files.json`. Finally the files in `nginx_redirects_data/verified_paths/*.json` will contain the final results for each version and that can also be considered an intermediary stage.

106
versions/check_redirects.py Executable file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python3
import time
import requests
from urllib.parse import urlparse
import sys
import argparse
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from versions.utils.common import load_json_dict
def check_url_status(url, timeout=10):
"""Check if URL returns status 200."""
try:
response = requests.head(url, timeout=timeout, allow_redirects=True)
return response.status_code
except requests.exceptions.RequestException as e:
return f"Error: {str(e)}"
def is_valid_url(url):
"""Check if URL is valid and not empty."""
if not url or url.strip() == "":
return False
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except (ValueError, TypeError):
return False
def main():
parser = argparse.ArgumentParser(
description="Check if destination URLs in known_redirects.json return status 200"
)
parser.add_argument("redirects_file", help="Path to known_redirects.json file")
args = parser.parse_args()
redirects_file = Path(args.redirects_file)
if not redirects_file.exists():
print(f"Error: {redirects_file} not found")
sys.exit(1)
print("Loading redirects...")
redirects_data = load_json_dict(str(redirects_file))
if not redirects_data:
print(f"Error: Could not load redirects from {redirects_file}")
sys.exit(1)
valid_redirects = {
source_path: data
for source_path, data in redirects_data.items()
if is_valid_url(data.get("destination", ""))
}
total_redirects = len(valid_redirects)
print(f"Found {total_redirects} valid redirect entries to check")
print("Starting URL status checks with 1 second delay between requests...\n")
success_count = 0
error_count = 0
results = []
for i, (source_path, data) in enumerate(valid_redirects.items(), 1):
destination_url = data["destination"]
print(f"[{i}/{total_redirects}] Checking: {destination_url}")
status = check_url_status(destination_url)
if status == 200:
success_count += 1
status_text = "✓ 200 OK"
else:
error_count += 1
status_text = f"{status}"
print(f"{status_text}")
results.append(
{
"source_path": source_path,
"destination_url": destination_url,
"status": status,
"success": status == 200,
}
)
if i < total_redirects:
time.sleep(1)
print("\n" + "=" * 50)
print(f"Total URLs checked: {total_redirects}")
print(f"Successful (200): {success_count}")
print(f"Failed/Error: {error_count}")
print(f"Success rate: {success_count/total_redirects*100:.1f}%")
failed_results = [r for r in results if not r["success"]]
if failed_results:
print("\nFailed URLs:")
for result in failed_results:
print(f" {result['status']}: {result['destination_url']}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,397 @@
import re
import djclick as click
import json
import requests
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Dict, Any
from bs4 import BeautifulSoup
from versions.utils.common import load_json_list, has_index_files
FILE_FILTER_EXTENSIONS = (
".html",
".htm",
".js",
".xml",
".css",
".txt",
".c",
".mso",
".cpp",
".hpp",
".ipp",
".php",
".py",
".md",
".rst",
".pdf",
".qbk",
".docx",
".xlsx",
".csv",
".json",
".yaml",
".yml",
".txt",
".txt.gz",
".txt.bz2",
".txt.xz",
".txt.zst",
".txt.lz4.in",
".v2",
".dat",
".dat.gz",
".dat.bz2",
".dat.xz",
".dat.zst",
".dat.lz4",
".dot",
".ico",
".toyxml",
".svg",
".png",
".jpg",
".jpeg",
)
def href_pass(url: str) -> bool:
"""Check if URL is local (relative or absolute local path)."""
url = url.strip()
if not url:
return False
# stage 1: quick checks, don't require filesystem access
if any(
[
url.startswith(("http://", "https://", "javascript:", "mailto:")),
url.startswith("{{") and url.endswith("}}"), # Jinja2 style
"#" in url,
"://" in url,
Path(url).suffix in FILE_FILTER_EXTENSIONS,
re.match(r"^[./]+$", url), # catch relative paths, "./", "../", "../../"
]
):
return False
# stage 2: filesystem check only if all quick checks passed, mitigates exception
# trip ups in lazily evaluated any() statement above
return not has_index_files(Path(url))
def extract_href_urls_from_content(content: str) -> List[str]:
"""Extract and filter href URLs from HTML content using BeautifulSoup."""
try:
soup = BeautifulSoup(content, "html.parser")
return [
a_tag.get("href")
for a_tag in soup.find_all("a", href=True)
if a_tag.get("href") and href_pass(a_tag.get("href"))
]
except (AttributeError, TypeError, ValueError):
return []
def process_single_file(file_path: Path, relative_path: str) -> Dict[str, List[str]]:
"""Process a single HTML file and return dict of URLs -> [files that reference them]."""
content = file_path.read_text(encoding="utf-8", errors="ignore")
filtered_urls = extract_href_urls_from_content(content)
return {url: [relative_path] for url in filtered_urls}
def process_version_files(
version_dir: Path, doc_files: List[str]
) -> tuple[Dict[str, List[str]], int]:
"""Process all doc files for a version and return dict of URLs -> referencing files."""
url_references = {}
files_processed = 0
for doc_file in doc_files:
file_path = version_dir / doc_file
file_url_dict = process_single_file(file_path, doc_file)
# Merge URLs into main dict, combining referencing file lists
for url, referencing_files in file_url_dict.items():
if url in url_references:
url_references[url].extend(referencing_files)
else:
url_references[url] = referencing_files[:]
if file_path.exists():
files_processed += 1
return url_references, files_processed
def check_path_exists(base_dir: Path, path: str) -> tuple[bool, bool]:
"""Check if path exists and return (is_file, is_directory)."""
try:
full_path = base_dir / path
if not full_path.exists():
return False, False
return full_path.is_file(), full_path.is_dir()
except ValueError:
return False, False
def resolve_target_path(ref_file: str, url: str, version_dir: Path) -> Path:
"""Resolve a URL relative to a referencing file's directory."""
ref_file_path = version_dir / ref_file
ref_file_dir = ref_file_path.parent
target_path = ref_file_dir / url
return target_path.resolve()
def check_directory_contents(target_dir: Path) -> tuple[bool, bool]:
"""Check if directory has index files and other files."""
has_index = False
has_files = False
if target_dir.exists() and target_dir.is_dir():
has_index = has_index_files(target_dir)
files_in_dir = [f for f in target_dir.iterdir() if f.is_file()]
has_files = len(files_in_dir) > 0
return has_index, has_files
@dataclass
class PathData:
"""Standardized path data with consistent structure."""
references: List[Dict[str, str]]
is_file: bool = False
is_directory: bool = False
is_server_url: bool = False
has_index: bool = False
has_files: bool = False
def create_path_data(relative_target: Path, version_dir: Path) -> Dict[str, Any]:
"""Create path data with existence flags and directory metadata."""
is_file, is_directory = check_path_exists(version_dir, str(relative_target))
has_index = has_files = False
if is_directory:
target_dir = version_dir / relative_target
has_index, has_files = check_directory_contents(target_dir)
path_data = PathData(
references=[],
is_server_url=False,
is_file=is_file,
is_directory=is_directory,
has_index=has_index,
has_files=has_files,
)
result = asdict(path_data)
del result["references"] # Will be created from reference_set
result["reference_set"] = set()
return result
def add_reference_to_path(
existing_path_data: Dict[str, Any], ref_file: str, url: str
) -> None:
"""Add a reference to path data in place."""
if "reference_set" not in existing_path_data:
existing_path_data["reference_set"] = set()
existing_path_data["reference_set"].add((ref_file, url))
def check_filesystem(
url: str,
referencing_files: List[str],
version_dir: Path,
existing_paths: Dict[str, Any],
) -> Dict[str, Any]:
"""Check filesystem for URL references and return updated paths."""
updated_paths = existing_paths.copy()
for ref_file in referencing_files:
try:
normalized_target = resolve_target_path(ref_file, url, version_dir)
relative_target = normalized_target.relative_to(version_dir.resolve())
relative_target_str = str(relative_target)
if relative_target_str not in updated_paths:
updated_paths[relative_target_str] = create_path_data(
relative_target, version_dir
)
add_reference_to_path(updated_paths[relative_target_str], ref_file, url)
except ValueError as e:
print(f"Error resolving path: {e}")
continue
return updated_paths
def check_url_status(url: str) -> bool:
"""Check if a URL returns a 404 status (single attempt, no retries)."""
try:
response = requests.head(url, timeout=10, allow_redirects=True)
return response.status_code != 404
except requests.RequestException:
return False
def check_server(
url: str,
referencing_files: List[str],
version_dir: Path,
existing_paths: Dict[str, Any],
version_slug: str = "",
) -> Dict[str, Any]:
"""Check server for URL references by fetching HTML from server and checking URLs."""
updated_paths = existing_paths.copy()
for ref_file in referencing_files:
try:
# Extract version number from slug (boost-1-79-0 -> 1_79_0)
version_number = version_slug.replace("boost-", "").replace("-", "_")
response = requests.get(
f"http://web:8000/doc/libs/{version_number}/{ref_file}", timeout=15
)
if response.status_code != 200:
continue
all_hrefs = extract_href_urls_from_content(response.text)
if url in all_hrefs:
url_exists = check_url_status(url)
if url not in updated_paths:
path_data = PathData(
references=[],
is_server_url=True,
is_file=url_exists,
is_directory=False,
has_index=False,
has_files=False,
)
result = asdict(path_data)
del result["references"] # Will be created from reference_set
result["reference_set"] = set()
updated_paths[url] = result
add_reference_to_path(updated_paths[url], ref_file, url)
except (requests.RequestException, ValueError, KeyError):
continue
return updated_paths
def is_django_template_url(url: str) -> bool:
"""Check if URL looks like a Django template (contains template syntax)."""
return "{%" in url or "{{" in url
def process_url_reference(
url: str,
referencing_files: List[str],
version_dir: Path,
existing_paths: Dict[str, Any],
version_slug: str = "",
) -> Dict[str, Any]:
"""Process a single URL and its referencing files, returning updated paths."""
if is_django_template_url(url):
return check_server(
url, referencing_files, version_dir, existing_paths, version_slug
)
else:
return check_filesystem(url, referencing_files, version_dir, existing_paths)
def analyze_version_urls(version_data: Dict[str, Any], base_dir: str) -> Dict[str, Any]:
"""Analyze all documentation files for a version, extract URLs, and verify paths."""
version_name = version_data.get("version")
slug = version_data.get("slug")
doc_files = version_data.get("doc_files", [])
directory_exists = version_data.get("directory_exists", False)
if not version_name or not slug:
raise ValueError(
f"Missing required fields: version_name={version_name}, slug={slug}"
)
if not directory_exists:
return {"version": version_name, "directory_exists": False, "paths": {}}
version_dir = Path(base_dir) / slug.replace("-", "_")
url_references, files_processed = process_version_files(version_dir, doc_files)
# Process each URL and verify paths
paths_result = {}
for url, referencing_files in url_references.items():
paths_result = process_url_reference(
url, referencing_files, version_dir, paths_result, slug
)
# Convert reference sets to lists for JSON serialization
for path_data in paths_result.values():
path_data["references"] = [
{"referencing_file": ref_file, "original_url": url}
for ref_file, url in path_data.pop("reference_set", set())
]
return {
"version": version_name,
"directory_exists": directory_exists,
"version_directory": slug.replace("-", "_"),
"total_doc_files": len(doc_files),
"files_processed": files_processed,
"paths": paths_result,
}
@click.command()
@click.option(
"--json-file", required=True, help="JSON file containing documentation information"
)
@click.option(
"--base-dir",
default="tarballs",
help="Base directory containing extracted tarballs",
)
@click.option(
"--output-dir",
required=True,
help="Directory to write individual version JSON files",
)
def command(json_file: str, base_dir: str, output_dir: str):
"""Analyze local documentation URLs and verify that referenced paths exist.
Takes a JSON file with documentation file information, scans each HTML file
to extract local href URLs, then verifies that all referenced files and
directories actually exist in the extracted tarballs. Writes individual
JSON files for each version.
Examples:
python manage.py analyze_docs_urls --json-file=tarballs/docs_files.json --output-dir=nginx_redirects_data
"""
docs_data = load_json_list(json_file)
if not docs_data:
return
if not Path(base_dir).exists():
click.echo(f"Warning: Base directory '{base_dir}' does not exist")
Path(output_dir).mkdir(parents=True, exist_ok=True)
for version_data in docs_data:
version_name = version_data.get("version", "unknown")
version_slug = version_name.replace("boost-", "")
output_file = Path(output_dir) / f"{version_slug}_paths.json"
if output_file.exists():
click.echo(f"Skipping {version_name} - {output_file} already exists")
continue
result = analyze_version_urls(version_data, base_dir)
output_file.write_text(json.dumps([result], indent=2))
output_file.chmod(0o666)
click.echo(f"Written {output_file}")

View File

@@ -0,0 +1,70 @@
import djclick as click
import json
from pathlib import Path
from typing import List, Dict, Any
from versions.utils.common import load_json_list, get_version_directory_from_tarball
def find_doc_files(directory: Path) -> List[str]:
"""Find all HTML and HTM files in the directory recursively."""
files = []
# Use glob pattern to match both .html and .htm files
for file_path in directory.rglob("*.htm*"):
relative_path = file_path.relative_to(directory)
files.append(str(relative_path))
return sorted(files)
def process_version_directory(
version_dir: Path, version_info: Dict[str, Any]
) -> Dict[str, Any]:
"""Process a single version directory to find documentation files."""
result = {
"version": version_info["version"],
"slug": version_info["slug"],
"tarball_filename": version_info["tarball_filename"],
"directory_exists": version_dir.exists(),
"doc_files": [],
"total_files": 0,
}
if version_dir.exists():
result["doc_files"] = find_doc_files(version_dir)
result["total_files"] = len(result["doc_files"])
return result
@click.command()
@click.option(
"--json-file", required=True, help="JSON file containing version information"
)
@click.option(
"--base-dir",
default="tarballs",
help="Base directory containing extracted tarballs",
)
def command(json_file: str, base_dir: str):
"""Determine documentation files for versions by scanning extracted tarballs.
Takes a JSON file with version information and scans the corresponding
extracted directories to find HTML/HTM documentation files.
Examples:
python manage.py determine_version_docs_files --json-file=nginx_redirects_workspace/stage_1_tarballs.json
python manage.py determine_version_docs_files --json-file=nginx_redirects_workspace/stage_1_tarballs.json --base-dir=tarballs
"""
versions_data = load_json_list(json_file)
if not versions_data:
return
if not Path(base_dir).exists():
click.echo(f"Warning: Base directory '{base_dir}' does not exist")
results = []
for version_info in versions_data:
version_dir = get_version_directory_from_tarball(version_info, base_dir)
result = process_version_directory(version_dir, version_info)
results.append(result)
click.echo(json.dumps(results, indent=2))

View File

@@ -0,0 +1,193 @@
import djclick as click
import os
import requests
from pathlib import Path
from urllib.parse import urljoin
import time
from typing import List
from enum import Enum
class DownloadResult(Enum):
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
def download_file(url: str, destination_path: Path, chunk_size: int = 8192) -> bool:
"""Download a file from URL to destination path with progress indication."""
try:
click.echo(f"download {url=}")
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
with open(destination_path, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
if total_size > 0:
progress = (downloaded_size / total_size) * 100
click.echo(
f"\rDownloading {os.path.basename(destination_path)}: {progress:.1f}%",
nl=False,
)
click.echo() # New line after progress
return True
except requests.exceptions.RequestException as e:
click.echo(f"Error downloading {url}: {e}", err=True)
return False
def get_filename_from_url(url: str) -> str:
"""Extract filename from URL, fallback to timestamp-based name."""
filename = os.path.basename(url)
return (
filename
if filename and "." in filename
else f"tarball_{int(time.time())}.tar.bz2"
)
def build_download_urls(base_url: str, filenames: List[str]) -> List[str]:
"""Build complete URLs from base URL and filenames."""
return [urljoin(base_url.rstrip("/") + "/", fname) for fname in filenames]
def should_skip_existing_file(destination: Path, overwrite: bool) -> bool:
"""Check if file should be skipped due to existing file."""
return not overwrite and destination.exists()
def download_with_retries(
url: str, destination: Path, max_retries: int, retry_delay: int
) -> bool:
"""Download file with retry logic."""
for attempt in range(1, max_retries + 1):
if attempt > 1:
click.echo(f"Retry attempt {attempt}/{max_retries}")
success = download_file(url, destination)
if success:
return True
elif attempt < max_retries:
time.sleep(retry_delay)
return False
def process_single_download(
download_url: str,
output_path: Path,
overwrite: bool,
max_retries: int,
retry_delay: int,
) -> DownloadResult:
"""Process a single file download."""
filename = get_filename_from_url(download_url)
destination = output_path / filename
if should_skip_existing_file(destination, overwrite):
click.echo(f"File {filename} exists, skipping (use --overwrite to replace)")
return DownloadResult.SKIPPED
if download_with_retries(download_url, destination, max_retries, retry_delay):
try:
click.echo(f"Downloaded {filename}")
return DownloadResult.SUCCESS
except FileNotFoundError:
click.echo(f"Error: File {filename} was not created after download.")
return DownloadResult.FAILED
else:
click.echo(f"Failed to download {filename}")
if destination.exists():
destination.unlink()
return DownloadResult.FAILED
def print_download_summary(
successful: int, failed: int, skipped: int, total: int
) -> None:
"""Print download completion summary."""
click.echo("\nDownload Summary:")
click.echo(f" Successful: {successful}")
click.echo(f" Already downloaded: {skipped}")
click.echo(f" Failed: {failed}")
click.echo(f" Total: {total}")
@click.command()
@click.option("--base-url", required=True, help="Base URL to combine with filenames")
@click.option(
"--filename",
multiple=True,
required=True,
help="Filename to append to base URL (can be used multiple times)",
)
@click.option(
"--output-dir",
default="/code/tarballs",
help="Output directory for downloaded files",
)
@click.option("--overwrite", is_flag=True, help="Overwrite existing files")
@click.option(
"--max-retries", default=3, help="Maximum number of retry attempts per URL"
)
@click.option("--delay", default=1, help="Delay in seconds between downloads")
def command(
base_url: str,
filename: List[str],
output_dir: str,
overwrite: bool,
max_retries: int,
delay: int,
) -> None:
"""Download one or more tarballs using base URL + filenames.
Combines a base URL with multiple filenames to download tarballs.
Examples:
# Download using base URL + filenames
python manage.py download_tarballs --base-url=https://archives.boost.io/release/ --filename=boost_1_81_0.tar.bz2 --filename=boost_1_82_0.tar.bz2
# With custom settings
python manage.py download_tarballs --base-url=https://archives.boost.io/release/ --filename=boost_1_81_0.tar.bz2 --overwrite --max-retries=5 --delay=2
"""
urls_to_download = build_download_urls(base_url, filename)
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
click.echo(
f"Downloading {len(urls_to_download)} tarball(s) to {output_path.absolute()}"
)
results = {
DownloadResult.SUCCESS: 0,
DownloadResult.FAILED: 0,
DownloadResult.SKIPPED: 0,
}
for i, download_url in enumerate(urls_to_download, 1):
click.echo(f"\n[{i}/{len(urls_to_download)}] Processing: {download_url}")
result = process_single_download(
download_url, output_path, overwrite, max_retries, delay
)
results[result] += 1
if i < len(urls_to_download) and delay > 0:
time.sleep(delay)
print_download_summary(
results[DownloadResult.SUCCESS],
results[DownloadResult.FAILED],
results[DownloadResult.SKIPPED],
len(urls_to_download),
)
if results[DownloadResult.FAILED] > 0:
exit(1)

View File

@@ -0,0 +1,301 @@
import djclick as click
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple, Union
from versions.utils.common import load_json_dict
DEFAULT_REDIRECT_FORMAT = "BoostRedirectFormat"
REDIRECT_REGEX = r"location [=~] \^?(.+?)\$? \{ return 301 (.+?); \}"
class RedirectFormat:
"""Base class for handling redirect URL formatting."""
def extract_source_pattern(self, source_url: str) -> str:
"""Extract the path pattern from source URL for grouping."""
raise NotImplementedError
def normalize_destination(self, destination: str) -> str:
"""Normalize destination for grouping purposes."""
raise NotImplementedError
def create_regex_source(self, source_url: str) -> str:
"""Convert source URL to regex pattern with version capture group."""
raise NotImplementedError
def create_regex_destination(self, destination: str) -> str:
"""Convert destination to use regex backreference."""
raise NotImplementedError
def can_merge_destinations(self, destinations: List[str]) -> bool:
"""Check if destinations can be merged."""
raise NotImplementedError
class BoostRedirectFormat(RedirectFormat):
"""Handles Boost-specific redirect URL formatting."""
def extract_source_pattern(self, source_url: str) -> str:
"""Extract path after version: /doc/libs/VERSION/path -> path"""
match = re.search(r"/doc/libs/[^/]+/(.+?)(?:\$|$)", source_url)
return match.group(1) if match else source_url
def normalize_destination(self, destination: str) -> str:
"""Normalize destination by replacing version-specific parts."""
return re.sub(r"boost-[\d\.]+", "boost-VERSION", destination)
def create_regex_source(self, source_url: str) -> str:
"""Convert /doc/libs/VERSION/path to /doc/libs/([^/]+)/path"""
return re.sub(r"/doc/libs/[^/]+/", "/doc/libs/([^/]+)/", source_url)
def create_regex_destination(self, destination: str) -> str:
"""Convert boost-1.79.0 to boost-$1 in destination."""
return re.sub(r"boost-[\d\.]+", "boost-$1", destination)
def can_merge_destinations(self, destinations: List[str]) -> bool:
"""Check if destinations can be merged (only differ by version)."""
if len(destinations) <= 1:
return True
# Normalize destinations by replacing versions
normalized = [self.normalize_destination(dest) for dest in destinations]
# All normalized destinations should be the same
return len(set(normalized)) == 1
@dataclass
class ParsedRedirect:
"""Parsed redirect with extracted components."""
original: str
source_url: str
destination: str
formatter_type: str
formatter: RedirectFormat
path_pattern: str
normalized_dest: str
def parse_redirects(
redirects: List[str], known_redirect_map: Dict[str, Dict[str, str]]
) -> List[ParsedRedirect]:
"""Parse redirects once and extract all needed data."""
parsed = []
for redirect in redirects:
source_match = re.search(REDIRECT_REGEX, redirect)
if not source_match:
continue
source_url, destination = source_match.groups()
# Get formatter type and instance
formatter_type = known_redirect_map.get(source_url, {}).get(
"redirect_format", DEFAULT_REDIRECT_FORMAT
)
formatter = get_formatter(formatter_type)
# Extract pattern data
path_pattern = formatter.extract_source_pattern(source_url)
normalized_dest = formatter.normalize_destination(destination)
parsed.append(
ParsedRedirect(
original=redirect,
source_url=source_url,
destination=destination,
formatter_type=formatter_type,
formatter=formatter,
path_pattern=path_pattern,
normalized_dest=normalized_dest,
)
)
return parsed
def group_parsed_redirects(
parsed_redirects: List[ParsedRedirect],
) -> Dict[str, Dict[str, List[ParsedRedirect]]]:
"""Group parsed redirects by formatter type and then by pattern."""
groups = {}
for parsed in parsed_redirects:
if parsed.formatter_type not in groups:
groups[parsed.formatter_type] = {}
group_key = f"{parsed.path_pattern}::{parsed.normalized_dest}"
if group_key not in groups[parsed.formatter_type]:
groups[parsed.formatter_type][group_key] = []
groups[parsed.formatter_type][group_key].append(parsed)
return groups
def merge_redirect_group(group: List[ParsedRedirect]) -> List[str]:
"""Merge a group of parsed redirects or keep them separate."""
if len(group) == 1:
return [group[0].original]
destinations = [parsed.destination for parsed in group]
formatter = group[0].formatter
if not formatter.can_merge_destinations(destinations):
return [parsed.original for parsed in group]
first = group[0]
regex_source = formatter.create_regex_source(first.source_url)
regex_destination = formatter.create_regex_destination(first.destination)
merged = f"location ~ ^{regex_source}$ {{ return 301 {regex_destination}; }}"
return [merged]
def merge_version_patterns_optimized(
redirects: List[str], known_redirect_map: Dict[str, Dict[str, str]]
) -> List[str]:
"""Optimized merge that parses redirects once and processes by formatter type."""
parsed_redirects = parse_redirects(redirects, known_redirect_map)
groups = group_parsed_redirects(parsed_redirects)
merged = []
for formatter_type, pattern_groups in groups.items():
for group_key, group in pattern_groups.items():
merged.extend(merge_redirect_group(group))
return merged
def create_default_redirect_config() -> Dict[str, str]:
"""Create default redirect configuration object."""
return {"destination": "", "redirect_format": DEFAULT_REDIRECT_FORMAT}
def get_formatter(format_type: str) -> RedirectFormat:
"""Get formatter instance based on format type."""
if format_type == "BoostRedirectFormat":
return BoostRedirectFormat()
else:
# Default to BoostRedirectFormat for unknown types
return BoostRedirectFormat()
def should_create_redirect(path_info: Dict[str, Union[str, bool]]) -> bool:
"""Determine if a path should have a redirect created."""
return path_info.get("is_directory", True) and not path_info.get("has_index", False)
def create_source_url(version: str, path: str) -> str:
"""Create source URL from version and path."""
version_path = version.replace("boost-", "").replace("-", "_")
return f"/doc/libs/{version_path}/{path}"
def create_redirect_line(source_url: str, destination: str) -> str:
"""Create nginx redirect line with exact location match."""
return f"location = {source_url} {{ return 301 {destination}; }}"
def create_redirects_and_update_map(
verified_data: List[Dict], known_redirect_map: Dict[str, Dict[str, str]]
) -> Tuple[List[str], Dict[str, Dict[str, str]]]:
"""Generate redirect lines from verified data and update redirect map."""
redirects = []
updated_redirect_map = known_redirect_map.copy()
for version_data in verified_data:
version = version_data.get("version", "unknown")
paths = version_data.get("paths", {})
for path, path_info in paths.items():
if not should_create_redirect(path_info):
continue
source_url = create_source_url(version, path)
destination = known_redirect_map.get(source_url, {}).get("destination", "")
# Update redirect map data if not already present
if source_url not in updated_redirect_map:
updated_redirect_map[source_url] = create_default_redirect_config()
redirect_line = create_redirect_line(source_url, destination)
redirects.append(redirect_line)
return redirects, updated_redirect_map
def save_updated_redirects(
known_redirects_file: str, updated_redirect_map: Dict[str, Dict[str, str]]
) -> None:
"""Save updated redirect map to file if changes were made."""
try:
with open(known_redirects_file, "w") as f:
json.dump(dict(sorted(updated_redirect_map.items())), f, indent=2)
except Exception as e:
click.echo(
f"Warning: Could not update known redirects file {known_redirects_file}: {e}",
err=True,
)
def output_nginx_configuration(merged_redirects: List[str], output_file: str) -> None:
"""Output the nginx configuration."""
with open(output_file, "w") as f:
for redirect in sorted(merged_redirects):
f.write(redirect + "\n")
click.echo(f"Nginx configuration written to {output_file}")
@click.command()
@click.option(
"--input-dir",
required=True,
help="Directory containing individual version verified paths JSON files",
)
@click.option(
"--known-redirects",
required=True,
help="JSON file containing known redirect destinations",
)
@click.option(
"--output-file", required=True, help="Output file for nginx redirect configuration"
)
def command(input_dir: str, known_redirects: str, output_file: str):
"""Generate nginx redirect configuration from verified paths data.
Extracts paths that need redirects (directories without index files or non-existent files)
and outputs them as nginx rewrite directives.
Examples:
python manage.py generate_redirect_list --input-dir=nginx_redirects_data --known-redirects=known_redirects.json --output-file=nginx_redirects.conf
"""
verified_data = []
input_path = Path(input_dir)
if not input_path.exists():
click.echo(f"Error: Input directory '{input_dir}' does not exist")
return
for json_path in input_path.glob("*_paths.json"):
try:
with open(json_path) as f:
data = json.load(f)
verified_data.extend(data if isinstance(data, list) else [data])
except Exception as e:
click.echo(f"Error loading {json_path}: {e}")
continue
if not verified_data:
click.echo("No verified paths data found")
return
known_redirect_map = load_json_dict(known_redirects)
redirects, updated_redirect_map = create_redirects_and_update_map(
verified_data, known_redirect_map
)
if updated_redirect_map != known_redirect_map:
save_updated_redirects(known_redirects, updated_redirect_map)
merged_redirects = merge_version_patterns_optimized(redirects, known_redirect_map)
output_nginx_configuration(merged_redirects, output_file)

View File

@@ -0,0 +1,106 @@
import djclick as click
import json
from packaging.version import parse as parse_version
from versions.models import Version
def parse_version_range(version_range):
"""Parse version range string into start and end versions.
Formats supported:
- "1.81.0" - single version
- "1.79.0-1.81.0" - range from 1.79.0 to 1.81.0
- "1.79.0+" - from 1.79.0 onwards
"""
if not version_range:
return None, None
if "+" in version_range:
start_version = version_range.replace("+", "").strip()
return start_version, None
elif "-" in version_range and not version_range.startswith("boost-"):
parts = version_range.split("-", 1)
if len(parts) == 2 and "." in parts[0] and "." in parts[1]:
return parts[0].strip(), parts[1].strip()
return version_range.strip(), version_range.strip()
def filter_versions_by_range(queryset, version_filter: str):
"""Filter queryset by version range."""
if not version_filter:
return queryset
start_version, end_version = parse_version_range(version_filter)
if not start_version:
return queryset
matching_versions = []
for v in queryset:
if v.cleaned_version_parts_int:
try:
v_version = ".".join(map(str, v.cleaned_version_parts_int[:3]))
if parse_version(v_version) >= parse_version(start_version):
if end_version is None or parse_version(v_version) <= parse_version(
end_version
):
matching_versions.append(v.id)
except (ValueError, TypeError):
# Skip versions that can't be parsed
continue
return queryset.filter(id__in=matching_versions)
def generate_tarball_filename(version_obj) -> str:
"""Generate tarball filename from version object."""
# boost-1-73-0 -> 1.73.0/source/boost_1_73_0.tar.bz2
short_version = version_obj.slug.replace("boost-", "").replace("-", ".")
tarball_filename = (
short_version + "/source/" + version_obj.slug.replace("-", "_") + ".tar.bz2"
)
return tarball_filename
def extract_tarball_data(versions_list: list) -> list:
"""Extract tarball data from sorted versions list."""
return [
{
"version": version_obj.name,
"slug": version_obj.slug,
"tarball_filename": generate_tarball_filename(version_obj),
"version_parts": version_obj.cleaned_version_parts_int,
}
for version_obj in versions_list
]
@click.command()
@click.option(
"--version-filter",
help="Single version (e.g., '1.81.0') or range (e.g., '1.79.0-1.81.0', '1.79.0+')",
)
def command(version_filter: str):
"""Extract tarball URLs from active versions.
Queries active versions in the database and generates tarball URLs by converting
slug values (replacing - with _ and appending .tar.bz2).
Returns JSON with version information and tarball filenames.
Examples:
python manage.py list_tarballs --version-filter=1.81.0
python manage.py list_tarballs --version-filter=1.79.0-1.81.0
python manage.py list_tarballs --version-filter=1.79.0+
"""
queryset = (
Version.objects.minor_versions()
.filter(slug__isnull=False, active=True)
.exclude(slug="")
.exclude(name__in=["master", "develop"])
.order_by("-version_array", "-name")
)
queryset = filter_versions_by_range(queryset, version_filter)
urls = extract_tarball_data(queryset)
click.echo(json.dumps(urls, indent=2))

View File

60
versions/utils/common.py Normal file
View File

@@ -0,0 +1,60 @@
import json
import os
from pathlib import Path
from typing import Dict, List, Any
import djclick as click
def load_json_list(json_file: str) -> List[Dict[str, Any]]:
"""Load and validate JSON file expecting a list of objects."""
try:
with open(json_file, "r") as f:
data = json.load(f)
except FileNotFoundError:
click.echo(f"Error: JSON file '{json_file}' not found", err=True)
return []
except json.JSONDecodeError as e:
click.echo(f"Error: Invalid JSON in file '{json_file}': {e}", err=True)
return []
if not isinstance(data, list):
click.echo(
"Error: JSON file should contain an array of version objects", err=True
)
return []
return data
def load_json_dict(json_file: str) -> Dict[str, Any]:
"""Load and validate JSON file expecting a dictionary/object."""
try:
with open(json_file, "r") as f:
data = json.load(f)
except FileNotFoundError:
click.echo(f"Error: JSON file '{json_file}' not found", err=True)
return {}
except json.JSONDecodeError as e:
click.echo(f"Error: Invalid JSON in file '{json_file}': {e}", err=True)
return {}
if not isinstance(data, dict):
click.echo("Error: JSON file should contain a dictionary/object", err=True)
return {}
return data
def has_index_files(directory: Path) -> bool:
"""Check if directory contains index.html or index.htm files."""
index_files = ["index.html", "index.htm"]
return any((directory / index_file).exists() for index_file in index_files)
def get_version_directory_from_tarball(
version_data: Dict[str, Any], base_dir: str
) -> Path:
"""Get the directory path for a version by extracting from tarball filename."""
tarball_file = os.path.basename(version_data.get("tarball_filename", ""))
dir_name = os.path.splitext(os.path.splitext(tarball_file)[0])[0] # Remove .tar.bz2
return Path(base_dir) / dir_name