import logging import pathlib import urllib.error import urllib.request from typing import Union, Mapping, Iterable, Dict, Any, List from .general import SysCommand from .output import log from .storage import storage def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: """ This function can sort /etc/pacman.d/mirrorlist according to the mirror's URL prefix. By default places HTTPS before HTTP but it also preserves the country/rank-order. This assumes /etc/pacman.d/mirrorlist looks like the following: ## Comment Server = url or ## Comment #Server = url But the Comments need to start with double-hashmarks to be distringuished from server url definitions (commented or uncommented). """ comments_and_whitespaces = b"" categories = {key: [] for key in sort_order + ["Unknown"]} for line in raw_data.split(b"\n"): if line[0:2] in (b'##', b''): comments_and_whitespaces += line + b'\n' elif line[:6].lower() == b'server' or line[:7].lower() == b'#server': opening, url = line.split(b'=', 1) opening, url = opening.strip(), url.strip() if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories: categories[category].append(comments_and_whitespaces) categories[category].append(opening + b' = ' + url + b'\n') else: categories["Unknown"].append(comments_and_whitespaces) categories["Unknown"].append(opening + b' = ' + url + b'\n') comments_and_whitespaces = b"" new_raw_data = b'' for category in sort_order + ["Unknown"]: for line in categories[category]: new_raw_data += line return new_raw_data def filter_mirrors_by_region(regions :str, destination :str = '/etc/pacman.d/mirrorlist', sort_order :List[str] = ["https", "http"], *args :str, **kwargs :str ) -> Union[bool, bytes]: """ This function will change the active mirrors on the live medium by filtering which regions are active based on `regions`. :param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States. :type regions: str """ region_list = [f'country={region}' for region in regions.split(',')] response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux32.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'})) new_list = response.read().replace(b"#Server", b"Server") if sort_order: new_list = sort_mirrorlist(new_list, sort_order=sort_order) if destination: with open(destination, "wb") as mirrorlist: mirrorlist.write(new_list) return True else: return new_list.decode('UTF-8') def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool: """ This will append custom mirror definitions in pacman.conf :param mirrors: A list of mirror data according to: `{'url': 'http://url.com', 'signcheck': 'Optional', 'signoptions': 'TrustAll', 'name': 'testmirror'}` :type mirrors: dict """ with open('/etc/pacman.conf', 'a') as pacman: for mirror in mirrors: pacman.write(f"[{mirror['name']}]\n") pacman.write(f"SigLevel = {mirror['signcheck']} {mirror['signoptions']}\n") pacman.write(f"Server = {mirror['url']}\n") return True def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool: """ This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`. It will not flush any other mirrors, just insert new ones. :param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}` :type mirrors: dict """ original_mirrorlist = '' with open('/etc/pacman.d/mirrorlist', 'r') as original: original_mirrorlist = original.read() with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist: for mirror, country in mirrors.items(): new_mirrorlist.write(f'## {country}\n') new_mirrorlist.write(f'Server = {mirror}\n') new_mirrorlist.write('\n') new_mirrorlist.write(original_mirrorlist) return True def use_mirrors( regions: Mapping[str, Iterable[str]], destination: str = '/etc/pacman.d/mirrorlist' ) -> None: log(f'A new package mirror-list has been created: {destination}', level=logging.INFO) with open(destination, 'w') as mirrorlist: for region, mirrors in regions.items(): for mirror in mirrors: mirrorlist.write(f'## {region}\n') mirrorlist.write(f'Server = {mirror}\n') def re_rank_mirrors( top: int = 10, src: str = '/etc/pacman.d/mirrorlist', dst: str = '/etc/pacman.d/mirrorlist', ) -> bool: cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}") if cmd.exit_code != 0: return False with open(dst, 'w') as f: f.write(str(cmd)) return True def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: regions = {} if storage['arguments']['offline']: with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh: mirrorlist = fh.read() else: url = "https://archlinux32.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" try: response = urllib.request.urlopen(url) except urllib.error.URLError as err: log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange") return regions mirrorlist = response.read() if sort_order: mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order) region = 'Unknown region' for line in mirrorlist.split(b'\n'): if len(line.strip()) == 0: continue line = line.decode('UTF-8').strip('\n').strip('\r') if line[:3] == '## ': region = line[3:] elif line[:10] == '#Server = ': regions.setdefault(region, {}) url = line.lstrip('#Server = ') regions[region][url] = True elif line.startswith('Server = '): regions.setdefault(region, {}) url = line.lstrip('Server = ') regions[region][url] = True return regions