From 00b0ae7ba439a5a420095175b3bedd52c569db51 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Wed, 19 Apr 2023 20:55:42 +1000 Subject: PyParted and a large rewrite of the underlying partitioning (#1604) * Invert mypy files * Add optional pre-commit hooks * New profile structure * Serialize profiles * Use profile instead of classmethod * Custom profile setup * Separator between back * Support profile import via url * Move profiles module * Refactor files * Remove symlink * Add user to docker group * Update schema description * Handle list services * mypy fixes * mypy fixes * Rename profilesv2 to profiles * flake8 * mypy again * Support selecting DM * Fix mypy * Cleanup * Update greeter setting * Update schema * Revert toml changes * Poc external dependencies * Dependency support * New encryption menu * flake8 * Mypy and flake8 * Unify lsblk command * Update bootloader configuration * Git hooks * Fix import * Pyparted * Remove custom font setting * flake8 * Remove default preview * Manual partitioning menu * Update structure * Disk configuration * Update filesystem * luks2 encryption * Everything works until installation * Btrfsutil * Btrfs handling * Update btrfs * Save encryption config * Fix pipewire issue * Update mypy version * Update all pre-commit * Update package versions * Revert audio/pipewire * Merge master PRs * Add master changes * Merge master changes * Small renaming * Pull master changes * Reset disk enc after disk config change * Generate locals * Update naming * Fix imports * Fix broken sync * Fix pre selection on table menu * Profile menu * Update profile * Fix post_install * Added python-pyparted to PKGBUILD, this requires [testing] to be enabled in order to run makepkg. Package still works via python -m build etc. * Swaped around some setuptools logic in pyproject Since we define `package-data` and `packages` there should be no need for: ``` [tool.setuptools.packages.find] where = ["archinstall", "archinstall.*"] ``` * Removed pyproject collisions. Duplicate definitions. * Made sure pyproject.toml includes languages * Add example and update README * Fix pyproject issues * Generate locale * Refactor imports * Simplify imports * Add profile description and package examples * Align code * Fix mypy * Simplify imports * Fix saving config * Fix wrong luks merge * Refactor installation * Fix cdrom device loading * Fix wrongly merged code * Fix imports and greeter * Don't terminate on partprobe error * Use specific path on partprobe from luks * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update archinstall/lib/disk/device_model.py Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> * Update github workflow to test archinstall installation * Update sway merge * Generate locales * Update workflow --------- Co-authored-by: Daniel Girtler Co-authored-by: Anton Hvornum Co-authored-by: Anton Hvornum Co-authored-by: codefiles <11915375+codefiles@users.noreply.github.com> --- archinstall/lib/installer.py | 670 ++++++++++++++++++++++--------------------- 1 file changed, 340 insertions(+), 330 deletions(-) (limited to 'archinstall/lib/installer.py') diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index b4d253b3..ddbcc2f2 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -1,30 +1,29 @@ -import time +import glob import logging import os import re -import shutil import shlex -import pathlib +import shutil import subprocess -import glob -from types import ModuleType -from typing import Union, Dict, Any, List, Optional, Iterator, Mapping, TYPE_CHECKING -from .disk import get_partitions_in_use, Partition -from .general import SysCommand, generate_password +import time +from pathlib import Path +from typing import Any, Iterator, List, Mapping, Optional, TYPE_CHECKING, Union, Dict + +from . import disk +from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError +from .general import SysCommand from .hardware import has_uefi, is_vm, cpu_vendor from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout -from .disk.helpers import findmnt +from .luks import Luks2 from .mirrors import use_mirrors -from .models.disk_encryption import DiskEncryption +from .models.bootloader import Bootloader +from .models.network_configuration import NetworkConfiguration +from .models.users import User +from .output import log +from .pacman import run_pacman from .plugins import plugins +from .services import service_state from .storage import storage -from .output import log -from .profiles import Profile -from .disk.partition import get_mount_fs_type -from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError -from .models.users import User -from .models.subvolume import Subvolume -from .hsm import Fido2 if TYPE_CHECKING: _: Any @@ -36,9 +35,6 @@ __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "l # Additional packages that are installed if the user is running the Live ISO with accessibility tools enabled __accessibility_packages__ = ["brltty", "espeakup", "alsa-utils"] -from .pacman import run_pacman -from .models.network_configuration import NetworkConfiguration - class InstallationFile: def __init__(self, installation :'Installer', filename :str, owner :str, mode :str = "w"): @@ -92,26 +88,35 @@ class Installer: :param hostname: The given /etc/hostname for the machine. :type hostname: str, optional - """ - - def __init__(self, target :str, *, base_packages :Optional[List[str]] = None, kernels :Optional[List[str]] = None): - if base_packages is None: + def __init__( + self, + target: Path, + disk_config: disk.DiskLayoutConfiguration, + disk_encryption: Optional[disk.DiskEncryption] = None, + base_packages: List[str] = [], + kernels: Optional[List[str]] = None + ): + if not base_packages: base_packages = __packages__[:3] + if kernels is None: self.kernels = ['linux'] else: self.kernels = kernels + + self._disk_config = disk_config + self._disk_encryption = disk_encryption + + if self._disk_encryption is None: + self._disk_encryption = disk.DiskEncryption(disk.EncryptionType.NoEncryption) + self.target = target self.init_time = time.strftime('%Y-%m-%d_%H-%M-%S') self.milliseconds = int(str(time.time()).split('.')[1]) + self.helper_flags = {'base': False, 'bootloader': False} + self.base_packages = base_packages - self.helper_flags = { - 'base': False, - 'bootloader': False - } - - self.base_packages = base_packages.split(' ') if type(base_packages) is str else base_packages for kernel in self.kernels: self.base_packages.append(kernel) @@ -136,19 +141,10 @@ class Installer: self._zram_enabled = False - self._disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption') - - def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): - """ - installer.log() wraps output.log() mainly to set a default log-level for this install session. - Any manual override can be done per log() call. - """ - log(*args, level=level, **kwargs) - - def __enter__(self, *args :str, **kwargs :str) -> 'Installer': + def __enter__(self, *args: str, **kwargs: str) -> 'Installer': return self - def __exit__(self, *args :str, **kwargs :str) -> None: + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: @@ -165,7 +161,6 @@ class Installer: if not (missing_steps := self.post_install_check()): self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) self.sync_log_to_install_medium() - return True else: self.log('Some required steps were not successfully installed/configured before leaving the installer:', fg='red', level=logging.WARNING) @@ -178,146 +173,168 @@ class Installer: self.sync_log_to_install_medium() return False - @property - def partitions(self) -> List[Partition]: - return get_partitions_in_use(self.target).values() + def log(self, *args :str, level :int = logging.DEBUG, **kwargs :str): + """ + installer.log() wraps output.log() mainly to set a default log-level for this install session. + Any manual override can be done per log() call. + """ + log(*args, level=level, **kwargs) - def sync_log_to_install_medium(self) -> bool: - # Copy over the install log (if there is one) to the install medium if - # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. - if self.helper_flags.get('base-strapped', False) is True: - if filename := storage.get('LOG_FILE', None): - absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) + def _verify_service_stop(self): + """ + Certain services might be running that affects the system during installation. + Currently, only one such service is "reflector.service" which updates /etc/pacman.d/mirrorlist + We need to wait for it before we continue since we opted in to use a custom mirror/region. + """ + log('Waiting for automatic mirror selection (reflector) to complete...', level=logging.INFO) + while service_state('reflector') not in ('dead', 'failed', 'exited'): + time.sleep(1) - if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): - os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}") + log('Waiting pacman-init.service to complete.', level=logging.INFO) + while service_state('pacman-init') not in ('dead', 'failed', 'exited'): + time.sleep(1) - shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}") + log('Waiting Arch Linux keyring sync (archlinux-keyring-wkd-sync) to complete.', level=logging.INFO) + while service_state('archlinux-keyring-wkd-sync') not in ('dead', 'failed', 'exited'): + time.sleep(1) - return True + def _verify_boot_part(self): + """ + Check that mounted /boot device has at minimum size for installation + The reason this check is here is to catch pre-mounted device configuration and potentially + configured one that has not gone through any previous checks (e.g. --silence mode) - def _create_keyfile(self,luks_handle , partition :dict, password :str): - """ roiutine to create keyfiles, so it can be moved elsewhere + NOTE: this function should be run AFTER running the mount_ordered_layout function """ - if self._disk_encryption and self._disk_encryption.generate_encryption_file(partition): - if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): - cryptkey_dir.mkdir(parents=True) - # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key - # if we name the device to "xyzloop". - if partition.get('mountpoint',None): - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key" - else: - encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key" - with open(f"{self.target}{encryption_key_path}", "w") as keyfile: - keyfile.write(generate_password(length=512)) + boot_mount = self.target / 'boot' + lsblk_info = disk.get_lsblk_by_mountpoint(boot_mount) + + if len(lsblk_info) > 0: + if lsblk_info[0].size < disk.Size(200, disk.Unit.MiB): + raise DiskError( + f'The boot partition mounted at {boot_mount} is not large enough to install a boot loader. ' + f'Please resize it to at least 200MiB and re-run the installation.' + ) - os.chmod(f"{self.target}{encryption_key_path}", 0o400) + def sanity_check(self): + self._verify_boot_part() + self._verify_service_stop() - luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password) - luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"]) + def mount_ordered_layout(self): + log('Mounting partitions in order', level=logging.INFO) - def _has_root(self, partition :dict) -> bool: - """ - Determine if an encrypted partition contains root in it - """ - if partition.get("mountpoint") is None: - if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})): - for mountpoint in [sub_list[subvolume].get("mountpoint") if isinstance(subvolume, dict) else subvolume.mountpoint for subvolume in sub_list]: - if mountpoint == '/': - return True - return False + for mod in self._disk_config.device_modifications: + # partitions have to mounted in the right order on btrfs the mountpoint will + # be empty as the actual subvolumes are getting mounted instead so we'll use + # '/' just for sorting + sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint if x.mountpoint else Path('/')) + + if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption: + enc_partitions = list(filter(lambda x: x in self._disk_encryption.partitions, sorted_part_mods)) else: - return False - elif partition.get("mountpoint") == '/': - return True - else: - return False + enc_partitions = [] - def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: - from .luks import luks2 - from .disk.btrfs import setup_subvolumes, mount_subvolume - - # set the partitions as a list not part of a tree (which we don't need anymore (i think) - list_part = [] - list_luks_handles = [] - for blockdevice in layouts: - list_part.extend(layouts[blockdevice]['partitions']) - - # TODO: Implement a proper mount-queue system that does not depend on return values. - mount_queue = {} - - # we manage the encrypted partititons - if self._disk_encryption: - for partition in self._disk_encryption.all_partitions: - # open the luks device and all associate stuff - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" - - # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used - with (luks_handle := luks2(partition['device_instance'], loopdev, self._disk_encryption.encryption_password, auto_unmount=False)) as unlocked_device: - if self._disk_encryption.generate_encryption_file(partition) and not self._has_root(partition): - list_luks_handles.append([luks_handle, partition, self._disk_encryption.encryption_password]) - # this way all the requesrs will be to the dm_crypt device and not to the physical partition - partition['device_instance'] = unlocked_device - - if self._has_root(partition) and self._disk_encryption.generate_encryption_file(partition) is False: - if self._disk_encryption.hsm_device: - Fido2.fido2_enroll(self._disk_encryption.hsm_device, partition['device_instance'], self._disk_encryption.encryption_password) - - btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])] - - for partition in btrfs_subvolumes: - device_instance = partition['device_instance'] - mount_options = partition.get('filesystem', {}).get('mount_options', []) - self.mount(device_instance, "/", options=','.join(mount_options)) - setup_subvolumes(installation=self, partition_dict=partition) - device_instance.unmount() - - # We then handle any special cases, such as btrfs - for partition in btrfs_subvolumes: - subvolumes: List[Subvolume] = partition['btrfs']['subvolumes'] - for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint): - # We cache the mount call for later - mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume( - installation=self, - device=device, - subvolume=sub_vol - ) + # attempt to decrypt all luks partitions + luks_handlers = self._prepare_luks_partitions(enc_partitions) - # We mount ordinary partitions, and we sort them by the mountpoint - for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']): - mountpoint = partition['mountpoint'] - log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) + for part_mod in sorted_part_mods: + if part_mod not in luks_handlers: # partition is not encrypted + self._mount_partition(part_mod) + else: # mount encrypted partition + self._mount_luks_partiton(part_mod, luks_handlers[part_mod]) - if partition.get('filesystem',{}).get('mount_options',[]): - mount_options = ','.join(partition['filesystem']['mount_options']) - mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}", options=mount_options: instance.mount(target, options=options) - else: - mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target) + def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[disk.PartitionModification, Luks2]: + luks_handlers = {} - log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.DEBUG, fg="white") + for part_mod in partitions: + luks_handler = disk.device_handler.unlock_luks2_dev( + part_mod.dev_path, + part_mod.mapper_name, + self._disk_encryption.encryption_password + ) + luks_handlers[part_mod] = luks_handler + + return luks_handlers + + def _mount_partition(self, part_mod: disk.PartitionModification): + # it would be none if it's btrfs as the subvolumes will have the mountpoints defined + if part_mod.mountpoint is not None: + target = self.target / part_mod.relative_mountpoint + disk.device_handler.mount(part_mod.dev_path, target, options=part_mod.mount_options) + + if part_mod.fs_type == disk.FilesystemType.Btrfs: + self._mount_btrfs_subvol(part_mod.dev_path, part_mod.btrfs_subvols) + + def _mount_luks_partiton(self, part_mod: disk.PartitionModification, luks_handler: Luks2): + # it would be none if it's btrfs as the subvolumes will have the mountpoints defined + if part_mod.mountpoint is not None: + target = self.target / part_mod.relative_mountpoint + disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) + + if part_mod.fs_type == disk.FilesystemType.Btrfs: + self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols) + + def _mount_btrfs_subvol(self, dev_path: Path, subvolumes: List[disk.SubvolumeModification]): + for subvol in subvolumes: + mountpoint = self.target / subvol.relative_mountpoint + mount_options = subvol.mount_options + [f'subvol={subvol.name}'] + disk.device_handler.mount(dev_path, mountpoint, options=mount_options) + + def generate_key_files(self): + for part_mod in self._disk_encryption.partitions: + gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod) + + luks_handler = Luks2( + part_mod.dev_path, + mapper_name=part_mod.mapper_name, + password=self._disk_encryption.encryption_password + ) - # We mount everything by sorting on the mountpoint itself. - for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]): - frozen_func() + if gen_enc_file and not part_mod.is_root(): + log(f'Creating key-file: {part_mod.dev_path}', level=logging.INFO) + luks_handler.create_keyfile(self.target) + if part_mod.is_root() and not gen_enc_file: + if self._disk_encryption.hsm_device: + disk.Fido2.fido2_enroll( + self._disk_encryption.hsm_device, + part_mod, + self._disk_encryption.encryption_password + ) + + def activate_ntp(self): + """ + If NTP is activated, confirm activiation in the ISO and at least one time-sync finishes + """ + SysCommand('timedatectl set-ntp true') + + logged = False + while service_state('dbus-org.freedesktop.timesync1.service') not in ['running']: + if not logged: + log(f"Waiting for dbus-org.freedesktop.timesync1.service to enter running state", level=logging.INFO) + logged = True time.sleep(1) - try: - findmnt(pathlib.Path(f"{self.target}{mountpoint}"), traverse=False) - except DiskError: - raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") + logged = False + while 'Server: n/a' in SysCommand('timedatectl timesync-status --no-pager --property=Server --value'): + if not logged: + log(f"Waiting for timedatectl timesync-status to report a timesync against a server", level=logging.INFO) + logged = True + time.sleep(1) - # once everything is mounted, we generate the key files in the correct place - for handle in list_luks_handles: - ppath = handle[1]['device_instance'].path - log(f"creating key-file for {ppath}",level=logging.INFO) - self._create_keyfile(handle[0],handle[1],handle[2]) + def sync_log_to_install_medium(self) -> bool: + # Copy over the install log (if there is one) to the install medium if + # at least the base has been strapped in, otherwise we won't have a filesystem/structure to copy to. + if self.helper_flags.get('base-strapped', False) is True: + if filename := storage.get('LOG_FILE', None): + absolute_logfile = os.path.join(storage.get('LOG_PATH', './'), filename) + + if not os.path.isdir(f"{self.target}/{os.path.dirname(absolute_logfile)}"): + os.makedirs(f"{self.target}/{os.path.dirname(absolute_logfile)}") - def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True, options='') -> None: - if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'): - os.makedirs(f'{self.target}{mountpoint}') + shutil.copy2(absolute_logfile, f"{self.target}/{absolute_logfile}") - partition.mount(f'{self.target}{mountpoint}', options=options) + return True def add_swapfile(self, size='4G', enable_resume=True, file='/swapfile'): if file[:1] != '/': @@ -394,7 +411,7 @@ class Installer: else: pacman_conf.write(line) - def pacstrap(self, *packages :str, **kwargs :str) -> bool: + def pacstrap(self, *packages: Union[str, List[str]], **kwargs :str) -> bool: if type(packages[0]) in (list, tuple): packages = packages[0] @@ -437,7 +454,7 @@ class Installer: return use_mirrors(mirrors, destination=f'{self.target}/etc/pacman.d/mirrorlist') - def genfstab(self, flags :str = '-pU') -> bool: + def genfstab(self, flags :str = '-pU'): self.log(f"Updating {self.target}/etc/fstab", level=logging.INFO) try: @@ -460,7 +477,37 @@ class Installer: for entry in self.FSTAB_ENTRIES: fstab_fh.write(f'{entry}\n') - return True + for mod in self._disk_config.device_modifications: + for part_mod in mod.partitions: + if part_mod.fs_type != disk.FilesystemType.Btrfs: + continue + + fstab_file = Path(f'{self.target}/etc/fstab') + + with fstab_file.open('r') as fp: + fstab = fp.readlines() + + # Replace the {installation}/etc/fstab with entries + # using the compress=zstd where the mountpoint has compression set. + for index, line in enumerate(fstab): + # So first we grab the mount options by using subvol=.*? as a locator. + # And we also grab the mountpoint for the entry, for instance /var/log + subvoldef = re.findall(',.*?subvol=.*?[\t ]', line) + mountpoint = re.findall('[\t ]/.*?[\t ]', line) + + if not subvoldef or not mountpoint: + continue + + for sub_vol in part_mod.btrfs_subvols: + # We then locate the correct subvolume and check if it's compressed, + # and skip entries where compression is already defined + # We then sneak in the compress=zstd option if it doesn't already exist: + if sub_vol.compress and str(sub_vol.mountpoint) == Path(mountpoint[0].strip()) and ',compress=zstd,' not in line: + fstab[index] = line.replace(subvoldef[0], f',compress=zstd{subvoldef[0]}') + break + + with fstab_file.open('w') as fp: + fp.writelines(fstab) def set_hostname(self, hostname: str, *args :str, **kwargs :str) -> None: with open(f'{self.target}/etc/hostname', 'w') as fh: @@ -509,8 +556,8 @@ class Installer: if result := plugin.on_timezone(zone): zone = result - if (pathlib.Path("/usr") / "share" / "zoneinfo" / zone).exists(): - (pathlib.Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) + if (Path("/usr") / "share" / "zoneinfo" / zone).exists(): + (Path(self.target) / "etc" / "localtime").unlink(missing_ok=True) SysCommand(f'/usr/bin/arch-chroot {self.target} ln -s /usr/share/zoneinfo/{zone} /etc/localtime') return True @@ -523,10 +570,6 @@ class Installer: return False - def activate_ntp(self) -> None: - log(f"activate_ntp() is deprecated, use activate_time_syncronization()", fg="yellow", level=logging.INFO) - self.activate_time_syncronization() - def activate_time_syncronization(self) -> None: self.log('Activating systemd-timesyncd for time synchronization using Arch Linux and ntp.org NTP servers.', level=logging.INFO) self.enable_service('systemd-timesyncd') @@ -540,7 +583,10 @@ class Installer: # fstrim is owned by util-linux, a dependency of both base and systemd. self.enable_service("fstrim.timer") - def enable_service(self, *services :str) -> None: + def enable_service(self, *services: Union[str, List[str]]) -> None: + if type(services[0]) in (list, tuple): + services = services[0] + for service in services: self.log(f'Enabling service {service}', level=logging.INFO) try: @@ -552,10 +598,10 @@ class Installer: if hasattr(plugin, 'on_service'): plugin.on_service(service) - def run_command(self, cmd :str, *args :str, **kwargs :str) -> None: + def run_command(self, cmd :str, *args :str, **kwargs :str) -> SysCommand: return SysCommand(f'/usr/bin/arch-chroot {self.target} {cmd}') - def arch_chroot(self, cmd :str, run_as :Optional[str] = None): + def arch_chroot(self, cmd :str, run_as :Optional[str] = None) -> SysCommand: if run_as: cmd = f"su - {run_as} -c {shlex.quote(cmd)}" @@ -645,21 +691,6 @@ class Installer: return True - def detect_encryption(self, partition :Partition) -> bool: - from .disk.mapperdev import MapperDev - from .disk.dmcryptdev import DMCryptDev - from .disk.helpers import get_filesystem_type - - if type(partition) is MapperDev: - # Returns MapperDev.partition - return partition.partition - elif type(partition) is DMCryptDev: - return partition.MapperDev.partition - elif get_filesystem_type(partition.path) == 'crypto_LUKS': - return partition - - return False - def mkinitcpio(self, *flags :str) -> bool: for plugin in plugins.values(): if hasattr(plugin, 'on_mkinitcpio'): @@ -668,7 +699,7 @@ class Installer: return True # mkinitcpio will error out if there's no vconsole. - if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False: + if (vconsole := Path(f"{self.target}/etc/vconsole.conf")).exists() is False: with vconsole.open('w') as fh: fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") @@ -677,7 +708,7 @@ class Installer: mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") - if self._disk_encryption and not self._disk_encryption.hsm_device: + if not self._disk_encryption.hsm_device: # For now, if we don't use HSM we revert to the old # way of setting up encryption hooks for mkinitcpio. # This is purely for stability reasons, we're going away from this. @@ -694,46 +725,36 @@ class Installer: return False def minimal_installation( - self, testing: bool = False, multilib: bool = False, - hostname: str = 'archinstall', locales: List[str] = ['en_US.UTF-8 UTF-8']) -> bool: - # Add necessary packages if encrypting the drive - # (encrypted partitions default to btrfs for now, so we need btrfs-progs) - # TODO: Perhaps this should be living in the function which dictates - # the partitioning. Leaving here for now. - - for partition in self.partitions: - if partition.filesystem == 'btrfs': - # if partition.encrypted: - if 'btrfs-progs' not in self.base_packages: - self.base_packages.append('btrfs-progs') - if partition.filesystem == 'xfs': - if 'xfs' not in self.base_packages: - self.base_packages.append('xfsprogs') - if partition.filesystem == 'f2fs': - if 'f2fs' not in self.base_packages: - self.base_packages.append('f2fs-tools') - - # Configure mkinitcpio to handle some specific use cases. - if partition.filesystem == 'btrfs': - if 'btrfs' not in self.MODULES: - self.MODULES.append('btrfs') - if '/usr/bin/btrfs' not in self.BINARIES: - self.BINARIES.append('/usr/bin/btrfs') - # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. - if partition.filesystem == 'ntfs3' and partition.mountpoint == self.target: - if 'fsck' in self.HOOKS: - self.HOOKS.remove('fsck') - - if self.detect_encryption(partition): - if self._disk_encryption and self._disk_encryption.hsm_device: - # Required bby mkinitcpio to add support for fido2-device options - self.pacstrap('libfido2') - - if 'sd-encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') - else: - if 'encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') + self, + testing: bool = False, + multilib: bool = False, + hostname: str = 'archinstall', + locales: List[str] = ['en_US.UTF-8 UTF-8'] + ): + for mod in self._disk_config.device_modifications: + for part in mod.partitions: + if (pkg := part.fs_type.installation_pkg) is not None: + self.base_packages.append(pkg) + if (module := part.fs_type.installation_module) is not None: + self.MODULES.append(module) + if (binary := part.fs_type.installation_binary) is not None: + self.BINARIES.append(binary) + + # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. + if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: + if 'fsck' in self.HOOKS: + self.HOOKS.remove('fsck') + + if part in self._disk_encryption.partitions: + if self._disk_encryption.hsm_device: + # Required bby mkinitcpio to add support for fido2-device options + self.pacstrap('libfido2') + + if 'sd-encrypt' not in self.HOOKS: + self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') + else: + if 'encrypt' not in self.HOOKS: + self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') if not has_uefi(): self.base_packages.append('grub') @@ -742,11 +763,11 @@ class Installer: vendor = cpu_vendor() if vendor == "AuthenticAMD": self.base_packages.append("amd-ucode") - if (ucode := pathlib.Path(f"{self.target}/boot/amd-ucode.img")).exists(): + if (ucode := Path(f"{self.target}/boot/amd-ucode.img")).exists(): ucode.unlink() elif vendor == "GenuineIntel": self.base_packages.append("intel-ucode") - if (ucode := pathlib.Path(f"{self.target}/boot/intel-ucode.img")).exists(): + if (ucode := Path(f"{self.target}/boot/intel-ucode.img")).exists(): ucode.unlink() else: self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't install any ucode.", level=logging.DEBUG) @@ -802,9 +823,7 @@ class Installer: if hasattr(plugin, 'on_install'): plugin.on_install(self) - return True - - def setup_swap(self, kind :str = 'zram') -> bool: + def setup_swap(self, kind :str = 'zram'): if kind == 'zram': self.log(f"Setting up swap on zram") self.pacstrap('zram-generator') @@ -818,16 +837,27 @@ class Installer: self.enable_service('systemd-zram-setup@zram0.service') self._zram_enabled = True - - return True else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") - def add_systemd_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: + def _get_boot_partition(self) -> Optional[disk.PartitionModification]: + for layout in self._disk_config.device_modifications: + if boot := layout.get_boot_partition(): + return boot + return None + + def _get_root_partition(self) -> Optional[disk.PartitionModification]: + for mod in self._disk_config.device_modifications: + if root := mod.get_root_partition(self._disk_config.relative_mountpoint): + return root + return None + + def _add_systemd_bootloader(self, root_partition: disk.PartitionModification): self.pacstrap('efibootmgr') if not has_uefi(): raise HardwareIncompatibilityError + # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. @@ -882,74 +912,73 @@ class Installer: elif vendor == "GenuineIntel": entry.write("initrd /intel-ucode.img\n") else: - self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG) + self.log( + f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", + level=logging.DEBUG) entry.write(f"initrd /initramfs-{kernel}{variant}.img\n") # blkid doesn't trigger on loopback devices really well, # so we'll use the old manual method until we get that sorted out. - root_fs_type = get_mount_fs_type(root_partition.filesystem) - if root_fs_type is not None: - options_entry = f'rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n' - else: - options_entry = f'rw {" ".join(self.KERNEL_PARAMS)}\n' + options_entry = f'rw rootfstype={root_partition.fs_type.fs_type_mount} {" ".join(self.KERNEL_PARAMS)}\n' - for subvolume in root_partition.subvolumes: - if subvolume.root is True and subvolume.name != '': - options_entry = f"rootflags=subvol={subvolume.name} " + options_entry + for sub_vol in root_partition.btrfs_subvols: + if sub_vol.is_root(): + options_entry = f"rootflags=subvol={sub_vol.name} " + options_entry # Zswap should be disabled when using zram. - # # https://github.com/archlinux/archinstall/issues/881 if self._zram_enabled: options_entry = "zswap.enabled=0 " + options_entry - if real_device := self.detect_encryption(root_partition): + if root_partition.fs_type.is_crypto(): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG) + log('Root partition is an encrypted device, identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) kernel_options = f"options" if self._disk_encryption and self._disk_encryption.hsm_device: # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work - kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" + kernel_options += f' rd.luks.name={root_partition.uuid}=luksdev' # Note: tpm2-device and fido2-device don't play along very well: # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 - kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no" + kernel_options += f' rd.luks.options=fido2-device=auto,password-echo=no' else: - kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev" + kernel_options += f' cryptdevice=PARTUUID={root_partition.partuuid}:luksdev' entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}') else: - log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) - entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}') + log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + entry.write(f'options root=PARTUUID={root_partition.partuuid} {options_entry}') - self.helper_flags['bootloader'] = "systemd" - - return True + self.helper_flags['bootloader'] = 'systemd' - def add_grub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: + def _add_grub_bootloader( + self, + boot_partition: disk.PartitionModification, + root_partition: disk.PartitionModification + ): self.pacstrap('grub') # no need? - root_fs_type = get_mount_fs_type(root_partition.filesystem) + _file = "/etc/default/grub" - if real_device := self.detect_encryption(root_partition): - root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip() - _file = "/etc/default/grub" - add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'" - enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'" + if root_partition.fs_type.is_crypto(): + log(f"Using UUID {root_partition.uuid} as encrypted root identifier", level=logging.DEBUG) - log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO) - SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") - SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}") + cmd_line_linux = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_partition.uuid}:cryptlvm rootfstype={root_partition.fs_type.value}\"/'" + enable_cryptdisk = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'" + + SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_cryptdisk} {_file}") else: - _file = "/etc/default/grub" - add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'" - SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") + cmd_line_linux = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_partition.fs_type.value}\"/'" + + SysCommand(f"/usr/bin/arch-chroot {self.target} {cmd_line_linux} {_file}") + + log(f"GRUB boot partition: {boot_partition.dev_path}", level=logging.INFO) - log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO) if has_uefi(): self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable', peek_output=True) except SysCallError: @@ -961,7 +990,7 @@ class Installer: try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --debug --target=i386-pc --recheck {boot_partition.parent}', peek_output=True) except SysCallError as error: - raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}") + raise DiskError(f"Failed to install GRUB boot on {boot_partition.dev_path}: {error}") try: SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg') @@ -970,22 +999,22 @@ class Installer: self.helper_flags['bootloader'] = "grub" - return True - - def add_efistub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: + def _add_efistub_bootloader( + self, + boot_partition: disk.PartitionModification, + root_partition: disk.PartitionModification + ): self.pacstrap('efibootmgr') if not has_uefi(): raise HardwareIncompatibilityError + # TODO: Ideally we would want to check if another config # points towards the same disk and/or partition. # And in which case we should do some clean up. - root_fs_type = get_mount_fs_type(root_partition.filesystem) - for kernel in self.kernels: # Setup the firmware entry - label = f'Arch Linux ({kernel})' loader = f"/vmlinuz-{kernel}" @@ -1004,22 +1033,22 @@ class Installer: # blkid doesn't trigger on loopback devices really well, # so we'll use the old manual method until we get that sorted out. - if real_device := self.detect_encryption(root_partition): + + if root_partition.fs_type.is_crypto(): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.part_uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.part_uuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + log(f'Identifying root partition by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + kernel_parameters.append(f'cryptdevice=PARTUUID={root_partition.partuuid}:luksdev root=/dev/mapper/luksdev rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') else: - log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'root=PARTUUID={root_partition.part_uuid} rw rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + log(f'Root partition is an encrypted device identifying by PARTUUID: {root_partition.partuuid}', level=logging.DEBUG) + kernel_parameters.append(f'root=PARTUUID={root_partition.partuuid} rw rootfstype={root_partition.fs_type.value} {" ".join(self.KERNEL_PARAMS)}') - SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') + device = disk.device_handler.get_device_by_partition_path(boot_partition.dev_path) + SysCommand(f'efibootmgr --disk {device.path} --part {device.path} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') self.helper_flags['bootloader'] = "efistub" - return True - - def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool: + def add_bootloader(self, bootloader: Bootloader) -> bool: """ Adds a bootloader to the installation instance. Archinstall supports one of three types: @@ -1039,52 +1068,33 @@ class Installer: return True if type(self.target) == str: - self.target = pathlib.Path(self.target) - - boot_partition = None - root_partition = None - for partition in self.partitions: - if self.target / 'boot' in partition.mountpoints: - boot_partition = partition - elif self.target in partition.mountpoints: - root_partition = partition - - if boot_partition is None or root_partition is None: - raise ValueError(f"Could not detect root ({root_partition}) or boot ({boot_partition}) in {self.target} based on: {self.partitions}") - - self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO) - - if bootloader == 'systemd-bootctl': - self.add_systemd_bootloader(boot_partition, root_partition) - elif bootloader == "grub-install": - self.add_grub_bootloader(boot_partition, root_partition) - elif bootloader == 'efistub': - self.add_efistub_bootloader(boot_partition, root_partition) - else: - raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}") + self.target = Path(self.target) - return True + boot_partition = self._get_boot_partition() + root_partition = self._get_root_partition() - def add_additional_packages(self, *packages :str) -> bool: - return self.pacstrap(*packages) + if boot_partition is None: + raise ValueError(f'Could not detect boot at mountpoint {self.target}') - def install_profile(self, profile :str) -> ModuleType: - """ - Installs a archinstall profile script (.py file). - This profile can be either local, remote or part of the library. + if root_partition is None: + raise ValueError(f'Could not detect root at mountpoint {self.target}') - :param profile: Can be a local path or a remote path (URL) - :return: Returns the imported script as a module, this way - you can access any remaining functions exposed by the profile. - :rtype: module - """ - storage['installation_session'] = self + self.log(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}', level=logging.INFO) + + match bootloader: + case Bootloader.Systemd: + self._add_systemd_bootloader(root_partition) + case Bootloader.Grub: + self._add_grub_bootloader(boot_partition, root_partition) + case Bootloader.Efistub: + self._add_efistub_bootloader(boot_partition, root_partition) - if type(profile) == str: - profile = Profile(self, profile) + def add_additional_packages(self, *packages: Union[str, List[str]]) -> bool: + return self.pacstrap(*packages) - self.log(f'Installing archinstall profile {profile}', level=logging.INFO) - return profile.install() + def _enable_users(self, service: str, users: List[User]): + for user in users: + self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username) def enable_sudo(self, entity: str, group :bool = False): self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) @@ -1092,7 +1102,7 @@ class Installer: sudoers_dir = f"{self.target}/etc/sudoers.d" # Creates directory if not exists - if not (sudoers_path := pathlib.Path(sudoers_dir)).exists(): + if not (sudoers_path := Path(sudoers_dir)).exists(): sudoers_path.mkdir(parents=True) # Guarantees sudoer confs directory recommended perms os.chmod(sudoers_dir, 0o440) @@ -1114,7 +1124,7 @@ class Installer: sudoers.write(f'{"%" if group else ""}{entity} ALL=(ALL) ALL\n') # Guarantees sudoer conf file recommended perms - os.chmod(pathlib.Path(rule_file_name), 0o440) + os.chmod(Path(rule_file_name), 0o440) def create_users(self, users: Union[User, List[User]]): if not isinstance(users, list): -- cgit v1.2.3-54-g00ecf