From 8a292a163ea2e643a8ac5d4cfada8a27076de630 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Mon, 15 May 2023 17:16:18 +1000 Subject: Add custom mirror support (#1816) Co-authored-by: Daniel Girtler --- archinstall/lib/mirrors.py | 442 +++++++++++++++++++++++++++++---------------- 1 file changed, 290 insertions(+), 152 deletions(-) (limited to 'archinstall/lib/mirrors.py') diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index 62a0b081..521a8e5b 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,99 +1,279 @@ import pathlib -import urllib.error -import urllib.request -from typing import Union, Iterable, Dict, Any, List -from dataclasses import dataclass - -from .general import SysCommand -from .output import info, warn -from .exceptions import SysCallError +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, Any, List, Optional, TYPE_CHECKING + +from .menu import AbstractSubMenu, Selector, MenuSelectionType, Menu, ListManager, TextInput +from .networking import fetch_data_from_url +from .output import info, warn, FormattedOutput from .storage import storage +if TYPE_CHECKING: + _: Any -@dataclass -class CustomMirror: - url: str - signcheck: str - signoptions: str - name: str +class SignCheck(Enum): + Never = 'Never' + Optional = 'Optional' + Required = 'Required' -def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes: - """ - This function can sort /etc/pacman.d/mirrorlist according to the - mirror's URL prefix. By default places HTTPS before HTTP but it also - preserves the country/rank-order. - This assumes /etc/pacman.d/mirrorlist looks like the following: +class SignOption(Enum): + TrustedOnly = 'TrustedOnly' + TrustAll = 'TrustAll' - ## Comment - Server = url - or +@dataclass +class CustomMirror: + name: str + url: str + sign_check: SignCheck + sign_option: SignOption + + def as_json(self) -> Dict[str, str]: + return { + 'Name': self.name, + 'Url': self.url, + 'Sign check': self.sign_check.value, + 'Sign options': self.sign_option.value + } + + def json(self) -> Dict[str, str]: + return { + 'name': self.name, + 'url': self.url, + 'sign_check': self.sign_check.value, + 'sign_option': self.sign_option.value + } + + @classmethod + def parse_args(cls, args: List[Dict[str, str]]) -> List['CustomMirror']: + configs = [] + for arg in args: + configs.append( + CustomMirror( + arg['name'], + arg['url'], + SignCheck(arg['sign_check']), + SignOption(arg['sign_option']) + ) + ) + + return configs - ## Comment - #Server = url - But the Comments need to start with double-hashmarks to be distringuished - from server url definitions (commented or uncommented). - """ - comments_and_whitespaces = b"" - sort_order += ['Unknown'] - categories: Dict[str, List] = {key: [] for key in sort_order} - - for line in raw_data.split(b"\n"): - if line[0:2] in (b'##', b''): - comments_and_whitespaces += line + b'\n' - elif line[:6].lower() == b'server' or line[:7].lower() == b'#server': - opening, url = line.split(b'=', 1) - opening, url = opening.strip(), url.strip() - if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories: - categories[category].append(comments_and_whitespaces) - categories[category].append(opening + b' = ' + url + b'\n') - else: - categories["Unknown"].append(comments_and_whitespaces) - categories["Unknown"].append(opening + b' = ' + url + b'\n') - - comments_and_whitespaces = b"" - - new_raw_data = b'' - for category in sort_order + ["Unknown"]: - for line in categories[category]: - new_raw_data += line - - return new_raw_data - - -def filter_mirrors_by_region(regions :str, - destination :str = '/etc/pacman.d/mirrorlist', - sort_order :List[str] = ["https", "http"], - *args :str, - **kwargs :str -) -> Union[bool, bytes]: +@dataclass +class MirrorConfiguration: + mirror_regions: Dict[str, List[str]] = field(default_factory=dict) + custom_mirrors: List[CustomMirror] = field(default_factory=list) + + @property + def regions(self) -> str: + return ', '.join(self.mirror_regions.keys()) + + def json(self) -> Dict[str, Any]: + return { + 'mirror_regions': self.mirror_regions, + 'custom_mirrors': [c.json() for c in self.custom_mirrors] + } + + @classmethod + def parse_args(cls, args: Dict[str, Any]) -> 'MirrorConfiguration': + config = MirrorConfiguration() + + if 'mirror_regions' in args: + config.mirror_regions = args['mirror_regions'] + + if 'custom_mirrors' in args: + config.custom_mirrors = CustomMirror.parse_args(args['custom_mirrors']) + + return config + + +class CustomMirrorList(ListManager): + def __init__(self, prompt: str, custom_mirrors: List[CustomMirror]): + self._actions = [ + str(_('Add a custom mirror')), + str(_('Change custom mirror')), + str(_('Delete custom mirror')) + ] + super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:]) + + def reformat(self, data: List[CustomMirror]) -> Dict[str, Any]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') + + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[CustomMirror]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, user in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = user + + return display_data + + def selected_action_display(self, mirror: CustomMirror) -> str: + return mirror.name + + def handle_action( + self, + action: str, + entry: Optional[CustomMirror], + data: List[CustomMirror] + ) -> List[CustomMirror]: + if action == self._actions[0]: # add + new_mirror = self._add_custom_mirror() + if new_mirror is not None: + data = [d for d in data if d.name != new_mirror.name] + data += [new_mirror] + elif action == self._actions[1] and entry: # modify mirror + new_mirror = self._add_custom_mirror(entry) + if new_mirror is not None: + data = [d for d in data if d.name != entry.name] + data += [new_mirror] + elif action == self._actions[2] and entry: # delete + data = [d for d in data if d != entry] + + return data + + def _add_custom_mirror(self, mirror: Optional[CustomMirror] = None) -> Optional[CustomMirror]: + prompt = '\n\n' + str(_('Enter name (leave blank to skip): ')) + existing_name = mirror.name if mirror else '' + + while True: + name = TextInput(prompt, existing_name).run() + if not name: + return mirror + break + + prompt = '\n' + str(_('Enter url (leave blank to skip): ')) + existing_url = mirror.url if mirror else '' + + while True: + url = TextInput(prompt, existing_url).run() + if not url: + return mirror + break + + sign_check_choice = Menu( + str(_('Select signature check option')), + [s.value for s in SignCheck], + skip=False, + clear_screen=False, + preset_values=mirror.sign_check.value if mirror else None + ).run() + + sign_option_choice = Menu( + str(_('Select signature option')), + [s.value for s in SignOption], + skip=False, + clear_screen=False, + preset_values=mirror.sign_option.value if mirror else None + ).run() + + return CustomMirror( + name, + url, + SignCheck(sign_check_choice.single_value), + SignOption(sign_option_choice.single_value) + ) + + +class MirrorMenu(AbstractSubMenu): + def __init__( + self, + data_store: Dict[str, Any], + preset: Optional[MirrorConfiguration] = None + ): + if preset: + self._preset = preset + else: + self._preset = MirrorConfiguration() + + super().__init__(data_store=data_store) + + def setup_selection_menu_options(self): + self._menu_options['mirror_regions'] = \ + Selector( + _('Mirror region'), + lambda preset: select_mirror_regions(preset), + display_func=lambda x: ', '.join(x.keys()) if x else '', + default=self._preset.mirror_regions, + enabled=True) + self._menu_options['custom_mirrors'] = \ + Selector( + _('Custom mirrors'), + lambda preset: select_custom_mirror(preset=preset), + display_func=lambda x: str(_('Defined')) if x else '', + preview_func=self._prev_custom_mirror, + default=self._preset.custom_mirrors, + enabled=True + ) + + def _prev_custom_mirror(self) -> Optional[str]: + selector = self._menu_options['custom_mirrors'] + + if selector.has_selection(): + custom_mirrors: List[CustomMirror] = selector.current_selection # type: ignore + output = FormattedOutput.as_table(custom_mirrors) + return output.strip() + + return None + + def run(self, allow_reset: bool = True) -> Optional[MirrorConfiguration]: + super().run(allow_reset=allow_reset) + + if self._data_store.get('mirror_regions', None) or self._data_store.get('custom_mirrors', None): + return MirrorConfiguration( + mirror_regions=self._data_store['mirror_regions'], + custom_mirrors=self._data_store['custom_mirrors'], + ) + + return None + + +def select_mirror_regions(preset_values: Dict[str, List[str]] = {}) -> Dict[str, List[str]]: """ - This function will change the active mirrors on the live medium by - filtering which regions are active based on `regions`. + Asks the user to select a mirror or region + Usually this is combined with :ref:`archinstall.list_mirrors`. - :param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States. - :type regions: str + :return: The dictionary information about a mirror/region. + :rtype: dict """ - region_list = [f'country={region}' for region in regions.split(',')] - response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'})) - new_list = response.read().replace(b"#Server", b"Server") + if preset_values is None: + preselected = None + else: + preselected = list(preset_values.keys()) - if sort_order: - new_list = sort_mirrorlist(new_list, sort_order=sort_order) + mirrors = list_mirrors() - if destination: - with open(destination, "wb") as mirrorlist: - mirrorlist.write(new_list) + choice = Menu( + _('Select one of the regions to download packages from'), + list(mirrors.keys()), + preset_values=preselected, + multi=True, + allow_reset=True + ).run() - return True - else: - return new_list.decode('UTF-8') + match choice.type_: + case MenuSelectionType.Reset: + return {} + case MenuSelectionType.Skip: + return preset_values + case MenuSelectionType.Selection: + return {selected: mirrors[selected] for selected in choice.multi_value} + + return {} -def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool: +def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []): + custom_mirrors = CustomMirrorList(prompt, preset).run() + return custom_mirrors + + +def add_custom_mirrors(mirrors: List[CustomMirror]): """ This will append custom mirror definitions in pacman.conf @@ -102,99 +282,57 @@ def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool: """ with open('/etc/pacman.conf', 'a') as pacman: for mirror in mirrors: - pacman.write(f"[{mirror.name}]\n") - pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n") + pacman.write(f"\n\n[{mirror.name}]\n") + pacman.write(f"SigLevel = {mirror.sign_check.value} {mirror.sign_option.value}\n") pacman.write(f"Server = {mirror.url}\n") - return True - - -def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool: - """ - This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`. - It will not flush any other mirrors, just insert new ones. - - :param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}` - :type mirrors: dict - """ - original_mirrorlist = '' - with open('/etc/pacman.d/mirrorlist', 'r') as original: - original_mirrorlist = original.read() - - with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist: - for mirror, country in mirrors.items(): - new_mirrorlist.write(f'## {country}\n') - new_mirrorlist.write(f'Server = {mirror}\n') - new_mirrorlist.write('\n') - new_mirrorlist.write(original_mirrorlist) - - return True - def use_mirrors( - regions: Dict[str, Iterable[str]], + regions: Dict[str, List[str]], destination: str = '/etc/pacman.d/mirrorlist' ): - info(f'A new package mirror-list has been created: {destination}') - with open(destination, 'w') as mirrorlist: + with open(destination, 'w') as fp: for region, mirrors in regions.items(): for mirror in mirrors: - mirrorlist.write(f'## {region}\n') - mirrorlist.write(f'Server = {mirror}\n') + fp.write(f'## {region}\n') + fp.write(f'Server = {mirror}\n') + + info(f'A new package mirror-list has been created: {destination}') + +def _parse_mirror_list(mirrorlist: str) -> Dict[str, List[str]]: + file_content = mirrorlist.split('\n') + file_content = list(filter(lambda x: x, file_content)) # filter out empty lines + first_srv_idx = [idx for idx, line in enumerate(file_content) if 'server' in line.lower()][0] + mirrors = file_content[first_srv_idx - 1:] -def re_rank_mirrors( - top: int = 10, - src: str = '/etc/pacman.d/mirrorlist', - dst: str = '/etc/pacman.d/mirrorlist', -) -> bool: - try: - cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}") - except SysCallError: - return False - with open(dst, 'w') as f: - f.write(str(cmd)) - return True + mirror_list: Dict[str, List[str]] = {} + for idx in range(0, len(mirrors), 2): + region = mirrors[idx].removeprefix('## ') + url = mirrors[idx + 1].removeprefix('#').removeprefix('Server = ') + mirror_list.setdefault(region, []).append(url) -def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: - regions: Dict[str, Dict[str, Any]] = {} + return mirror_list + + +def list_mirrors() -> Dict[str, List[str]]: + regions: Dict[str, List[str]] = {} if storage['arguments']['offline']: - with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh: - mirrorlist = fh.read() + with pathlib.Path('/etc/pacman.d/mirrorlist').open('r') as fp: + mirrorlist = fp.read() else: url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" - try: - response = urllib.request.urlopen(url) - except urllib.error.URLError as err: + mirrorlist = fetch_data_from_url(url) + except ValueError as err: warn(f'Could not fetch an active mirror-list: {err}') return regions - mirrorlist = response.read() - - if sort_order: - mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order) - - region = 'Unknown region' - for line in mirrorlist.split(b'\n'): - if len(line.strip()) == 0: - continue - - clean_line = line.decode('UTF-8').strip('\n').strip('\r') - - if clean_line[:3] == '## ': - region = clean_line[3:] - elif clean_line[:10] == '#Server = ': - regions.setdefault(region, {}) - - url = clean_line.lstrip('#Server = ') - regions[region][url] = True - elif clean_line.startswith('Server = '): - regions.setdefault(region, {}) - - url = clean_line.lstrip('Server = ') - regions[region][url] = True + regions = _parse_mirror_list(mirrorlist) + sorted_regions = {} + for region, urls in regions.items(): + sorted_regions[region] = sorted(urls, reverse=True) - return regions + return sorted_regions -- cgit v1.2.3-54-g00ecf