Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/installer.py
diff options
context:
space:
mode:
authorDaniel Girtler <blackrabbit256@gmail.com>2023-05-04 00:36:46 +1000
committerGitHub <noreply@github.com>2023-05-03 16:36:46 +0200
commitec4ecbcb7a839ab06b739f01ce42bfd18376c620 (patch)
treeb617783f2d7cb4d4bf32690590859e68666bf3d4 /archinstall/lib/installer.py
parente78ddb03e1bbc46e59fd6a9889699b12808d0fec (diff)
Full mypy compliance and small fixes (#1777)
* Fix mypy compliance --------- Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
Diffstat (limited to 'archinstall/lib/installer.py')
-rw-r--r--archinstall/lib/installer.py239
1 files changed, 116 insertions, 123 deletions
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index ddbcc2f2..b6eaa797 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -7,7 +7,7 @@ import shutil
import subprocess
import time
from pathlib import Path
-from typing import Any, Iterator, List, Mapping, Optional, TYPE_CHECKING, Union, Dict
+from typing import Any, List, Optional, TYPE_CHECKING, Union, Dict, Callable, Iterable
from . import disk
from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
@@ -36,32 +36,6 @@ __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "l
__accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"]
-class InstallationFile:
- def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"):
- self.installation = installation
- self.filename = filename
- self.owner = owner
- self.mode = mode
- self.fh = None
-
- def __enter__(self) -> 'InstallationFile':
- self.fh = open(self.filename, self.mode)
- return self
-
- def __exit__(self, *args :str) -> None:
- self.fh.close()
- self.installation.chown(self.owner, self.filename)
-
- def write(self, data: Union[str, bytes]) -> int:
- return self.fh.write(data)
-
- def read(self, *args) -> Union[str, bytes]:
- return self.fh.read(*args)
-
-# def poll(self, *args) -> bool:
-# return self.fh.poll(*args)
-
-
def accessibility_tools_in_use() -> bool:
return os.system('systemctl is-active --quiet espeakup.service') == 0
@@ -106,15 +80,17 @@ class Installer:
self.kernels = kernels
self._disk_config = disk_config
- self._disk_encryption = disk_encryption
- if self._disk_encryption is None:
+ if disk_encryption is None:
self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption)
+ else:
+ self._disk_encryption = disk_encryption
+
+ self.target: Path = target
- self.target = target
self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S')
self.milliseconds = int(str(time.time()).split('.')[1])
- self.helper_flags = {'base': False, 'bootloader': False}
+ self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None}
self.base_packages = base_packages
for kernel in self.kernels:
@@ -124,31 +100,33 @@ class Installer:
if accessibility_tools_in_use():
self.base_packages.extend(__accessibility_packages__)
- self.post_base_install = []
+ self.post_base_install: List[Callable] = []
# TODO: Figure out which one of these two we'll use.. But currently we're mixing them..
storage['session'] = self
storage['installation_session'] = self
- self.MODULES = []
- self.BINARIES = []
- self.FILES = []
+ self.modules: List[str] = []
+ self._binaries: List[str] = []
+ self._files: List[str] = []
+
# systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt
# if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override.
- self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"]
- self.KERNEL_PARAMS = []
- self.FSTAB_ENTRIES = []
+ self._hooks: List[str] = [
+ "base", "systemd", "autodetect", "keyboard",
+ "sd-vconsole", "modconf", "block", "filesystems", "fsck"
+ ]
+ self._kernel_params: List[str] = []
+ self._fstab_entries: List[str] = []
self._zram_enabled = False
- def __enter__(self, *args: str, **kwargs: str) -> 'Installer':
+ def __enter__(self) -> 'Installer':
return self
- def __exit__(self, *args :str, **kwargs :str) -> bool:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
-
- if len(args) >= 2 and args[1]:
- self.log(args[1], level=logging.ERROR, fg='red')
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is not None:
+ log(exc_val, fg='red', level=logging.ERROR)
self.sync_log_to_install_medium()
@@ -156,7 +134,7 @@ class Installer:
# and then reboot, and a identical log file will be found in the ISO medium anyway.
print(_("[!] A log file has been created here: {}").format(os.path.join(storage['LOG_PATH'], storage['LOG_FILE'])))
print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues"))
- raise args[1]
+ raise exc_val
if not (missing_steps := self.post_install_check()):
self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO)
@@ -164,6 +142,7 @@ class Installer:
return True
else:
self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING)
+
for step in missing_steps:
self.log(f' - {step}', fg='red', level=logging.WARNING)
@@ -247,31 +226,32 @@ class Installer:
luks_handlers = {}
for part_mod in partitions:
- luks_handler = disk.device_handler.unlock_luks2_dev(
- part_mod.dev_path,
- part_mod.mapper_name,
- self._disk_encryption.encryption_password
- )
- luks_handlers[part_mod] = luks_handler
+ if part_mod.mapper_name and part_mod.dev_path:
+ luks_handler = disk.device_handler.unlock_luks2_dev(
+ part_mod.dev_path,
+ part_mod.mapper_name,
+ self._disk_encryption.encryption_password
+ )
+ luks_handlers[part_mod] = luks_handler
return luks_handlers
def _mount_partition(self, part_mod: disk.PartitionModification):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
- if part_mod.mountpoint is not None:
+ if part_mod.mountpoint and part_mod.dev_path:
target = self.target / part_mod.relative_mountpoint
disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options)
- if part_mod.fs_type == disk.FilesystemType.Btrfs:
+ if part_mod.fs_type == disk.FilesystemType.Btrfs and part_mod.dev_path:
self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols)
def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2):
# it would be none if it's btrfs as the subvolumes will have the mountpoints defined
- if part_mod.mountpoint is not None:
+ if part_mod.mountpoint and luks_handler.mapper_dev:
target = self.target / part_mod.relative_mountpoint
disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options)
- if part_mod.fs_type == disk.FilesystemType.Btrfs:
+ if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev:
self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols)
def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]):
@@ -346,15 +326,15 @@ class Installer:
SysCommand(f'chmod 0600 {self.target}{file}')
SysCommand(f'mkswap {self.target}{file}')
- self.FSTAB_ENTRIES.append(f'{file} none swap defaults 0 0')
+ self._fstab_entries.append(f'{file} none swap defaults 0 0')
if enable_resume:
resume_uuid = SysCommand(f'findmnt -no UUID -T {self.target}{file}').decode('UTF-8').strip()
resume_offset = SysCommand(f'/usr/bin/filefrag -v {self.target}{file}').decode('UTF-8').split('0:', 1)[1].split(":", 1)[1].split("..", 1)[0].strip()
- self.HOOKS.append('resume')
- self.KERNEL_PARAMS.append(f'resume=UUID={resume_uuid}')
- self.KERNEL_PARAMS.append(f'resume_offset={resume_offset}')
+ self._hooks.append('resume')
+ self._kernel_params.append(f'resume=UUID={resume_uuid}')
+ self._kernel_params.append(f'resume_offset={resume_offset}')
def post_install_check(self, *args :str, **kwargs :str) -> List[str]:
return [step for step, flag in self.helper_flags.items() if flag is False]
@@ -411,7 +391,7 @@ class Installer:
else:
pacman_conf.write(line)
- def pacstrap(self, *packages: Union[str, List[str]], **kwargs :str) -> bool:
+ def _pacstrap(self, packages: Union[str, List[str]]) -> bool:
if type(packages[0]) in (list, tuple):
packages = packages[0]
@@ -430,9 +410,9 @@ class Installer:
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
- return self.pacstrap(*packages, **kwargs)
+ return self._pacstrap(packages)
- raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red")
+ raise RequirementError(f'Could not sync mirrors: {error}')
try:
SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf -K {self.target} {" ".join(packages)} --noconfirm', peek_output=True)
@@ -442,40 +422,44 @@ class Installer:
if storage['arguments'].get('silent', False) is False:
if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'):
- return self.pacstrap(*packages, **kwargs)
+ return self._pacstrap(packages)
raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.")
- def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None:
+ def set_mirrors(self, mirrors: Dict[str, Iterable[str]]):
for plugin in plugins.values():
if hasattr(plugin, 'on_mirrors'):
if result := plugin.on_mirrors(mirrors):
mirrors = result
- return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist')
+ destination = f'{self.target}/etc/pacman.d/mirrorlist'
+ use_mirrors(mirrors, destination=destination)
def genfstab(self, flags :str = '-pU'):
self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO)
try:
- fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}')
+ gen_fstab = SysCommand(f'/usr/bin/genfstab {flags} {self.target}').decode()
except SysCallError as error:
raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {error}')
- with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
- fstab_fh.write(fstab.decode())
+ if not gen_fstab:
+ raise RequirementError(f'Genrating fstab returned empty value')
+
+ with open(f"{self.target}/etc/fstab", 'a') as fp:
+ fp.write(gen_fstab)
if not os.path.isfile(f'{self.target}/etc/fstab'):
- raise RequirementError(f'Could not generate fstab, strapping in packages most likely failed (disk out of space?)\n Error: {fstab}')
+ raise RequirementError(f'Could not create fstab file')
for plugin in plugins.values():
if hasattr(plugin, 'on_genfstab'):
if plugin.on_genfstab(self) is True:
break
- with open(f"{self.target}/etc/fstab", 'a') as fstab_fh:
- for entry in self.FSTAB_ENTRIES:
- fstab_fh.write(f'{entry}\n')
+ with open(f"{self.target}/etc/fstab", 'a') as fp:
+ for entry in self._fstab_entries:
+ fp.write(f'{entry}\n')
for mod in self._disk_config.device_modifications:
for part_mod in mod.partitions:
@@ -583,7 +567,7 @@ class Installer:
# fstrim is owned by util-linux, a dependency of both base and systemd.
self.enable_service("fstrim.timer")
- def enable_service(self, *services: Union[str, List[str]]) -> None:
+ def enable_service(self, services: Union[str, List[str]]) -> None:
if type(services[0]) in (list, tuple):
services = services[0]
@@ -611,19 +595,7 @@ class Installer:
subprocess.check_call(f"/usr/bin/arch-chroot {self.target}", shell=True)
def configure_nic(self, network_config: NetworkConfiguration) -> None:
- from .systemd import Networkd
-
- if network_config.dhcp:
- conf = Networkd(Match={"Name": network_config.iface}, Network={"DHCP": "yes"})
- else:
- network = {"Address": network_config.ip}
- if network_config.gateway:
- network["Gateway"] = network_config.gateway
- if network_config.dns:
- dns = network_config.dns
- network["DNS"] = dns if isinstance(dns, list) else [dns]
-
- conf = Networkd(Match={"Name": network_config.iface}, Network=network)
+ conf = network_config.as_systemd_config()
for plugin in plugins.values():
if hasattr(plugin, 'on_configure_nic'):
@@ -663,7 +635,7 @@ class Installer:
# Otherwise, we can go ahead and add the required package
# and enable it's service:
else:
- self.pacstrap('iwd')
+ self._pacstrap('iwd')
self.enable_service('iwd')
for psk in psk_files:
@@ -682,12 +654,12 @@ class Installer:
if self.helper_flags.get('base', False) is False:
def post_install_enable_networkd_resolved(*args :str, **kwargs :str):
- self.enable_service('systemd-networkd', 'systemd-resolved')
+ self.enable_service(['systemd-networkd', 'systemd-resolved'])
self.post_base_install.append(post_install_enable_networkd_resolved)
# Otherwise, we can go ahead and enable the services
else:
- self.enable_service('systemd-networkd', 'systemd-resolved')
+ self.enable_service(['systemd-networkd', 'systemd-resolved'])
return True
@@ -704,9 +676,9 @@ class Installer:
fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n")
with open(f'{self.target}/etc/mkinitcpio.conf', 'w') as mkinit:
- mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n")
- mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n")
- mkinit.write(f"FILES=({' '.join(self.FILES)})\n")
+ mkinit.write(f"MODULES=({' '.join(self.modules)})\n")
+ mkinit.write(f"BINARIES=({' '.join(self._binaries)})\n")
+ mkinit.write(f"FILES=({' '.join(self._files)})\n")
if not self._disk_encryption.hsm_device:
# For now, if we don't use HSM we revert to the old
@@ -714,9 +686,9 @@ class Installer:
# This is purely for stability reasons, we're going away from this.
# * systemd -> udev
# * sd-vconsole -> keymap
- self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS]
+ self._hooks = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self._hooks]
- mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n")
+ mkinit.write(f"HOOKS=({' '.join(self._hooks)})\n")
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}')
@@ -736,25 +708,25 @@ class Installer:
if (pkg := part.fs_type.installation_pkg) is not None:
self.base_packages.append(pkg)
if (module := part.fs_type.installation_module) is not None:
- self.MODULES.append(module)
+ self.modules.append(module)
if (binary := part.fs_type.installation_binary) is not None:
- self.BINARIES.append(binary)
+ self._binaries.append(binary)
# There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed.
if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target:
- if 'fsck' in self.HOOKS:
- self.HOOKS.remove('fsck')
+ if 'fsck' in self._hooks:
+ self._hooks.remove('fsck')
if part in self._disk_encryption.partitions:
if self._disk_encryption.hsm_device:
# Required bby mkinitcpio to add support for fido2-device options
- self.pacstrap('libfido2')
+ self._pacstrap('libfido2')
- if 'sd-encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt')
+ if 'sd-encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt')
else:
- if 'encrypt' not in self.HOOKS:
- self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt')
+ if 'encrypt' not in self._hooks:
+ self._hooks.insert(self._hooks.index('filesystems'), 'encrypt')
if not has_uefi():
self.base_packages.append('grub')
@@ -786,7 +758,7 @@ class Installer:
else:
self.log("The testing flag is not set. This system will be installed without testing repositories enabled.")
- self.pacstrap(self.base_packages)
+ self._pacstrap(self.base_packages)
self.helper_flags['base-strapped'] = True
# This handles making sure that the repositories we enabled persist on the installed system
@@ -826,7 +798,7 @@ class Installer:
def setup_swap(self, kind :str = 'zram'):
if kind == 'zram':
self.log(f"Setting up swap on zram")
- self.pacstrap('zram-generator')
+ self._pacstrap('zram-generator')
# We could use the default example below, but maybe not the best idea: https://github.com/archlinux/archinstall/pull/678#issuecomment-962124813
# zram_example_location = '/usr/share/doc/zram-generator/zram-generator.conf.example'
@@ -853,7 +825,7 @@ class Installer:
return None
def _add_systemd_bootloader(self, root_partition: disk.PartitionModification):
- self.pacstrap('efibootmgr')
+ self._pacstrap('efibootmgr')
if not has_uefi():
raise HardwareIncompatibilityError
@@ -919,7 +891,7 @@ class Installer:
# blkid doesn't trigger on loopback devices really well,
# so we'll use the old manual method until we get that sorted out.
- options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self.KERNEL_PARAMS)}\n'
+ options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self._kernel_params)}\n'
for sub_vol in root_partition.btrfs_subvols:
if sub_vol.is_root():
@@ -958,7 +930,7 @@ class Installer:
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification
):
- self.pacstrap('grub') # no need?
+ self._pacstrap('grub') # no need?
_file = "/etc/default/grub"
@@ -977,7 +949,7 @@ class Installer:
log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO)
if has_uefi():
- self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
+ self._pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
try:
SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True)
@@ -987,8 +959,20 @@ class Installer:
except SysCallError as error:
raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}")
else:
+ device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path)
+
+ if not device:
+ raise ValueError(f'Can not find block device: {boot_partition.safe_dev_path}')
+
try:
- SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True)
+ cmd = f'/usr/bin/arch-chroot' \
+ f' {self.target}' \
+ f' grub-install' \
+ f' --debug' \
+ f' --target=i386-pc' \
+ f' --recheck {device.device_info.path}'
+
+ SysCommand(cmd, peek_output=True)
except SysCallError as error:
raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}")
@@ -1004,7 +988,7 @@ class Installer:
boot_partition: disk.PartitionModification,
root_partition: disk.PartitionModification
):
- self.pacstrap('efibootmgr')
+ self._pacstrap('efibootmgr')
if not has_uefi():
raise HardwareIncompatibilityError
@@ -1038,17 +1022,30 @@ class Installer:
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)
log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
- kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}')
+ kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}')
else:
log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG)
- kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}')
+ kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self._kernel_params)}')
+
+ device = disk.device_handler.get_device_by_partition_path(boot_partition.safe_dev_path)
- device = disk.device_handler.get_device_by_partition_path(boot_partition.dev_path)
- SysCommand(f'efibootmgr --disk {device.path} --part {device.path} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
+ if not device:
+ raise ValueError(f'Unable to find block device: {boot_partition.safe_dev_path}')
+
+ cmd = f'efibootmgr ' \
+ f'--disk {device.device_info.path} ' \
+ f'--part {boot_partition.safe_dev_path} ' \
+ f'--create ' \
+ f'--label "{label}" ' \
+ f'--loader {loader} ' \
+ f'--unicode \'{" ".join(kernel_parameters)}\' ' \
+ f'--verbose'
+
+ SysCommand(cmd)
self.helper_flags['bootloader'] = "efistub"
- def add_bootloader(self, bootloader: Bootloader) -> bool:
+ def add_bootloader(self, bootloader: Bootloader):
"""
Adds a bootloader to the installation instance.
Archinstall supports one of three types:
@@ -1056,8 +1053,7 @@ class Installer:
* grub
* efistub (beta)
- :param bootloader: Can be one of the three strings
- 'systemd-bootctl', 'grub' or 'efistub' (beta)
+ :param bootloader: Type of bootloader to be added
"""
for plugin in plugins.values():
@@ -1089,8 +1085,8 @@ class Installer:
case Bootloader.Efistub:
self._add_efistub_bootloader(boot_partition, root_partition)
- def add_additional_packages(self, *packages: Union[str, List[str]]) -> bool:
- return self.pacstrap(*packages)
+ def add_additional_packages(self, packages: Union[str, List[str]]) -> bool:
+ return self._pacstrap(packages)
def _enable_users(self, service: str, users: List[User]):
for user in users:
@@ -1201,9 +1197,6 @@ class Installer:
except SysCallError:
return False
- def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile:
- return InstallationFile(self, filename, owner)
-
def set_keyboard_language(self, language: str) -> bool:
log(f"Setting keyboard language to {language}", level=logging.INFO)
if len(language.strip()):