From 8a292a163ea2e643a8ac5d4cfada8a27076de630 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Mon, 15 May 2023 17:16:18 +1000 Subject: Add custom mirror support (#1816) Co-authored-by: Daniel Girtler --- archinstall/lib/global_menu.py | 34 ++- archinstall/lib/installer.py | 15 +- archinstall/lib/interactions/__init__.py | 2 +- archinstall/lib/interactions/general_conf.py | 37 +-- archinstall/lib/mirrors.py | 442 ++++++++++++++++++--------- 5 files changed, 329 insertions(+), 201 deletions(-) (limited to 'archinstall/lib') diff --git a/archinstall/lib/global_menu.py b/archinstall/lib/global_menu.py index 13595132..fc58a653 100644 --- a/archinstall/lib/global_menu.py +++ b/archinstall/lib/global_menu.py @@ -5,6 +5,7 @@ from typing import Any, List, Optional, Union, Dict, TYPE_CHECKING from . import disk from .general import secret from .menu import Selector, AbstractMenu +from .mirrors import MirrorConfiguration, MirrorMenu from .models import NetworkConfiguration from .models.bootloader import Bootloader from .models.users import User @@ -26,7 +27,6 @@ from .interactions import select_kernel from .interactions import select_language from .interactions import select_locale_enc from .interactions import select_locale_lang -from .interactions import select_mirror_regions from .interactions import ask_ntp from .interactions.disk_conf import select_disk_config @@ -51,12 +51,13 @@ class GlobalMenu(AbstractMenu): _('Keyboard layout'), lambda preset: select_language(preset), default='us') - self._menu_options['mirror-region'] = \ + self._menu_options['mirror_config'] = \ Selector( - _('Mirror region'), - lambda preset: select_mirror_regions(preset), - display_func=lambda x: list(x.keys()) if x else '[]', - default={}) + _('Mirrors'), + lambda preset: self._mirror_configuration(preset), + display_func=lambda x: str(_('Defined')) if x else '', + preview_func=self._prev_mirror_config + ) self._menu_options['sys-language'] = \ Selector( _('Locale language'), @@ -354,3 +355,24 @@ class GlobalMenu(AbstractMenu): def _create_user_account(self, defined_users: List[User]) -> List[User]: users = ask_for_additional_users(defined_users=defined_users) return users + + def _mirror_configuration(self, preset: Optional[MirrorConfiguration] = None) -> Optional[MirrorConfiguration]: + data_store: Dict[str, Any] = {} + mirror_configuration = MirrorMenu(data_store, preset=preset).run() + return mirror_configuration + + def _prev_mirror_config(self) -> Optional[str]: + selector = self._menu_options['mirror_config'] + + if selector.has_selection(): + mirror_config: MirrorConfiguration = selector.current_selection # type: ignore + output = '' + if mirror_config.regions: + output += '{}: {}\n\n'.format(str(_('Mirror regions')), mirror_config.regions) + if mirror_config.custom_mirrors: + table = FormattedOutput.as_table(mirror_config.custom_mirrors) + output += '{}\n{}'.format(str(_('Custom mirrors')), table) + + return output.strip() + + return None diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 3c427ab2..30442774 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -6,7 +6,7 @@ import shutil import subprocess import time from pathlib import Path -from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, Iterable +from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable from . import disk from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError @@ -14,7 +14,7 @@ from .general import SysCommand from .hardware import SysInfo from .locale import verify_keyboard_layout, verify_x11_keyboard_layout from .luks import Luks2 -from .mirrors import use_mirrors +from .mirrors import use_mirrors, MirrorConfiguration, add_custom_mirrors from .models.bootloader import Bootloader from .models.network_configuration import NetworkConfiguration from .models.users import User @@ -383,14 +383,17 @@ class Installer: raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") - def set_mirrors(self, mirrors: Dict[str, Iterable[str]]): + def set_mirrors(self, mirror_config: MirrorConfiguration): for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): - if result := plugin.on_mirrors(mirrors): - mirrors = result + if result := plugin.on_mirrors(mirror_config): + mirror_config = result destination = f'{self.target}/etc/pacman.d/mirrorlist' - use_mirrors(mirrors, destination=destination) + if mirror_config.mirror_regions: + use_mirrors(mirror_config.mirror_regions, destination) + if mirror_config.custom_mirrors: + add_custom_mirrors(mirror_config.custom_mirrors) def genfstab(self, flags :str = '-pU'): info(f"Updating {self.target}/etc/fstab") diff --git a/archinstall/lib/interactions/__init__.py b/archinstall/lib/interactions/__init__.py index b5691a10..158750cc 100644 --- a/archinstall/lib/interactions/__init__.py +++ b/archinstall/lib/interactions/__init__.py @@ -11,7 +11,7 @@ from .disk_conf import ( from .general_conf import ( ask_ntp, ask_hostname, ask_for_a_timezone, ask_for_audio_selection, select_language, - select_mirror_regions, select_archinstall_language, ask_additional_packages_to_install, + select_archinstall_language, ask_additional_packages_to_install, add_number_of_parrallel_downloads, select_additional_repositories ) diff --git a/archinstall/lib/interactions/general_conf.py b/archinstall/lib/interactions/general_conf.py index 5fcfa633..0338c61e 100644 --- a/archinstall/lib/interactions/general_conf.py +++ b/archinstall/lib/interactions/general_conf.py @@ -1,11 +1,10 @@ from __future__ import annotations import pathlib -from typing import List, Any, Optional, Dict, TYPE_CHECKING +from typing import List, Any, Optional, TYPE_CHECKING from ..locale import list_keyboard_languages, list_timezones from ..menu import MenuSelectionType, Menu, TextInput -from ..mirrors import list_mirrors from ..output import warn from ..packages.packages import validate_package_list from ..storage import storage @@ -96,40 +95,6 @@ def select_language(preset: Optional[str] = None) -> Optional[str]: return None -def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: - """ - Asks the user to select a mirror or region - Usually this is combined with :ref:`archinstall.list_mirrors`. - - :return: The dictionary information about a mirror/region. - :rtype: dict - """ - if preset_values is None: - preselected = None - else: - preselected = list(preset_values.keys()) - - mirrors = list_mirrors() - - choice = Menu( - _('Select one of the regions to download packages from'), - list(mirrors.keys()), - preset_values=preselected, - multi=True, - allow_reset=True - ).run() - - match choice.type_: - case MenuSelectionType.Reset: - return {} - case MenuSelectionType.Skip: - return preset_values - case MenuSelectionType.Selection: - return {selected: mirrors[selected] for selected in choice.multi_value} - - return {} - - def select_archinstall_language(languages: List[Language], preset: Language) -> Language: # these are the displayed language names which can either be # the english name of a language or, if present, the diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index 62a0b081..521a8e5b 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,99 +1,279 @@ import pathlib -import urllib.error -import urllib.request -from typing import Union, Iterable, Dict, Any, List -from dataclasses import dataclass - -from .general import SysCommand -from .output import info, warn -from .exceptions import SysCallError +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, Any, List, Optional, TYPE_CHECKING + +from .menu import AbstractSubMenu, Selector, MenuSelectionType, Menu, ListManager, TextInput +from .networking import fetch_data_from_url +from .output import info, warn, FormattedOutput from .storage import storage +if TYPE_CHECKING: + _: Any -@dataclass -class CustomMirror: - url: str - signcheck: str - signoptions: str - name: str +class SignCheck(Enum): + Never = 'Never' + Optional = 'Optional' + Required = 'Required' -def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes: - """ - This function can sort /etc/pacman.d/mirrorlist according to the - mirror's URL prefix. By default places HTTPS before HTTP but it also - preserves the country/rank-order. - This assumes /etc/pacman.d/mirrorlist looks like the following: +class SignOption(Enum): + TrustedOnly = 'TrustedOnly' + TrustAll = 'TrustAll' - ## Comment - Server = url - or +@dataclass +class CustomMirror: + name: str + url: str + sign_check: SignCheck + sign_option: SignOption + + def as_json(self) -> Dict[str, str]: + return { + 'Name': self.name, + 'Url': self.url, + 'Sign check': self.sign_check.value, + 'Sign options': self.sign_option.value + } + + def json(self) -> Dict[str, str]: + return { + 'name': self.name, + 'url': self.url, + 'sign_check': self.sign_check.value, + 'sign_option': self.sign_option.value + } + + @classmethod + def parse_args(cls, args: List[Dict[str, str]]) -> List['CustomMirror']: + configs = [] + for arg in args: + configs.append( + CustomMirror( + arg['name'], + arg['url'], + SignCheck(arg['sign_check']), + SignOption(arg['sign_option']) + ) + ) + + return configs - ## Comment - #Server = url - But the Comments need to start with double-hashmarks to be distringuished - from server url definitions (commented or uncommented). - """ - comments_and_whitespaces = b"" - sort_order += ['Unknown'] - categories: Dict[str, List] = {key: [] for key in sort_order} - - for line in raw_data.split(b"\n"): - if line[0:2] in (b'##', b''): - comments_and_whitespaces += line + b'\n' - elif line[:6].lower() == b'server' or line[:7].lower() == b'#server': - opening, url = line.split(b'=', 1) - opening, url = opening.strip(), url.strip() - if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories: - categories[category].append(comments_and_whitespaces) - categories[category].append(opening + b' = ' + url + b'\n') - else: - categories["Unknown"].append(comments_and_whitespaces) - categories["Unknown"].append(opening + b' = ' + url + b'\n') - - comments_and_whitespaces = b"" - - new_raw_data = b'' - for category in sort_order + ["Unknown"]: - for line in categories[category]: - new_raw_data += line - - return new_raw_data - - -def filter_mirrors_by_region(regions :str, - destination :str = '/etc/pacman.d/mirrorlist', - sort_order :List[str] = ["https", "http"], - *args :str, - **kwargs :str -) -> Union[bool, bytes]: +@dataclass +class MirrorConfiguration: + mirror_regions: Dict[str, List[str]] = field(default_factory=dict) + custom_mirrors: List[CustomMirror] = field(default_factory=list) + + @property + def regions(self) -> str: + return ', '.join(self.mirror_regions.keys()) + + def json(self) -> Dict[str, Any]: + return { + 'mirror_regions': self.mirror_regions, + 'custom_mirrors': [c.json() for c in self.custom_mirrors] + } + + @classmethod + def parse_args(cls, args: Dict[str, Any]) -> 'MirrorConfiguration': + config = MirrorConfiguration() + + if 'mirror_regions' in args: + config.mirror_regions = args['mirror_regions'] + + if 'custom_mirrors' in args: + config.custom_mirrors = CustomMirror.parse_args(args['custom_mirrors']) + + return config + + +class CustomMirrorList(ListManager): + def __init__(self, prompt: str, custom_mirrors: List[CustomMirror]): + self._actions = [ + str(_('Add a custom mirror')), + str(_('Change custom mirror')), + str(_('Delete custom mirror')) + ] + super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:]) + + def reformat(self, data: List[CustomMirror]) -> Dict[str, Any]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') + + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[CustomMirror]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, user in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = user + + return display_data + + def selected_action_display(self, mirror: CustomMirror) -> str: + return mirror.name + + def handle_action( + self, + action: str, + entry: Optional[CustomMirror], + data: List[CustomMirror] + ) -> List[CustomMirror]: + if action == self._actions[0]: # add + new_mirror = self._add_custom_mirror() + if new_mirror is not None: + data = [d for d in data if d.name != new_mirror.name] + data += [new_mirror] + elif action == self._actions[1] and entry: # modify mirror + new_mirror = self._add_custom_mirror(entry) + if new_mirror is not None: + data = [d for d in data if d.name != entry.name] + data += [new_mirror] + elif action == self._actions[2] and entry: # delete + data = [d for d in data if d != entry] + + return data + + def _add_custom_mirror(self, mirror: Optional[CustomMirror] = None) -> Optional[CustomMirror]: + prompt = '\n\n' + str(_('Enter name (leave blank to skip): ')) + existing_name = mirror.name if mirror else '' + + while True: + name = TextInput(prompt, existing_name).run() + if not name: + return mirror + break + + prompt = '\n' + str(_('Enter url (leave blank to skip): ')) + existing_url = mirror.url if mirror else '' + + while True: + url = TextInput(prompt, existing_url).run() + if not url: + return mirror + break + + sign_check_choice = Menu( + str(_('Select signature check option')), + [s.value for s in SignCheck], + skip=False, + clear_screen=False, + preset_values=mirror.sign_check.value if mirror else None + ).run() + + sign_option_choice = Menu( + str(_('Select signature option')), + [s.value for s in SignOption], + skip=False, + clear_screen=False, + preset_values=mirror.sign_option.value if mirror else None + ).run() + + return CustomMirror( + name, + url, + SignCheck(sign_check_choice.single_value), + SignOption(sign_option_choice.single_value) + ) + + +class MirrorMenu(AbstractSubMenu): + def __init__( + self, + data_store: Dict[str, Any], + preset: Optional[MirrorConfiguration] = None + ): + if preset: + self._preset = preset + else: + self._preset = MirrorConfiguration() + + super().__init__(data_store=data_store) + + def setup_selection_menu_options(self): + self._menu_options['mirror_regions'] = \ + Selector( + _('Mirror region'), + lambda preset: select_mirror_regions(preset), + display_func=lambda x: ', '.join(x.keys()) if x else '', + default=self._preset.mirror_regions, + enabled=True) + self._menu_options['custom_mirrors'] = \ + Selector( + _('Custom mirrors'), + lambda preset: select_custom_mirror(preset=preset), + display_func=lambda x: str(_('Defined')) if x else '', + preview_func=self._prev_custom_mirror, + default=self._preset.custom_mirrors, + enabled=True + ) + + def _prev_custom_mirror(self) -> Optional[str]: + selector = self._menu_options['custom_mirrors'] + + if selector.has_selection(): + custom_mirrors: List[CustomMirror] = selector.current_selection # type: ignore + output = FormattedOutput.as_table(custom_mirrors) + return output.strip() + + return None + + def run(self, allow_reset: bool = True) -> Optional[MirrorConfiguration]: + super().run(allow_reset=allow_reset) + + if self._data_store.get('mirror_regions', None) or self._data_store.get('custom_mirrors', None): + return MirrorConfiguration( + mirror_regions=self._data_store['mirror_regions'], + custom_mirrors=self._data_store['custom_mirrors'], + ) + + return None + + +def select_mirror_regions(preset_values: Dict[str, List[str]] = {}) -> Dict[str, List[str]]: """ - This function will change the active mirrors on the live medium by - filtering which regions are active based on `regions`. + Asks the user to select a mirror or region + Usually this is combined with :ref:`archinstall.list_mirrors`. - :param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States. - :type regions: str + :return: The dictionary information about a mirror/region. + :rtype: dict """ - region_list = [f'country={region}' for region in regions.split(',')] - response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'})) - new_list = response.read().replace(b"#Server", b"Server") + if preset_values is None: + preselected = None + else: + preselected = list(preset_values.keys()) - if sort_order: - new_list = sort_mirrorlist(new_list, sort_order=sort_order) + mirrors = list_mirrors() - if destination: - with open(destination, "wb") as mirrorlist: - mirrorlist.write(new_list) + choice = Menu( + _('Select one of the regions to download packages from'), + list(mirrors.keys()), + preset_values=preselected, + multi=True, + allow_reset=True + ).run() - return True - else: - return new_list.decode('UTF-8') + match choice.type_: + case MenuSelectionType.Reset: + return {} + case MenuSelectionType.Skip: + return preset_values + case MenuSelectionType.Selection: + return {selected: mirrors[selected] for selected in choice.multi_value} + + return {} -def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool: +def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []): + custom_mirrors = CustomMirrorList(prompt, preset).run() + return custom_mirrors + + +def add_custom_mirrors(mirrors: List[CustomMirror]): """ This will append custom mirror definitions in pacman.conf @@ -102,99 +282,57 @@ def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool: """ with open('/etc/pacman.conf', 'a') as pacman: for mirror in mirrors: - pacman.write(f"[{mirror.name}]\n") - pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n") + pacman.write(f"\n\n[{mirror.name}]\n") + pacman.write(f"SigLevel = {mirror.sign_check.value} {mirror.sign_option.value}\n") pacman.write(f"Server = {mirror.url}\n") - return True - - -def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool: - """ - This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`. - It will not flush any other mirrors, just insert new ones. - - :param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}` - :type mirrors: dict - """ - original_mirrorlist = '' - with open('/etc/pacman.d/mirrorlist', 'r') as original: - original_mirrorlist = original.read() - - with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist: - for mirror, country in mirrors.items(): - new_mirrorlist.write(f'## {country}\n') - new_mirrorlist.write(f'Server = {mirror}\n') - new_mirrorlist.write('\n') - new_mirrorlist.write(original_mirrorlist) - - return True - def use_mirrors( - regions: Dict[str, Iterable[str]], + regions: Dict[str, List[str]], destination: str = '/etc/pacman.d/mirrorlist' ): - info(f'A new package mirror-list has been created: {destination}') - with open(destination, 'w') as mirrorlist: + with open(destination, 'w') as fp: for region, mirrors in regions.items(): for mirror in mirrors: - mirrorlist.write(f'## {region}\n') - mirrorlist.write(f'Server = {mirror}\n') + fp.write(f'## {region}\n') + fp.write(f'Server = {mirror}\n') + + info(f'A new package mirror-list has been created: {destination}') + +def _parse_mirror_list(mirrorlist: str) -> Dict[str, List[str]]: + file_content = mirrorlist.split('\n') + file_content = list(filter(lambda x: x, file_content)) # filter out empty lines + first_srv_idx = [idx for idx, line in enumerate(file_content) if 'server' in line.lower()][0] + mirrors = file_content[first_srv_idx - 1:] -def re_rank_mirrors( - top: int = 10, - src: str = '/etc/pacman.d/mirrorlist', - dst: str = '/etc/pacman.d/mirrorlist', -) -> bool: - try: - cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}") - except SysCallError: - return False - with open(dst, 'w') as f: - f.write(str(cmd)) - return True + mirror_list: Dict[str, List[str]] = {} + for idx in range(0, len(mirrors), 2): + region = mirrors[idx].removeprefix('## ') + url = mirrors[idx + 1].removeprefix('#').removeprefix('Server = ') + mirror_list.setdefault(region, []).append(url) -def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: - regions: Dict[str, Dict[str, Any]] = {} + return mirror_list + + +def list_mirrors() -> Dict[str, List[str]]: + regions: Dict[str, List[str]] = {} if storage['arguments']['offline']: - with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh: - mirrorlist = fh.read() + with pathlib.Path('/etc/pacman.d/mirrorlist').open('r') as fp: + mirrorlist = fp.read() else: url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" - try: - response = urllib.request.urlopen(url) - except urllib.error.URLError as err: + mirrorlist = fetch_data_from_url(url) + except ValueError as err: warn(f'Could not fetch an active mirror-list: {err}') return regions - mirrorlist = response.read() - - if sort_order: - mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order) - - region = 'Unknown region' - for line in mirrorlist.split(b'\n'): - if len(line.strip()) == 0: - continue - - clean_line = line.decode('UTF-8').strip('\n').strip('\r') - - if clean_line[:3] == '## ': - region = clean_line[3:] - elif clean_line[:10] == '#Server = ': - regions.setdefault(region, {}) - - url = clean_line.lstrip('#Server = ') - regions[region][url] = True - elif clean_line.startswith('Server = '): - regions.setdefault(region, {}) - - url = clean_line.lstrip('Server = ') - regions[region][url] = True + regions = _parse_mirror_list(mirrorlist) + sorted_regions = {} + for region, urls in regions.items(): + sorted_regions[region] = sorted(urls, reverse=True) - return regions + return sorted_regions -- cgit v1.2.3-70-g09d2