From b470b16ec923260cfd9c5b9f2b88e0a39611b463 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Mon, 15 Apr 2024 18:49:00 +1000 Subject: LVM support (#2104) * Submenu for disk configuration * Update * Add LVM manual config * PV selection * LVM volume menu * Update * Fix mypy * Update * Update * Update * Update * Update * Update * Update * Update * Update LVM * Update * Update * Btrfs support * Refactor * LVM on Luks * Luks on LVM * Update * LVM on Luks * Update * Update * mypy * Update * Fix bug with LuksOnLvm and Btrfs * Update * Update * Info -> Debug output --- archinstall/lib/disk/__init__.py | 9 +- archinstall/lib/disk/device_handler.py | 264 ++++++++++--- archinstall/lib/disk/device_model.py | 408 +++++++++++++++++++-- archinstall/lib/disk/disk_menu.py | 140 +++++++ archinstall/lib/disk/encryption_menu.py | 131 +++++-- archinstall/lib/disk/fido.py | 8 +- archinstall/lib/disk/filesystem.py | 295 ++++++++++++++- archinstall/lib/disk/partitioning_menu.py | 18 +- archinstall/lib/disk/subvolume_menu.py | 18 +- archinstall/lib/global_menu.py | 70 ++-- archinstall/lib/installer.py | 427 +++++++++++++++++----- archinstall/lib/interactions/disk_conf.py | 134 ++++++- archinstall/lib/interactions/manage_users_conf.py | 18 +- archinstall/lib/luks.py | 30 +- archinstall/lib/menu/abstract_menu.py | 93 ++--- archinstall/lib/menu/list_manager.py | 28 +- archinstall/lib/menu/menu.py | 8 +- archinstall/lib/menu/table_selection_menu.py | 4 +- archinstall/lib/mirrors.py | 15 - archinstall/scripts/guided.py | 2 +- examples/interactive_installation.py | 2 +- 21 files changed, 1711 insertions(+), 411 deletions(-) create mode 100644 archinstall/lib/disk/disk_menu.py diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py index 24dafef5..7f881273 100644 --- a/archinstall/lib/disk/__init__.py +++ b/archinstall/lib/disk/__init__.py @@ -11,6 +11,11 @@ from .device_model import ( BDevice, DiskLayoutType, DiskLayoutConfiguration, + LvmLayoutType, + LvmConfiguration, + LvmVolumeGroup, + LvmVolume, + LvmVolumeStatus, PartitionTable, Unit, Size, @@ -30,7 +35,7 @@ from .device_model import ( CleanType, get_lsblk_info, get_all_lsblk_info, - get_lsblk_by_mountpoint + get_lsblk_by_mountpoint, ) from .encryption_menu import ( select_encryption_type, @@ -39,3 +44,5 @@ from .encryption_menu import ( select_partitions_to_encrypt, DiskEncryptionMenu, ) + +from .disk_menu import DiskLayoutConfigurationMenu diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py index 6e91ac2e..7ba70382 100644 --- a/archinstall/lib/disk/device_handler.py +++ b/archinstall/lib/disk/device_handler.py @@ -3,8 +3,9 @@ from __future__ import annotations import json import os import logging +import time from pathlib import Path -from typing import List, Dict, Any, Optional, TYPE_CHECKING +from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal, Iterable from parted import ( # type: ignore Disk, Geometry, FileSystem, @@ -17,11 +18,12 @@ from .device_model import ( BDevice, _DeviceInfo, _PartitionInfo, FilesystemType, Unit, PartitionTable, ModificationStatus, get_lsblk_info, LsblkInfo, - _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption + _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption, LvmVolumeGroup, LvmVolume, Size, LvmGroupInfo, + SectorSize, LvmVolumeInfo, LvmPVInfo, SubvolumeModification, BtrfsMountOption ) from ..exceptions import DiskError, UnknownFilesystemFormat -from ..general import SysCommand, SysCallError, JSON +from ..general import SysCommand, SysCallError, JSON, SysCommandWorker from ..luks import Luks2 from ..output import debug, error, info, warn, log from ..utils.util import is_subpath @@ -189,7 +191,7 @@ class DeviceHandler(object): return subvol_infos - def _perform_formatting( + def format( self, fs_type: FilesystemType, path: Path, @@ -234,7 +236,7 @@ class DeviceHandler(object): options += additional_parted_options options_str = ' '.join(options) - info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') + debug(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}') try: SysCommand(f"/usr/bin/{command} {options_str} {path}") @@ -243,7 +245,33 @@ class DeviceHandler(object): error(msg) raise DiskError(msg) from err - def _perform_enc_formatting( + def encrypt( + self, + dev_path: Path, + mapper_name: Optional[str], + enc_password: str, + lock_after_create: bool = True + ) -> Luks2: + luks_handler = Luks2( + dev_path, + mapper_name=mapper_name, + password=enc_password + ) + + key_file = luks_handler.encrypt() + + luks_handler.unlock(key_file=key_file) + + if not luks_handler.mapper_dev: + raise DiskError('Failed to unlock luks device') + + if lock_after_create: + debug(f'luks2 locking device: {dev_path}') + luks_handler.lock() + + return luks_handler + + def format_encrypted( self, dev_path: Path, mapper_name: Optional[str], @@ -258,71 +286,160 @@ class DeviceHandler(object): key_file = luks_handler.encrypt() - debug(f'Unlocking luks2 device: {dev_path}') luks_handler.unlock(key_file=key_file) if not luks_handler.mapper_dev: raise DiskError('Failed to unlock luks device') info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}') - self._perform_formatting(fs_type, luks_handler.mapper_dev) + self.format(fs_type, luks_handler.mapper_dev) info(f'luks2 locking device: {dev_path}') luks_handler.lock() - def _validate_partitions(self, partitions: List[PartitionModification]): - checks = { - # verify that all partitions have a path set (which implies that they have been created) - lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'), - # crypto luks is not a valid file system type - lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError('Crypto luks cannot be set as a filesystem type'), - # file system type must be set - lambda x: x.fs_type is None: ValueError('File system type must be set for modification') - } - - for check, exc in checks.items(): - found = next(filter(check, partitions), None) - if found is not None: - raise exc - - def format( + def _lvm_info( self, - device_mod: DeviceModification, - enc_conf: Optional['DiskEncryption'] = None - ): - """ - Format can be given an overriding path, for instance /dev/null to test - the formatting functionality and in essence the support for the given filesystem. - """ + cmd: str, + info_type: Literal['lv', 'vg', 'pvseg'] + ) -> Optional[Any]: + raw_info = SysCommand(cmd).decode().split('\n') - # only verify partitions that are being created or modified - create_or_modify_parts = [p for p in device_mod.partitions if p.is_create_or_modify()] + # for whatever reason the output sometimes contains + # "File descriptor X leaked leaked on vgs invocation + data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw]) - self._validate_partitions(create_or_modify_parts) + debug(f'LVM info: {data}') - # make sure all devices are unmounted - self._umount_all_existing(device_mod.device_path) - - for part_mod in create_or_modify_parts: - # partition will be encrypted - if enc_conf is not None and part_mod in enc_conf.partitions: - self._perform_enc_formatting( - part_mod.safe_dev_path, - part_mod.mapper_name, - part_mod.safe_fs_type, - enc_conf - ) - else: - self._perform_formatting(part_mod.safe_fs_type, part_mod.safe_dev_path) + reports = json.loads(data) + + for report in reports['report']: + if len(report[info_type]) != 1: + raise ValueError(f'Report does not contain any entry') - # synchronize with udev before using lsblk - SysCommand('udevadm settle') + entry = report[info_type][0] - lsblk_info = self._fetch_part_info(part_mod.safe_dev_path) + match info_type: + case 'pvseg': + return LvmPVInfo( + pv_name=Path(entry['pv_name']), + lv_name=entry['lv_name'], + vg_name=entry['vg_name'], + ) + case 'lv': + return LvmVolumeInfo( + lv_name=entry['lv_name'], + vg_name=entry['vg_name'], + lv_size=Size(int(entry[f'lv_size'][:-1]), Unit.B, SectorSize.default()) + ) + case 'vg': + return LvmGroupInfo( + vg_uuid=entry['vg_uuid'], + vg_size=Size(int(entry[f'vg_size'][:-1]), Unit.B, SectorSize.default()) + ) + + return None - part_mod.partn = lsblk_info.partn - part_mod.partuuid = lsblk_info.partuuid - part_mod.uuid = lsblk_info.uuid + def _lvm_info_with_retry(self, cmd: str, info_type: Literal['lv', 'vg', 'pvseg']) -> Optional[Any]: + attempts = 3 + + for attempt_nr in range(attempts): + try: + return self._lvm_info(cmd, info_type) + except ValueError: + time.sleep(attempt_nr + 1) + + raise ValueError(f'Failed to fetch {info_type} information') + + def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]: + cmd = ( + 'lvs --reportformat json ' + '--unit B ' + f'-S lv_name={lv_name}' + ) + + return self._lvm_info_with_retry(cmd, 'lv') + + def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]: + cmd = ( + 'vgs --reportformat json ' + '--unit B ' + '-o vg_name,vg_uuid,vg_size ' + f'-S vg_name={vg_name}' + ) + + return self._lvm_info_with_retry(cmd, 'vg') + + def lvm_pvseg_info(self, vg_name: str, lv_name: str) -> Optional[LvmPVInfo]: + cmd = ( + 'pvs ' + '--segments -o+lv_name,vg_name ' + f'-S vg_name={vg_name},lv_name={lv_name} ' + '--reportformat json ' + ) + + return self._lvm_info_with_retry(cmd, 'pvseg') + + def lvm_vol_change(self, vol: LvmVolume, activate: bool): + active_flag = 'y' if activate else 'n' + cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}' + + debug(f'lvchange volume: {cmd}') + SysCommand(cmd) + + def lvm_export_vg(self, vg: LvmVolumeGroup): + cmd = f'vgexport {vg.name}' + + debug(f'vgexport: {cmd}') + SysCommand(cmd) + + def lvm_import_vg(self, vg: LvmVolumeGroup): + cmd = f'vgimport {vg.name}' + + debug(f'vgimport: {cmd}') + SysCommand(cmd) + + def lvm_vol_reduce(self, vol_path: Path, amount: Size): + val = amount.format_size(Unit.B, include_unit=False) + cmd = f'lvreduce -L -{val}B {vol_path}' + + debug(f'Reducing LVM volume size: {cmd}') + SysCommand(cmd) + + def lvm_pv_create(self, pvs: Iterable[Path]): + cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs]) + debug(f'Creating LVM PVS: {cmd}') + + worker = SysCommandWorker(cmd) + worker.poll() + worker.write(b'y\n', line_ending=False) + + def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str): + pvs_str = ' '.join([str(pv) for pv in pvs]) + cmd = f'vgcreate --yes {vg_name} {pvs_str}' + + debug(f'Creating LVM group: {cmd}') + + worker = SysCommandWorker(cmd) + worker.poll() + worker.write(b'y\n', line_ending=False) + + def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None): + if offset is not None: + length = volume.length - offset + else: + length = volume.length + + length_str = length.format_size(Unit.B, include_unit=False) + cmd = f'lvcreate --yes -L {length_str}B {vg_name} -n {volume.name}' + + debug(f'Creating volume: {cmd}') + + worker = SysCommandWorker(cmd) + worker.poll() + worker.write(b'y\n', line_ending=False) + + volume.vg_name = vg_name + volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}') def _setup_partition( self, @@ -385,7 +502,7 @@ class DeviceHandler(object): # the partition has a path now that it has been added part_mod.dev_path = Path(partition.path) - def _fetch_part_info(self, path: Path) -> LsblkInfo: + def fetch_part_info(self, path: Path) -> LsblkInfo: lsblk_info = get_lsblk_info(path) if not lsblk_info.partn: @@ -404,6 +521,37 @@ class DeviceHandler(object): return lsblk_info + def create_lvm_btrfs_subvolumes( + self, + path: Path, + btrfs_subvols: List[SubvolumeModification], + mount_options: List[str] + ): + info(f'Creating subvolumes: {path}') + + self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True) + + for sub_vol in btrfs_subvols: + debug(f'Creating subvolume: {sub_vol.name}') + + subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name + + SysCommand(f"btrfs subvolume create {subvol_path}") + + if BtrfsMountOption.nodatacow.value in mount_options: + try: + SysCommand(f'chattr +C {subvol_path}') + except SysCallError as err: + raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}') + + if BtrfsMountOption.compress.value in mount_options: + try: + SysCommand(f'chattr +c {subvol_path}') + except SysCallError as err: + raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}') + + self.umount(path) + def create_btrfs_volumes( self, part_mod: PartitionModification, @@ -468,8 +616,8 @@ class DeviceHandler(object): return luks_handler - def _umount_all_existing(self, device_path: Path): - info(f'Unmounting all existing partitions: {device_path}') + def umount_all_existing(self, device_path: Path): + debug(f'Unmounting all existing partitions: {device_path}') existing_partitions = self._devices[device_path].partition_infos @@ -498,7 +646,7 @@ class DeviceHandler(object): raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions') # make sure all devices are unmounted - self._umount_all_existing(modification.device_path) + self.umount_all_existing(modification.device_path) # WARNING: the entire device will be wiped and all data lost if modification.wipe: diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py index fe96203c..1cd3d674 100644 --- a/archinstall/lib/disk/device_model.py +++ b/archinstall/lib/disk/device_model.py @@ -41,6 +41,8 @@ class DiskLayoutType(Enum): class DiskLayoutConfiguration: config_type: DiskLayoutType device_modifications: List[DeviceModification] = field(default_factory=list) + lvm_config: Optional[LvmConfiguration] = None + # used for pre-mounted config mountpoint: Optional[Path] = None @@ -51,13 +53,18 @@ class DiskLayoutConfiguration: 'mountpoint': str(self.mountpoint) } else: - return { + config: Dict[str, Any] = { 'config_type': self.config_type.value, - 'device_modifications': [mod.json() for mod in self.device_modifications] + 'device_modifications': [mod.json() for mod in self.device_modifications], } + if self.lvm_config: + config['lvm_config'] = self.lvm_config.json() + + return config + @classmethod - def parse_arg(cls, disk_config: Dict[str, List[Dict[str, Any]]]) -> Optional[DiskLayoutConfiguration]: + def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]: from .device_handler import device_handler device_modifications: List[DeviceModification] = [] @@ -124,6 +131,10 @@ class DiskLayoutConfiguration: device_modification.partitions = device_partitions device_modifications.append(device_modification) + # Parse LVM configuration from settings + if (lvm_arg := disk_config.get('lvm_config', None)) is not None: + config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config) + return config @@ -133,24 +144,24 @@ class PartitionTable(Enum): class Unit(Enum): - B = 1 # byte - kB = 1000**1 # kilobyte - MB = 1000**2 # megabyte - GB = 1000**3 # gigabyte - TB = 1000**4 # terabyte - PB = 1000**5 # petabyte - EB = 1000**6 # exabyte - ZB = 1000**7 # zettabyte - YB = 1000**8 # yottabyte - - KiB = 1024**1 # kibibyte - MiB = 1024**2 # mebibyte - GiB = 1024**3 # gibibyte - TiB = 1024**4 # tebibyte - PiB = 1024**5 # pebibyte - EiB = 1024**6 # exbibyte - ZiB = 1024**7 # zebibyte - YiB = 1024**8 # yobibyte + B = 1 # byte + kB = 1000 ** 1 # kilobyte + MB = 1000 ** 2 # megabyte + GB = 1000 ** 3 # gigabyte + TB = 1000 ** 4 # terabyte + PB = 1000 ** 5 # petabyte + EB = 1000 ** 6 # exabyte + ZB = 1000 ** 7 # zettabyte + YB = 1000 ** 8 # yottabyte + + KiB = 1024 ** 1 # kibibyte + MiB = 1024 ** 2 # mebibyte + GiB = 1024 ** 3 # gibibyte + TiB = 1024 ** 4 # tebibyte + PiB = 1024 ** 5 # pebibyte + EiB = 1024 ** 6 # exbibyte + ZiB = 1024 ** 7 # zebibyte + YiB = 1024 ** 8 # yobibyte sectors = 'sectors' # size in sector @@ -575,7 +586,7 @@ class PartitionFlag(Enum): Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183 """ Boot = _ped.PARTITION_BOOT - XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot + XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot ESP = _ped.PARTITION_ESP @@ -658,6 +669,10 @@ class PartitionModification: flags: List[PartitionFlag] = field(default_factory=list) btrfs_subvols: List[SubvolumeModification] = field(default_factory=list) + # only set when modification was created from an existing + # partition info object to be able to reference it back + part_info: Optional[_PartitionInfo] = None + # only set if the device was created or exists dev_path: Optional[Path] = None partn: Optional[int] = None @@ -724,7 +739,8 @@ class PartitionModification: uuid=partition_info.uuid, flags=partition_info.flags, mountpoint=mountpoint, - btrfs_subvols=subvol_mods + btrfs_subvols=subvol_mods, + part_info=partition_info ) @property @@ -832,6 +848,270 @@ class PartitionModification: return part_mod +class LvmLayoutType(Enum): + Default = 'default' + + # Manual = 'manual_lvm' + + def display_msg(self) -> str: + match self: + case LvmLayoutType.Default: + return str(_('Default layout')) + # case LvmLayoutType.Manual: + # return str(_('Manual configuration')) + + raise ValueError(f'Unknown type: {self}') + + +@dataclass +class LvmVolumeGroup: + name: str + pvs: List[PartitionModification] + volumes: List[LvmVolume] = field(default_factory=list) + + def json(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'lvm_pvs': [p.obj_id for p in self.pvs], + 'volumes': [vol.json() for vol in self.volumes] + } + + @staticmethod + def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup: + lvm_pvs = [] + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.obj_id in arg.get('lvm_pvs', []): + lvm_pvs.append(part) + + return LvmVolumeGroup( + arg['name'], + lvm_pvs, + [LvmVolume.parse_arg(vol) for vol in arg['volumes']] + ) + + def contains_lv(self, lv: LvmVolume) -> bool: + return lv in self.volumes + + +class LvmVolumeStatus(Enum): + Exist = 'existing' + Modify = 'modify' + Delete = 'delete' + Create = 'create' + + +@dataclass +class LvmVolume: + status: LvmVolumeStatus + name: str + fs_type: FilesystemType + length: Size + mountpoint: Optional[Path] + mount_options: List[str] = field(default_factory=list) + btrfs_subvols: List[SubvolumeModification] = field(default_factory=list) + + # volume group name + vg_name: Optional[str] = None + # mapper device path /dev// + dev_path: Optional[Path] = None + + def __post_init__(self): + # needed to use the object as a dictionary key due to hash func + if not hasattr(self, '_obj_id'): + self._obj_id = uuid.uuid4() + + def __hash__(self): + return hash(self._obj_id) + + @property + def obj_id(self) -> str: + if hasattr(self, '_obj_id'): + return str(self._obj_id) + return '' + + @property + def mapper_name(self) -> Optional[str]: + if self.dev_path: + return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}' + return None + + @property + def mapper_path(self) -> Path: + if self.mapper_name: + return Path(f'/dev/mapper/{self.mapper_name}') + + raise ValueError('No mapper path set') + + @property + def safe_dev_path(self) -> Path: + if self.dev_path: + return self.dev_path + raise ValueError('No device path for volume defined') + + @property + def safe_fs_type(self) -> FilesystemType: + if self.fs_type is None: + raise ValueError('File system type is not set') + return self.fs_type + + @property + def relative_mountpoint(self) -> Path: + """ + Will return the relative path based on the anchor + e.g. Path('/mnt/test') -> Path('mnt/test') + """ + if self.mountpoint is not None: + return self.mountpoint.relative_to(self.mountpoint.anchor) + + raise ValueError('Mountpoint is not specified') + + @staticmethod + def parse_arg(arg: Dict[str, Any]) -> LvmVolume: + volume = LvmVolume( + status=LvmVolumeStatus(arg['status']), + name=arg['name'], + fs_type=FilesystemType(arg['fs_type']), + length=Size.parse_args(arg['length']), + mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None, + mount_options=arg.get('mount_options', []), + btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', [])) + ) + + setattr(volume, '_obj_id', arg['obj_id']) + + return volume + + def json(self) -> Dict[str, Any]: + return { + 'obj_id': self.obj_id, + 'status': self.status.value, + 'name': self.name, + 'fs_type': self.fs_type.value, + 'length': self.length.json(), + 'mountpoint': str(self.mountpoint) if self.mountpoint else None, + 'mount_options': self.mount_options, + 'btrfs': [vol.json() for vol in self.btrfs_subvols] + } + + def table_data(self) -> Dict[str, Any]: + part_mod = { + 'Type': self.status.value, + 'Name': self.name, + 'Size': self.length.format_highest(), + 'FS type': self.fs_type.value, + 'Mountpoint': str(self.mountpoint) if self.mountpoint else '', + 'Mount options': ', '.join(self.mount_options), + 'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol') + } + return part_mod + + def is_modify(self) -> bool: + return self.status == LvmVolumeStatus.Modify + + def exists(self) -> bool: + return self.status == LvmVolumeStatus.Exist + + def is_exists_or_modify(self) -> bool: + return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify] + + def is_root(self) -> bool: + if self.mountpoint is not None: + return Path('/') == self.mountpoint + else: + for subvol in self.btrfs_subvols: + if subvol.is_root(): + return True + + return False + + +@dataclass +class LvmGroupInfo: + vg_size: Size + vg_uuid: str + + +@dataclass +class LvmVolumeInfo: + lv_name: str + vg_name: str + lv_size: Size + + +@dataclass +class LvmPVInfo: + pv_name: Path + lv_name: str + vg_name: str + + +@dataclass +class LvmConfiguration: + config_type: LvmLayoutType + vol_groups: List[LvmVolumeGroup] + + def __post_init__(self): + # make sure all volume groups have unique PVs + pvs = [] + for group in self.vol_groups: + for pv in group.pvs: + if pv in pvs: + raise ValueError('A PV cannot be used in multiple volume groups') + pvs.append(pv) + + def json(self) -> Dict[str, Any]: + return { + 'config_type': self.config_type.value, + 'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups] + } + + @staticmethod + def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration: + lvm_pvs = [] + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.obj_id in arg.get('lvm_pvs', []): + lvm_pvs.append(part) + + return LvmConfiguration( + config_type=LvmLayoutType(arg['config_type']), + vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']], + ) + + def get_all_pvs(self) -> List[PartitionModification]: + pvs = [] + for vg in self.vol_groups: + pvs += vg.pvs + + return pvs + + def get_all_volumes(self) -> List[LvmVolume]: + volumes = [] + + for vg in self.vol_groups: + volumes += vg.volumes + + return volumes + + def get_root_volume(self) -> Optional[LvmVolume]: + for vg in self.vol_groups: + filtered = next(filter(lambda x: x.is_root(), vg.volumes), None) + if filtered: + return filtered + + return None + + +# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str: +# """ +# Find the LUKS superblock UUID for the device that +# contains the given logical volume +# """ +# for vg in self.vol_groups: +# if vg.contains_lv(lv): + + @dataclass class DeviceModification: device: BDevice @@ -885,11 +1165,16 @@ class DeviceModification: class EncryptionType(Enum): NoEncryption = "no_encryption" Luks = "luks" + LvmOnLuks = 'lvm_on_luks' + LuksOnLvm = 'luks_on_lvm' @classmethod def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']: return { - 'Luks': EncryptionType.Luks + str(_('No Encryption')): EncryptionType.NoEncryption, + str(_('LUKS')): EncryptionType.Luks, + str(_('LVM on LUKS')): EncryptionType.LvmOnLuks, + str(_('LUKS on LVM')): EncryptionType.LuksOnLvm } @classmethod @@ -906,18 +1191,31 @@ class EncryptionType(Enum): @dataclass class DiskEncryption: - encryption_type: EncryptionType = EncryptionType.Luks + encryption_type: EncryptionType = EncryptionType.NoEncryption encryption_password: str = '' partitions: List[PartitionModification] = field(default_factory=list) + lvm_volumes: List[LvmVolume] = field(default_factory=list) hsm_device: Optional[Fido2Device] = None - def should_generate_encryption_file(self, part_mod: PartitionModification) -> bool: - return part_mod in self.partitions and part_mod.mountpoint != Path('/') + def __post_init__(self): + if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions: + raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined') + + if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes: + raise ValueError('LuksOnLvm encryption require LMV volumes to be defined') + + def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool: + if isinstance(dev, PartitionModification): + return dev in self.partitions and dev.mountpoint != Path('/') + elif isinstance(dev, LvmVolume): + return dev in self.lvm_volumes and dev.mountpoint != Path('/') + return False def json(self) -> Dict[str, Any]: obj: Dict[str, Any] = { 'encryption_type': self.encryption_type.value, - 'partitions': [p.obj_id for p in self.partitions] + 'partitions': [p.obj_id for p in self.partitions], + 'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes] } if self.hsm_device: @@ -925,23 +1223,47 @@ class DiskEncryption: return obj + @classmethod + def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool: + partitions = [] + + for mod in disk_config.device_modifications: + for part in mod.partitions: + partitions.append(part) + + if len(partitions) > 2: # assume one boot and at least 2 additional + if disk_config.lvm_config: + return False + + return True + @classmethod def parse_arg( cls, disk_config: DiskLayoutConfiguration, arg: Dict[str, Any], password: str = '' - ) -> 'DiskEncryption': + ) -> Optional['DiskEncryption']: + if not cls.validate_enc(disk_config): + return None + enc_partitions = [] for mod in disk_config.device_modifications: for part in mod.partitions: if part.obj_id in arg.get('partitions', []): enc_partitions.append(part) + volumes = [] + if disk_config.lvm_config: + for vol in disk_config.lvm_config.get_all_volumes(): + if vol.obj_id in arg.get('lvm_volumes', []): + volumes.append(vol) + enc = DiskEncryption( EncryptionType(arg['encryption_type']), password, - enc_partitions + enc_partitions, + volumes ) if hsm := arg.get('hsm_device', None): @@ -992,7 +1314,7 @@ class LsblkInfo: tran: Optional[str] = None partn: Optional[int] = None partuuid: Optional[str] = None - parttype :Optional[str] = None + parttype: Optional[str] = None uuid: Optional[str] = None fstype: Optional[str] = None fsver: Optional[str] = None @@ -1017,7 +1339,7 @@ class LsblkInfo: 'tran': self.tran, 'partn': self.partn, 'partuuid': self.partuuid, - 'parttype' : self.parttype, + 'parttype': self.parttype, 'uuid': self.uuid, 'fstype': self.fstype, 'fsver': self.fsver, @@ -1102,13 +1424,24 @@ def _clean_field(name: str, clean_type: CleanType) -> str: return name.replace('_percentage', '%').replace('_', '-') -def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[LsblkInfo]: +def _fetch_lsblk_info( + dev_path: Optional[Union[Path, str]] = None, + reverse: bool = False, + full_dev_path: bool = False, + retry: int = 3 +) -> List[LsblkInfo]: fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()] cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)] if dev_path: cmd.append(str(dev_path)) + if reverse: + cmd.append('--inverse') + + if full_dev_path: + cmd.append('--paths') + try: result = SysCommand(cmd).decode() except SysCallError as err: @@ -1132,8 +1465,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[Lsblk return [LsblkInfo.from_json(device) for device in blockdevices] -def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo: - if infos := _fetch_lsblk_info(dev_path): +def get_lsblk_info( + dev_path: Union[Path, str], + reverse: bool = False, + full_dev_path: bool = False +) -> LsblkInfo: + if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path): return infos[0] raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"') @@ -1142,6 +1479,7 @@ def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo: def get_all_lsblk_info() -> List[LsblkInfo]: return _fetch_lsblk_info() + def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]: def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]: devices = [] diff --git a/archinstall/lib/disk/disk_menu.py b/archinstall/lib/disk/disk_menu.py new file mode 100644 index 00000000..a7d9ccc3 --- /dev/null +++ b/archinstall/lib/disk/disk_menu.py @@ -0,0 +1,140 @@ +from typing import Dict, Optional, Any, TYPE_CHECKING, List + +from . import DiskLayoutConfiguration, DiskLayoutType +from .device_model import LvmConfiguration +from ..disk import ( + DeviceModification +) +from ..interactions import select_disk_config +from ..interactions.disk_conf import select_lvm_config +from ..menu import ( + Selector, + AbstractSubMenu +) +from ..output import FormattedOutput + +if TYPE_CHECKING: + _: Any + + +class DiskLayoutConfigurationMenu(AbstractSubMenu): + def __init__( + self, + disk_layout_config: Optional[DiskLayoutConfiguration], + data_store: Dict[str, Any], + advanced: bool = False + ): + self._disk_layout_config = disk_layout_config + self._advanced = advanced + + super().__init__(data_store=data_store, preview_size=0.5) + + def setup_selection_menu_options(self): + self._menu_options['disk_config'] = \ + Selector( + _('Partitioning'), + lambda x: self._select_disk_layout_config(x), + display_func=lambda x: self._display_disk_layout(x), + preview_func=self._prev_disk_layouts, + default=self._disk_layout_config, + enabled=True + ) + self._menu_options['lvm_config'] = \ + Selector( + _('Logical Volume Management (LVM)'), + lambda x: self._select_lvm_config(x), + display_func=lambda x: self.defined_text if x else '', + preview_func=self._prev_lvm_config, + default=self._disk_layout_config.lvm_config if self._disk_layout_config else None, + dependencies=[self._check_dep_lvm], + enabled=True + ) + + def run(self, allow_reset: bool = True) -> Optional[DiskLayoutConfiguration]: + super().run(allow_reset=allow_reset) + + disk_layout_config: Optional[DiskLayoutConfiguration] = self._data_store.get('disk_config', None) + + if disk_layout_config: + disk_layout_config.lvm_config = self._data_store.get('lvm_config', None) + + return disk_layout_config + + def _check_dep_lvm(self) -> bool: + disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + + if disk_layout_conf and disk_layout_conf.config_type == DiskLayoutType.Default: + return True + + return False + + def _select_disk_layout_config( + self, + preset: Optional[DiskLayoutConfiguration] + ) -> Optional[DiskLayoutConfiguration]: + disk_config = select_disk_config(preset, advanced_option=self._advanced) + + if disk_config != preset: + self._menu_options['lvm_config'].set_current_selection(None) + + return disk_config + + def _select_lvm_config(self, preset: Optional[LvmConfiguration]) -> Optional[LvmConfiguration]: + disk_config: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + if disk_config: + return select_lvm_config(disk_config, preset=preset) + return preset + + def _display_disk_layout(self, current_value: Optional[DiskLayoutConfiguration] = None) -> str: + if current_value: + return current_value.config_type.display_msg() + return '' + + def _prev_disk_layouts(self) -> Optional[str]: + disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + + if disk_layout_conf: + device_mods: List[DeviceModification] = \ + list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications)) + + if device_mods: + output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg()) + output_btrfs = '' + + for mod in device_mods: + # create partition table + partition_table = FormattedOutput.as_table(mod.partitions) + + output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n' + output_partition += partition_table + '\n' + + # create btrfs table + btrfs_partitions = list( + filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions) + ) + for partition in btrfs_partitions: + output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n' + + output = output_partition + output_btrfs + return output.rstrip() + + return None + + def _prev_lvm_config(self) -> Optional[str]: + lvm_config: Optional[LvmConfiguration] = self._menu_options['lvm_config'].current_selection + + if lvm_config: + output = '{}: {}\n'.format(str(_('Configuration')), lvm_config.config_type.display_msg()) + + for vol_gp in lvm_config.vol_groups: + pv_table = FormattedOutput.as_table(vol_gp.pvs) + output += '{}:\n{}'.format(str(_('Physical volumes')), pv_table) + + output += f'\nVolume Group: {vol_gp.name}' + + lvm_volumes = FormattedOutput.as_table(vol_gp.volumes) + output += '\n\n{}:\n{}'.format(str(_('Volumes')), lvm_volumes) + + return output + + return None diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py index c3a1c32f..b0e292ce 100644 --- a/archinstall/lib/disk/encryption_menu.py +++ b/archinstall/lib/disk/encryption_menu.py @@ -1,6 +1,7 @@ from pathlib import Path from typing import Dict, Optional, Any, TYPE_CHECKING, List +from . import LvmConfiguration, LvmVolume from ..disk import ( DeviceModification, DiskLayoutConfiguration, @@ -40,31 +41,41 @@ class DiskEncryptionMenu(AbstractSubMenu): super().__init__(data_store=data_store) def setup_selection_menu_options(self): + self._menu_options['encryption_type'] = \ + Selector( + _('Encryption type'), + func=lambda preset: select_encryption_type(self._disk_config, preset), + display_func=lambda x: EncryptionType.type_to_text(x) if x else None, + default=self._preset.encryption_type, + enabled=True, + ) self._menu_options['encryption_password'] = \ Selector( _('Encryption password'), lambda x: select_encrypted_password(), + dependencies=[self._check_dep_enc_type], display_func=lambda x: secret(x) if x else '', default=self._preset.encryption_password, enabled=True ) - self._menu_options['encryption_type'] = \ - Selector( - _('Encryption type'), - func=lambda preset: select_encryption_type(preset), - display_func=lambda x: EncryptionType.type_to_text(x) if x else None, - dependencies=['encryption_password'], - default=self._preset.encryption_type, - enabled=True - ) self._menu_options['partitions'] = \ Selector( _('Partitions'), func=lambda preset: select_partitions_to_encrypt(self._disk_config.device_modifications, preset), display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None, - dependencies=['encryption_password'], + dependencies=[self._check_dep_partitions], default=self._preset.partitions, - preview_func=self._prev_disk_layouts, + preview_func=self._prev_partitions, + enabled=True + ) + self._menu_options['lvm_vols'] = \ + Selector( + _('LVM volumes'), + func=lambda preset: self._select_lvm_vols(preset), + display_func=lambda x: f'{len(x)} {_("LVM volumes")}' if x else None, + dependencies=[self._check_dep_lvm_vols], + default=self._preset.lvm_volumes, + preview_func=self._prev_lvm_vols, enabled=True ) self._menu_options['HSM'] = \ @@ -73,19 +84,54 @@ class DiskEncryptionMenu(AbstractSubMenu): func=lambda preset: select_hsm(preset), display_func=lambda x: self._display_hsm(x), preview_func=self._prev_hsm, - dependencies=['encryption_password'], + dependencies=[self._check_dep_enc_type], default=self._preset.hsm_device, enabled=True ) + def _select_lvm_vols(self, preset: List[LvmVolume]) -> List[LvmVolume]: + if self._disk_config.lvm_config: + return select_lvm_vols_to_encrypt(self._disk_config.lvm_config, preset=preset) + return [] + + def _check_dep_enc_type(self) -> bool: + enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection + if enc_type and enc_type != EncryptionType.NoEncryption: + return True + return False + + def _check_dep_partitions(self) -> bool: + enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection + if enc_type and enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks]: + return True + return False + + def _check_dep_lvm_vols(self) -> bool: + enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection + if enc_type and enc_type == EncryptionType.LuksOnLvm: + return True + return False + def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]: super().run(allow_reset=allow_reset) - if self._data_store.get('encryption_password', None): + enc_type = self._data_store.get('encryption_type', None) + enc_password = self._data_store.get('encryption_password', None) + enc_partitions = self._data_store.get('partitions', None) + enc_lvm_vols = self._data_store.get('lvm_vols', None) + + if enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and enc_partitions: + enc_lvm_vols = [] + + if enc_type == EncryptionType.LuksOnLvm: + enc_partitions = [] + + if enc_type != EncryptionType.NoEncryption and enc_password and (enc_partitions or enc_lvm_vols): return DiskEncryption( - encryption_password=self._data_store.get('encryption_password', None), - encryption_type=self._data_store['encryption_type'], - partitions=self._data_store.get('partitions', None), + encryption_password=enc_password, + encryption_type=enc_type, + partitions=enc_partitions, + lvm_volumes=enc_lvm_vols, hsm_device=self._data_store.get('HSM', None) ) @@ -97,7 +143,7 @@ class DiskEncryptionMenu(AbstractSubMenu): return None - def _prev_disk_layouts(self) -> Optional[str]: + def _prev_partitions(self) -> Optional[str]: partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection if partitions: output = str(_('Partitions to be encrypted')) + '\n' @@ -106,6 +152,15 @@ class DiskEncryptionMenu(AbstractSubMenu): return None + def _prev_lvm_vols(self) -> Optional[str]: + volumes: Optional[List[PartitionModification]] = self._menu_options['lvm_vols'].current_selection + if volumes: + output = str(_('LVM volumes to be encrypted')) + '\n' + output += FormattedOutput.as_table(volumes) + return output.rstrip() + + return None + def _prev_hsm(self) -> Optional[str]: try: Fido2.get_fido2_devices() @@ -123,13 +178,19 @@ class DiskEncryptionMenu(AbstractSubMenu): return None -def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]: +def select_encryption_type(disk_config: DiskLayoutConfiguration, preset: EncryptionType) -> Optional[EncryptionType]: title = str(_('Select disk encryption option')) - options = [ - EncryptionType.type_to_text(EncryptionType.Luks) - ] + + if disk_config.lvm_config: + options = [ + EncryptionType.type_to_text(EncryptionType.LvmOnLuks), + EncryptionType.type_to_text(EncryptionType.LuksOnLvm) + ] + else: + options = [EncryptionType.type_to_text(EncryptionType.Luks)] preset_value = EncryptionType.type_to_text(preset) + choice = Menu(title, options, preset_values=preset_value).run() match choice.type_: @@ -197,3 +258,31 @@ def select_partitions_to_encrypt( case MenuSelectionType.Selection: return choice.multi_value return [] + + +def select_lvm_vols_to_encrypt( + lvm_config: LvmConfiguration, + preset: List[LvmVolume] +) -> List[LvmVolume]: + volumes: List[LvmVolume] = lvm_config.get_all_volumes() + + if volumes: + title = str(_('Select which LVM volumes to encrypt')) + partition_table = FormattedOutput.as_table(volumes) + + choice = TableMenu( + title, + table_data=(volumes, partition_table), + preset=preset, + multi=True + ).run() + + match choice.type_: + case MenuSelectionType.Reset: + return [] + case MenuSelectionType.Skip: + return preset + case MenuSelectionType.Selection: + return choice.multi_value + + return [] diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py index 49904c17..5a139534 100644 --- a/archinstall/lib/disk/fido.py +++ b/archinstall/lib/disk/fido.py @@ -4,7 +4,7 @@ import getpass from pathlib import Path from typing import List -from .device_model import PartitionModification, Fido2Device +from .device_model import Fido2Device from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes from ..output import error, info from ..exceptions import SysCallError @@ -72,16 +72,16 @@ class Fido2: def fido2_enroll( cls, hsm_device: Fido2Device, - part_mod: PartitionModification, + dev_path: Path, password: str ): - worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {part_mod.dev_path}", peek_output=True) + worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True) pw_inputted = False pin_inputted = False while worker.is_alive(): if pw_inputted is False: - if bytes(f"please enter current passphrase for disk {part_mod.dev_path}", 'UTF-8') in worker._trace_log.lower(): + if bytes(f"please enter current passphrase for disk {dev_path}", 'UTF-8') in worker._trace_log.lower(): worker.write(bytes(password, 'UTF-8')) pw_inputted = True elif pin_inputted is False: diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 9c6e6d35..5c11896e 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -3,13 +3,21 @@ from __future__ import annotations import signal import sys import time -from typing import Any, Optional, TYPE_CHECKING +from pathlib import Path +from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set -from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption from .device_handler import device_handler +from .device_model import ( + DiskLayoutConfiguration, DiskLayoutType, PartitionTable, + FilesystemType, DiskEncryption, LvmVolumeGroup, + Size, Unit, SectorSize, PartitionModification, EncryptionType, + LvmVolume, LvmConfiguration +) from ..hardware import SysInfo -from ..output import debug +from ..luks import Luks2 from ..menu import Menu +from ..output import debug, info +from ..general import SysCommand if TYPE_CHECKING: _: Any @@ -52,13 +60,288 @@ class FilesystemHandler: for mod in device_mods: device_handler.partition(mod, partition_table=partition_table) - device_handler.format(mod, enc_conf=self._enc_config) - for part_mod in mod.partitions: - if part_mod.is_create_or_modify(): + if self._disk_config.lvm_config: + for mod in device_mods: + if boot_part := mod.get_boot_partition(): + debug(f'Formatting boot partition: {boot_part.dev_path}') + self._format_partitions( + [boot_part], + mod.device_path + ) + + self.perform_lvm_operations() + else: + for mod in device_mods: + self._format_partitions( + mod.partitions, + mod.device_path + ) + + for part_mod in mod.partitions: if part_mod.fs_type == FilesystemType.Btrfs: device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config) + def _format_partitions( + self, + partitions: List[PartitionModification], + device_path: Path + ): + """ + Format can be given an overriding path, for instance /dev/null to test + the formatting functionality and in essence the support for the given filesystem. + """ + + # don't touch existing partitions + create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()] + + self._validate_partitions(create_or_modify_parts) + + # make sure all devices are unmounted + device_handler.umount_all_existing(device_path) + + for part_mod in create_or_modify_parts: + # partition will be encrypted + if self._enc_config is not None and part_mod in self._enc_config.partitions: + device_handler.format_encrypted( + part_mod.safe_dev_path, + part_mod.mapper_name, + part_mod.safe_fs_type, + self._enc_config + ) + else: + device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path) + + # synchronize with udev before using lsblk + SysCommand('udevadm settle') + + lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path) + + part_mod.partn = lsblk_info.partn + part_mod.partuuid = lsblk_info.partuuid + part_mod.uuid = lsblk_info.uuid + + def _validate_partitions(self, partitions: List[PartitionModification]): + checks = { + # verify that all partitions have a path set (which implies that they have been created) + lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'), + # crypto luks is not a valid file system type + lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError( + 'Crypto luks cannot be set as a filesystem type'), + # file system type must be set + lambda x: x.fs_type is None: ValueError('File system type must be set for modification') + } + + for check, exc in checks.items(): + found = next(filter(check, partitions), None) + if found is not None: + raise exc + + def perform_lvm_operations(self): + info('Setting up LVM config...') + + if not self._disk_config.lvm_config: + return + + if self._enc_config: + self._setup_lvm_encrypted( + self._disk_config.lvm_config, + self._enc_config + ) + else: + self._setup_lvm(self._disk_config.lvm_config) + self._format_lvm_vols(self._disk_config.lvm_config) + + def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption): + if enc_config.encryption_type == EncryptionType.LvmOnLuks: + enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False) + + self._setup_lvm(lvm_config, enc_mods) + self._format_lvm_vols(lvm_config) + + # export the lvm group safely otherwise the Luks cannot be closed + self._safely_close_lvm(lvm_config) + + for luks in enc_mods.values(): + luks.lock() + elif enc_config.encryption_type == EncryptionType.LuksOnLvm: + self._setup_lvm(lvm_config) + enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False) + self._format_lvm_vols(lvm_config, enc_vols) + + for luks in enc_vols.values(): + luks.lock() + + self._safely_close_lvm(lvm_config) + + def _safely_close_lvm(self, lvm_config: LvmConfiguration): + for vg in lvm_config.vol_groups: + for vol in vg.volumes: + device_handler.lvm_vol_change(vol, False) + + device_handler.lvm_export_vg(vg) + + def _setup_lvm( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + self._lvm_create_pvs(lvm_config, enc_mods) + + for vg in lvm_config.vol_groups: + pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods) + + device_handler.lvm_vg_create(pv_dev_paths, vg.name) + + # figure out what the actual available size in the group is + vg_info = device_handler.lvm_group_info(vg.name) + + if not vg_info: + raise ValueError('Unable to fetch VG info') + + # the actual available LVM Group size will be smaller than the + # total PVs size due to reserved metadata storage etc. + # so we'll have a look at the total avail. size, check the delta + # to the desired sizes and subtract some equally from the actually + # created volume + avail_size = vg_info.vg_size + desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default())) + + delta = desired_size - avail_size + max_vol_offset = delta.convert(Unit.B) + + max_vol = max(vg.volumes, key=lambda x: x.length) + + for lv in vg.volumes: + offset = max_vol_offset if lv == max_vol else None + + debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}') + device_handler.lvm_vol_create(vg.name, lv, offset) + + while True: + debug('Fetching LVM volume info') + lv_info = device_handler.lvm_vol_info(lv.name) + if lv_info is not None: + break + + time.sleep(1) + + self._lvm_vol_handle_e2scrub(vg) + + def _format_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_vols: Dict[LvmVolume, Luks2] = {} + ): + for vol in lvm_config.get_all_volumes(): + if enc_vol := enc_vols.get(vol, None): + if not enc_vol.mapper_dev: + raise ValueError('No mapper device defined') + path = enc_vol.mapper_dev + else: + path = vol.safe_dev_path + + # wait a bit otherwise the mkfs will fail as it can't + # find the mapper device yet + device_handler.format(vol.fs_type, path) + + if vol.fs_type == FilesystemType.Btrfs: + device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options) + + def _lvm_create_pvs( + self, + lvm_config: LvmConfiguration, + enc_mods: Dict[PartitionModification, Luks2] = {} + ): + pv_paths: Set[Path] = set() + + for vg in lvm_config.vol_groups: + pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods) + + device_handler.lvm_pv_create(pv_paths) + + def _get_all_pv_dev_paths( + self, + pvs: List[PartitionModification], + enc_mods: Dict[PartitionModification, Luks2] = {} + ) -> Set[Path]: + pv_paths: Set[Path] = set() + + for pv in pvs: + if enc_pv := enc_mods.get(pv, None): + if mapper := enc_pv.mapper_dev: + pv_paths.add(mapper) + else: + pv_paths.add(pv.safe_dev_path) + + return pv_paths + + def _encrypt_lvm_vols( + self, + lvm_config: LvmConfiguration, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[LvmVolume, Luks2]: + enc_vols: Dict[LvmVolume, Luks2] = {} + + for vol in lvm_config.get_all_volumes(): + if vol in enc_config.lvm_volumes: + luks_handler = device_handler.encrypt( + vol.safe_dev_path, + vol.mapper_name, + enc_config.encryption_password, + lock_after_create + ) + + enc_vols[vol] = luks_handler + + return enc_vols + + def _encrypt_partitions( + self, + enc_config: DiskEncryption, + lock_after_create: bool = True + ) -> Dict[PartitionModification, Luks2]: + enc_mods: Dict[PartitionModification, Luks2] = {} + + for mod in self._disk_config.device_modifications: + partitions = mod.partitions + + # don't touch existing partitions + filtered_part = [p for p in partitions if not p.exists()] + + self._validate_partitions(filtered_part) + + # make sure all devices are unmounted + device_handler.umount_all_existing(mod.device_path) + + enc_mods = {} + + for part_mod in filtered_part: + if part_mod in enc_config.partitions: + luks_handler = device_handler.encrypt( + part_mod.safe_dev_path, + part_mod.mapper_name, + enc_config.encryption_password, + lock_after_create=lock_after_create + ) + + enc_mods[part_mod] = luks_handler + + return enc_mods + + def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup): + # from arch wiki: + # If a logical volume will be formatted with ext4, leave at least 256 MiB + # free space in the volume group to allow using e2scrub + if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]): + largest_vol = max(vol_gp.volumes, key=lambda x: x.length) + + device_handler.lvm_vol_reduce( + largest_vol.safe_dev_path, + Size(256, Unit.MiB, SectorSize.default()) + ) + def _do_countdown(self) -> bool: SIG_TRIGGER = False diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py index 823605e3..330f61a3 100644 --- a/archinstall/lib/disk/partitioning_menu.py +++ b/archinstall/lib/disk/partitioning_menu.py @@ -2,7 +2,7 @@ from __future__ import annotations import re from pathlib import Path -from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple +from typing import Any, TYPE_CHECKING, List, Optional, Tuple from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \ ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption @@ -38,21 +38,6 @@ class PartitioningList(ListManager): display_actions = list(self._actions.values()) super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:]) - def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]: - table = FormattedOutput.as_table(data) - rows = table.split('\n') - - # these are the header rows of the table and do not map to any User obviously - # we're adding 2 spaces as prefix because the menu selector '> ' will be put before - # the selectable rows so the header has to be aligned - display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None} - - for row, user in zip(rows[2:], data): - row = row.replace('|', '\\|') - display_data[row] = user - - return display_data - def selected_action_display(self, partition: PartitionModification) -> str: return str(_('Partition')) @@ -258,7 +243,6 @@ class PartitioningList(ListManager): while True: value = TextInput(prompt).run().strip() size: Optional[Size] = None - if not value: size = default else: diff --git a/archinstall/lib/disk/subvolume_menu.py b/archinstall/lib/disk/subvolume_menu.py index 48afa829..ea77149d 100644 --- a/archinstall/lib/disk/subvolume_menu.py +++ b/archinstall/lib/disk/subvolume_menu.py @@ -1,9 +1,8 @@ from pathlib import Path -from typing import Dict, List, Optional, Any, TYPE_CHECKING +from typing import List, Optional, Any, TYPE_CHECKING from .device_model import SubvolumeModification from ..menu import TextInput, ListManager -from ..output import FormattedOutput if TYPE_CHECKING: _: Any @@ -18,21 +17,6 @@ class SubvolumeMenu(ListManager): ] super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:]) - def reformat(self, data: List[SubvolumeModification]) -> Dict[str, Optional[SubvolumeModification]]: - table = FormattedOutput.as_table(data) - rows = table.split('\n') - - # these are the header rows of the table and do not map to any User obviously - # we're adding 2 spaces as prefix because the menu selector '> ' will be put before - # the selectable rows so the header has to be aligned - display_data: Dict[str, Optional[SubvolumeModification]] = {f' {rows[0]}': None, f' {rows[1]}': None} - - for row, subvol in zip(rows[2:], data): - row = row.replace('|', '\\|') - display_data[row] = subvol - - return display_data - def selected_action_display(self, subvolume: SubvolumeModification) -> str: return str(subvolume.name) diff --git a/archinstall/lib/global_menu.py b/archinstall/lib/global_menu.py index e65915db..1b5e779b 100644 --- a/archinstall/lib/global_menu.py +++ b/archinstall/lib/global_menu.py @@ -14,7 +14,6 @@ from .models.audio_configuration import Audio, AudioConfiguration from .models.users import User from .output import FormattedOutput from .profile.profile_menu import ProfileConfiguration -from .storage import storage from .configuration import save_config from .interactions import add_number_of_parallel_downloads from .interactions import ask_additional_packages_to_install @@ -30,7 +29,6 @@ from .interactions import select_additional_repositories from .interactions import select_kernel from .utils.util import format_cols from .interactions import ask_ntp -from .interactions.disk_conf import select_disk_config if TYPE_CHECKING: _: Any @@ -38,7 +36,6 @@ if TYPE_CHECKING: class GlobalMenu(AbstractMenu): def __init__(self, data_store: Dict[str, Any]): - self._defined_text = str(_('Defined')) super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3) def setup_selection_menu_options(self): @@ -54,20 +51,20 @@ class GlobalMenu(AbstractMenu): _('Locales'), lambda preset: self._locale_selection(preset), preview_func=self._prev_locale, - display_func=lambda x: self._defined_text if x else '') + display_func=lambda x: self.defined_text if x else '') self._menu_options['mirror_config'] = \ Selector( _('Mirrors'), lambda preset: self._mirror_configuration(preset), - display_func=lambda x: self._defined_text if x else '', + display_func=lambda x: self.defined_text if x else '', preview_func=self._prev_mirror_config ) self._menu_options['disk_config'] = \ Selector( _('Disk configuration'), lambda preset: self._select_disk_config(preset), - preview_func=self._prev_disk_layouts, - display_func=lambda x: self._display_disk_layout(x), + preview_func=self._prev_disk_config, + display_func=lambda x: self.defined_text if x else '', ) self._menu_options['disk_encryption'] = \ Selector( @@ -75,7 +72,8 @@ class GlobalMenu(AbstractMenu): lambda preset: self._disk_encryption(preset), preview_func=self._prev_disk_encryption, display_func=lambda x: self._display_disk_encryption(x), - dependencies=['disk_config']) + dependencies=['disk_config'] + ) self._menu_options['swap'] = \ Selector( _('Swap'), @@ -140,7 +138,7 @@ class GlobalMenu(AbstractMenu): Selector( _('Additional packages'), lambda preset: ask_additional_packages_to_install(preset), - display_func=lambda x: self._defined_text if x else '', + display_func=lambda x: self.defined_text if x else '', preview_func=self._prev_additional_pkgs, default=[]) self._menu_options['additional-repositories'] = \ @@ -247,14 +245,17 @@ class GlobalMenu(AbstractMenu): return config.type.display_msg() def _disk_encryption(self, preset: Optional[disk.DiskEncryption]) -> Optional[disk.DiskEncryption]: - mods: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + disk_config: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection - if not mods: + if not disk_config: # this should not happen as the encryption menu has the disk_config as dependency raise ValueError('No disk layout specified') + if not disk.DiskEncryption.validate_enc(disk_config): + return None + data_store: Dict[str, Any] = {} - disk_encryption = disk.DiskEncryptionMenu(mods, data_store, preset=preset).run() + disk_encryption = disk.DiskEncryptionMenu(disk_config, data_store, preset=preset).run() return disk_encryption def _locale_selection(self, preset: LocaleConfiguration) -> LocaleConfiguration: @@ -287,44 +288,35 @@ class GlobalMenu(AbstractMenu): return format_cols(packages, None) return None - def _prev_disk_layouts(self) -> Optional[str]: + def _prev_disk_config(self) -> Optional[str]: selector = self._menu_options['disk_config'] disk_layout_conf: Optional[disk.DiskLayoutConfiguration] = selector.current_selection + output = '' if disk_layout_conf: - device_mods: List[disk.DeviceModification] = \ - list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications)) - - if device_mods: - output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg()) - output_btrfs = '' + output += str(_('Configuration type: {}')).format(disk_layout_conf.config_type.display_msg()) - for mod in device_mods: - # create partition table - partition_table = FormattedOutput.as_table(mod.partitions) - - output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n' - output_partition += partition_table + '\n' - - # create btrfs table - btrfs_partitions = list( - filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions) - ) - for partition in btrfs_partitions: - output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n' + if disk_layout_conf.lvm_config: + output += '\n{}: {}'.format(str(_('LVM configuration type')), disk_layout_conf.lvm_config.config_type.display_msg()) - output = output_partition + output_btrfs - return output.rstrip() + if output: + return output return None - def _display_disk_layout(self, current_value: Optional[disk.DiskLayoutConfiguration] = None) -> str: + def _display_disk_config(self, current_value: Optional[disk.DiskLayoutConfiguration] = None) -> str: if current_value: return current_value.config_type.display_msg() return '' def _prev_disk_encryption(self) -> Optional[str]: + disk_config: Optional[disk.DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection + + if disk_config and not disk.DiskEncryption.validate_enc(disk_config): + return str(_('LVM disk encryption with more than 2 partitions is currently not supported')) + encryption: Optional[disk.DiskEncryption] = self._menu_options['disk_encryption'].current_selection + if encryption: enc_type = disk.EncryptionType.type_to_text(encryption.encryption_type) output = str(_('Encryption type')) + f': {enc_type}\n' @@ -332,6 +324,8 @@ class GlobalMenu(AbstractMenu): if encryption.partitions: output += 'Partitions: {} selected'.format(len(encryption.partitions)) + '\n' + elif encryption.lvm_volumes: + output += 'LVM volumes: {} selected'.format(len(encryption.lvm_volumes)) + '\n' if encryption.hsm_device: output += f'HSM: {encryption.hsm_device.manufacturer}' @@ -425,10 +419,8 @@ class GlobalMenu(AbstractMenu): self, preset: Optional[disk.DiskLayoutConfiguration] = None ) -> Optional[disk.DiskLayoutConfiguration]: - disk_config = select_disk_config( - preset, - storage['arguments'].get('advanced', False) - ) + data_store: Dict[str, Any] = {} + disk_config = disk.DiskLayoutConfigurationMenu(preset, data_store).run() if disk_config != preset: self._menu_options['disk_encryption'].set_current_selection(None) diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 37121118..8292a3be 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -52,7 +52,7 @@ class Installer: `Installer()` is the wrapper for most basic installation steps. It also wraps :py:func:`~archinstall.Installer.pacstrap` among other things. """ - self.base_packages = base_packages or __packages__[:3] + self._base_packages = base_packages or __packages__[:3] self.kernels = kernels or ['linux'] self._disk_config = disk_config @@ -64,11 +64,11 @@ class Installer: self.helper_flags: Dict[str, Any] = {'base': False, 'bootloader': None} for kernel in self.kernels: - self.base_packages.append(kernel) + self._base_packages.append(kernel) # If using accessibility tools in the live environment, append those to the packages list if accessibility_tools_in_use(): - self.base_packages.extend(__accessibility_packages__) + self._base_packages.extend(__accessibility_packages__) self.post_base_install: List[Callable] = [] @@ -90,6 +90,8 @@ class Installer: self._fstab_entries: List[str] = [] self._zram_enabled = False + self._disable_fstrim = False + self.pacman = Pacman(self.target, storage['arguments'].get('silent', False)) def __enter__(self) -> 'Installer': @@ -198,31 +200,71 @@ class Installer: self._verify_service_stop() def mount_ordered_layout(self): - info('Mounting partitions in order') + debug('Mounting ordered layout') + + luks_handlers: Dict[Any, Luks2] = {} + + match self._disk_encryption.encryption_type: + case disk.EncryptionType.NoEncryption: + self._mount_lvm_layout() + case disk.EncryptionType.Luks: + luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions) + case disk.EncryptionType.LvmOnLuks: + luks_handlers = self._prepare_luks_partitions(self._disk_encryption.partitions) + self._import_lvm() + self._mount_lvm_layout(luks_handlers) + case disk.EncryptionType.LuksOnLvm: + self._import_lvm() + luks_handlers = self._prepare_luks_lvm(self._disk_encryption.lvm_volumes) + self._mount_lvm_layout(luks_handlers) + + # mount all regular partitions + self._mount_partition_layout(luks_handlers) + + def _mount_partition_layout(self, luks_handlers: Dict[Any, Luks2]): + debug('Mounting partition layout') + + # do not mount any PVs part of the LVM configuration + pvs = [] + if self._disk_config.lvm_config: + pvs = self._disk_config.lvm_config.get_all_pvs() for mod in self._disk_config.device_modifications: + not_pv_part_mods = list(filter(lambda x: x not in pvs, mod.partitions)) + # partitions have to mounted in the right order on btrfs the mountpoint will # be empty as the actual subvolumes are getting mounted instead so we'll use # '/' just for sorting - sorted_part_mods = sorted(mod.partitions, key=lambda x: x.mountpoint or Path('/')) - - enc_partitions = [] - if self._disk_encryption.encryption_type is not disk.EncryptionType.NoEncryption: - enc_partitions = list(set(sorted_part_mods) & set(self._disk_encryption.partitions)) - - # attempt to decrypt all luks partitions - luks_handlers = self._prepare_luks_partitions(enc_partitions) + sorted_part_mods = sorted(not_pv_part_mods, key=lambda x: x.mountpoint or Path('/')) for part_mod in sorted_part_mods: if luks_handler := luks_handlers.get(part_mod): - # mount encrypted partition self._mount_luks_partition(part_mod, luks_handler) else: - # partition is not encrypted self._mount_partition(part_mod) - def _prepare_luks_partitions(self, partitions: List[disk.PartitionModification]) -> Dict[ - disk.PartitionModification, Luks2]: + def _mount_lvm_layout(self, luks_handlers: Dict[Any, Luks2] = {}): + lvm_config = self._disk_config.lvm_config + + if not lvm_config: + debug('No lvm config defined to be mounted') + return + + debug('Mounting LVM layout') + + for vg in lvm_config.vol_groups: + sorted_vol = sorted(vg.volumes, key=lambda x: x.mountpoint or Path('/')) + + for vol in sorted_vol: + if luks_handler := luks_handlers.get(vol): + self._mount_luks_volume(vol, luks_handler) + else: + self._mount_lvm_vol(vol) + + def _prepare_luks_partitions( + self, + partitions: List[disk.PartitionModification] + ) -> Dict[disk.PartitionModification, Luks2]: return { part_mod: disk.device_handler.unlock_luks2_dev( part_mod.dev_path, @@ -233,6 +275,33 @@ class Installer: if part_mod.mapper_name and part_mod.dev_path } + def _import_lvm(self): + lvm_config = self._disk_config.lvm_config + + if not lvm_config: + debug('No lvm config defined to be imported') + return + + for vg in lvm_config.vol_groups: + disk.device_handler.lvm_import_vg(vg) + + for vol in vg.volumes: + disk.device_handler.lvm_vol_change(vol, True) + + def _prepare_luks_lvm( + self, + lvm_volumes: List[disk.LvmVolume] + ) -> Dict[disk.LvmVolume, Luks2]: + return { + vol: disk.device_handler.unlock_luks2_dev( + vol.dev_path, + vol.mapper_name, + self._disk_encryption.encryption_password + ) + for vol in lvm_volumes + if vol.mapper_name and vol.dev_path + } + def _mount_partition(self, part_mod: disk.PartitionModification): # it would be none if it's btrfs as the subvolumes will have the mountpoints defined if part_mod.mountpoint and part_mod.dev_path: @@ -246,14 +315,32 @@ class Installer: part_mod.mount_options ) + def _mount_lvm_vol(self, volume: disk.LvmVolume): + if volume.fs_type != disk.FilesystemType.Btrfs: + if volume.mountpoint and volume.dev_path: + target = self.target / volume.relative_mountpoint + disk.device_handler.mount(volume.dev_path, target, options=volume.mount_options) + + if volume.fs_type == disk.FilesystemType.Btrfs and volume.dev_path: + self._mount_btrfs_subvol(volume.dev_path, volume.btrfs_subvols, volume.mount_options) + def _mount_luks_partition(self, part_mod: disk.PartitionModification, luks_handler: Luks2): - # it would be none if it's btrfs as the subvolumes will have the mountpoints defined - if part_mod.mountpoint and luks_handler.mapper_dev: - target = self.target / part_mod.relative_mountpoint - disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) + if part_mod.fs_type != disk.FilesystemType.Btrfs: + if part_mod.mountpoint and luks_handler.mapper_dev: + target = self.target / part_mod.relative_mountpoint + disk.device_handler.mount(luks_handler.mapper_dev, target, options=part_mod.mount_options) if part_mod.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: - self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols) + self._mount_btrfs_subvol(luks_handler.mapper_dev, part_mod.btrfs_subvols, part_mod.mount_options) + + def _mount_luks_volume(self, volume: disk.LvmVolume, luks_handler: Luks2): + if volume.fs_type != disk.FilesystemType.Btrfs: + if volume.mountpoint and luks_handler.mapper_dev: + target = self.target / volume.relative_mountpoint + disk.device_handler.mount(luks_handler.mapper_dev, target, options=volume.mount_options) + + if volume.fs_type == disk.FilesystemType.Btrfs and luks_handler.mapper_dev: + self._mount_btrfs_subvol(luks_handler.mapper_dev, volume.btrfs_subvols, volume.mount_options) def _mount_btrfs_subvol( self, @@ -262,13 +349,23 @@ class Installer: mount_options: List[str] = [] ): for subvol in subvolumes: - disk.device_handler.mount( - dev_path, - self.target / subvol.relative_mountpoint, - options=mount_options + [f'subvol={subvol.name}'] - ) + mountpoint = self.target / subvol.relative_mountpoint + mount_options = mount_options + [f'subvol={subvol.name}'] + disk.device_handler.mount(dev_path, mountpoint, options=mount_options) def generate_key_files(self): + match self._disk_encryption.encryption_type: + case disk.EncryptionType.Luks: + self._generate_key_files_partitions() + case disk.EncryptionType.LuksOnLvm: + self._generate_key_file_lvm_volumes() + case disk.EncryptionType.LvmOnLuks: + # currently LvmOnLuks only supports a single + # partitioning layout (boot + partition) + # so we won't need any keyfile generation atm + pass + + def _generate_key_files_partitions(self): for part_mod in self._disk_encryption.partitions: gen_enc_file = self._disk_encryption.should_generate_encryption_file(part_mod) @@ -279,14 +376,36 @@ class Installer: ) if gen_enc_file and not part_mod.is_root(): - info(f'Creating key-file: {part_mod.dev_path}') + debug(f'Creating key-file: {part_mod.dev_path}') luks_handler.create_keyfile(self.target) if part_mod.is_root() and not gen_enc_file: if self._disk_encryption.hsm_device: disk.Fido2.fido2_enroll( self._disk_encryption.hsm_device, - part_mod, + part_mod.safe_dev_path, + self._disk_encryption.encryption_password + ) + + def _generate_key_file_lvm_volumes(self): + for vol in self._disk_encryption.lvm_volumes: + gen_enc_file = self._disk_encryption.should_generate_encryption_file(vol) + + luks_handler = Luks2( + vol.safe_dev_path, + mapper_name=vol.mapper_name, + password=self._disk_encryption.encryption_password + ) + + if gen_enc_file and not vol.is_root(): + info(f'Creating key-file: {vol.dev_path}') + luks_handler.create_keyfile(self.target) + + if vol.is_root() and not gen_enc_file: + if self._disk_encryption.hsm_device: + disk.Fido2.fido2_enroll( + self._disk_encryption.hsm_device, + vol.safe_dev_path, self._disk_encryption.encryption_password ) @@ -393,7 +512,7 @@ class Installer: for entry in self._fstab_entries: fp.write(f'{entry}\n') - def set_hostname(self, hostname: str, *args: str, **kwargs: str) -> None: + def set_hostname(self, hostname: str): with open(f'{self.target}/etc/hostname', 'w') as fh: fh.write(hostname + '\n') @@ -444,7 +563,7 @@ class Installer: (self.target / 'etc/locale.conf').write_text(f'LANG={lang_value}\n') return True - def set_timezone(self, zone: str, *args: str, **kwargs: str) -> bool: + def set_timezone(self, zone: str) -> bool: if not zone: return True if not len(zone): @@ -532,7 +651,7 @@ class Installer: if enable_services: # If we haven't installed the base yet (function called pre-maturely) if self.helper_flags.get('base', False) is False: - self.base_packages.append('iwd') + self._base_packages.append('iwd') # This function will be called after minimal_installation() # as a hook for post-installs. This hook is only needed if @@ -608,51 +727,98 @@ class Installer: return vendor.get_ucode() return None - def minimal_installation( - self, - testing: bool = False, - multilib: bool = False, - mkinitcpio: bool = True, - hostname: str = 'archinstall', - locale_config: LocaleConfiguration = LocaleConfiguration.default() - ): - _disable_fstrim = False + def _handle_partition_installation(self): + pvs = [] + if self._disk_config.lvm_config: + pvs = self._disk_config.lvm_config.get_all_pvs() + for mod in self._disk_config.device_modifications: for part in mod.partitions: - if part.fs_type is not None: - if (pkg := part.fs_type.installation_pkg) is not None: - self.base_packages.append(pkg) - if (module := part.fs_type.installation_module) is not None: + if part in pvs or part.fs_type is None: + continue + + if (pkg := part.fs_type.installation_pkg) is not None: + self._base_packages.append(pkg) + if (module := part.fs_type.installation_module) is not None: + self._modules.append(module) + if (binary := part.fs_type.installation_binary) is not None: + self._binaries.append(binary) + + # https://github.com/archlinux/archinstall/issues/1837 + if part.fs_type.fs_type_mount == 'btrfs': + self._disable_fstrim = True + + # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. + if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: + if 'fsck' in self._hooks: + self._hooks.remove('fsck') + + if part in self._disk_encryption.partitions: + if self._disk_encryption.hsm_device: + # Required by mkinitcpio to add support for fido2-device options + self.pacman.strap('libfido2') + + if 'sd-encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') + else: + if 'encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') + + def _handle_lvm_installation(self): + if not self._disk_config.lvm_config: + return + + self.add_additional_packages('lvm2') + self._hooks.insert(self._hooks.index('filesystems') - 1, 'lvm2') + + for vg in self._disk_config.lvm_config.vol_groups: + for vol in vg.volumes: + if vol.fs_type is not None: + if (pkg := vol.fs_type.installation_pkg) is not None: + self._base_packages.append(pkg) + if (module := vol.fs_type.installation_module) is not None: self._modules.append(module) - if (binary := part.fs_type.installation_binary) is not None: + if (binary := vol.fs_type.installation_binary) is not None: self._binaries.append(binary) - # https://github.com/archlinux/archinstall/issues/1837 - if part.fs_type.fs_type_mount == 'btrfs': - _disable_fstrim = True + if vol.fs_type.fs_type_mount == 'btrfs': + self._disable_fstrim = True # There is not yet an fsck tool for NTFS. If it's being used for the root filesystem, the hook should be removed. - if part.fs_type.fs_type_mount == 'ntfs3' and part.mountpoint == self.target: + if vol.fs_type.fs_type_mount == 'ntfs3' and vol.mountpoint == self.target: if 'fsck' in self._hooks: self._hooks.remove('fsck') - if part in self._disk_encryption.partitions: - if self._disk_encryption.hsm_device: - # Required by mkinitcpio to add support for fido2-device options - self.pacman.strap('libfido2') + if self._disk_encryption.encryption_type in [disk.EncryptionType.LvmOnLuks, disk.EncryptionType.LuksOnLvm]: + if self._disk_encryption.hsm_device: + # Required by mkinitcpio to add support for fido2-device options + self.pacman.strap('libfido2') + + if 'sd-encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('lvm2') - 1, 'sd-encrypt') + else: + if 'encrypt' not in self._hooks: + self._hooks.insert(self._hooks.index('lvm2') - 1, 'encrypt') - if 'sd-encrypt' not in self._hooks: - self._hooks.insert(self._hooks.index('filesystems'), 'sd-encrypt') - else: - if 'encrypt' not in self._hooks: - self._hooks.insert(self._hooks.index('filesystems'), 'encrypt') + def minimal_installation( + self, + testing: bool = False, + multilib: bool = False, + mkinitcpio: bool = True, + hostname: str = 'archinstall', + locale_config: LocaleConfiguration = LocaleConfiguration.default() + ): + if self._disk_config.lvm_config: + self._handle_lvm_installation() + else: + self._handle_partition_installation() if not SysInfo.has_uefi(): - self.base_packages.append('grub') + self._base_packages.append('grub') if ucode := self._get_microcode(): (self.target / 'boot' / ucode).unlink(missing_ok=True) - self.base_packages.append(ucode.stem) + self._base_packages.append(ucode.stem) else: debug('Archinstall will not install any ucode.') @@ -673,7 +839,7 @@ class Installer: pacman_conf.apply() - self.pacman.strap(self.base_packages) + self.pacman.strap(self._base_packages) self.helper_flags['base-strapped'] = True pacman_conf.persist() @@ -685,7 +851,7 @@ class Installer: # https://github.com/archlinux/archinstall/issues/880 # https://github.com/archlinux/archinstall/issues/1837 # https://github.com/archlinux/archinstall/issues/1841 - if not _disable_fstrim: + if not self._disable_fstrim: self.enable_periodic_trim() # TODO: Support locale and timezone @@ -742,13 +908,24 @@ class Installer: return boot return None - def _get_root_partition(self) -> Optional[disk.PartitionModification]: - for mod in self._disk_config.device_modifications: - if root := mod.get_root_partition(): - return root + def _get_root(self) -> Optional[disk.PartitionModification | disk.LvmVolume]: + if self._disk_config.lvm_config: + return self._disk_config.lvm_config.get_root_volume() + else: + for mod in self._disk_config.device_modifications: + if root := mod.get_root_partition(): + return root return None - def _get_kernel_params( + def _get_luks_uuid_from_mapper_dev(self, mapper_dev_path: Path) -> str: + lsblk_info = disk.get_lsblk_info(mapper_dev_path, reverse=True, full_dev_path=True) + + if not lsblk_info.children or not lsblk_info.children[0].uuid: + raise ValueError('Unable to determine UUID of luks superblock') + + return lsblk_info.children[0].uuid + + def _get_kernel_params_partition( self, root_partition: disk.PartitionModification, id_root: bool = True, @@ -784,20 +961,74 @@ class Installer: debug(f'Identifying root partition by UUID: {root_partition.uuid}') kernel_parameters.append(f'root=UUID={root_partition.uuid}') + return kernel_parameters + + def _get_kernel_params_lvm( + self, + lvm: disk.LvmVolume + ) -> List[str]: + kernel_parameters = [] + + match self._disk_encryption.encryption_type: + case disk.EncryptionType.LvmOnLuks: + if not lvm.vg_name: + raise ValueError(f'Unable to determine VG name for {lvm.name}') + + pv_seg_info = disk.device_handler.lvm_pvseg_info(lvm.vg_name, lvm.name) + + if not pv_seg_info: + raise ValueError(f'Unable to determine PV segment info for {lvm.vg_name}/{lvm.name}') + + uuid = self._get_luks_uuid_from_mapper_dev(pv_seg_info.pv_name) + + if self._disk_encryption.hsm_device: + debug(f'LvmOnLuks, encrypted root partition, HSM, identifying by UUID: {uuid}') + kernel_parameters.append(f'rd.luks.name={uuid}=cryptlvm root={lvm.safe_dev_path}') + else: + debug(f'LvmOnLuks, encrypted root partition, identifying by UUID: {uuid}') + kernel_parameters.append(f'cryptdevice=UUID={uuid}:cryptlvm root={lvm.safe_dev_path}') + case disk.EncryptionType.LuksOnLvm: + uuid = self._get_luks_uuid_from_mapper_dev(lvm.mapper_path) + + if self._disk_encryption.hsm_device: + debug(f'LuksOnLvm, encrypted root partition, HSM, identifying by UUID: {uuid}') + kernel_parameters.append(f'rd.luks.name={uuid}=root root=/dev/mapper/root') + else: + debug(f'LuksOnLvm, encrypted root partition, identifying by UUID: {uuid}') + kernel_parameters.append(f'cryptdevice=UUID={uuid}:root root=/dev/mapper/root') + case disk.EncryptionType.NoEncryption: + debug(f'Identifying root lvm by mapper device: {lvm.dev_path}') + kernel_parameters.append(f'root={lvm.safe_dev_path}') + + return kernel_parameters + + def _get_kernel_params( + self, + root: disk.PartitionModification | disk.LvmVolume, + id_root: bool = True, + partuuid: bool = True + ) -> List[str]: + kernel_parameters = [] + + if isinstance(root, disk.LvmVolume): + kernel_parameters = self._get_kernel_params_lvm(root) + else: + kernel_parameters = self._get_kernel_params_partition(root, id_root, partuuid) + # Zswap should be disabled when using zram. # https://github.com/archlinux/archinstall/issues/881 if self._zram_enabled: kernel_parameters.append('zswap.enabled=0') if id_root: - for sub_vol in root_partition.btrfs_subvols: + for sub_vol in root.btrfs_subvols: if sub_vol.is_root(): kernel_parameters.append(f'rootflags=subvol={sub_vol.name}') break kernel_parameters.append('rw') - kernel_parameters.append(f'rootfstype={root_partition.safe_fs_type.fs_type_mount}') + kernel_parameters.append(f'rootfstype={root.safe_fs_type.fs_type_mount}') kernel_parameters.extend(self._kernel_params) debug(f'kernel parameters: {" ".join(kernel_parameters)}') @@ -807,10 +1038,12 @@ class Installer: def _add_systemd_bootloader( self, boot_partition: disk.PartitionModification, - root_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, efi_partition: Optional[disk.PartitionModification], uki_enabled: bool = False ): + debug('Installing systemd bootloader') + self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): @@ -882,7 +1115,7 @@ class Installer: f'# Created on: {self.init_time}' ) - options = 'options ' + ' '.join(self._get_kernel_params(root_partition)) + options = 'options ' + ' '.join(self._get_kernel_params(root)) for kernel in self.kernels: for variant in ("", "-fallback"): @@ -904,15 +1137,17 @@ class Installer: def _add_grub_bootloader( self, boot_partition: disk.PartitionModification, - root_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, efi_partition: Optional[disk.PartitionModification] ): + debug('Installing grub bootloader') + self.pacman.strap('grub') # no need? grub_default = self.target / 'etc/default/grub' config = grub_default.read_text() - kernel_parameters = ' '.join(self._get_kernel_params(root_partition, False, False)) + kernel_parameters = ' '.join(self._get_kernel_params(root, False, False)) config = re.sub(r'(GRUB_CMDLINE_LINUX=")("\n)', rf'\1{kernel_parameters}\2', config, 1) grub_default.write_text(config) @@ -934,7 +1169,7 @@ class Installer: info(f"GRUB EFI partition: {efi_partition.dev_path}") - self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + self.pacman.strap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? boot_dir_arg = [] if boot_partition.mountpoint and boot_partition.mountpoint != boot_dir: @@ -988,8 +1223,10 @@ class Installer: self, boot_partition: disk.PartitionModification, efi_partition: Optional[disk.PartitionModification], - root_partition: disk.PartitionModification + root: disk.PartitionModification | disk.LvmVolume ): + debug('Installing limine bootloader') + self.pacman.strap('limine') info(f"Limine boot partition: {boot_partition.dev_path}") @@ -1052,7 +1289,7 @@ Exec = /bin/sh -c "{hook_command}" hook_path = hooks_dir / '99-limine.hook' hook_path.write_text(hook_contents) - kernel_params = ' '.join(self._get_kernel_params(root_partition)) + kernel_params = ' '.join(self._get_kernel_params(root)) config_contents = 'TIMEOUT=5\n' for kernel in self.kernels: @@ -1075,9 +1312,11 @@ Exec = /bin/sh -c "{hook_command}" def _add_efistub_bootloader( self, boot_partition: disk.PartitionModification, - root_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, uki_enabled: bool = False ): + debug('Installing efistub bootloader') + self.pacman.strap('efibootmgr') if not SysInfo.has_uefi(): @@ -1092,7 +1331,7 @@ Exec = /bin/sh -c "{hook_command}" entries = ( 'initrd=/initramfs-{kernel}.img', - *self._get_kernel_params(root_partition) + *self._get_kernel_params(root) ) cmdline = [' '.join(entries)] @@ -1122,7 +1361,7 @@ Exec = /bin/sh -c "{hook_command}" def _config_uki( self, - root_partition: disk.PartitionModification, + root: disk.PartitionModification | disk.LvmVolume, efi_partition: Optional[disk.PartitionModification] ): if not efi_partition or not efi_partition.mountpoint: @@ -1130,7 +1369,7 @@ Exec = /bin/sh -c "{hook_command}" # Set up kernel command line with open(self.target / 'etc/kernel/cmdline', 'w') as cmdline: - kernel_parameters = self._get_kernel_params(root_partition) + kernel_parameters = self._get_kernel_params(root) cmdline.write(' '.join(kernel_parameters) + '\n') diff_mountpoint = None @@ -1191,37 +1430,33 @@ Exec = /bin/sh -c "{hook_command}" efi_partition = self._get_efi_partition() boot_partition = self._get_boot_partition() - root_partition = self._get_root_partition() + root = self._get_root() if boot_partition is None: raise ValueError(f'Could not detect boot at mountpoint {self.target}') - if root_partition is None: + if root is None: raise ValueError(f'Could not detect root at mountpoint {self.target}') info(f'Adding bootloader {bootloader.value} to {boot_partition.dev_path}') if uki_enabled: - self._config_uki(root_partition, efi_partition) + self._config_uki(root, efi_partition) match bootloader: case Bootloader.Systemd: - self._add_systemd_bootloader(boot_partition, root_partition, efi_partition, uki_enabled) + self._add_systemd_bootloader(boot_partition, root, efi_partition, uki_enabled) case Bootloader.Grub: - self._add_grub_bootloader(boot_partition, root_partition, efi_partition) + self._add_grub_bootloader(boot_partition, root, efi_partition) case Bootloader.Efistub: - self._add_efistub_bootloader(boot_partition, root_partition, uki_enabled) + self._add_efistub_bootloader(boot_partition, root, uki_enabled) case Bootloader.Limine: - self._add_limine_bootloader(boot_partition, efi_partition, root_partition) + self._add_limine_bootloader(boot_partition, efi_partition, root) def add_additional_packages(self, packages: Union[str, List[str]]) -> bool: return self.pacman.strap(packages) - def _enable_users(self, service: str, users: List[User]): - for user in users: - self.arch_chroot(f'systemctl enable --user {service}', run_as=user.username) - - def enable_sudo(self, entity: str, group :bool = False): + def enable_sudo(self, entity: str, group: bool = False): info(f'Enabling sudo permissions for {entity}') sudoers_dir = f"{self.target}/etc/sudoers.d" @@ -1237,7 +1472,7 @@ Exec = /bin/sh -c "{hook_command}" # We count how many files are there already so we know which number to prefix the file with num_of_rules_already = len(os.listdir(sudoers_dir)) - file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc + file_num_str = "{:02d}".format(num_of_rules_already) # We want 00_user1, 01_user2, etc # Guarantees that entity str does not contain invalid characters for a linux file name: # \ / : * ? " < > | @@ -1293,7 +1528,7 @@ Exec = /bin/sh -c "{hook_command}" if sudo and self.enable_sudo(user): self.helper_flags['user'] = True - def user_set_pw(self, user :str, password :str) -> bool: + def user_set_pw(self, user: str, password: str) -> bool: info(f'Setting password for {user}') if user == 'root': @@ -1310,7 +1545,7 @@ Exec = /bin/sh -c "{hook_command}" except SysCallError: return False - def user_set_shell(self, user :str, shell :str) -> bool: + def user_set_shell(self, user: str, shell: str) -> bool: info(f'Setting shell for {user} to {shell}') try: @@ -1319,7 +1554,7 @@ Exec = /bin/sh -c "{hook_command}" except SysCallError: return False - def chown(self, owner :str, path :str, options :List[str] = []) -> bool: + def chown(self, owner: str, path: str, options: List[str] = []) -> bool: cleaned_path = path.replace('\'', '\\\'') try: SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'") diff --git a/archinstall/lib/interactions/disk_conf.py b/archinstall/lib/interactions/disk_conf.py index 9d0042d6..f80af9ca 100644 --- a/archinstall/lib/interactions/disk_conf.py +++ b/archinstall/lib/interactions/disk_conf.py @@ -58,7 +58,7 @@ def select_devices(preset: List[disk.BDevice] = []) -> List[disk.BDevice]: case MenuSelectionType.Reset: return [] case MenuSelectionType.Skip: return preset case MenuSelectionType.Selection: - selected_device_info: List[disk._DeviceInfo] = choice.value # type: ignore + selected_device_info: List[disk._DeviceInfo] = choice.single_value selected_devices = [] for device in devices: @@ -73,7 +73,6 @@ def get_default_partition_layout( filesystem_type: Optional[disk.FilesystemType] = None, advanced_option: bool = False ) -> List[disk.DeviceModification]: - if len(devices) == 1: device_modification = suggest_single_disk_layout( devices[0], @@ -133,7 +132,7 @@ def select_disk_config( case MenuSelectionType.Reset: return None case MenuSelectionType.Selection: if choice.single_value == pre_mount_mode: - output = "You will use whatever drive-setup is mounted at the specified directory\n" + output = 'You will use whatever drive-setup is mounted at the specified directory\n' output += "WARNING: Archinstall won't check the suitability of this setup\n" try: @@ -151,7 +150,6 @@ def select_disk_config( ) preset_devices = [mod.device for mod in preset.device_modifications] if preset else [] - devices = select_devices(preset_devices) if not devices: @@ -177,6 +175,36 @@ def select_disk_config( return None +def select_lvm_config( + disk_config: disk.DiskLayoutConfiguration, + preset: Optional[disk.LvmConfiguration] = None, +) -> Optional[disk.LvmConfiguration]: + default_mode = disk.LvmLayoutType.Default.display_msg() + + options = [default_mode] + + preset_value = preset.config_type.display_msg() if preset else None + warning = str(_('Are you sure you want to reset this setting?')) + + choice = Menu( + _('Select a LVM option'), + options, + allow_reset=True, + allow_reset_warning_msg=warning, + sort=False, + preview_size=0.2, + preset_values=preset_value + ).run() + + match choice.type_: + case MenuSelectionType.Skip: return preset + case MenuSelectionType.Reset: return None + case MenuSelectionType.Selection: + if choice.single_value == default_mode: + return suggest_lvm_layout(disk_config) + return preset + + def _boot_partition(sector_size: disk.SectorSize, using_gpt: bool) -> disk.PartitionModification: flags = [disk.PartitionFlag.Boot] if using_gpt: @@ -199,7 +227,7 @@ def _boot_partition(sector_size: disk.SectorSize, using_gpt: bool) -> disk.Parti ) -def select_main_filesystem_format(advanced_options=False) -> disk.FilesystemType: +def select_main_filesystem_format(advanced_options: bool = False) -> disk.FilesystemType: options = { 'btrfs': disk.FilesystemType.Btrfs, 'ext4': disk.FilesystemType.Ext4, @@ -250,7 +278,6 @@ def suggest_single_disk_layout( prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?')) choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() using_subvolumes = choice.value == Menu.yes() - mount_options = select_mount_options() device_modification = disk.DeviceModification(device, wipe=True) @@ -288,7 +315,11 @@ def suggest_single_disk_layout( root_start = boot_partition.start + boot_partition.length # Set a size for / (/root) - if using_subvolumes or device_size_gib < min_size_to_allow_home_part or not using_home_partition: + if ( + using_subvolumes + or device_size_gib < min_size_to_allow_home_part + or not using_home_partition + ): root_length = device.device_info.total_size - root_start else: root_length = min(device.device_info.total_size, root_partition_size) @@ -305,6 +336,7 @@ def suggest_single_disk_layout( fs_type=filesystem_type, mount_options=mount_options ) + device_modification.add_partition(root_partition) if using_subvolumes: @@ -388,9 +420,9 @@ def suggest_multi_disk_layout( device_paths = ', '.join([str(d.device_info.path) for d in devices]) - debug(f"Suggesting multi-disk-layout for devices: {device_paths}") - debug(f"/root: {root_device.device_info.path}") - debug(f"/home: {home_device.device_info.path}") + debug(f'Suggesting multi-disk-layout for devices: {device_paths}') + debug(f'/root: {root_device.device_info.path}') + debug(f'/home: {home_device.device_info.path}') root_device_modification = disk.DeviceModification(root_device, wipe=True) home_device_modification = disk.DeviceModification(home_device, wipe=True) @@ -444,3 +476,85 @@ def suggest_multi_disk_layout( home_device_modification.add_partition(home_partition) return [root_device_modification, home_device_modification] + + +def suggest_lvm_layout( + disk_config: disk.DiskLayoutConfiguration, + filesystem_type: Optional[disk.FilesystemType] = None, + vg_grp_name: str = 'ArchinstallVg', +) -> disk.LvmConfiguration: + if disk_config.config_type != disk.DiskLayoutType.Default: + raise ValueError('LVM suggested volumes are only available for default partitioning') + + using_subvolumes = False + btrfs_subvols = [] + home_volume = True + mount_options = [] + + if not filesystem_type: + filesystem_type = select_main_filesystem_format() + + if filesystem_type == disk.FilesystemType.Btrfs: + prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?')) + choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() + using_subvolumes = choice.value == Menu.yes() + + mount_options = select_mount_options() + + if using_subvolumes: + btrfs_subvols = [ + disk.SubvolumeModification(Path('@'), Path('/')), + disk.SubvolumeModification(Path('@home'), Path('/home')), + disk.SubvolumeModification(Path('@log'), Path('/var/log')), + disk.SubvolumeModification(Path('@pkg'), Path('/var/cache/pacman/pkg')), + disk.SubvolumeModification(Path('@.snapshots'), Path('/.snapshots')), + ] + + home_volume = False + + boot_part: Optional[disk.PartitionModification] = None + other_part: List[disk.PartitionModification] = [] + + for mod in disk_config.device_modifications: + for part in mod.partitions: + if part.is_boot(): + boot_part = part + else: + other_part.append(part) + + if not boot_part: + raise ValueError('Unable to find boot partition in partition modifications') + + total_vol_available = sum( + [p.length for p in other_part], + disk.Size(0, disk.Unit.B, disk.SectorSize.default()), + ) + root_vol_size = disk.Size(20, disk.Unit.GiB, disk.SectorSize.default()) + home_vol_size = total_vol_available - root_vol_size + + lvm_vol_group = disk.LvmVolumeGroup(vg_grp_name, pvs=other_part, ) + + root_vol = disk.LvmVolume( + status=disk.LvmVolumeStatus.Create, + name='root', + fs_type=filesystem_type, + length=root_vol_size, + mountpoint=Path('/'), + btrfs_subvols=btrfs_subvols, + mount_options=mount_options + ) + + lvm_vol_group.volumes.append(root_vol) + + if home_volume: + home_vol = disk.LvmVolume( + status=disk.LvmVolumeStatus.Create, + name='home', + fs_type=filesystem_type, + length=home_vol_size, + mountpoint=Path('/home'), + ) + + lvm_vol_group.volumes.append(home_vol) + + return disk.LvmConfiguration(disk.LvmLayoutType.Default, [lvm_vol_group]) diff --git a/archinstall/lib/interactions/manage_users_conf.py b/archinstall/lib/interactions/manage_users_conf.py index ca912283..886f85b6 100644 --- a/archinstall/lib/interactions/manage_users_conf.py +++ b/archinstall/lib/interactions/manage_users_conf.py @@ -1,12 +1,11 @@ from __future__ import annotations import re -from typing import Any, Dict, TYPE_CHECKING, List, Optional +from typing import Any, TYPE_CHECKING, List, Optional from .utils import get_password from ..menu import Menu, ListManager from ..models.users import User -from ..output import FormattedOutput if TYPE_CHECKING: _: Any @@ -26,21 +25,6 @@ class UserList(ListManager): ] super().__init__(prompt, lusers, [self._actions[0]], self._actions[1:]) - def reformat(self, data: List[User]) -> Dict[str, Any]: - table = FormattedOutput.as_table(data) - rows = table.split('\n') - - # these are the header rows of the table and do not map to any User obviously - # we're adding 2 spaces as prefix because the menu selector '> ' will be put before - # the selectable rows so the header has to be aligned - display_data: Dict[str, Optional[User]] = {f' {rows[0]}': None, f' {rows[1]}': None} - - for row, user in zip(rows[2:], data): - row = row.replace('|', '\\|') - display_data[row] = user - - return display_data - def selected_action_display(self, user: User) -> str: return user.username diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index c917420e..50e15cee 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -60,7 +60,7 @@ class Luks2: iter_time: int = 10000, key_file: Optional[Path] = None ) -> Path: - info(f'Luks2 encrypting: {self.luks_dev_path}') + debug(f'Luks2 encrypting: {self.luks_dev_path}') byte_password = self._password_bytes() @@ -87,12 +87,15 @@ class Luks2: 'luksFormat', str(self.luks_dev_path), ]) + debug(f'cryptsetup format: {cryptsetup_args}') + # Retry formatting the volume because archinstall can some times be too quick # which generates a "Device /dev/sdX does not exist or access denied." between # setting up partitions and us trying to encrypt it. for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS'] + 1): try: - SysCommand(cryptsetup_args) + result = SysCommand(cryptsetup_args).decode() + debug(f'cryptsetup luksFormat output: {result}') break except SysCallError as err: time.sleep(storage['DISK_TIMEOUTS']) @@ -106,10 +109,13 @@ class Luks2: self.lock() # Then try again to set up the crypt-device - SysCommand(cryptsetup_args) + result = SysCommand(cryptsetup_args).decode() + debug(f'cryptsetup luksFormat output: {result}') else: raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {err}') + self.key_file = key_file + return key_file def _get_luks_uuid(self) -> str: @@ -152,7 +158,15 @@ class Luks2: while Path(self.luks_dev_path).exists() is False and time.time() - wait_timer < 10: time.sleep(0.025) - SysCommand(f'/usr/bin/cryptsetup open {self.luks_dev_path} {self.mapper_name} --key-file {key_file} --type luks2') + result = SysCommand( + '/usr/bin/cryptsetup open ' + f'{self.luks_dev_path} ' + f'{self.mapper_name} ' + f'--key-file {key_file} ' + f'--type luks2' + ).decode() + + debug(f'cryptsetup open output: {result}') if not self.mapper_dev or not self.mapper_dev.is_symlink(): raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}') @@ -199,8 +213,8 @@ class Luks2: key_file.parent.mkdir(parents=True, exist_ok=True) - with open(key_file, "w") as keyfile: - keyfile.write(generate_password(length=512)) + pwd = generate_password(length=512) + key_file.write_text(pwd) key_file.chmod(0o400) @@ -208,7 +222,7 @@ class Luks2: self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"]) def _add_key(self, key_file: Path): - info(f'Adding additional key-file {key_file}') + debug(f'Adding additional key-file {key_file}') command = f'/usr/bin/cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}' worker = SysCommandWorker(command, environment_vars={'LC_ALL': 'C'}) @@ -228,7 +242,7 @@ class Luks2: key_file: Path, options: List[str] ) -> None: - info(f'Adding crypttab entry for key {key_file}') + debug(f'Adding crypttab entry for key {key_file}') with open(crypttab_path, 'a') as crypttab: opt = ','.join(options) diff --git a/archinstall/lib/menu/abstract_menu.py b/archinstall/lib/menu/abstract_menu.py index 14db98ca..ee55f5c9 100644 --- a/archinstall/lib/menu/abstract_menu.py +++ b/archinstall/lib/menu/abstract_menu.py @@ -10,6 +10,7 @@ from ..translationhandler import TranslationHandler, Language if TYPE_CHECKING: _: Any + class Selector: def __init__( self, @@ -68,42 +69,19 @@ class Selector: :param no_store: A boolean which determines that the field should or shouldn't be stored in the data storage :type no_store: bool """ - self._description = description - self.func = func self._display_func = display_func - self._current_selection = default + self._no_store = no_store + + self.description = description + self.func = func + self.current_selection = default self.enabled = enabled - self._dependencies = dependencies - self._dependencies_not = dependencies_not + self.dependencies = dependencies + self.dependencies_not = dependencies_not self.exec_func = exec_func - self._preview_func = preview_func + self.preview_func = preview_func self.mandatory = mandatory - self._no_store = no_store - self._default = default - - @property - def default(self) -> Any: - return self._default - - @property - def description(self) -> str: - return self._description - - @property - def dependencies(self) -> List: - return self._dependencies - - @property - def dependencies_not(self) -> List: - return self._dependencies_not - - @property - def current_selection(self) -> Optional[Any]: - return self._current_selection - - @property - def preview_func(self): - return self._preview_func + self.default = default def do_store(self) -> bool: return self._no_store is False @@ -112,45 +90,45 @@ class Selector: self.enabled = status def update_description(self, description: str): - self._description = description + self.description = description def menu_text(self, padding: int = 0) -> str: - if self._description == '': # special menu option for __separator__ + if self.description == '': # special menu option for __separator__ return '' current = '' if self._display_func: - current = self._display_func(self._current_selection) + current = self._display_func(self.current_selection) else: - if self._current_selection is not None: - current = str(self._current_selection) + if self.current_selection is not None: + current = str(self.current_selection) if current: padding += 5 - description = unicode_ljust(str(self._description), padding, ' ') + description = unicode_ljust(str(self.description), padding, ' ') current = current else: - description = self._description + description = self.description current = '' return f'{description} {current}' def set_current_selection(self, current: Optional[Any]): - self._current_selection = current + self.current_selection = current def has_selection(self) -> bool: - if not self._current_selection: + if not self.current_selection: return False return True def get_selection(self) -> Any: - return self._current_selection + return self.current_selection def is_empty(self) -> bool: - if self._current_selection is None: + if self.current_selection is None: return True - elif isinstance(self._current_selection, (str, list, dict)) and len(self._current_selection) == 0: + elif isinstance(self.current_selection, (str, list, dict)) and len(self.current_selection) == 0: return True return False @@ -197,6 +175,8 @@ class AbstractMenu: self._sync_all() self._populate_default_values() + self.defined_text = str(_('Defined')) + @property def last_choice(self): return self._last_choice @@ -382,9 +362,10 @@ class AbstractMenu: result = None if selector.func is not None: - presel_val = self.option(config_name).get_selection() - result = selector.func(presel_val) + cur_value = self.option(config_name).get_selection() + result = selector.func(cur_value) self._menu_options[config_name].set_current_selection(result) + if selector.do_store(): self._data_store[config_name] = result @@ -398,19 +379,23 @@ class AbstractMenu: return True def _verify_selection_enabled(self, selection_name: str) -> bool: - """ general """ if selection := self._menu_options.get(selection_name, None): if not selection.enabled: return False if len(selection.dependencies) > 0: - for d in selection.dependencies: - if not self._verify_selection_enabled(d) or self._menu_options[d].is_empty(): - return False + for dep in selection.dependencies: + if isinstance(dep, str): + if not self._verify_selection_enabled(dep) or self._menu_options[dep].is_empty(): + return False + elif callable(dep): # callable dependency eval + return dep() + else: + raise ValueError(f'Unsupported dependency: {selection_name}') if len(selection.dependencies_not) > 0: - for d in selection.dependencies_not: - if not self._menu_options[d].is_empty(): + for dep in selection.dependencies_not: + if not self._menu_options[dep].is_empty(): return False return True @@ -454,8 +439,8 @@ class AbstractMenu: class AbstractSubMenu(AbstractMenu): - def __init__(self, data_store: Dict[str, Any] = {}): - super().__init__(data_store=data_store) + def __init__(self, data_store: Dict[str, Any] = {}, preview_size: float = 0.2): + super().__init__(data_store=data_store, preview_size=preview_size) self._menu_options['__separator__'] = Selector('') self._menu_options['back'] = \ diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py index 54fb6a1b..de18791c 100644 --- a/archinstall/lib/menu/list_manager.py +++ b/archinstall/lib/menu/list_manager.py @@ -3,6 +3,7 @@ from os import system from typing import Any, TYPE_CHECKING, Dict, Optional, Tuple, List from .menu import Menu +from ..output import FormattedOutput if TYPE_CHECKING: _: Any @@ -127,18 +128,29 @@ class ListManager: if choice.value and choice.value != self._cancel_action: self._data = self.handle_action(choice.value, entry, self._data) - def selected_action_display(self, selection: Any) -> str: + def reformat(self, data: List[Any]) -> Dict[str, Optional[Any]]: """ - this will return the value to be displayed in the - "Select an action for '{}'" string + Default implementation of the table to be displayed. + Override if any custom formatting is needed """ - raise NotImplementedError('Please implement me in the child class') + table = FormattedOutput.as_table(data) + rows = table.split('\n') - def reformat(self, data: List[Any]) -> Dict[str, Optional[Any]]: + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[Any]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, entry in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = entry + + return display_data + + def selected_action_display(self, selection: Any) -> str: """ - this should return a dictionary of display string to actual data entry - mapping; if the value for a given display string is None it will be used - in the header value (useful when displaying tables) + this will return the value to be displayed in the + "Select an action for '{}'" string """ raise NotImplementedError('Please implement me in the child class') diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py index f14b855d..38301d3a 100644 --- a/archinstall/lib/menu/menu.py +++ b/archinstall/lib/menu/menu.py @@ -66,7 +66,7 @@ class Menu(TerminalMenu): sort: bool = True, preset_values: Optional[Union[str, List[str]]] = None, cursor_index: Optional[int] = None, - preview_command: Optional[Callable] = None, + preview_command: Optional[Callable[[Any], str | None]] = None, preview_size: float = 0.0, preview_title: str = 'Info', header: Union[List[str], str] = [], @@ -228,7 +228,11 @@ class Menu(TerminalMenu): default_str = str(_('(default)')) return f'{self._default_option} {default_str}' - def _show_preview(self, preview_command: Optional[Callable], selection: str) -> Optional[str]: + def _show_preview( + self, + preview_command: Optional[Callable[[Any], str | None]], + selection: str + ) -> Optional[str]: if selection == self.back(): return None diff --git a/archinstall/lib/menu/table_selection_menu.py b/archinstall/lib/menu/table_selection_menu.py index 4cff7216..fec6ae59 100644 --- a/archinstall/lib/menu/table_selection_menu.py +++ b/archinstall/lib/menu/table_selection_menu.py @@ -19,6 +19,7 @@ class TableMenu(Menu): preview_size: float = 0.0, allow_reset: bool = True, allow_reset_warning_msg: Optional[str] = None, + skip: bool = True ): """ param title: Text that will be displayed above the menu @@ -81,7 +82,8 @@ class TableMenu(Menu): preview_title=preview_title, extra_bottom_space=extra_bottom_space, allow_reset=allow_reset, - allow_reset_warning_msg=allow_reset_warning_msg + allow_reset_warning_msg=allow_reset_warning_msg, + skip=skip ) def _preset_values(self, preset: List[Any]) -> List[str]: diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index 61f3c568..c9094669 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -121,21 +121,6 @@ class CustomMirrorList(ListManager): ] super().__init__(prompt, custom_mirrors, [self._actions[0]], self._actions[1:]) - def reformat(self, data: List[CustomMirror]) -> Dict[str, Any]: - table = FormattedOutput.as_table(data) - rows = table.split('\n') - - # these are the header rows of the table and do not map to any User obviously - # we're adding 2 spaces as prefix because the menu selector '> ' will be put before - # the selectable rows so the header has to be aligned - display_data: Dict[str, Optional[CustomMirror]] = {f' {rows[0]}': None, f' {rows[1]}': None} - - for row, user in zip(rows[2:], data): - row = row.replace('|', '\\|') - display_data[row] = user - - return display_data - def selected_action_display(self, mirror: CustomMirror) -> str: return mirror.name diff --git a/archinstall/scripts/guided.py b/archinstall/scripts/guided.py index b1fc8fd9..385ff500 100644 --- a/archinstall/scripts/guided.py +++ b/archinstall/scripts/guided.py @@ -104,7 +104,7 @@ def perform_installation(mountpoint: Path): Only requirement is that the block devices are formatted and setup prior to entering this function. """ - info('Starting installation') + info('Starting installation...') disk_config: disk.DiskLayoutConfiguration = archinstall.arguments['disk_config'] # Retrieve list of additional repositories and set boolean values appropriately diff --git a/examples/interactive_installation.py b/examples/interactive_installation.py index 3c9a5876..4513b6f2 100644 --- a/examples/interactive_installation.py +++ b/examples/interactive_installation.py @@ -82,7 +82,7 @@ def perform_installation(mountpoint: Path): Only requirement is that the block devices are formatted and setup prior to entering this function. """ - info('Starting installation') + info('Starting installation...') disk_config: disk.DiskLayoutConfiguration = archinstall.arguments['disk_config'] # Retrieve list of additional repositories and set boolean values appropriately -- cgit v1.2.3-70-g09d2