From ec4ecbcb7a839ab06b739f01ce42bfd18376c620 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Thu, 4 May 2023 00:36:46 +1000 Subject: Full mypy compliance and small fixes (#1777) * Fix mypy compliance --------- Co-authored-by: Daniel Girtler --- archinstall/lib/installer.py | 239 +++++++++++++++++++++---------------------- 1 file changed, 116 insertions(+), 123 deletions(-) (limited to 'archinstall/lib/installer.py') diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index ddbcc2f2..b6eaa797 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -7,7 +7,7 @@ import shutil import subprocess import time from pathlib import Path -from typing import Any, Iterator, List, Mapping, Optional, TYPE_CHECKING, Union, Dict +from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, Iterable from . import disk from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError @@ -36,32 +36,6 @@ __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "l __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] -class InstallationFile: - def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): - self.installation = installation - self.filename = filename - self.owner = owner - self.mode = mode - self.fh = None - - def __enter__(self) -> 'InstallationFile': - self.fh = open(self.filename, self.mode) - return self - - def __exit__(self, *args :str) -> None: - self.fh.close() - self.installation.chown(self.owner, self.filename) - - def write(self, data: Union[str, bytes]) -> int: - return self.fh.write(data) - - def read(self, *args) -> Union[str, bytes]: - return self.fh.read(*args) - -# def poll(self, *args) -> bool: -# return self.fh.poll(*args) - - def accessibility_tools_in_use() -> bool: return os.system('systemctl is-active --quiet espeakup.service') == 0 @@ -106,15 +80,17 @@ class Installer: self.kernels = kernels self._disk_config = disk_config - self._disk_encryption = disk_encryption - if self._disk_encryption is None: + if disk_encryption is None: self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption) + else: + self._disk_encryption = disk_encryption + + self.target: Path = target - self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) - self.helper_flags = {'base': False, 'bootloader': False} + self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} self.base_packages = base_packages for kernel in self.kernels: @@ -124,31 +100,33 @@ class Installer: if accessibility_tools_in_use(): self.base_packages.extend(__accessibility_packages__) - self.post_base_install = [] + self.post_base_install: List[Callable] = [] # TODO: Figure out which one of these two we'll use.. But currently we're mixing them.. storage['session'] = self storage['installation_session'] = self - self.MODULES = [] - self.BINARIES = [] - self.FILES = [] + self.modules: List[str] = [] + self._binaries: List[str] = [] + self._files: List[str] = [] + # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. - self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"] - self.KERNEL_PARAMS = [] - self.FSTAB_ENTRIES = [] + self._hooks: List[str] = [ + "base", "systemd", "autodetect", "keyboard", + "sd-vconsole", "modconf", "block", "filesystems", "fsck" + ] + self._kernel_params: List[str] = [] + self._fstab_entries: List[str] = [] self._zram_enabled = False - def __enter__(self, *args: str, **kwargs: str) -> 'Installer': + def __enter__(self) -> 'Installer': return self - def __exit__(self, *args :str, **kwargs :str) -> bool: - # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager - - if len(args) >= 2 and args[1]: - self.log(args[1], level=logging.ERROR, fg='red') + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + log(exc_val, fg='red', level=logging.ERROR) self.sync_log_to_install_medium() @@ -156,7 +134,7 @@ class Installer: # and then reboot, and a identical log file will be found in the ISO medium anyway. print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE']))) print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")) - raise args[1] + raise exc_val if not (missing_steps := self.post_install_check()): self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) @@ -164,6 +142,7 @@ class Installer: return True else: self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING) + for step in missing_steps: self.log(f' - {step}', fg='red', level=logging.WARNING) @@ -247,31 +226,32 @@ class Installer: luks_handlers = {} for part_mod in partitions: - luks_handler = disk.device_handler.unlock_luks2_dev( - part_mod.dev_path, - part_mod.mapper_name, - self._disk_encryption.encryption_password - ) - luks_handlers[part_mod] = luks_handler + if part_mod.mapper_name and part_mod.dev_path: + luks_handler = disk.device_handler.unlock_luks2_dev( + part_mod.dev_path, + part_mod.mapper_name, + self._disk_encryption.encryption_password + ) + luks_handlers[part_mod] = luks_handler return luks_handlers def _mount_partition(self, part_mod: disk.PartitionModification): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined - if part_mod.mountpoint is not None: + if part_mod.mountpoint and part_mod.dev_path: target = self.target / part_mod.relative_mountpoint disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options) - if part_mod.fs_type == disk.FilesystemType.Btrfs: + if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path: self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols) def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined - if part_mod.mountpoint is not None: + if part_mod.mountpoint and luks_handler.mapper_dev: target = self.target / part_mod.relative_mountpoint disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) - if part_mod.fs_type == disk.FilesystemType.Btrfs: + if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols) def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]): @@ -346,15 +326,15 @@ class Installer: SysCommand(f'chmod 0600 {self.target}{file}') SysCommand(f'mkswap {self.target}{file}') - self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0') + self._fstab_entries.append(f'{file} none swap defaults 0 0') if enable_resume: resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip() resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip() - self.HOOKS.append('resume') - self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}') - self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}') + self._hooks.append('resume') + self._kernel_params.append(f'resume=UUID={resume_uuid}') + self._kernel_params.append(f'resume_offset={resume_offset}') def post_install_check(self, *args :str, **kwargs :str) -> List[str]: return [step for step, flag in self.helper_flags.items() if flag is False] @@ -411,7 +391,7 @@ class Installer: else: pacman_conf.write(line) - def pacstrap(self, *packages: Union[str, List[str]], **kwargs :str) -> bool: + def _pacstrap(self, packages: Union[str, List[str]]) -> bool: if type(packages[0]) in (list, tuple): packages = packages[0] @@ -430,9 +410,9 @@ class Installer: if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self.pacstrap(*packages, **kwargs) + return self._pacstrap(packages) - raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red") + raise RequirementError(f'Could not sync mirrors: {error}') try: SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True) @@ -442,40 +422,44 @@ class Installer: if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): - return self.pacstrap(*packages, **kwargs) + return self._pacstrap(packages) raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") - def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: + def set_mirrors(self, mirrors: Dict[str, Iterable[str]]): for plugin in plugins.values(): if hasattr(plugin, 'on_mirrors'): if result := plugin.on_mirrors(mirrors): mirrors = result - return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') + destination = f'{self.target}/etc/pacman.d/mirrorlist' + use_mirrors(mirrors, destination=destination) def genfstab(self, flags :str = '-pU'): self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) try: - fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}') + gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode() except SysCallError as error: raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}') - with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - fstab_fh.write(fstab.decode()) + if not gen_fstab: + raise RequirementError(f'Genrating fstab returned empty value') + + with open(f"{self.target}/etc/fstab", 'a') as fp: + fp.write(gen_fstab) if not os.path.isfile(f'{self.target}/etc/fstab'): - raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}') + raise RequirementError(f'Could not create fstab file') for plugin in plugins.values(): if hasattr(plugin, 'on_genfstab'): if plugin.on_genfstab(self) is True: break - with open(f"{self.target}/etc/fstab", 'a') as fstab_fh: - for entry in self.FSTAB_ENTRIES: - fstab_fh.write(f'{entry}\n') + with open(f"{self.target}/etc/fstab", 'a') as fp: + for entry in self._fstab_entries: + fp.write(f'{entry}\n') for mod in self._disk_config.device_modifications: for part_mod in mod.partitions: @@ -583,7 +567,7 @@ class Installer: # fstrim is owned by util-linux, a dependency of both base and systemd. self.enable_service("fstrim.timer") - def enable_service(self, *services: Union[str, List[str]]) -> None: + def enable_service(self, services: Union[str, List[str]]) -> None: if type(services[0]) in (list, tuple): services = services[0] @@ -611,19 +595,7 @@ class Installer: subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True) def configure_nic(self, network_config: NetworkConfiguration) -> None: - from .systemd import Networkd - - if network_config.dhcp: - conf = Networkd(Match={"Name": network_config.iface}, Network={"DHCP": "yes"}) - else: - network = {"Address": network_config.ip} - if network_config.gateway: - network["Gateway"] = network_config.gateway - if network_config.dns: - dns = network_config.dns - network["DNS"] = dns if isinstance(dns, list) else [dns] - - conf = Networkd(Match={"Name": network_config.iface}, Network=network) + conf = network_config.as_systemd_config() for plugin in plugins.values(): if hasattr(plugin, 'on_configure_nic'): @@ -663,7 +635,7 @@ class Installer: # Otherwise, we can go ahead and add the required package # and enable it's service: else: - self.pacstrap('iwd') + self._pacstrap('iwd') self.enable_service('iwd') for psk in psk_files: @@ -682,12 +654,12 @@ class Installer: if self.helper_flags.get('base', False) is False: def post_install_enable_networkd_resolved(*args :str, **kwargs :str): - self.enable_service('systemd-networkd', 'systemd-resolved') + self.enable_service(['systemd-networkd', 'systemd-resolved']) self.post_base_install.append(post_install_enable_networkd_resolved) # Otherwise, we can go ahead and enable the services else: - self.enable_service('systemd-networkd', 'systemd-resolved') + self.enable_service(['systemd-networkd', 'systemd-resolved']) return True @@ -704,9 +676,9 @@ class Installer: fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit: - mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n") - mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") - mkinit.write(f"FILES=({' '.join(self.FILES)})\n") + mkinit.write(f"MODULES=({' '.join(self.modules)})\n") + mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n") + mkinit.write(f"FILES=({' '.join(self._files)})\n") if not self._disk_encryption.hsm_device: # For now, if we don't use HSM we revert to the old @@ -714,9 +686,9 @@ class Installer: # This is purely for stability reasons, we're going away from this. # * systemd -> udev # * sd-vconsole -> keymap - self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS] + self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks] - mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") + mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n") try: SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}') @@ -736,25 +708,25 @@ class Installer: if (pkg := part.fs_type.installation_pkg) is not None: self.base_packages.append(pkg) if (module := part.fs_type.installation_module) is not None: - self.MODULES.append(module) + self.modules.append(module) if (binary := part.fs_type.installation_binary) is not None: - self.BINARIES.append(binary) + self._binaries.append(binary) # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: - if 'fsck' in self.HOOKS: - self.HOOKS.remove('fsck') + if 'fsck' in self._hooks: + self._hooks.remove('fsck') if part in self._disk_encryption.partitions: if self._disk_encryption.hsm_device: # Required bby mkinitcpio to add support for fido2-device options - self.pacstrap('libfido2') + self._pacstrap('libfido2') - if 'sd-encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') + if 'sd-encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') else: - if 'encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') + if 'encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') if not has_uefi(): self.base_packages.append('grub') @@ -786,7 +758,7 @@ class Installer: else: self.log("The testing flag is not set. This system will be installed without testing repositories enabled.") - self.pacstrap(self.base_packages) + self._pacstrap(self.base_packages) self.helper_flags['base-strapped'] = True # This handles making sure that the repositories we enabled persist on the installed system @@ -826,7 +798,7 @@ class Installer: def setup_swap(self, kind :str = 'zram'): if kind == 'zram': self.log(f"Setting up swap on zram") - self.pacstrap('zram-generator') + self._pacstrap('zram-generator') # We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813 # zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example' @@ -853,7 +825,7 @@ class Installer: return None def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): - self.pacstrap('efibootmgr') + self._pacstrap('efibootmgr') if not has_uefi(): raise HardwareIncompatibilityError @@ -919,7 +891,7 @@ class Installer: # blkid doesn't trigger on loopback devices really well, # so we'll use the old manual method until we get that sorted out. - options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self.KERNEL_PARAMS)}\n' + options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self._kernel_params)}\n' for sub_vol in root_partition.btrfs_subvols: if sub_vol.is_root(): @@ -958,7 +930,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self.pacstrap('grub') # no need? + self._pacstrap('grub') # no need? _file = "/etc/default/grub" @@ -977,7 +949,7 @@ class Installer: log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO) if has_uefi(): - self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) @@ -987,8 +959,20 @@ class Installer: except SysCallError as error: raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") else: + device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path) + + if not device: + raise ValueError(f'Can not find block device: {boot_partition.safe_dev_path}') + try: - SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True) + cmd = f'/usr/bin/arch-chroot' \ + f' {self.target}' \ + f' grub-install' \ + f' --debug' \ + f' --target=i386-pc' \ + f' --recheck {device.device_info.path}' + + SysCommand(cmd, peek_output=True) except SysCallError as error: raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}") @@ -1004,7 +988,7 @@ class Installer: boot_partition: disk.PartitionModification, root_partition: disk.PartitionModification ): - self.pacstrap('efibootmgr') + self._pacstrap('efibootmgr') if not has_uefi(): raise HardwareIncompatibilityError @@ -1038,17 +1022,30 @@ class Installer: # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) - kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') + kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}') else: log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) - kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') + kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}') + + device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path) - device = disk.device_handler.get_device_by_partition_path(boot_partition.dev_path) - SysCommand(f'efibootmgr --disk {device.path} --part {device.path} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') + if not device: + raise ValueError(f'Unable to find block device: {boot_partition.safe_dev_path}') + + cmd = f'efibootmgr ' \ + f'--disk {device.device_info.path} ' \ + f'--part {boot_partition.safe_dev_path} ' \ + f'--create ' \ + f'--label "{label}" ' \ + f'--loader {loader} ' \ + f'--unicode \'{" ".join(kernel_parameters)}\' ' \ + f'--verbose' + + SysCommand(cmd) self.helper_flags['bootloader'] = "efistub" - def add_bootloader(self, bootloader: Bootloader) -> bool: + def add_bootloader(self, bootloader: Bootloader): """ Adds a bootloader to the installation instance. Archinstall supports one of three types: @@ -1056,8 +1053,7 @@ class Installer: * grub * efistub (beta) - :param bootloader: Can be one of the three strings - 'systemd-bootctl', 'grub' or 'efistub' (beta) + :param bootloader: Type of bootloader to be added """ for plugin in plugins.values(): @@ -1089,8 +1085,8 @@ class Installer: case Bootloader.Efistub: self._add_efistub_bootloader(boot_partition, root_partition) - def add_additional_packages(self, *packages: Union[str, List[str]]) -> bool: - return self.pacstrap(*packages) + def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: + return self._pacstrap(packages) def _enable_users(self, service: str, users: List[User]): for user in users: @@ -1201,9 +1197,6 @@ class Installer: except SysCallError: return False - def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: - return InstallationFile(self, filename, owner) - def set_keyboard_language(self, language: str) -> bool: log(f"Setting keyboard language to {language}", level=logging.INFO) if len(language.strip()): -- cgit v1.2.3-54-g00ecf