Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/mirrors.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/mirrors.py')
-rw-r--r--archinstall/lib/mirrors.py449
1 files changed, 290 insertions, 159 deletions
diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py
index f78a8b18..18ffffcd 100644
--- a/archinstall/lib/mirrors.py
+++ b/archinstall/lib/mirrors.py
@@ -1,187 +1,318 @@
-import logging
import pathlib
-import urllib.error
-import urllib.request
-from typing import Union, Mapping, Iterable, Dict, Any, List
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Dict, Any, List, Optional, TYPE_CHECKING
-from .general import SysCommand
-from .output import log
+from .menu import AbstractSubMenu, Selector, MenuSelectionType, Menu, ListManager, TextInput
+from .networking import fetch_data_from_url
+from .output import warn, FormattedOutput
from .storage import storage
-def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes:
- """
- This function can sort /etc/pacman.d/mirrorlist according to the
- mirror's URL prefix. By default places HTTPS before HTTP but it also
- preserves the country/rank-order.
-
- This assumes /etc/pacman.d/mirrorlist looks like the following:
-
- ## Comment
- Server = url
-
- or
-
- ## Comment
- #Server = url
-
- But the Comments need to start with double-hashmarks to be distringuished
- from server url definitions (commented or uncommented).
- """
- comments_and_whitespaces = b""
-
- categories = {key: [] for key in sort_order + ["Unknown"]}
- for line in raw_data.split(b"\n"):
- if line[0:2] in (b'##', b''):
- comments_and_whitespaces += line + b'\n'
- elif line[:6].lower() == b'server' or line[:7].lower() == b'#server':
- opening, url = line.split(b'=', 1)
- opening, url = opening.strip(), url.strip()
- if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories:
- categories[category].append(comments_and_whitespaces)
- categories[category].append(opening + b' = ' + url + b'\n')
- else:
- categories["Unknown"].append(comments_and_whitespaces)
- categories["Unknown"].append(opening + b' = ' + url + b'\n')
-
- comments_and_whitespaces = b""
-
- new_raw_data = b''
- for category in sort_order + ["Unknown"]:
- for line in categories[category]:
- new_raw_data += line
-
- return new_raw_data
-
-
-def filter_mirrors_by_region(regions :str,
- destination :str = '/etc/pacman.d/mirrorlist',
- sort_order :List[str] = ["https", "http"],
- *args :str,
- **kwargs :str
-) -> Union[bool, bytes]:
+if TYPE_CHECKING:
+ _: Any
+
+
+class SignCheck(Enum):
+ Never = 'Never'
+ Optional = 'Optional'
+ Required = 'Required'
+
+
+class SignOption(Enum):
+ TrustedOnly = 'TrustedOnly'
+ TrustAll = 'TrustAll'
+
+
+@dataclass
+class CustomMirror:
+ name: str
+ url: str
+ sign_check: SignCheck
+ sign_option: SignOption
+
+ def table_data(self) -> Dict[str, str]:
+ return {
+ 'Name': self.name,
+ 'Url': self.url,
+ 'Sign check': self.sign_check.value,
+ 'Sign options': self.sign_option.value
+ }
+
+ def json(self) -> Dict[str, str]:
+ return {
+ 'name': self.name,
+ 'url': self.url,
+ 'sign_check': self.sign_check.value,
+ 'sign_option': self.sign_option.value
+ }
+
+ @classmethod
+ def parse_args(cls, args: List[Dict[str, str]]) -> List['CustomMirror']:
+ configs = []
+ for arg in args:
+ configs.append(
+ CustomMirror(
+ arg['name'],
+ arg['url'],
+ SignCheck(arg['sign_check']),
+ SignOption(arg['sign_option'])
+ )
+ )
+
+ return configs
+
+
+@dataclass
+class MirrorConfiguration:
+ mirror_regions: Dict[str, List[str]] = field(default_factory=dict)
+ custom_mirrors: List[CustomMirror] = field(default_factory=list)
+
+ @property
+ def regions(self) -> str:
+ return ', '.join(self.mirror_regions.keys())
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'mirror_regions': self.mirror_regions,
+ 'custom_mirrors': [c.json() for c in self.custom_mirrors]
+ }
+
+ def mirrorlist_config(self) -> str:
+ config = ''
+
+ for region, mirrors in self.mirror_regions.items():
+ for mirror in mirrors:
+ config += f'\n\n## {region}\nServer = {mirror}\n'
+
+ for cm in self.custom_mirrors:
+ config += f'\n\n## {cm.name}\nServer = {cm.url}\n'
+
+ return config
+
+ def pacman_config(self) -> str:
+ config = ''
+
+ for mirror in self.custom_mirrors:
+ config += f'\n\n[{mirror.name}]\n'
+ config += f'SigLevel = {mirror.sign_check.value} {mirror.sign_option.value}\n'
+ config += f'Server = {mirror.url}\n'
+
+ return config
+
+ @classmethod
+ def parse_args(cls, args: Dict[str, Any]) -> 'MirrorConfiguration':
+ config = MirrorConfiguration()
+
+ if 'mirror_regions' in args:
+ config.mirror_regions = args['mirror_regions']
+
+ if 'custom_mirrors' in args:
+ config.custom_mirrors = CustomMirror.parse_args(args['custom_mirrors'])
+
+ return config
+
+
+class CustomMirrorList(ListManager):
+ def __init__(self, prompt: str, custom_mirrors: List[CustomMirror]):
+ self._actions = [
+ str(_('Add a custom mirror')),
+ str(_('Change custom mirror')),
+ str(_('Delete custom mirror'))
+ ]
+ super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:])
+
+ def selected_action_display(self, mirror: CustomMirror) -> str:
+ return mirror.name
+
+ def handle_action(
+ self,
+ action: str,
+ entry: Optional[CustomMirror],
+ data: List[CustomMirror]
+ ) -> List[CustomMirror]:
+ if action == self._actions[0]: # add
+ new_mirror = self._add_custom_mirror()
+ if new_mirror is not None:
+ data = [d for d in data if d.name != new_mirror.name]
+ data += [new_mirror]
+ elif action == self._actions[1] and entry: # modify mirror
+ new_mirror = self._add_custom_mirror(entry)
+ if new_mirror is not None:
+ data = [d for d in data if d.name != entry.name]
+ data += [new_mirror]
+ elif action == self._actions[2] and entry: # delete
+ data = [d for d in data if d != entry]
+
+ return data
+
+ def _add_custom_mirror(self, mirror: Optional[CustomMirror] = None) -> Optional[CustomMirror]:
+ prompt = '\n\n' + str(_('Enter name (leave blank to skip): '))
+ existing_name = mirror.name if mirror else ''
+
+ while True:
+ name = TextInput(prompt, existing_name).run()
+ if not name:
+ return mirror
+ break
+
+ prompt = '\n' + str(_('Enter url (leave blank to skip): '))
+ existing_url = mirror.url if mirror else ''
+
+ while True:
+ url = TextInput(prompt, existing_url).run()
+ if not url:
+ return mirror
+ break
+
+ sign_check_choice = Menu(
+ str(_('Select signature check option')),
+ [s.value for s in SignCheck],
+ skip=False,
+ clear_screen=False,
+ preset_values=mirror.sign_check.value if mirror else None
+ ).run()
+
+ sign_option_choice = Menu(
+ str(_('Select signature option')),
+ [s.value for s in SignOption],
+ skip=False,
+ clear_screen=False,
+ preset_values=mirror.sign_option.value if mirror else None
+ ).run()
+
+ return CustomMirror(
+ name,
+ url,
+ SignCheck(sign_check_choice.single_value),
+ SignOption(sign_option_choice.single_value)
+ )
+
+
+class MirrorMenu(AbstractSubMenu):
+ def __init__(
+ self,
+ data_store: Dict[str, Any],
+ preset: Optional[MirrorConfiguration] = None
+ ):
+ if preset:
+ self._preset = preset
+ else:
+ self._preset = MirrorConfiguration()
+
+ super().__init__(data_store=data_store)
+
+ def setup_selection_menu_options(self):
+ self._menu_options['mirror_regions'] = \
+ Selector(
+ _('Mirror region'),
+ lambda preset: select_mirror_regions(preset),
+ display_func=lambda x: ', '.join(x.keys()) if x else '',
+ default=self._preset.mirror_regions,
+ enabled=True)
+ self._menu_options['custom_mirrors'] = \
+ Selector(
+ _('Custom mirrors'),
+ lambda preset: select_custom_mirror(preset=preset),
+ display_func=lambda x: str(_('Defined')) if x else '',
+ preview_func=self._prev_custom_mirror,
+ default=self._preset.custom_mirrors,
+ enabled=True
+ )
+
+ def _prev_custom_mirror(self) -> Optional[str]:
+ selector = self._menu_options['custom_mirrors']
+
+ if selector.has_selection():
+ custom_mirrors: List[CustomMirror] = selector.current_selection # type: ignore
+ output = FormattedOutput.as_table(custom_mirrors)
+ return output.strip()
+
+ return None
+
+ def run(self, allow_reset: bool = True) -> Optional[MirrorConfiguration]:
+ super().run(allow_reset=allow_reset)
+
+ if self._data_store.get('mirror_regions', None) or self._data_store.get('custom_mirrors', None):
+ return MirrorConfiguration(
+ mirror_regions=self._data_store['mirror_regions'],
+ custom_mirrors=self._data_store['custom_mirrors'],
+ )
+
+ return None
+
+
+def select_mirror_regions(preset_values: Dict[str, List[str]] = {}) -> Dict[str, List[str]]:
"""
- This function will change the active mirrors on the live medium by
- filtering which regions are active based on `regions`.
+ Asks the user to select a mirror or region
+ Usually this is combined with :ref:`archinstall.list_mirrors`.
- :param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States.
- :type regions: str
+ :return: The dictionary information about a mirror/region.
+ :rtype: dict
"""
- region_list = [f'country={region}' for region in regions.split(',')]
- response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux32.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'}))
- new_list = response.read().replace(b"#Server", b"Server")
-
- if sort_order:
- new_list = sort_mirrorlist(new_list, sort_order=sort_order)
+ if preset_values is None:
+ preselected = None
+ else:
+ preselected = list(preset_values.keys())
- if destination:
- with open(destination, "wb") as mirrorlist:
- mirrorlist.write(new_list)
+ mirrors = list_mirrors()
- return True
- else:
- return new_list.decode('UTF-8')
+ choice = Menu(
+ _('Select one of the regions to download packages from'),
+ list(mirrors.keys()),
+ preset_values=preselected,
+ multi=True,
+ allow_reset=True
+ ).run()
+ match choice.type_:
+ case MenuSelectionType.Reset:
+ return {}
+ case MenuSelectionType.Skip:
+ return preset_values
+ case MenuSelectionType.Selection:
+ return {selected: mirrors[selected] for selected in choice.multi_value}
-def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool:
- """
- This will append custom mirror definitions in pacman.conf
+ return {}
- :param mirrors: A list of mirror data according to: `{'url': 'http://url.com', 'signcheck': 'Optional', 'signoptions': 'TrustAll', 'name': 'testmirror'}`
- :type mirrors: dict
- """
- with open('/etc/pacman.conf', 'a') as pacman:
- for mirror in mirrors:
- pacman.write(f"[{mirror['name']}]\n")
- pacman.write(f"SigLevel = {mirror['signcheck']} {mirror['signoptions']}\n")
- pacman.write(f"Server = {mirror['url']}\n")
- return True
+def select_custom_mirror(prompt: str = '', preset: List[CustomMirror] = []):
+ custom_mirrors = CustomMirrorList(prompt, preset).run()
+ return custom_mirrors
-def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
- """
- This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
- It will not flush any other mirrors, just insert new ones.
+def _parse_mirror_list(mirrorlist: str) -> Dict[str, List[str]]:
+ file_content = mirrorlist.split('\n')
+ file_content = list(filter(lambda x: x, file_content)) # filter out empty lines
+ first_srv_idx = [idx for idx, line in enumerate(file_content) if 'server' in line.lower()][0]
+ mirrors = file_content[first_srv_idx - 1:]
- :param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}`
- :type mirrors: dict
- """
- original_mirrorlist = ''
- with open('/etc/pacman.d/mirrorlist', 'r') as original:
- original_mirrorlist = original.read()
-
- with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist:
- for mirror, country in mirrors.items():
- new_mirrorlist.write(f'## {country}\n')
- new_mirrorlist.write(f'Server = {mirror}\n')
- new_mirrorlist.write('\n')
- new_mirrorlist.write(original_mirrorlist)
-
- return True
-
-
-def use_mirrors(
- regions: Mapping[str, Iterable[str]],
- destination: str = '/etc/pacman.d/mirrorlist'
-) -> None:
- log(f'A new package mirror-list has been created: {destination}', level=logging.INFO)
- with open(destination, 'w') as mirrorlist:
- for region, mirrors in regions.items():
- for mirror in mirrors:
- mirrorlist.write(f'## {region}\n')
- mirrorlist.write(f'Server = {mirror}\n')
+ mirror_list: Dict[str, List[str]] = {}
+ for idx in range(0, len(mirrors), 2):
+ region = mirrors[idx].removeprefix('## ')
+ url = mirrors[idx + 1].removeprefix('#').removeprefix('Server = ')
+ mirror_list.setdefault(region, []).append(url)
-def re_rank_mirrors(
- top: int = 10,
- src: str = '/etc/pacman.d/mirrorlist',
- dst: str = '/etc/pacman.d/mirrorlist',
-) -> bool:
- cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}")
- if cmd.exit_code != 0:
- return False
- with open(dst, 'w') as f:
- f.write(str(cmd))
- return True
+ return mirror_list
-def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
- regions = {}
+def list_mirrors() -> Dict[str, List[str]]:
+ regions: Dict[str, List[str]] = {}
if storage['arguments']['offline']:
- with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh:
- mirrorlist = fh.read()
+ with pathlib.Path('/etc/pacman.d/mirrorlist').open('r') as fp:
+ mirrorlist = fp.read()
else:
url = "https://archlinux32.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
-
try:
- response = urllib.request.urlopen(url)
- except urllib.error.URLError as err:
- log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange")
+ mirrorlist = fetch_data_from_url(url)
+ except ValueError as err:
+ warn(f'Could not fetch an active mirror-list: {err}')
return regions
- mirrorlist = response.read()
-
- if sort_order:
- mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order)
-
- region = 'Unknown region'
- for line in mirrorlist.split(b'\n'):
- if len(line.strip()) == 0:
- continue
-
- line = line.decode('UTF-8').strip('\n').strip('\r')
- if line[:3] == '## ':
- region = line[3:]
- elif line[:10] == '#Server = ':
- regions.setdefault(region, {})
-
- url = line.lstrip('#Server = ')
- regions[region][url] = True
- elif line.startswith('Server = '):
- regions.setdefault(region, {})
-
- url = line.lstrip('Server = ')
- regions[region][url] = True
+ regions = _parse_mirror_list(mirrorlist)
+ sorted_regions = {}
+ for region, urls in regions.items():
+ sorted_regions[region] = sorted(urls, reverse=True)
- return regions
+ return sorted_regions