mirror of
https://github.com/mozilla/gecko-dev.git
synced 2025-11-07 11:48:19 +02:00
While nothing explicitly prompted updating this, letting `pip` and `pip-tools` diverage too greatly in version release dates seems like a bad idea, especially with the various deprecations in `pip` and `python3.12`. This also vendors the implicit dependencies `build`, `tomli`, and `pyproject_hooks`. Differential Revision: https://phabricator.services.mozilla.com/D210526
322 lines
11 KiB
Python
322 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import io
|
|
import os
|
|
import re
|
|
import sys
|
|
from itertools import chain
|
|
from typing import BinaryIO, Iterable, Iterator, cast
|
|
|
|
from click import unstyle
|
|
from click.core import Context
|
|
from pip._internal.models.format_control import FormatControl
|
|
from pip._internal.req.req_install import InstallRequirement
|
|
from pip._vendor.packaging.markers import Marker
|
|
from pip._vendor.packaging.utils import canonicalize_name
|
|
|
|
from .logging import log
|
|
from .utils import (
|
|
comment,
|
|
dedup,
|
|
format_requirement,
|
|
get_compile_command,
|
|
key_from_ireq,
|
|
strip_extras,
|
|
)
|
|
|
|
MESSAGE_UNHASHED_PACKAGE = comment(
|
|
"# WARNING: pip install will require the following package to be hashed."
|
|
"\n# Consider using a hashable URL like "
|
|
"https://github.com/jazzband/pip-tools/archive/SOMECOMMIT.zip"
|
|
)
|
|
|
|
MESSAGE_UNSAFE_PACKAGES_UNPINNED = comment(
|
|
"# WARNING: The following packages were not pinned, but pip requires them to be"
|
|
"\n# pinned when the requirements file includes hashes and the requirement is not"
|
|
"\n# satisfied by a package already installed. "
|
|
"Consider using the --allow-unsafe flag."
|
|
)
|
|
|
|
MESSAGE_UNSAFE_PACKAGES = comment(
|
|
"# The following packages are considered to be unsafe in a requirements file:"
|
|
)
|
|
|
|
MESSAGE_UNINSTALLABLE = (
|
|
"The generated requirements file may be rejected by pip install. "
|
|
"See # WARNING lines for details."
|
|
)
|
|
|
|
|
|
strip_comes_from_line_re = re.compile(r" \(line \d+\)$")
|
|
|
|
|
|
def _comes_from_as_string(comes_from: str | InstallRequirement) -> str:
|
|
if isinstance(comes_from, str):
|
|
return strip_comes_from_line_re.sub("", comes_from)
|
|
return cast(str, canonicalize_name(key_from_ireq(comes_from)))
|
|
|
|
|
|
def annotation_style_split(required_by: set[str]) -> str:
|
|
sorted_required_by = sorted(required_by)
|
|
if len(sorted_required_by) == 1:
|
|
source = sorted_required_by[0]
|
|
annotation = "# via " + source
|
|
else:
|
|
annotation_lines = ["# via"]
|
|
for source in sorted_required_by:
|
|
annotation_lines.append(" # " + source)
|
|
annotation = "\n".join(annotation_lines)
|
|
return annotation
|
|
|
|
|
|
def annotation_style_line(required_by: set[str]) -> str:
|
|
return f"# via {', '.join(sorted(required_by))}"
|
|
|
|
|
|
class OutputWriter:
|
|
def __init__(
|
|
self,
|
|
dst_file: BinaryIO,
|
|
click_ctx: Context,
|
|
dry_run: bool,
|
|
emit_header: bool,
|
|
emit_index_url: bool,
|
|
emit_trusted_host: bool,
|
|
annotate: bool,
|
|
annotation_style: str,
|
|
strip_extras: bool,
|
|
generate_hashes: bool,
|
|
default_index_url: str,
|
|
index_urls: Iterable[str],
|
|
trusted_hosts: Iterable[str],
|
|
format_control: FormatControl,
|
|
linesep: str,
|
|
allow_unsafe: bool,
|
|
find_links: list[str],
|
|
emit_find_links: bool,
|
|
emit_options: bool,
|
|
) -> None:
|
|
self.dst_file = dst_file
|
|
self.click_ctx = click_ctx
|
|
self.dry_run = dry_run
|
|
self.emit_header = emit_header
|
|
self.emit_index_url = emit_index_url
|
|
self.emit_trusted_host = emit_trusted_host
|
|
self.annotate = annotate
|
|
self.annotation_style = annotation_style
|
|
self.strip_extras = strip_extras
|
|
self.generate_hashes = generate_hashes
|
|
self.default_index_url = default_index_url
|
|
self.index_urls = index_urls
|
|
self.trusted_hosts = trusted_hosts
|
|
self.format_control = format_control
|
|
self.linesep = linesep
|
|
self.allow_unsafe = allow_unsafe
|
|
self.find_links = find_links
|
|
self.emit_find_links = emit_find_links
|
|
self.emit_options = emit_options
|
|
|
|
def _sort_key(self, ireq: InstallRequirement) -> tuple[bool, str]:
|
|
return (not ireq.editable, key_from_ireq(ireq))
|
|
|
|
def write_header(self) -> Iterator[str]:
|
|
if self.emit_header:
|
|
yield comment("#")
|
|
yield comment(
|
|
"# This file is autogenerated by pip-compile with Python "
|
|
f"{sys.version_info.major}.{sys.version_info.minor}"
|
|
)
|
|
yield comment("# by the following command:")
|
|
yield comment("#")
|
|
compile_command = os.environ.get(
|
|
"CUSTOM_COMPILE_COMMAND"
|
|
) or get_compile_command(self.click_ctx)
|
|
yield comment(f"# {compile_command}")
|
|
yield comment("#")
|
|
|
|
def write_index_options(self) -> Iterator[str]:
|
|
if self.emit_index_url:
|
|
for index, index_url in enumerate(dedup(self.index_urls)):
|
|
if index == 0 and index_url.rstrip("/") == self.default_index_url:
|
|
continue
|
|
flag = "--index-url" if index == 0 else "--extra-index-url"
|
|
yield f"{flag} {index_url}"
|
|
|
|
def write_trusted_hosts(self) -> Iterator[str]:
|
|
if self.emit_trusted_host:
|
|
for trusted_host in dedup(self.trusted_hosts):
|
|
yield f"--trusted-host {trusted_host}"
|
|
|
|
def write_format_controls(self) -> Iterator[str]:
|
|
for nb in dedup(sorted(self.format_control.no_binary)):
|
|
yield f"--no-binary {nb}"
|
|
for ob in dedup(sorted(self.format_control.only_binary)):
|
|
yield f"--only-binary {ob}"
|
|
|
|
def write_find_links(self) -> Iterator[str]:
|
|
if self.emit_find_links:
|
|
for find_link in dedup(self.find_links):
|
|
yield f"--find-links {find_link}"
|
|
|
|
def write_flags(self) -> Iterator[str]:
|
|
if not self.emit_options:
|
|
return
|
|
emitted = False
|
|
for line in chain(
|
|
self.write_index_options(),
|
|
self.write_find_links(),
|
|
self.write_trusted_hosts(),
|
|
self.write_format_controls(),
|
|
):
|
|
emitted = True
|
|
yield line
|
|
if emitted:
|
|
yield ""
|
|
|
|
def _iter_lines(
|
|
self,
|
|
results: set[InstallRequirement],
|
|
unsafe_requirements: set[InstallRequirement],
|
|
unsafe_packages: set[str],
|
|
markers: dict[str, Marker],
|
|
hashes: dict[InstallRequirement, set[str]] | None = None,
|
|
) -> Iterator[str]:
|
|
# default values
|
|
unsafe_packages = unsafe_packages if self.allow_unsafe else set()
|
|
hashes = hashes or {}
|
|
|
|
# Check for unhashed or unpinned packages if at least one package does have
|
|
# hashes, which will trigger pip install's --require-hashes mode.
|
|
warn_uninstallable = False
|
|
has_hashes = hashes and any(hash for hash in hashes.values())
|
|
|
|
yielded = False
|
|
|
|
for line in self.write_header():
|
|
yield line
|
|
yielded = True
|
|
for line in self.write_flags():
|
|
yield line
|
|
yielded = True
|
|
|
|
unsafe_requirements = unsafe_requirements or {
|
|
r for r in results if r.name in unsafe_packages
|
|
}
|
|
packages = {r for r in results if r.name not in unsafe_packages}
|
|
|
|
if packages:
|
|
for ireq in sorted(packages, key=self._sort_key):
|
|
if has_hashes and not hashes.get(ireq):
|
|
yield MESSAGE_UNHASHED_PACKAGE
|
|
warn_uninstallable = True
|
|
line = self._format_requirement(
|
|
ireq, markers.get(key_from_ireq(ireq)), hashes=hashes
|
|
)
|
|
yield line
|
|
yielded = True
|
|
|
|
if unsafe_requirements:
|
|
yield ""
|
|
yielded = True
|
|
if has_hashes and not self.allow_unsafe:
|
|
yield MESSAGE_UNSAFE_PACKAGES_UNPINNED
|
|
warn_uninstallable = True
|
|
else:
|
|
yield MESSAGE_UNSAFE_PACKAGES
|
|
|
|
for ireq in sorted(unsafe_requirements, key=self._sort_key):
|
|
ireq_key = key_from_ireq(ireq)
|
|
if not self.allow_unsafe:
|
|
yield comment(f"# {ireq_key}")
|
|
else:
|
|
line = self._format_requirement(
|
|
ireq, marker=markers.get(ireq_key), hashes=hashes
|
|
)
|
|
yield line
|
|
|
|
# Yield even when there's no real content, so that blank files are written
|
|
if not yielded:
|
|
yield ""
|
|
|
|
if warn_uninstallable:
|
|
log.warning(MESSAGE_UNINSTALLABLE)
|
|
|
|
def write(
|
|
self,
|
|
results: set[InstallRequirement],
|
|
unsafe_requirements: set[InstallRequirement],
|
|
unsafe_packages: set[str],
|
|
markers: dict[str, Marker],
|
|
hashes: dict[InstallRequirement, set[str]] | None,
|
|
) -> None:
|
|
if not self.dry_run:
|
|
dst_file = io.TextIOWrapper(
|
|
self.dst_file,
|
|
encoding="utf8",
|
|
newline=self.linesep,
|
|
line_buffering=True,
|
|
)
|
|
try:
|
|
for line in self._iter_lines(
|
|
results, unsafe_requirements, unsafe_packages, markers, hashes
|
|
):
|
|
if self.dry_run:
|
|
# Bypass the log level to always print this during a dry run
|
|
log.log(line)
|
|
else:
|
|
log.info(line)
|
|
dst_file.write(unstyle(line))
|
|
dst_file.write("\n")
|
|
finally:
|
|
if not self.dry_run:
|
|
dst_file.detach()
|
|
|
|
def _format_requirement(
|
|
self,
|
|
ireq: InstallRequirement,
|
|
marker: Marker | None = None,
|
|
hashes: dict[InstallRequirement, set[str]] | None = None,
|
|
) -> str:
|
|
ireq_hashes = (hashes if hashes is not None else {}).get(ireq)
|
|
|
|
line = format_requirement(ireq, marker=marker, hashes=ireq_hashes)
|
|
if self.strip_extras:
|
|
line = strip_extras(line)
|
|
|
|
if not self.annotate:
|
|
return line
|
|
|
|
# Annotate what packages or reqs-ins this package is required by
|
|
required_by = set()
|
|
if hasattr(ireq, "_source_ireqs"):
|
|
required_by |= {
|
|
_comes_from_as_string(src_ireq.comes_from)
|
|
for src_ireq in ireq._source_ireqs
|
|
if src_ireq.comes_from
|
|
}
|
|
|
|
# Filter out the origin install requirements for extras.
|
|
# See https://github.com/jazzband/pip-tools/issues/2003
|
|
if ireq.comes_from and (
|
|
isinstance(ireq.comes_from, str) or ireq.comes_from.name != ireq.name
|
|
):
|
|
required_by.add(_comes_from_as_string(ireq.comes_from))
|
|
|
|
required_by |= set(getattr(ireq, "_required_by", set()))
|
|
|
|
if required_by:
|
|
if self.annotation_style == "split":
|
|
annotation = annotation_style_split(required_by)
|
|
sep = "\n "
|
|
elif self.annotation_style == "line":
|
|
annotation = annotation_style_line(required_by)
|
|
sep = "\n " if ireq_hashes else " "
|
|
else: # pragma: no cover
|
|
raise ValueError("Invalid value for annotation style")
|
|
if self.strip_extras:
|
|
annotation = strip_extras(annotation)
|
|
# 24 is one reasonable column size to use here, that we've used in the past
|
|
lines = f"{line:24}{sep}{comment(annotation)}".splitlines()
|
|
line = "\n".join(ln.rstrip() for ln in lines)
|
|
|
|
return line
|