From ec4ecbcb7a839ab06b739f01ce42bfd18376c620 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Thu, 4 May 2023 00:36:46 +1000 Subject: Full mypy compliance and small fixes (#1777) * Fix mypy compliance --------- Co-authored-by: Daniel Girtler --- archinstall/__init__.py | 3 +- archinstall/lib/disk/device_handler.py | 18 +- archinstall/lib/disk/device_model.py | 2 +- archinstall/lib/disk/fido.py | 9 +- archinstall/lib/general.py | 55 +++--- archinstall/lib/hardware.py | 55 +++--- archinstall/lib/installer.py | 239 +++++++++++------------ archinstall/lib/menu/abstract_menu.py | 4 +- archinstall/lib/menu/menu.py | 104 ++++++---- archinstall/lib/mirrors.py | 46 +++-- archinstall/lib/models/network_configuration.py | 78 ++++---- archinstall/lib/plugins.py | 66 ++++--- archinstall/lib/profile/profiles_handler.py | 16 +- archinstall/lib/systemd.py | 71 ++----- archinstall/lib/user_interaction/general_conf.py | 68 ++++--- archinstall/lib/user_interaction/locale_conf.py | 22 ++- archinstall/scripts/guided.py | 2 +- archinstall/scripts/swiss.py | 4 +- 18 files changed, 450 insertions(+), 412 deletions(-) (limited to 'archinstall') diff --git a/archinstall/__init__.py b/archinstall/__init__.py index 3d0768a5..29b70b7a 100644 --- a/archinstall/__init__.py +++ b/archinstall/__init__.py @@ -233,7 +233,8 @@ def post_process_arguments(arguments): log(f"Warning: --debug mode will write certain credentials to {storage['LOG_PATH']}/{storage['LOG_FILE']}!", fg="red", level=logging.WARNING) if arguments.get('plugin', None): - load_plugin(arguments['plugin']) + path = arguments['plugin'] + load_plugin(path) load_config() diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py index ba325cda..8f92cf3b 100644 --- a/archinstall/lib/disk/device_handler.py +++ b/archinstall/lib/disk/device_handler.py @@ -269,13 +269,13 @@ class DeviceHandler(object): # partition will be encrypted if enc_conf is not None and part_mod in enc_conf.partitions: self._perform_enc_formatting( - part_mod.real_dev_path, + part_mod.safe_dev_path, part_mod.mapper_name, part_mod.fs_type, enc_conf ) else: - self._perform_formatting(part_mod.fs_type, part_mod.real_dev_path) + self._perform_formatting(part_mod.fs_type, part_mod.safe_dev_path) def _perform_partitioning( self, @@ -287,11 +287,11 @@ class DeviceHandler(object): # when we require a delete and the partition to be (re)created # already exists then we have to delete it first if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]: - log(f'Delete existing partition: {part_mod.real_dev_path}', level=logging.INFO) - part_info = self.find_partition(part_mod.real_dev_path) + log(f'Delete existing partition: {part_mod.safe_dev_path}', level=logging.INFO) + part_info = self.find_partition(part_mod.safe_dev_path) if not part_info: - raise DiskError(f'No partition for dev path found: {part_mod.real_dev_path}') + raise DiskError(f'No partition for dev path found: {part_mod.safe_dev_path}') disk.deletePartition(part_info.partition) disk.commit() @@ -375,7 +375,7 @@ class DeviceHandler(object): part_mod: PartitionModification, enc_conf: Optional['DiskEncryption'] = None ): - log(f'Creating subvolumes: {part_mod.real_dev_path}', level=logging.INFO) + log(f'Creating subvolumes: {part_mod.safe_dev_path}', level=logging.INFO) luks_handler = None @@ -385,7 +385,7 @@ class DeviceHandler(object): raise ValueError('No device path specified for modification') luks_handler = self.unlock_luks2_dev( - part_mod.real_dev_path, + part_mod.safe_dev_path, part_mod.mapper_name, enc_conf.encryption_password ) @@ -395,7 +395,7 @@ class DeviceHandler(object): self.mount(luks_handler.mapper_dev, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) else: - self.mount(part_mod.real_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + self.mount(part_mod.safe_dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) for sub_vol in part_mod.btrfs_subvols: log(f'Creating subvolume: {sub_vol.name}', level=logging.DEBUG) @@ -419,7 +419,7 @@ class DeviceHandler(object): self.umount(luks_handler.mapper_dev) luks_handler.lock() else: - self.umount(part_mod.real_dev_path) + self.umount(part_mod.safe_dev_path) def unlock_luks2_dev(self, dev_path: Path, mapper_name: str, enc_password: str) -> Luks2: luks_handler = Luks2(dev_path, mapper_name=mapper_name, password=enc_password) diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py index 0270a4dd..987a1e8a 100644 --- a/archinstall/lib/disk/device_model.py +++ b/archinstall/lib/disk/device_model.py @@ -603,7 +603,7 @@ class PartitionModification: return '' @property - def real_dev_path(self) -> Path: + def safe_dev_path(self) -> Path: if self.dev_path is None: raise ValueError('Device path was not set') return self.dev_path diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py index 436be4d4..2a53b551 100644 --- a/archinstall/lib/disk/fido.py +++ b/archinstall/lib/disk/fido.py @@ -2,7 +2,8 @@ from __future__ import annotations import getpass import logging -from typing import List +from pathlib import Path +from typing import List, Optional from .device_model import PartitionModification, Fido2Device from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes @@ -36,12 +37,12 @@ class Fido2: # to prevent continous reloading which will slow # down moving the cursor in the menu if not cls._loaded or reload: - ret = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8') + ret: Optional[str] = SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8') if not ret: log('Unable to retrieve fido2 devices', level=logging.ERROR) return [] - fido_devices = clear_vt100_escape_codes(ret) + fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore manufacturer_pos = 0 product_pos = 0 @@ -58,7 +59,7 @@ class Fido2: product = line[product_pos:] devices.append( - Fido2Device(path, manufacturer, product) + Fido2Device(Path(path), manufacturer, product) ) cls._loaded = True diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 57f13288..997b7d67 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -19,9 +19,15 @@ import pathlib from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING +from .exceptions import RequirementError, SysCallError +from .output import log +from .storage import storage + + if TYPE_CHECKING: from .installer import Installer + if sys.platform == 'linux': from select import epoll, EPOLLIN, EPOLLHUP else: @@ -53,30 +59,15 @@ else: except OSError: return [] -from .exceptions import RequirementError, SysCallError -from .output import log -from .storage import storage def gen_uid(entropy_length :int = 256) -> str: return hashlib.sha512(os.urandom(entropy_length)).hexdigest() + def generate_password(length :int = 64) -> str: haystack = string.printable # digits, ascii_letters, punctiation (!"#$[] etc) and whitespace return ''.join(secrets.choice(haystack) for i in range(length)) -def multisplit(s :str, splitters :List[str]) -> str: - s = [s, ] - for key in splitters: - ns = [] - for obj in s: - x = obj.split(key) - for index, part in enumerate(x): - if len(part): - ns.append(part) - if index < len(x) - 1: - ns.append(key) - s = ns - return s def locate_binary(name :str) -> str: for PATH in os.environ['PATH'].split(':'): @@ -88,20 +79,20 @@ def locate_binary(name :str) -> str: raise RequirementError(f"Binary {name} does not exist.") -def clear_vt100_escape_codes(data :Union[bytes, str]): + +def clear_vt100_escape_codes(data :Union[bytes, str]) -> Union[bytes, str]: # https://stackoverflow.com/a/43627833/929999 if type(data) == bytes: - vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') - else: + byte_vt100_escape_regex = bytes(r'\x1B\[[?0-9;]*[a-zA-Z]', 'UTF-8') + data = re.sub(byte_vt100_escape_regex, b'', data) + elif type(data) == str: vt100_escape_regex = r'\x1B\[[?0-9;]*[a-zA-Z]' - - for match in re.findall(vt100_escape_regex, data, re.IGNORECASE): - data = data.replace(match, '' if type(data) == str else b'') + data = re.sub(vt100_escape_regex, '', data) + else: + raise ValueError(f'Unsupported data type: {type(data)}') return data -def json_dumps(*args :str, **kwargs :str) -> str: - return json.dumps(*args, **{**kwargs, 'cls': JSON}) class JsonEncoder: @staticmethod @@ -245,10 +236,12 @@ class SysCommandWorker: def __iter__(self, *args :str, **kwargs :Dict[str, Any]) -> Iterator[bytes]: for line in self._trace_log[self._trace_log_pos:self._trace_log.rfind(b'\n')].split(b'\n'): if line: + escaped_line: bytes = line + if self.remove_vt100_escape_codes_from_lines: - line = clear_vt100_escape_codes(line) + escaped_line = clear_vt100_escape_codes(line) # type: ignore - yield line + b'\n' + yield escaped_line + b'\n' self._trace_log_pos = self._trace_log.rfind(b'\n') @@ -279,7 +272,11 @@ class SysCommandWorker: log(args[1], level=logging.DEBUG, fg='red') if self.exit_code != 0: - raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code, worker=self) + raise SysCallError( + f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {str(self._trace_log[-500:])}", + self.exit_code, + worker=self + ) def is_alive(self) -> bool: self.poll() @@ -328,7 +325,7 @@ class SysCommandWorker: change_perm = True with peak_logfile.open("a") as peek_output_log: - peek_output_log.write(output) + peek_output_log.write(str(output)) if change_perm: os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) @@ -497,7 +494,7 @@ class SysCommand: clears any printed output if ``.peek_output=True``. """ if self.session: - return self.session + return True with SysCommandWorker( self.cmd, diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index 9660ea95..3759725f 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -2,7 +2,7 @@ import os import logging from functools import partial from pathlib import Path -from typing import Iterator, Optional, Union +from typing import Iterator, Optional, Dict from .general import SysCommand from .networking import list_interfaces, enrich_iface_types @@ -61,15 +61,15 @@ AVAILABLE_GFX_DRIVERS = { "VMware / VirtualBox (open-source)": ["mesa", "xf86-video-vmware"], } -CPUINFO = Path("/proc/cpuinfo") -MEMINFO = Path("/proc/meminfo") - def cpuinfo() -> Iterator[dict[str, str]]: - """Yields information about the CPUs of the system.""" - cpu = {} + """ + Yields information about the CPUs of the system + """ + cpu_info_path = Path("/proc/cpuinfo") + cpu: Dict[str, str] = {} - with CPUINFO.open() as file: + with cpu_info_path.open() as file: for line in file: if not (line := line.strip()): yield cpu @@ -80,24 +80,31 @@ def cpuinfo() -> Iterator[dict[str, str]]: cpu[key.strip()] = value.strip() -def meminfo(key: Optional[str] = None) -> Union[dict[str, int], Optional[int]]: - """Returns a dict with memory info if called with no args +def all_meminfo() -> Dict[str, int]: + """ + Returns a dict with memory info if called with no args or the value of the given key of said dict. """ - with MEMINFO.open() as file: - mem_info = { - (columns := line.strip().split())[0].rstrip(':'): int(columns[1]) - for line in file - } + mem_info_path = Path("/proc/meminfo") + mem_info: Dict[str, int] = {} - if key is None: - return mem_info + with mem_info_path.open() as file: + for line in file: + key, value = line.strip().split(':') + num = value.split()[0] + mem_info[key] = int(num) + + return mem_info - return mem_info.get(key) + +def meminfo_for_key(key: str) -> int: + info = all_meminfo() + return info[key] def has_wifi() -> bool: - return 'WIRELESS' in enrich_iface_types(list_interfaces().values()).values() + ifaces = list(list_interfaces().values()) + return 'WIRELESS' in enrich_iface_types(ifaces).values() def has_cpu_vendor(vendor_id: str) -> bool: @@ -160,15 +167,15 @@ def product_name() -> Optional[str]: def mem_available() -> Optional[int]: - return meminfo('MemAvailable') + return meminfo_for_key('MemAvailable') def mem_free() -> Optional[int]: - return meminfo('MemFree') + return meminfo_for_key('MemFree') def mem_total() -> Optional[int]: - return meminfo('MemTotal') + return meminfo_for_key('MemTotal') def virtualization() -> Optional[str]: @@ -182,9 +189,9 @@ def virtualization() -> Optional[str]: def is_vm() -> bool: try: - return b"none" not in b"".join(SysCommand("systemd-detect-virt")).lower() + result = SysCommand("systemd-detect-virt") + return b"none" not in b"".join(result).lower() except SysCallError as error: log(f"System is not running in a VM: {error}", level=logging.DEBUG) - return None -# TODO: Add more identifiers + return False diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index ddbcc2f2..b6eaa797 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -7,7 +7,7 @@ import shutil import subprocess import time from pathlib import Path -from typing import Any, Iterator, List, Mapping, Optional, TYPE_CHECKING, Union, Dict +from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, Iterable from . import disk from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError @@ -36,32 +36,6 @@ __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "l __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] -class InstallationFile: - def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): - self.installation = installation - self.filename = filename - self.owner = owner - self.mode = mode - self.fh = None - - def __enter__(self) -> 'InstallationFile': - self.fh = open(self.filename, self.mode) - return self - - def __exit__(self, *args :str) -> None: - self.fh.close() - self.installation.chown(self.owner, self.filename) - - def write(self, data: Union[str, bytes]) -> int: - return self.fh.write(data) - - def read(self, *args) -> Union[str, bytes]: - return self.fh.read(*args) - -# def poll(self, *args) -> bool: -# return self.fh.poll(*args) - - def accessibility_tools_in_use() -> bool: return os.system('systemctl is-active --quiet espeakup.service') == 0 @@ -106,15 +80,17 @@ class Installer: self.kernels = kernels self._disk_config = disk_config - self._disk_encryption = disk_encryption - if self._disk_encryption is None: + if disk_encryption is None: self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption) + else: + self._disk_encryption = disk_encryption + + self.target: Path = target - self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) - self.helper_flags = {'base': False, 'bootloader': False} + self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} self.base_packages = base_packages for kernel in self.kernels: @@ -124,31 +100,33 @@ class Installer: if accessibility_tools_in_use(): self.base_packages.extend(__accessibility_packages__) - self.post_base_install = [] + self.post_base_install: List[Callable] = [] # TODO: Figure out which one of these two we'll use.. But currently we're mixing them.. storage['session'] = self storage['installation_session'] = self - self.MODULES = [] - self.BINARIES = [] - self.FILES = [] + self.modules: List[str] = [] + self._binaries: List[str] = [] + self._files: List[str] = [] + # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. - self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"] - self.KERNEL_PARAMS = [] - self.FSTAB_ENTRIES = [] + self._hooks: List[str] = [ + "base", "systemd", "autodetect", "keyboard", + "sd-vconsole", "modconf", "block", "filesystems", "fsck" + ] + self._kernel_params: List[str] = [] + self._fstab_entries: List[str] = [] self._zram_enabled = False - def __enter__(self, *args: str, **kwargs: str) -> 'Installer': + def __enter__(self) -> 'Installer': return self - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - - if len(args) >= 2 and args[1]: - self.log(args[1], level=logging.ERROR, fg='red') + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + log(exc_val, fg='red', level=logging.ERROR) self.sync_log_to_install_medium() @@ -156,7 +134,7 @@ class Installer: # and then reboot, and a identical log file will be found in the ISO medium anyway. print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE']))) print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")) - raise args[1] + raise exc_val if not (missing_steps := self.post_install_check()): self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) @@ -164,6 +142,7 @@ class Installer: return True else: self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING) + for step in missing_steps: self.log(f' - {step}', fg='red', level=logging.WARNING) @@ -247,31 +226,32 @@ class Installer: luks_handlers = {} for part_mod in partitions: - luks_handler = disk.device_handler.unlock_luks2_dev( - part_mod.dev_path, - part_mod.mapper_name, - self._disk_encryption.encryption_password - ) - luks_handlers[part_mod] = luks_handler + if part_mod.mapper_name and part_mod.dev_path: + luks_handler = disk.device_handler.unlock_luks2_dev( + part_mod.dev_path, + part_mod.mapper_name, + self._disk_encryption.encryption_password + ) + luks_handlers[part_mod] = luks_handler return luks_handlers def _mount_partition(self, part_mod: disk.PartitionModification): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined - if part_mod.mountpoint is not None: + if part_mod.mountpoint and part_mod.dev_path: target = self.target / part_mod.relative_mountpoint disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options) - if part_mod.fs_type == disk.FilesystemType.Btrfs: + if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path: self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols) def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined - if part_mod.mountpoint is not None: + if part_mod.mountpoint and luks_handler.mapper_dev: target = self.target / part_mod.relative_mountpoint disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) - if part_mod.fs_type == disk.FilesystemType.Btrfs: + if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols) def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]): @@ -346,15 +326,15 @@ class Installer: SysCommand(f'chmod 0600 {self.target}{file}') SysCommand(f'mkswap {self.target}{file}') - self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0') + self._fstab_entries.append(f'{file} none swap defaults 0 0') if enable_resume: resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip() resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() - self.HOOKS.append('resume') - self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}') - self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}') + self._hooks.append('resume') + self._kernel_params.append(f'resume=UUID={resume_uuid}') + self._kernel_params.append(f'resume_offset={resume_offset}') def post_install_check(self, *args :str, **kwargs :str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] @@ -411,7 +391,7 @@ class Installer: else: pacman_conf.write(line) - def pacstrap(self, *packages: Union[str, List[str]], **kwargs :str) -> bool: + def _pacstrap(self, packages: Union[str, List[str]]) -> bool: if type(packages[0]) in (list, tuple): packages = packages[0] @@ -430,9 +410,9 @@ class Installer: if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self.pacstrap(*packages, **kwargs) + return self._pacstrap(packages) - raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red") + raise RequirementError(f'Could not sync mirrors: {error}') try: SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) @@ -442,40 +422,44 @@ class Installer: if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self.pacstrap(*packages, **kwargs) + return self._pacstrap(packages) raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") - def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: + def set_mirrors(self, mirrors: Dict[str, Iterable[str]]): for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): if result := plugin.on_mirrors(mirrors): mirrors = result - return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') + destination = f'{self.target}/etc/pacman.d/mirrorlist' + use_mirrors(mirrors, destination=destination) def genfstab(self, flags :str = '-pU'): self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) try: - fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}') + gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() except SysCallError as error: raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}') - with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - fstab_fh.write(fstab.decode()) + if not gen_fstab: + raise RequirementError(f'Genrating fstab returned empty value') + + with open(f"{self.target}/etc/fstab", 'a') as fp: + fp.write(gen_fstab) if not os.path.isfile(f'{self.target}/etc/fstab'): - raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}') + raise RequirementError(f'Could not create fstab file') for plugin in plugins.values(): if hasattr(plugin, 'on_genfstab'): if plugin.on_genfstab(self) is True: break - with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - for entry in self.FSTAB_ENTRIES: - fstab_fh.write(f'{entry}\n') + with open(f"{self.target}/etc/fstab", 'a') as fp: + for entry in self._fstab_entries: + fp.write(f'{entry}\n') for mod in self._disk_config.device_modifications: for part_mod in mod.partitions: @@ -583,7 +567,7 @@ class Installer: # fstrim is owned by util-linux, a dependency of both base and systemd. self.enable_service("fstrim.timer") - def enable_service(self, *services: Union[str, List[str]]) -> None: + def enable_service(self, services: Union[str, List[str]]) -> None: if type(services[0]) in (list, tuple): services = services[0] @@ -611,19 +595,7 @@ class Installer: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) def configure_nic(self, network_config: NetworkConfiguration) -> None: - from .systemd import Networkd - - if network_config.dhcp: - conf = Networkd(Match={"Name": network_config.iface}, Network={"DHCP": "yes"}) - else: - network = {"Address": network_config.ip} - if network_config.gateway: - network["Gateway"] = network_config.gateway - if network_config.dns: - dns = network_config.dns - network["DNS"] = dns if isinstance(dns, list) else [dns] - - conf = Networkd(Match={"Name": network_config.iface}, Network=network) + conf = network_config.as_systemd_config() for plugin in plugins.values(): if hasattr(plugin, 'on_configure_nic'): @@ -663,7 +635,7 @@ class Installer: # Otherwise, we can go ahead and add the required package # and enable it's service: else: - self.pacstrap('iwd') + self._pacstrap('iwd') self.enable_service('iwd') for psk in psk_files: @@ -682,12 +654,12 @@ class Installer: if self.helper_flags.get('base', False) is False: def post_install_enable_networkd_resolved(*args :str, **kwargs :str): - self.enable_service('systemd-networkd', 'systemd-resolved') + self.enable_service(['systemd-networkd', 'systemd-resolved']) self.post_base_install.append(post_install_enable_networkd_resolved) # Otherwise, we can go ahead and enable the services else: - self.enable_service('systemd-networkd', 'systemd-resolved') + self.enable_service(['systemd-networkd', 'systemd-resolved']) return True @@ -704,9 +676,9 @@ class Installer: fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit: - mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n") - mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") - mkinit.write(f"FILES=({' '.join(self.FILES)})\n") + mkinit.write(f"MODULES=({' '.join(self.modules)})\n") + mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n") + mkinit.write(f"FILES=({' '.join(self._files)})\n") if not self._disk_encryption.hsm_device: # For now, if we don't use HSM we revert to the old @@ -714,9 +686,9 @@ class Installer: # This is purely for stability reasons, we're going away from this. # * systemd -> udev # * sd-vconsole -> keymap - self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS] + self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks] - mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") + mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n") try: SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') @@ -736,25 +708,25 @@ class Installer: if (pkg := part.fs_type.installation_pkg) is not None: self.base_packages.append(pkg) if (module := part.fs_type.installation_module) is not None: - self.MODULES.append(module) + self.modules.append(module) if (binary := part.fs_type.installation_binary) is not None: - self.BINARIES.append(binary) + self._binaries.append(binary) # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: - if 'fsck' in self.HOOKS: - self.HOOKS.remove('fsck') + if 'fsck' in self._hooks: + self._hooks.remove('fsck') if part in self._disk_encryption.partitions: if self._disk_encryption.hsm_device: # Required bby mkinitcpio to add support for fido2-device options - self.pacstrap('libfido2') + self._pacstrap('libfido2') - if 'sd-encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') + if 'sd-encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') else: - if 'encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') + if 'encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') if not has_uefi(): self.base_packages.append('grub') @@ -786,7 +758,7 @@ class Installer: else: self.log("The testing flag is not set. This system will be installed without testing repositories enabled.") - self.pacstrap(self.base_packages) + self._pacstrap(self.base_packages) self.helper_flags['base-strapped'] = True # This handles making sure that the repositories we enabled persist on the installed system @@ -826,7 +798,7 @@ class Installer: def setup_swap(self, kind :str = 'zram'): if kind == 'zram': self.log(f"Setting up swap on zram") - self.pacstrap('zram-generator') + self._pacstrap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' @@ -853,7 +825,7 @@ class Installer: return None def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): - self.pacstrap('efibootmgr') + self._pacstrap('efibootmgr') if not has_uefi(): raise HardwareIncompatibilityError @@ -919,7 +891,7 @@ class Installer: # blkid doesn't trigger on loopback devices really well, # so we'll use the old manual method until we get that sorted out. - options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self.KERNEL_PARAMS)}\n' + options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self._kernel_params)}\n' for sub_vol in root_partition.btrfs_subvols: if sub_vol.is_root(): @@ -958,7 +930,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self.pacstrap('grub') # no need? + self._pacstrap('grub') # no need? _file = "/etc/default/grub" @@ -977,7 +949,7 @@ class Installer: log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO) if has_uefi(): - self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) @@ -987,8 +959,20 @@ class Installer: except SysCallError as error: raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") else: + device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path) + + if not device: + raise ValueError(f'Can not find block device: {boot_partition.safe_dev_path}') + try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True) + cmd = f'/usr/bin/arch-chroot' \ + f' {self.target}' \ + f' grub-install' \ + f' --debug' \ + f' --target=i386-pc' \ + f' --recheck {device.device_info.path}' + + SysCommand(cmd, peek_output=True) except SysCallError as error: raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}") @@ -1004,7 +988,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self.pacstrap('efibootmgr') + self._pacstrap('efibootmgr') if not has_uefi(): raise HardwareIncompatibilityError @@ -1038,17 +1022,30 @@ class Installer: # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) - kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') + kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}') else: log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) - kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') + kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}') + + device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path) - device = disk.device_handler.get_device_by_partition_path(boot_partition.dev_path) - SysCommand(f'efibootmgr --disk {device.path} --part {device.path} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') + if not device: + raise ValueError(f'Unable to find block device: {boot_partition.safe_dev_path}') + + cmd = f'efibootmgr ' \ + f'--disk {device.device_info.path} ' \ + f'--part {boot_partition.safe_dev_path} ' \ + f'--create ' \ + f'--label "{label}" ' \ + f'--loader {loader} ' \ + f'--unicode \'{" ".join(kernel_parameters)}\' ' \ + f'--verbose' + + SysCommand(cmd) self.helper_flags['bootloader'] = "efistub" - def add_bootloader(self, bootloader: Bootloader) -> bool: + def add_bootloader(self, bootloader: Bootloader): """ Adds a bootloader to the installation instance. Archinstall supports one of three types: @@ -1056,8 +1053,7 @@ class Installer: * grub * efistub (beta) - :param bootloader: Can be one of the three strings - 'systemd-bootctl', 'grub' or 'efistub' (beta) + :param bootloader: Type of bootloader to be added """ for plugin in plugins.values(): @@ -1089,8 +1085,8 @@ class Installer: case Bootloader.Efistub: self._add_efistub_bootloader(boot_partition, root_partition) - def add_additional_packages(self, *packages: Union[str, List[str]]) -> bool: - return self.pacstrap(*packages) + def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: + return self._pacstrap(packages) def _enable_users(self, service: str, users: List[User]): for user in users: @@ -1201,9 +1197,6 @@ class Installer: except SysCallError: return False - def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: - return InstallationFile(self, filename, owner) - def set_keyboard_language(self, language: str) -> bool: log(f"Setting keyboard language to {language}", level=logging.INFO) if len(language.strip()): diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py index 53816655..e44d65a4 100644 --- a/archinstall/lib/menu/abstract_menu.py +++ b/archinstall/lib/menu/abstract_menu.py @@ -482,9 +482,9 @@ class AbstractMenu: if item in self._menus_to_enable(): yield item - def _select_archinstall_language(self, preset_value: Language) -> Language: + def _select_archinstall_language(self, preset: Language) -> Language: from ..user_interaction.general_conf import select_archinstall_language - language = select_archinstall_language(self.translation_handler.translated_languages, preset_value) + language = select_archinstall_language(self.translation_handler.translated_languages, preset) self._translation_handler.activate(language) return language diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py index 44ac33a6..12dbf1f5 100644 --- a/archinstall/lib/menu/menu.py +++ b/archinstall/lib/menu/menu.py @@ -3,7 +3,7 @@ from enum import Enum, auto from os import system from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable -from simple_term_menu import TerminalMenu +from simple_term_menu import TerminalMenu # type: ignore from ..exceptions import RequirementError from ..output import log @@ -29,11 +29,11 @@ class MenuSelection: @property def single_value(self) -> Any: - return self.value + return self.value # type: ignore @property def multi_value(self) -> List[Any]: - return self.value + return self.value # type: ignore class Menu(TerminalMenu): @@ -67,7 +67,7 @@ class Menu(TerminalMenu): preview_command: Optional[Callable] = None, preview_size: float = 0.0, preview_title: str = 'Info', - header: Union[List[str],str] = None, + header: Union[List[str], str] = [], allow_reset: bool = False, allow_reset_warning_msg: Optional[str] = None, clear_screen: bool = True, @@ -141,8 +141,6 @@ class Menu(TerminalMenu): log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) raise RequirementError("Menu() requires an iterable as option.") - self._default_str = str(_('(default)')) - if isinstance(p_options,dict): options = list(p_options.keys()) else: @@ -193,8 +191,7 @@ class Menu(TerminalMenu): if default_option: # if a default value was specified we move that one # to the top of the list and mark it as default as well - default = f'{default_option} {self._default_str}' - self._menu_options = [default] + [o for o in self._menu_options if default_option != o] + self._menu_options = [self._default_menu_value] + [o for o in self._menu_options if default_option != o] if display_back_option and not multi and skip: skip_empty_entries = True @@ -204,7 +201,18 @@ class Menu(TerminalMenu): skip_empty_entries = True self._menu_options += [''] - self._preselection(preset_values,cursor_index) + preset_list: Optional[List[str]] = None + + if preset_values and isinstance(preset_values, str): + preset_list = [preset_values] + + calc_cursor_idx = self._determine_cursor_pos(preset_list, cursor_index) + + # when we're not in multi selection mode we don't care about + # passing the pre-selection list to the menu as the position + # of the cursor is the one determining the pre-selection + if not self._multi: + preset_values = None cursor = "> " main_menu_cursor_style = ("fg_cyan", "bold") @@ -217,8 +225,8 @@ class Menu(TerminalMenu): menu_cursor_style=main_menu_cursor_style, menu_highlight_style=main_menu_style, multi_select=multi, - preselected_entries=self.preset_values, - cursor_index=self.cursor_index, + preselected_entries=preset_values, + cursor_index=calc_cursor_idx, preview_command=lambda x: self._show_preview(preview_command, x), preview_size=preview_size, preview_title=preview_title, @@ -231,12 +239,17 @@ class Menu(TerminalMenu): skip_empty_entries=skip_empty_entries ) + @property + def _default_menu_value(self) -> str: + default_str = str(_('(default)')) + return f'{self._default_option} {default_str}' + def _show_preview(self, preview_command: Optional[Callable], selection: str) -> Optional[str]: if selection == self.back(): return None if preview_command: - if self._default_option is not None and f'{self._default_option} {self._default_str}' == selection: + if self._default_option is not None and self._default_menu_value == selection: selection = self._default_option return preview_command(selection) @@ -249,7 +262,7 @@ class Menu(TerminalMenu): return MenuSelection(type_=MenuSelectionType.Reset) def check_default(elem): - if self._default_option is not None and f'{self._default_option} {self._default_str}' in elem: + if self._default_option is not None and self._default_menu_value in elem: return self._default_option else: return elem @@ -297,31 +310,44 @@ class Menu(TerminalMenu): pos = self._menu_entries.index(value) self.set_cursor_pos(pos) - def _preselection(self,preset_values :Union[str, List[str]] = [], cursor_index : Optional[int] = None): - def from_preset_to_cursor(): - if preset_values: - # if the value is not extant return 0 as cursor index + def _determine_cursor_pos( + self, + preset: Optional[List[str]] = None, + cursor_index: Optional[int] = None + ) -> Optional[int]: + """ + The priority order to determine the cursor position is: + 1. A static cursor position was provided + 2. Preset values have been provided so the cursor will be + positioned on those + 3. A default value for a selection is given so the cursor + will be placed on such + """ + if cursor_index: + return cursor_index + + if preset: + indexes = [] + + for p in preset: try: - if isinstance(preset_values,str): - self.cursor_index = self._menu_options.index(self.preset_values) - else: # should return an error, but this is smoother - self.cursor_index = self._menu_options.index(self.preset_values[0]) - except ValueError: - self.cursor_index = 0 - - self.cursor_index = cursor_index - if not preset_values: - self.preset_values = None - return - - self.preset_values = preset_values + # the options of the table selection menu + # are already escaped so we have to escape + # the preset values as well for the comparison + if '|' in p: + p = p.replace('|', '\\|') + + idx = self._menu_options.index(p) + indexes.append(idx) + except (IndexError, ValueError): + log(f'Error finding index of {p}: {self._menu_options}', level=logging.DEBUG) + + if len(indexes) == 0: + indexes.append(0) + + return indexes[0] + if self._default_option: - if isinstance(preset_values,str) and self._default_option == preset_values: - self.preset_values = f"{preset_values} {self._default_str}" - elif isinstance(preset_values,(list,tuple)) and self._default_option in preset_values: - idx = preset_values.index(self._default_option) - self.preset_values[idx] = f"{preset_values[idx]} {self._default_str}" - if cursor_index is None or not self._multi: - from_preset_to_cursor() - if not self._multi: # Not supported by the infraestructure - self.preset_values = None + return self._menu_options.index(self._default_menu_value) + + return None diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index 4bae6d8b..15d0fd6b 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -3,12 +3,22 @@ import pathlib import urllib.error import urllib.request from typing import Union, Iterable, Dict, Any, List +from dataclasses import dataclass from .general import SysCommand from .output import log from .storage import storage -def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: + +@dataclass +class CustomMirror: + url: str + signcheck: str + signoptions: str + name: str + + +def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes: """ This function can sort /etc/pacman.d/mirrorlist according to the mirror's URL prefix. By default places HTTPS before HTTP but it also @@ -28,8 +38,9 @@ def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: from server url definitions (commented or uncommented). """ comments_and_whitespaces = b"" + sort_order += ['Unknown'] + categories: Dict[str, List] = {key: [] for key in sort_order} - categories = {key: [] for key in sort_order + ["Unknown"]} for line in raw_data.split(b"\n"): if line[0:2] in (b'##', b''): comments_and_whitespaces += line + b'\n' @@ -82,18 +93,18 @@ def filter_mirrors_by_region(regions :str, return new_list.decode('UTF-8') -def add_custom_mirrors(mirrors: List[str], *args :str, **kwargs :str) -> bool: +def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool: """ This will append custom mirror definitions in pacman.conf - :param mirrors: A list of mirror data according to: `{'url': 'http://url.com', 'signcheck': 'Optional', 'signoptions': 'TrustAll', 'name': 'testmirror'}` - :type mirrors: dict + :param mirrors: A list of custom mirrors + :type mirrors: List[CustomMirror] """ with open('/etc/pacman.conf', 'a') as pacman: for mirror in mirrors: - pacman.write(f"[{mirror['name']}]\n") - pacman.write(f"SigLevel = {mirror['signcheck']} {mirror['signoptions']}\n") - pacman.write(f"Server = {mirror['url']}\n") + pacman.write(f"[{mirror.name}]\n") + pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n") + pacman.write(f"Server = {mirror.url}\n") return True @@ -123,7 +134,7 @@ def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool: def use_mirrors( regions: Dict[str, Iterable[str]], destination: str = '/etc/pacman.d/mirrorlist' -) -> None: +): log(f'A new package mirror-list has been created: {destination}', level=logging.INFO) with open(destination, 'w') as mirrorlist: for region, mirrors in regions.items(): @@ -146,7 +157,7 @@ def re_rank_mirrors( def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: - regions = {} + regions: Dict[str, Dict[str, Any]] = {} if storage['arguments']['offline']: with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh: @@ -170,18 +181,19 @@ def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: if len(line.strip()) == 0: continue - line = line.decode('UTF-8').strip('\n').strip('\r') - if line[:3] == '## ': - region = line[3:] - elif line[:10] == '#Server = ': + clean_line = line.decode('UTF-8').strip('\n').strip('\r') + + if clean_line[:3] == '## ': + region = clean_line[3:] + elif clean_line[:10] == '#Server = ': regions.setdefault(region, {}) - url = line.lstrip('#Server = ') + url = clean_line.lstrip('#Server = ') regions[region][url] = True - elif line.startswith('Server = '): + elif clean_line.startswith('Server = '): regions.setdefault(region, {}) - url = line.lstrip('Server = ') + url = clean_line.lstrip('Server = ') regions[region][url] = True return regions diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py index b7ab690d..66230e24 100644 --- a/archinstall/lib/models/network_configuration.py +++ b/archinstall/lib/models/network_configuration.py @@ -1,8 +1,9 @@ from __future__ import annotations -from dataclasses import dataclass +import logging +from dataclasses import dataclass, field from enum import Enum -from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING +from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING, Tuple from ..output import log from ..storage import storage @@ -24,7 +25,7 @@ class NetworkConfiguration: ip: Optional[str] = None dhcp: bool = True gateway: Optional[str] = None - dns: Union[None, List[str]] = None + dns: List[str] = field(default_factory=list) def __str__(self): if self.is_iso(): @@ -53,6 +54,33 @@ class NetworkConfiguration: return data + def as_systemd_config(self) -> str: + match: List[Tuple[str, str]] = [] + network: List[Tuple[str, str]] = [] + + if self.iface: + match.append(('Name', self.iface)) + + if self.dhcp: + network.append(('DHCP', 'yes')) + else: + if self.ip: + network.append(('Address', self.ip)) + if self.gateway: + network.append(('Gateway', self.gateway)) + for dns in self.dns: + network.append(('DNS', dns)) + + config = {'Match': match, 'Network': network} + + config_str = '' + for top, entries in config.items(): + config_str += f'[{top}]\n' + config_str += '\n'.join([f'{k}={v}' for k, v in entries]) + config_str += '\n\n' + + return config_str + def json(self) -> Dict: # for json serialization when calling json.dumps(...) on this class return self.__dict__ @@ -90,41 +118,14 @@ class NetworkConfigurationHandler: # Perform a copy of the config if self._configuration.is_iso(): installation.copy_iso_network_config( - enable_services=True) # Sources the ISO network configuration to the install medium. + enable_services=True # Sources the ISO network configuration to the install medium. + ) elif self._configuration.is_network_manager(): installation.add_additional_packages(["networkmanager"]) if (profile := storage['arguments'].get('profile_config')) and profile.is_desktop_type_profile: installation.add_additional_packages(["network-manager-applet"]) installation.enable_service('NetworkManager.service') - def _backwards_compability_config(self, config: Union[str,Dict[str, str]]) -> Union[List[NetworkConfiguration], NetworkConfiguration, None]: - def get(config: Dict[str, str], key: str) -> List[str]: - if (value := config.get(key, None)) is not None: - return [value] - return [] - - if isinstance(config, str): # is a ISO network - return NetworkConfiguration(NicType.ISO) - elif config.get('NetworkManager'): # is a network manager configuration - return NetworkConfiguration(NicType.NM) - elif 'ip' in config: - return [NetworkConfiguration( - NicType.MANUAL, - iface=config.get('nic', ''), - ip=config.get('ip'), - gateway=config.get('gateway', ''), - dns=get(config, 'dns'), - dhcp=False - )] - elif 'nic' in config: - return [NetworkConfiguration( - NicType.MANUAL, - iface=config.get('nic', ''), - dhcp=True - )] - else: # not recognized - return None - def _parse_manual_config(self, configs: List[Dict[str, Any]]) -> Optional[List[NetworkConfiguration]]: configurations = [] @@ -145,13 +146,17 @@ class NetworkConfigurationHandler: log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red') exit(1) + dns = manual_config.get('dns', []) + if not isinstance(dns, list): + dns = [dns] + configurations.append( NetworkConfiguration( NicType.MANUAL, iface=iface, ip=ip, gateway=manual_config.get('gateway', ''), - dns=manual_config.get('dns', []), + dns=dns, dhcp=False ) ) @@ -176,8 +181,5 @@ class NetworkConfigurationHandler: self._configuration = NetworkConfiguration(type_) else: # manual configuration settings self._configuration = self._parse_manual_config([config]) - else: # old style definitions - network_config = self._backwards_compability_config(config) - if network_config: - return network_config - return None + else: + log(f'Unable to parse network configuration: {config}', level=logging.DEBUG) diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 0ff63610..b1ece04f 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -3,77 +3,86 @@ import importlib import logging import os import sys -import pathlib import urllib.parse import urllib.request from importlib import metadata +from pathlib import Path from typing import Optional, List -from types import ModuleType from .output import log from .storage import storage plugins = {} + # 1: List archinstall.plugin definitions # 2: Load the plugin entrypoint # 3: Initiate the plugin and store it as .name in plugins for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'): plugin_entrypoint = plugin_definition.load() + try: plugins[plugin_definition.name] = plugin_entrypoint() except Exception as err: - log(err, level=logging.ERROR) + log(f'Error: {err}', level=logging.ERROR) log(f"The above error was detected when loading the plugin: {plugin_definition}", fg="red", level=logging.ERROR) -# The following functions and core are support structures for load_plugin() -def localize_path(profile_path :str) -> str: - if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'): - converted_path = f"/tmp/{os.path.basename(profile_path).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py" +def localize_path(path: Path) -> Path: + """ + Support structures for load_plugin() + """ + url = urllib.parse.urlparse(str(path)) + + if url.scheme and url.scheme in ('https', 'http'): + converted_path = Path(f'/tmp/{path.stem}_{hashlib.md5(os.urandom(12)).hexdigest()}.py') with open(converted_path, "w") as temp_file: temp_file.write(urllib.request.urlopen(url.geturl()).read().decode('utf-8')) return converted_path else: - return profile_path + return path -def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType: +def import_via_path(path: Path, namespace: Optional[str] = None) -> Optional[str]: if not namespace: namespace = os.path.basename(path) if namespace == '__init__.py': - path = pathlib.PurePath(path) namespace = path.parent.name try: spec = importlib.util.spec_from_file_location(namespace, path) - imported = importlib.util.module_from_spec(spec) - sys.modules[namespace] = imported - spec.loader.exec_module(sys.modules[namespace]) + if spec and spec.loader: + imported = importlib.util.module_from_spec(spec) + sys.modules[namespace] = imported + spec.loader.exec_module(sys.modules[namespace]) return namespace except Exception as err: - log(err, level=logging.ERROR) + log(f'Error: {err}', level=logging.ERROR) log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR) try: - del(sys.modules[namespace]) # noqa: E275 - except: + del sys.modules[namespace] + except Exception: pass -def find_nth(haystack :List[str], needle :str, n :int) -> int: - start = haystack.find(needle) - while start >= 0 and n > 1: - start = haystack.find(needle, start + len(needle)) - n -= 1 - return start + return namespace + + +def find_nth(haystack: List[str], needle: str, n: int) -> Optional[int]: + indices = [idx for idx, elem in enumerate(haystack) if elem == needle] + if n <= len(indices): + return indices[n - 1] + return None + -def load_plugin(path :str) -> ModuleType: - parsed_url = urllib.parse.urlparse(path) - log(f"Loading plugin {parsed_url}.", fg="gray", level=logging.INFO) +def load_plugin(path: Path): + namespace: Optional[str] = None + parsed_url = urllib.parse.urlparse(str(path)) + log(f"Loading plugin from url {parsed_url}.", level=logging.INFO) # The Profile was not a direct match on a remote URL if not parsed_url.scheme: @@ -81,9 +90,10 @@ def load_plugin(path :str) -> ModuleType: if os.path.isfile(path): namespace = import_via_path(path) elif parsed_url.scheme in ('https', 'http'): - namespace = import_via_path(localize_path(path)) + localized = localize_path(path) + namespace = import_via_path(localized) - if namespace in sys.modules: + if namespace and namespace in sys.modules: # Version dependency via __archinstall__version__ variable (if present) in the plugin # Any errors in version inconsistency will be handled through normal error handling if not defined. if hasattr(sys.modules[namespace], '__archinstall__version__'): @@ -99,7 +109,7 @@ def load_plugin(path :str) -> ModuleType: plugins[namespace] = sys.modules[namespace].Plugin() log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO) except Exception as err: - log(err, level=logging.ERROR) + log(f'Error: {err}', level=logging.ERROR) log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR) else: log(f"Plugin '{path}' is missing a valid entry-point or is corrupt.", fg="yellow", level=logging.WARNING) diff --git a/archinstall/lib/profile/profiles_handler.py b/archinstall/lib/profile/profiles_handler.py index a8b5cc22..824849c3 100644 --- a/archinstall/lib/profile/profiles_handler.py +++ b/archinstall/lib/profile/profiles_handler.py @@ -194,23 +194,23 @@ class ProfileHandler: install_session.add_additional_packages(f"{kernel}-headers") # I've had kernel regen fail if it wasn't installed before nvidia-dkms - install_session.add_additional_packages("dkms xorg-server xorg-xinit nvidia-dkms") + install_session.add_additional_packages(['dkms', 'xorg-server', 'xorg-xinit', 'nvidia-dkms']) return elif 'amdgpu' in driver_pkgs: # The order of these two are important if amdgpu is installed #808 - if 'amdgpu' in install_session.MODULES: - install_session.MODULES.remove('amdgpu') - install_session.MODULES.append('amdgpu') + if 'amdgpu' in install_session.modules: + install_session.modules.remove('amdgpu') + install_session.modules.append('amdgpu') - if 'radeon' in install_session.MODULES: - install_session.MODULES.remove('radeon') - install_session.MODULES.append('radeon') + if 'radeon' in install_session.modules: + install_session.modules.remove('radeon') + install_session.modules.append('radeon') install_session.add_additional_packages(additional_pkg) except Exception as err: log(f"Could not handle nvidia and linuz-zen specific situations during xorg installation: {err}", level=logging.WARNING, fg="yellow") # Prep didn't run, so there's no driver to install - install_session.add_additional_packages("xorg-server xorg-xinit") + install_session.add_additional_packages(['xorg-server', 'xorg-xinit']) def install_profile_config(self, install_session: 'Installer', profile_config: ProfileConfiguration): profile = profile_config.profile diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py index 64ffcae4..6ccbc5f6 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/systemd.py @@ -1,6 +1,6 @@ import logging import time -from typing import Iterator +from typing import Iterator, Optional from .exceptions import SysCallError from .general import SysCommand, SysCommandWorker, locate_binary from .installer import Installer @@ -8,51 +8,11 @@ from .output import log from .storage import storage -class Ini: - def __init__(self, *args :str, **kwargs :str): - """ - Limited INI handler for now. - Supports multiple keywords through dictionary list items. - """ - self.kwargs = kwargs - - def __str__(self) -> str: - result = '' - first_row_done = False - for top_level in self.kwargs: - if first_row_done: - result += f"\n[{top_level}]\n" - else: - result += f"[{top_level}]\n" - first_row_done = True - - for key, val in self.kwargs[top_level].items(): - if type(val) == list: - for item in val: - result += f"{key}={item}\n" - else: - result += f"{key}={val}\n" - - return result - - -class Systemd(Ini): - """ - Placeholder class to do systemd specific setups. - """ - - -class Networkd(Systemd): - """ - Placeholder class to do systemd-network specific setups. - """ - - class Boot: def __init__(self, installation: Installer): self.instance = installation self.container_name = 'archinstall' - self.session = None + self.session: Optional[SysCommandWorker] = None self.ready = False def __enter__(self) -> 'Boot': @@ -63,17 +23,18 @@ class Boot: self.session = existing_session.session self.ready = existing_session.ready else: + # '-P' or --console=pipe could help us not having to do a bunch + # of os.write() calls, but instead use pipes (stdin, stdout and stderr) as usual. self.session = SysCommandWorker([ '/usr/bin/systemd-nspawn', - '-D', self.instance.target, + '-D', str(self.instance.target), '--timezone=off', '-b', '--no-pager', '--machine', self.container_name ]) - # '-P' or --console=pipe could help us not having to do a bunch of os.write() calls, but instead use pipes (stdin, stdout and stderr) as usual. - if not self.ready: + if not self.ready and self.session: while self.session.is_alive(): if b' login:' in self.session: self.ready = True @@ -91,25 +52,31 @@ class Boot: log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red") shutdown = None - shutdown_exit_code = -1 + shutdown_exit_code: Optional[int] = -1 try: shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now') except SysCallError as error: shutdown_exit_code = error.exit_code - while self.session.is_alive(): - time.sleep(0.25) + if self.session: + while self.session.is_alive(): + time.sleep(0.25) - if shutdown: + if shutdown and shutdown.exit_code: shutdown_exit_code = shutdown.exit_code - if self.session.exit_code == 0 or shutdown_exit_code == 0: + if self.session and (self.session.exit_code == 0 or shutdown_exit_code == 0): storage['active_boot'] = None else: - raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {self.session.exit_code}/{shutdown_exit_code}", exit_code=next(filter(bool, [self.session.exit_code, shutdown_exit_code]))) + session_exit_code = self.session.exit_code if self.session else -1 + + raise SysCallError( + f"Could not shut down temporary boot of {self.instance}: {session_exit_code}/{shutdown_exit_code}", + exit_code=next(filter(bool, [session_exit_code, shutdown_exit_code])) + ) - def __iter__(self) -> Iterator[str]: + def __iter__(self) -> Iterator[bytes]: if self.session: for value in self.session: yield value diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py index 7a6bb358..9722dc4d 100644 --- a/archinstall/lib/user_interaction/general_conf.py +++ b/archinstall/lib/user_interaction/general_conf.py @@ -3,7 +3,6 @@ from __future__ import annotations import logging import pathlib from typing import List, Any, Optional, Dict, TYPE_CHECKING -from typing import Union from ..locale_helpers import list_keyboard_languages, list_timezones from ..menu import MenuSelectionType, Menu, TextInput @@ -29,13 +28,18 @@ def ask_ntp(preset: bool = True) -> bool: return False if choice.value == Menu.no() else True -def ask_hostname(preset: str = None) -> str: +def ask_hostname(preset: str = '') -> str: while True: - hostname = TextInput(_('Desired hostname for the installation: '), preset).run().strip() + hostname = TextInput( + str(_('Desired hostname for the installation: ')), + preset + ).run().strip() + if hostname: return hostname -def ask_for_a_timezone(preset: str = None) -> str: + +def ask_for_a_timezone(preset: Optional[str] = None) -> Optional[str]: timezones = list_timezones() default = 'UTC' @@ -48,10 +52,12 @@ def ask_for_a_timezone(preset: str = None) -> str: match choice.type_: case MenuSelectionType.Skip: return preset - case MenuSelectionType.Selection: return choice.value + case MenuSelectionType.Selection: return choice.single_value + + return None -def ask_for_audio_selection(desktop: bool = True, preset: Union[str, None] = None) -> Union[str, None]: +def ask_for_audio_selection(desktop: bool = True, preset: Optional[str] = None) -> Optional[str]: no_audio = str(_('No audio server')) choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', no_audio] default = 'pipewire' if desktop else no_audio @@ -60,10 +66,12 @@ def ask_for_audio_selection(desktop: bool = True, preset: Union[str, None] = Non match choice.type_: case MenuSelectionType.Skip: return preset - case MenuSelectionType.Selection: return choice.value + case MenuSelectionType.Selection: return choice.single_value + return None -def select_language(preset_value: str = None) -> str: + +def select_language(preset: Optional[str] = None) -> Optional[str]: """ Asks the user to select a language Usually this is combined with :ref:`archinstall.list_keyboard_languages`. @@ -75,17 +83,18 @@ def select_language(preset_value: str = None) -> str: # sort alphabetically and then by length sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) - selected_lang = Menu( + choice = Menu( _('Select keyboard layout'), sorted_kb_lang, - preset_values=preset_value, + preset_values=preset, sort=False ).run() - if selected_lang.value is None: - return preset_value + match choice.type_: + case MenuSelectionType.Skip: return preset + case MenuSelectionType.Selection: return choice.single_value - return selected_lang.value + return None def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: @@ -100,8 +109,10 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: preselected = None else: preselected = list(preset_values.keys()) + mirrors = list_mirrors() - selected_mirror = Menu( + + choice = Menu( _('Select one of the regions to download packages from'), list(mirrors.keys()), preset_values=preselected, @@ -109,13 +120,18 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: allow_reset=True ).run() - match selected_mirror.type_: - case MenuSelectionType.Reset: return {} - case MenuSelectionType.Skip: return preset_values - case _: return {selected: mirrors[selected] for selected in selected_mirror.value} + match choice.type_: + case MenuSelectionType.Reset: + return {} + case MenuSelectionType.Skip: + return preset_values + case MenuSelectionType.Selection: + return {selected: mirrors[selected] for selected in choice.multi_value} + + return {} -def select_archinstall_language(languages: List[Language], preset_value: Language) -> Language: +def select_archinstall_language(languages: List[Language], preset: Language) -> Language: # these are the displayed language names which can either be # the english name of a language or, if present, the # name of the language in its own language @@ -128,15 +144,15 @@ def select_archinstall_language(languages: List[Language], preset_value: Languag choice = Menu( title, list(options.keys()), - default_option=preset_value.display_name, + default_option=preset.display_name, preview_size=0.5 ).run() match choice.type_: - case MenuSelectionType.Skip: - return preset_value - case MenuSelectionType.Selection: - return options[choice.value] + case MenuSelectionType.Skip: return preset + case MenuSelectionType.Selection: return options[choice.single_value] + + raise ValueError('Language selection not handled') def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]: @@ -223,4 +239,6 @@ def select_additional_repositories(preset: List[str]) -> List[str]: match choice.type_: case MenuSelectionType.Skip: return preset case MenuSelectionType.Reset: return [] - case MenuSelectionType.Selection: return choice.value + case MenuSelectionType.Selection: return choice.single_value + + return [] diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/user_interaction/locale_conf.py index 88aec64e..cdc3423a 100644 --- a/archinstall/lib/user_interaction/locale_conf.py +++ b/archinstall/lib/user_interaction/locale_conf.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, TYPE_CHECKING +from typing import Any, TYPE_CHECKING, Optional from ..locale_helpers import list_locales from ..menu import Menu, MenuSelectionType @@ -9,33 +9,37 @@ if TYPE_CHECKING: _: Any -def select_locale_lang(preset: str = None) -> str: +def select_locale_lang(preset: Optional[str] = None) -> Optional[str]: locales = list_locales() locale_lang = set([locale.split()[0] for locale in locales]) - selected_locale = Menu( + choice = Menu( _('Choose which locale language to use'), list(locale_lang), sort=True, preset_values=preset ).run() - match selected_locale.type_: - case MenuSelectionType.Selection: return selected_locale.value + match choice.type_: + case MenuSelectionType.Selection: return choice.single_value case MenuSelectionType.Skip: return preset + return None -def select_locale_enc(preset: str = None) -> str: + +def select_locale_enc(preset: Optional[str] = None) -> Optional[str]: locales = list_locales() locale_enc = set([locale.split()[1] for locale in locales]) - selected_locale = Menu( + choice = Menu( _('Choose which locale encoding to use'), list(locale_enc), sort=True, preset_values=preset ).run() - match selected_locale.type_: - case MenuSelectionType.Selection: return selected_locale.value + match choice.type_: + case MenuSelectionType.Selection: return choice.single_value case MenuSelectionType.Skip: return preset + + return None diff --git a/archinstall/scripts/guided.py b/archinstall/scripts/guided.py index d9c5837c..48d141b7 100644 --- a/archinstall/scripts/guided.py +++ b/archinstall/scripts/guided.py @@ -223,7 +223,7 @@ def perform_installation(mountpoint: Path): # If the user provided a list of services to be enabled, pass the list to the enable_service function. # Note that while it's called enable_service, it can actually take a list of services and iterate it. if archinstall.arguments.get('services', None): - installation.enable_service(*archinstall.arguments['services']) + installation.enable_service(archinstall.arguments.get('services', [])) # If the user provided custom commands to be run post-installation, execute them now. if archinstall.arguments.get('custom-commands', None): diff --git a/archinstall/scripts/swiss.py b/archinstall/scripts/swiss.py index e2ee6fcb..34e4c022 100644 --- a/archinstall/scripts/swiss.py +++ b/archinstall/scripts/swiss.py @@ -239,7 +239,7 @@ def perform_installation(mountpoint: Path, exec_mode: ExecutionMode): handler.config_installer(installation) if archinstall.arguments.get('packages', None) and archinstall.arguments.get('packages', None)[0] != '': - installation.add_additional_packages(archinstall.arguments.get('packages', None)) + installation.add_additional_packages(archinstall.arguments.get('packages', [])) if users := archinstall.arguments.get('!users', None): installation.create_users(users) @@ -278,7 +278,7 @@ def perform_installation(mountpoint: Path, exec_mode: ExecutionMode): # If the user provided a list of services to be enabled, pass the list to the enable_service function. # Note that while it's called enable_service, it can actually take a list of services and iterate it. if archinstall.arguments.get('services', None): - installation.enable_service(*archinstall.arguments['services']) + installation.enable_service(archinstall.arguments.get('services', [])) # If the user provided custom commands to be run post-installation, execute them now. if archinstall.arguments.get('custom-commands', None): -- cgit v1.2.3-54-g00ecf