index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Andreas Baumann <mail@andreasbaumann.cc> | 2022-05-28 10:36:38 +0200 |
---|---|---|
committer | Andreas Baumann <mail@andreasbaumann.cc> | 2022-05-28 10:36:38 +0200 |
commit | faf925de1882be722d2994d697a802918282e509 (patch) | |
tree | 4856c76b10b36e94875ce3c9add961960bb23bf0 /archinstall/lib | |
parent | 3801bee921d22e23435c781c469d9ec0adfa00bd (diff) | |
parent | 78449f75bc44f0e2b03cb9d909b9b78e4f7ca4c8 (diff) |
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py index c971768f..510f7103 100644 --- a/archinstall/lib/configuration.py +++ b/archinstall/lib/configuration.py @@ -1,12 +1,23 @@ import json import logging -from pathlib import Path +import pathlib from typing import Optional, Dict from .storage import storage from .general import JSON, UNSAFE_JSON from .output import log - +from .exceptions import RequirementError +from .hsm import get_fido2_devices + +def configuration_sanity_check(): + if storage['arguments'].get('HSM'): + if not get_fido2_devices(): + raise RequirementError( + f"In order to use HSM to pair with the disk encryption," + + f" one needs to be accessible through /dev/hidraw* and support" + + f" the FIDO2 protocol. You can check this by running" + + f" 'systemd-cryptenroll --fido2-device=list'." + ) class ConfigurationOutput: def __init__(self, config: Dict): @@ -21,12 +32,12 @@ class ConfigurationOutput: self._user_credentials = {} self._disk_layout = None self._user_config = {} - self._default_save_path = Path(storage.get('LOG_PATH', '.')) + self._default_save_path = pathlib.Path(storage.get('LOG_PATH', '.')) self._user_config_file = 'user_configuration.json' self._user_creds_file = "user_credentials.json" self._disk_layout_file = "user_disk_layout.json" - self._sensitive = ['!users', '!superusers', '!encryption-password'] + self._sensitive = ['!users', '!encryption-password'] self._ignore = ['abort', 'install', 'config', 'creds', 'dry_run'] self._process_config() @@ -84,7 +95,7 @@ class ConfigurationOutput: print() - def _is_valid_path(self, dest_path :Path) -> bool: + def _is_valid_path(self, dest_path :pathlib.Path) -> bool: if (not dest_path.exists()) or not (dest_path.is_dir()): log( 'Destination directory {} does not exist or is not a directory,\n Configuration files can not be saved'.format(dest_path.resolve()), @@ -93,26 +104,26 @@ class ConfigurationOutput: return False return True - def save_user_config(self, dest_path :Path = None): + def save_user_config(self, dest_path :pathlib.Path = None): if self._is_valid_path(dest_path): with open(dest_path / self._user_config_file, 'w') as config_file: config_file.write(self.user_config_to_json()) - def save_user_creds(self, dest_path :Path = None): + def save_user_creds(self, dest_path :pathlib.Path = None): if self._is_valid_path(dest_path): if user_creds := self.user_credentials_to_json(): target = dest_path / self._user_creds_file with open(target, 'w') as config_file: config_file.write(user_creds) - def save_disk_layout(self, dest_path :Path = None): + def save_disk_layout(self, dest_path :pathlib.Path = None): if self._is_valid_path(dest_path): if disk_layout := self.disk_layout_to_json(): target = dest_path / self._disk_layout_file with target.open('w') as config_file: config_file.write(disk_layout) - def save(self, dest_path :Path = None): + def save(self, dest_path :pathlib.Path = None): if not dest_path: dest_path = self._default_save_path diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py index bb6eb815..352d04b9 100644 --- a/archinstall/lib/disk/__init__.py +++ b/archinstall/lib/disk/__init__.py @@ -4,4 +4,4 @@ from .blockdevice import BlockDevice from .filesystem import Filesystem, MBR, GPT from .partition import * from .user_guides import * -from .validators import * +from .validators import *
\ No newline at end of file diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 206c3b7e..c7b69205 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -3,6 +3,7 @@ import os import json import logging import time +from functools import cached_property from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -32,7 +33,29 @@ class BlockDevice: # I'm placing the encryption password on a BlockDevice level. def __repr__(self, *args :str, **kwargs :str) -> str: - return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})" + return self._str_repr + + @cached_property + def _str_repr(self) -> str: + return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})" + + @cached_property + def display_info(self) -> str: + columns = { + str(_('Device')): self.device_or_backfile, + str(_('Size')): f'{self._safe_size}GB', + str(_('Free space')): f'{self._safe_free_space}', + str(_('Bus-type')): f'{self.bus_type}' + } + + padding = max([len(k) for k in columns.keys()]) + + pretty = '' + for k, v in columns.items(): + k = k.ljust(padding, ' ') + pretty += f'{k} = {v}\n' + + return pretty.rstrip() def __iter__(self) -> Iterator[Partition]: for partition in self.partitions: @@ -74,7 +97,7 @@ class BlockDevice: for device in output['blockdevices']: return device['pttype'] - @property + @cached_property def device_or_backfile(self) -> str: """ Returns the actual device-endpoint of the BlockDevice. @@ -157,7 +180,7 @@ class BlockDevice: from .filesystem import GPT return GPT - @property + @cached_property def uuid(self) -> str: log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ @@ -167,7 +190,19 @@ class BlockDevice: """ return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') - @property + @cached_property + def _safe_size(self) -> float: + from .helpers import convert_size_to_gb + + try: + output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) + except SysCallError: + return -1.0 + + for device in output['blockdevices']: + return convert_size_to_gb(device['size']) + + @cached_property def size(self) -> float: from .helpers import convert_size_to_gb @@ -176,22 +211,29 @@ class BlockDevice: for device in output['blockdevices']: return convert_size_to_gb(device['size']) - @property + @cached_property def bus_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['tran'] - @property + @cached_property def spinning(self) -> bool: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['rota'] is True - @property - def free_space(self) -> Tuple[str, str, str]: + @cached_property + def _safe_free_space(self) -> Tuple[str, ...]: + try: + return '+'.join(part[2] for part in self.free_space) + except SysCallError: + return '?' + + @cached_property + def free_space(self) -> Tuple[str, ...]: # NOTE: parted -s will default to `cancel` on prompt, skipping any partition # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. @@ -204,7 +246,7 @@ class BlockDevice: except SysCallError as error: log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) - @property + @cached_property def largest_free_space(self) -> List[str]: info = [] for space_info in self.free_space: @@ -216,7 +258,7 @@ class BlockDevice: info = space_info return info - @property + @cached_property def first_free_sector(self) -> str: if info := self.largest_free_space: start = info[0] @@ -224,7 +266,7 @@ class BlockDevice: start = '512MB' return start - @property + @cached_property def first_end_sector(self) -> str: if info := self.largest_free_space: end = info[1] @@ -247,19 +289,27 @@ class BlockDevice: def flush_cache(self) -> None: self.part_cache = {} - def get_partition(self, uuid :str) -> Partition: - count = 0 - while count < 5: - for partition_uuid, partition in self.partitions.items(): - if partition.uuid.lower() == uuid.lower(): - return partition - else: - log(f"uuid {uuid} not found. Waiting for {count +1} time",level=logging.DEBUG) - time.sleep(float(storage['arguments'].get('disk-sleep', 0.2))) - count += 1 - else: - log(f"Could not find {uuid} in disk after 5 retries",level=logging.INFO) - print(f"Cache: {self.part_cache}") - print(f"Partitions: {self.partitions.items()}") - print(f"UUID: {[uuid]}") - raise DiskError(f"New partition {uuid} never showed up after adding new partition on {self}") + def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: + if not uuid and not partuuid: + raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.") + + for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): + for partition_index, partition in self.partitions.items(): + try: + if uuid and partition.uuid.lower() == uuid.lower(): + return partition + elif partuuid and partition.part_uuid.lower() == partuuid.lower(): + return partition + except DiskError as error: + # Most likely a blockdevice that doesn't support or use UUID's + # (like Microsoft recovery partition) + log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray") + pass + + log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) + time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) + + log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) + log(f"Cache: {self.part_cache}") + log(f"Partitions: {self.partitions.items()}") + raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs/__init__.py index 33f59721..84b9c0f6 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs/__init__.py @@ -4,44 +4,25 @@ import glob import logging import re from typing import Union, Dict, TYPE_CHECKING, Any, Iterator -from dataclasses import dataclass # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: - from ..installer import Installer -from .helpers import get_mount_info -from ..exceptions import DiskError -from ..general import SysCommand -from ..output import log -from ..exceptions import SysCallError - -@dataclass -class BtrfsSubvolume: - target :str - source :str - fstype :str - name :str - options :str - root :bool = False - -def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]: - if '[' in struct['source']: - subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1] - struct['source'] = struct['source'].replace(f"[{subvolume}]", "") - yield BtrfsSubvolume( - target=struct['target'], - source=struct['source'], - fstype=struct['fstype'], - name=subvolume, - options=struct['options'], - root=index == 0 - ) - index += 1 - - for child in struct.get('children', []): - for item in get_subvolumes_from_findmnt(child, index=index): - yield item - index += 1 + from ...installer import Installer + +from .btrfs_helpers import ( + subvolume_info_from_path as subvolume_info_from_path, + find_parent_subvolume as find_parent_subvolume, + setup_subvolumes as setup_subvolumes, + mount_subvolume as mount_subvolume +) +from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume +from .btrfspartition import BTRFSPartition as BTRFSPartition + +from ..helpers import get_mount_info +from ...exceptions import DiskError, Deprecated +from ...general import SysCommand +from ...output import log +from ...exceptions import SysCallError def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: try: @@ -57,42 +38,6 @@ def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: return result -def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: - """ - This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name. - - @installation: archinstall.Installer instance - @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot - @force: overrides the check for weither or not the subvolume mountpoint is empty or not - - This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure. - Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name] - """ - log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING) - installation_mountpoint = installation.target - if type(installation_mountpoint) == str: - installation_mountpoint = pathlib.Path(installation_mountpoint) - # Set up the required physical structure - if type(subvolume_location) == str: - subvolume_location = pathlib.Path(subvolume_location) - - target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor) - - if not target.exists(): - target.mkdir(parents=True) - - if glob.glob(str(target / '*')) and force is False: - raise DiskError(f"Cannot mount subvolume to {target} because it contains data (non-empty folder target)") - - log(f"Mounting {target} as a subvolume", level=logging.INFO) - # Mount the logical volume to the physical structure - mount_information, mountpoint_device_real_path = get_mount_info(target, traverse=True, return_real_path=True) - if mountpoint_device_real_path == str(target): - log(f"Unmounting non-subvolume {mount_information['source']} previously mounted at {target}") - SysCommand(f"umount {mount_information['source']}") - - return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0 - def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -132,13 +77,18 @@ def _has_option(option :str,options :list) -> bool: """ if not options: return False + for item in options: if option in item: return True + return False def manage_btrfs_subvolumes(installation :Installer, partition :Dict[str, str],) -> list: + + raise Deprecated("Use setup_subvolumes() instead.") + from copy import deepcopy """ we do the magic with subvolumes in a centralized place parameters: diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py new file mode 100644 index 00000000..d577d82b --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py @@ -0,0 +1,132 @@ +import pathlib +import logging +from typing import Optional + +from ...exceptions import SysCallError, DiskError +from ...general import SysCommand +from ...output import log +from ..helpers import get_mount_info +from .btrfssubvolume import BtrfsSubvolume + + +def mount_subvolume(installation, device, name, subvolume_information): + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = name.lstrip('/') + + # renormalize the right hand. + mountpoint = subvolume_information.get('mountpoint', None) + if not mountpoint: + return None + + if type(mountpoint) == str: + mountpoint = pathlib.Path(mountpoint) + + installation_target = installation.target + if type(installation_target) == str: + installation_target = pathlib.Path(installation_target) + + mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) + mountpoint.mkdir(parents=True, exist_ok=True) + + mount_options = subvolume_information.get('options', []) + if not any('subvol=' in x for x in mount_options): + mount_options += [f'subvol={name}'] + + log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") + SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") + + +def setup_subvolumes(installation, partition_dict): + """ + Taken from: ..user_guides.py + + partition['btrfs'] = { + "subvolumes" : { + "@": "/", + "@home": "/home", + "@log": "/var/log", + "@pkg": "/var/cache/pacman/pkg", + "@.snapshots": "/.snapshots" + } + } + """ + log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") + for name, right_hand in partition_dict['btrfs']['subvolumes'].items(): + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = name.lstrip('/') + + # renormalize the right hand. + # mountpoint = None + subvol_options = [] + + match right_hand: + # case str(): # backwards-compatability + # mountpoint = right_hand + case dict(): + # mountpoint = right_hand.get('mountpoint', None) + subvol_options = right_hand.get('options', []) + + # We create the subvolume using the BTRFSPartition instance. + # That way we ensure not only easy access, but also accurate mount locations etc. + partition_dict['device_instance'].create_subvolume(name, installation=installation) + + # Make the nodatacow processing now + # It will be the main cause of creation of subvolumes which are not to be mounted + # it is not an options which can be established by subvolume (but for whole file systems), and can be + # set up via a simple attribute change in a directory (if empty). And here the directories are brand new + if 'nodatacow' in subvol_options: + if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so nodatacow doesn't propagate to the mount options + del subvol_options[subvol_options.index('nodatacow')] + # Make the compress processing now + # it is not an options which can be established by subvolume (but for whole file systems), and can be + # set up via a simple attribute change in a directory (if empty). And here the directories are brand new + # in this way only zstd compression is activaded + # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated + + if 'compress' in subvol_options: + if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): + if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so compress doesn't propagate to the mount options + del subvol_options[subvol_options.index('compress')] + +def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: + try: + subvolume_name = None + result = {} + for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): + if index == 0: + subvolume_name = line.strip().decode('UTF-8') + continue + + if b':' in line: + key, value = line.strip().decode('UTF-8').split(':', 1) + + # A bit of a hack, until I figure out how @dataclass + # allows for hooking in a pre-processor to do this we have to do it here: + result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() + + return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result}) + + except SysCallError as error: + log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") + + return None + +def find_parent_subvolume(path :pathlib.Path, filters=[]): + # A root path cannot have a parent + if str(path) == '/': + return None + + if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters): + if not (subvolume := subvolume_info_from_path(found_mount['target'])): + if found_mount['target'] == '/': + return None + + return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']]) + + return subvolume
\ No newline at end of file diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py new file mode 100644 index 00000000..5020133d --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfspartition.py @@ -0,0 +1,116 @@ +import glob +import pathlib +import logging +from typing import Optional, TYPE_CHECKING + +from ...exceptions import DiskError +from ...storage import storage +from ...output import log +from ...general import SysCommand +from ..partition import Partition +from ..helpers import findmnt +from .btrfs_helpers import ( + subvolume_info_from_path +) + +if TYPE_CHECKING: + from ...installer import Installer + from .btrfssubvolume import BtrfsSubvolume + +class BTRFSPartition(Partition): + def __init__(self, *args, **kwargs): + Partition.__init__(self, *args, **kwargs) + + def __repr__(self, *args :str, **kwargs :str) -> str: + mount_repr = '' + if self.mountpoint: + mount_repr = f", mounted={self.mountpoint}" + elif self.target_mountpoint: + mount_repr = f", rel_mountpoint={self.target_mountpoint}" + + if self._encrypted: + return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' + else: + return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' + + @property + def subvolumes(self): + for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): + if '[' in filesystem.get('source', ''): + yield subvolume_info_from_path(filesystem['target']) + + def iterate_children(struct): + for child in struct.get('children', []): + if '[' in child.get('source', ''): + yield subvolume_info_from_path(child['target']) + + for sub_child in iterate_children(child): + yield sub_child + + for child in iterate_children(filesystem): + yield child + + def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume': + """ + Subvolumes have to be created within a mountpoint. + This means we need to get the current installation target. + After we get it, we need to verify it is a btrfs subvolume filesystem. + Finally, the destination must be empty. + """ + + # Allow users to override the installation session + if not installation: + installation = storage.get('installation_session') + + # Determain if the path given, is an absolute path or a releative path. + # We do this by checking if the path contains a known mountpoint. + if str(subvolume)[0] == '/': + if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): + if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): + # Path starts with a known mountpoint which isn't / + # Which means it's an absolut path to a mounted location. + pass + else: + # Since it's not an absolute position with a known start. + # We omit the anchor ('/' basically) and make sure it's appendable + # to the installation.target later + subvolume = subvolume.relative_to(subvolume.anchor) + # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is. + + # If the subvolume is not absolute, then we do two checks: + # 1. Check if the partition itself is mounted somewhere, and use that as a root + # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs + # If both above fail, we need to warn the user that such setup is not supported. + if str(subvolume)[0] != '/': + if self.mountpoint is None and installation is None: + raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.") + elif self.mountpoint: + subvolume = self.mountpoint / subvolume + elif installation: + ongoing_installation_destination = installation.target + if type(ongoing_installation_destination) == str: + ongoing_installation_destination = pathlib.Path(ongoing_installation_destination) + + subvolume = ongoing_installation_destination / subvolume + + subvolume.parent.mkdir(parents=True, exist_ok=True) + + # <!-- + # We perform one more check from the given absolute position. + # And we traverse backwards in order to locate any if possible subvolumes above + # our new btrfs subvolume. This is because it needs to be mounted under it to properly + # function. + # if btrfs_parent := find_parent_subvolume(subvolume): + # print('Found parent:', btrfs_parent) + # --> + + log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey") + + if glob.glob(str(subvolume / '*')): + raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") + elif subvolinfo := subvolume_info_from_path(subvolume): + raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") + + SysCommand(f"btrfs subvolume create {subvolume}") + + return subvolume_info_from_path(subvolume)
\ No newline at end of file diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolume.py new file mode 100644 index 00000000..a96e2a94 --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfssubvolume.py @@ -0,0 +1,191 @@ +import pathlib +import datetime +import logging +import string +import random +import shutil +from dataclasses import dataclass +from typing import Optional, List# , TYPE_CHECKING +from functools import cached_property + +# if TYPE_CHECKING: +# from ..blockdevice import BlockDevice + +from ...exceptions import DiskError +from ...general import SysCommand +from ...output import log +from ...storage import storage + +@dataclass +class BtrfsSubvolume: + full_path :pathlib.Path + name :str + uuid :str + parent_uuid :str + creation_time :datetime.datetime + subvolume_id :int + generation :int + gen_at_creation :int + parent_id :int + top_level_id :int + send_transid :int + send_time :datetime.datetime + receive_transid :int + received_uuid :Optional[str] = None + flags :Optional[str] = None + receive_time :Optional[datetime.datetime] = None + snapshots :Optional[List] = None + + def __post_init__(self): + self.full_path = pathlib.Path(self.full_path) + + # Convert "-" entries to `None` + if self.parent_uuid == "-": + self.parent_uuid = None + if self.received_uuid == "-": + self.received_uuid = None + if self.flags == "-": + self.flags = None + if self.receive_time == "-": + self.receive_time = None + if self.snapshots == "": + self.snapshots = [] + + # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) + self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) + self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) + if self.receive_time: + self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) + + @property + def parent_subvolume(self): + from .btrfs_helpers import find_parent_subvolume + + return find_parent_subvolume(self.full_path) + + @property + def root(self) -> bool: + from .btrfs_helpers import subvolume_info_from_path + + # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first + # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. + # It would also be nice if it could use findmnt(self.full_path) and traverse backwards + # finding the last occurance of a subvolume which 'self' belongs to. + if volume := subvolume_info_from_path(storage['MOUNT_POINT']): + return self.full_path == volume.full_path + + return False + + @cached_property + def partition(self): + from ..helpers import findmnt, get_parent_of_partition, all_blockdevices + from ..partition import Partition + from ..blockdevice import BlockDevice + from ..mapperdev import MapperDev + from .btrfspartition import BTRFSPartition + from .btrfs_helpers import subvolume_info_from_path + + try: + # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. + if filesystem := findmnt(self.full_path).get('filesystems', []): + if source := filesystem[0].get('source', None): + # Strip away subvolume definitions from findmnt + if '[' in source: + source = source[:source.find('[')] + + if filesystem[0].get('fstype', '') == 'btrfs': + return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) + elif filesystem[0].get('source', '').startswith('/dev/mapper'): + return MapperDev(source) + else: + return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) + except DiskError: + # Subvolume has never been mounted, we have no reliable way of finding where it is. + # But we have the UUID of the partition, and can begin looking for it by mounting + # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. + + log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) + for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): + if type(instance) in (Partition, MapperDev): + we_mounted_it = False + detection_mountpoint = instance.mountpoint + if not detection_mountpoint: + if type(instance) == Partition and instance.encrypted: + # TODO: Perhaps support unlocking encrypted volumes? + # This will cause a lot of potential user interactions tho. + log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) + continue + + detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") + detection_mountpoint.mkdir(parents=True, exist_ok=True) + + instance.mount(str(detection_mountpoint)) + we_mounted_it = True + + if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): + if subvolume := subvolume_info_from_path(filesystem[0]['target']): + if subvolume.uuid == self.uuid: + # The top level subvolume matched of ourselves, + # which means the instance we're iterating has the subvol we're looking for. + log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") + return instance + + def iterate_children(struct): + for child in struct.get('children', []): + if '[' in child.get('source', ''): + yield subvolume_info_from_path(child['target']) + + for sub_child in iterate_children(child): + yield sub_child + + for child in iterate_children(filesystem[0]): + if child.uuid == self.uuid: + # We found a child within the instance that has the subvol we're looking for. + log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") + return instance + + if we_mounted_it: + instance.unmount() + shutil.rmtree(detection_mountpoint) + + @cached_property + def mount_options(self) -> Optional[List[str]]: + from ..helpers import findmnt + + if filesystem := findmnt(self.full_path).get('filesystems', []): + return filesystem[0].get('options').split(',') + + def convert_to_ISO_format(self, time_string): + time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') + iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" + return iso_string + + def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): + from ..helpers import findmnt + + try: + if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): + log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") + SysCommand(f"umount {mountpoint}") + except DiskError: + # No previously mounted device at the mountpoint + pass + + if not options: + options = [] + + try: + if include_previously_known_options and (cached_options := self.mount_options): + options += cached_options + except DiskError: + pass + + if not any('subvol=' in x for x in options): + options += f'subvol={self.name}' + + SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") + log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") + + def unmount(self, recurse :bool = True): + SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") + log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
\ No newline at end of file diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 8a531de0..f94b4b47 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -55,7 +55,7 @@ class Filesystem: output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8')) for device in output['blockdevices']: - for index, partition in enumerate(device['children']): + for index, partition in enumerate(device.get('children', [])): # But we'll use blkid to reliably grab the PARTUUID for that child device (partition) partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip() if partition_uuid.lower() == uuid.lower(): @@ -65,6 +65,7 @@ class Filesystem: def load_layout(self, layout :Dict[str, Any]) -> None: from ..luks import luks2 + from .btrfs import BTRFSPartition # If the layout tells us to wipe the drive, we do so if layout.get('wipe', False): @@ -76,27 +77,34 @@ class Filesystem: raise KeyError(f"Could not create a MSDOS label on {self}") self.blockdevice.flush_cache() + time.sleep(3) prev_partition = None # We then iterate the partitions in order for partition in layout.get('partitions', []): # We don't want to re-add an existing partition (those containing a UUID already) if partition.get('wipe', False) and not partition.get('PARTUUID', None): - print(_("Adding partition....")) start = partition.get('start') or ( prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START) partition['device_instance'] = self.add_partition(partition.get('type', 'primary'), start=start, end=partition.get('size', '100%'), - partition_format=partition.get('filesystem', {}).get('format', 'btrfs')) - # TODO: device_instance some times become None - # print('Device instance:', partition['device_instance']) - - elif (partition_uuid := partition.get('PARTUUID')) and (partition_instance := self.blockdevice.get_partition(uuid=partition_uuid)): - print(_("Re-using partition instance: {}").format(partition_instance)) - partition['device_instance'] = partition_instance + partition_format=partition.get('filesystem', {}).get('format', 'btrfs'), + skip_mklabel=layout.get('wipe', False) is not False) + + elif (partition_uuid := partition.get('PARTUUID')): + # We try to deal with both UUID and PARTUUID of a partition when it's being re-used. + # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID') + # but for now, lets just attempt to deal with both. + try: + partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid) + except DiskError: + partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid) + + log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray") else: - raise ValueError(f"{self}.load_layout() doesn't know how to continue without a new partition definition or a UUID ({partition.get('PARTUUID')}) on the device ({self.blockdevice.get_partition(uuid=partition.get('PARTUUID'))}).") + log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING) + continue if partition.get('filesystem', {}).get('format', False): @@ -137,20 +145,32 @@ class Filesystem: while True: partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip() if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False: - print(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")) + log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's.")) continue break unlocked_device.format(partition['filesystem']['format'], options=format_options) + elif partition.get('wipe', False): if not partition['device_instance']: raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!") partition['device_instance'].format(partition['filesystem']['format'], options=format_options) + if partition['filesystem']['format'] == 'btrfs': + # We upgrade the device instance to a BTRFSPartition if we format it as such. + # This is so that we can gain access to more features than otherwise available in Partition() + partition['device_instance'] = BTRFSPartition( + partition['device_instance'].path, + block_device=partition['device_instance'].block_device, + encrypted=False, + filesystem='btrfs', + autodetect_filesystem=False + ) + if partition.get('boot', False): log(f"Marking partition {partition['device_instance']} as bootable.") - self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on') + self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on') prev_partition = partition @@ -190,10 +210,27 @@ class Filesystem: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> Partition: + def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) - previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()} + if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: + # If it's a completely empty drive, and we're about to add partitions to it + # we need to make sure there's a filesystem label. + if self.mode == GPT: + if not self.parted_mklabel(self.blockdevice.device, "gpt"): + raise KeyError(f"Could not create a GPT label on {self}") + elif self.mode == MBR: + if not self.parted_mklabel(self.blockdevice.device, "msdos"): + raise KeyError(f"Could not create a MSDOS label on {self}") + + self.blockdevice.flush_cache() + + previous_partuuids = [] + for partition in self.blockdevice.partitions.values(): + try: + previous_partuuids.append(partition.part_uuid) + except DiskError: + pass if self.mode == MBR: if len(self.blockdevice.partitions) > 3: @@ -207,36 +244,36 @@ class Filesystem: log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG) if self.parted(parted_string): - count = 0 - while count < 10: - new_uuid = None - new_uuid_set = (previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()}) + for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): + self.partprobe() - if len(new_uuid_set) > 0: - new_uuid = new_uuid_set.pop() + new_partition_uuids = [] + for partition in self.blockdevice.partitions.values(): + try: + new_partition_uuids.append(partition.part_uuid) + except DiskError: + pass + + new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) - if new_uuid: + if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): try: - return self.blockdevice.get_partition(new_uuid) + return self.blockdevice.get_partition(partuuid=new_partuuid) except Exception as err: log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") - log(f'Partition set: {new_uuid_set}', level=logging.ERROR, fg="red") - log(f'New UUID: {[new_uuid]}', level=logging.ERROR, fg="red") + log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") + log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red") log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") raise err else: - count += 1 - log(f"Could not get UUID for partition. Waiting before retry attempt {count} of 10 ...",level=logging.DEBUG) - time.sleep(float(storage['arguments'].get('disk-sleep', 0.2))) - else: - log("Add partition is exiting due to excessive wait time", level=logging.ERROR, fg="red") - raise DiskError(f"New partition never showed up after adding new partition on {self}.") + log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) + time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) # TODO: This should never be able to happen log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") - log(f"Previous partitions: {previous_partition_uuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {(previous_partition_uuids ^ {partition.uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red") + log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") + log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red") raise DiskError(f"Could not add partition using: {parted_string}") def set_name(self, partition: int, name: str) -> bool: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 0799cd49..99856aad 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -291,11 +291,37 @@ def find_mountpoint(device_path :str) -> Dict[str, Any]: except SysCallError: return {} -def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: +def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]: + for traversal in list(map(str, [str(path)] + list(path.parents))): + if traversal in ignore: + continue + + try: + log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) + if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')): + return json.loads(output) + + except SysCallError as error: + log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray") + pass + + if not traverse: + break + + raise DiskError(f"Could not get mount information for path {path}") + + +def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]: + import traceback + + log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}") device_path, bind_path = split_bind_name(path) output = {} for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): + if traversal in ignore: + continue + try: log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): @@ -385,9 +411,8 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]: def get_filesystem_type(path :str) -> Optional[str]: - device_name, bind_name = split_bind_name(path) try: - return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip() + return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip() except SysCallError: return None diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py index 32e3ac9b..913dbc13 100644 --- a/archinstall/lib/disk/mapperdev.py +++ b/archinstall/lib/disk/mapperdev.py @@ -51,11 +51,11 @@ class MapperDev: raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device") @property - def mountpoint(self) -> Optional[str]: + def mountpoint(self) -> Optional[pathlib.Path]: try: data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) for filesystem in data['filesystems']: - return filesystem.get('target') + return pathlib.Path(filesystem.get('target')) except SysCallError as error: # Not mounted anywhere most likely @@ -76,8 +76,9 @@ class MapperDev: @property def subvolumes(self) -> Iterator['BtrfsSubvolume']: - from .btrfs import get_subvolumes_from_findmnt - + from .btrfs import subvolume_info_from_path + for mountpoint in self.mount_information: - for result in get_subvolumes_from_findmnt(mountpoint): - yield result
\ No newline at end of file + if target := mountpoint.get('target'): + if subvolume := subvolume_info_from_path(pathlib.Path(target)): + yield subvolume
\ No newline at end of file diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index e7568258..73c88597 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -13,7 +13,8 @@ from ..storage import storage from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log from ..general import SysCommand -from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume +from .btrfs.btrfs_helpers import subvolume_info_from_path +from .btrfs.btrfssubvolume import BtrfsSubvolume class Partition: def __init__(self, @@ -96,11 +97,11 @@ class Partition: try: data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) for filesystem in data['filesystems']: - return filesystem.get('target') + return pathlib.Path(filesystem.get('target')) except SysCallError as error: # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG) + log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey") pass return None @@ -184,7 +185,7 @@ class Partition: return device['pttype'] @property - def uuid(self) -> Optional[str]: + def part_uuid(self) -> Optional[str]: """ Returns the PARTUUID as returned by lsblk. This is more reliable than relying on /dev/disk/by-partuuid as @@ -197,6 +198,26 @@ class Partition: time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) + partuuid = self._safe_part_uuid + if partuuid: + return partuuid + + raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") + + @property + def uuid(self) -> Optional[str]: + """ + Returns the UUID as returned by lsblk for the **partition**. + This is more reliable than relying on /dev/disk/by-uuid as + it doesn't seam to be able to detect md raid partitions. + For bind mounts all the subvolumes share the same uuid + """ + for i in range(storage['DISK_RETRY_ATTEMPTS']): + if not self.partprobe(): + raise DiskError(f"Could not perform partprobe on {self.device_path}") + + time.sleep(storage.get('DISK_TIMEOUTS', 1) * i) + partuuid = self._safe_uuid if partuuid: return partuuid @@ -217,6 +238,28 @@ class Partition: log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) try: + return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() + except SysCallError as error: + if self.block_device.info.get('TYPE') == 'iso9660': + # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) + return None + + log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}") + + @property + def _safe_part_uuid(self) -> Optional[str]: + """ + A near copy of self.uuid but without any delays. + This function should only be used where uuid is not crucial. + For instance when you want to get a __repr__ of the class. + """ + if not self.partprobe(): + if self.block_device.info.get('TYPE') == 'iso9660': + return None + + log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) + + try: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() except SysCallError as error: if self.block_device.info.get('TYPE') == 'iso9660': @@ -262,9 +305,26 @@ class Partition: @property def subvolumes(self) -> Iterator[BtrfsSubvolume]: + from .helpers import findmnt + + def iterate_children_recursively(information): + for child in information.get('children', []): + if target := child.get('target'): + if subvolume := subvolume_info_from_path(pathlib.Path(target)): + yield subvolume + + if child.get('children'): + for subchild in iterate_children_recursively(child): + yield subchild + for mountpoint in self.mount_information: - for result in get_subvolumes_from_findmnt(mountpoint): - yield result + if result := findmnt(pathlib.Path(mountpoint['target'])): + for filesystem in result.get('filesystems', []): + if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])): + yield subvolume + + for child in iterate_children_recursively(filesystem): + yield child def partprobe(self) -> bool: try: @@ -315,7 +375,7 @@ class Partition: handle = luks2(self, None, None) return handle.encrypt(self, *args, **kwargs) - def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool: + def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: """ Format can be given an overriding path, for instance /dev/null to test the formatting functionality and in essence the support for the given filesystem. @@ -337,63 +397,71 @@ class Partition: if log_formatting: log(f'Formatting {path} -> {filesystem}', level=logging.INFO) - if filesystem == 'btrfs': - options = ['-f'] + options + try: + if filesystem == 'btrfs': + options = ['-f'] + options - if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')): - raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self.filesystem = filesystem + if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')): + raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') + self.filesystem = filesystem - elif filesystem == 'vfat': - options = ['-F32'] + options + elif filesystem == 'vfat': + options = ['-F32'] + options - if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem - elif filesystem == 'ext4': - options = ['-F'] + options + elif filesystem == 'ext4': + options = ['-F'] + options - if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem - elif filesystem == 'ext2': - options = ['-F'] + options + elif filesystem == 'ext2': + options = ['-F'] + options - if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') - self.filesystem = 'ext2' + if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') + self.filesystem = 'ext2' - elif filesystem == 'xfs': - options = ['-f'] + options + elif filesystem == 'xfs': + options = ['-f'] + options - if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem - elif filesystem == 'f2fs': - options = ['-f'] + options + elif filesystem == 'f2fs': + options = ['-f'] + options - if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem - elif filesystem == 'ntfs3': - options = ['-f'] + options + elif filesystem == 'ntfs3': + options = ['-f'] + options - if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self.filesystem = filesystem - elif filesystem == 'crypto_LUKS': - # from ..luks import luks2 - # encrypted_partition = luks2(self, None, None) - # encrypted_partition.format(path) - self.filesystem = filesystem + elif filesystem == 'crypto_LUKS': + # from ..luks import luks2 + # encrypted_partition = luks2(self, None, None) + # encrypted_partition.format(path) + self.filesystem = filesystem - else: - raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") + else: + raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") + except SysCallError as error: + log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange") + if retry is True: + log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") + time.sleep(storage.get('DISK_TIMEOUTS', 1)) + + return self.format(filesystem, path, log_formatting, options, retry=False) if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': self.encrypted = True @@ -413,6 +481,7 @@ class Partition: def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) + if not fs: if not self.filesystem: raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 63ec1d9b..5fa6bfdc 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -5,6 +5,7 @@ from typing import Optional, Dict, Any, List, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: from .blockdevice import BlockDevice + _: Any from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to from ..hardware import has_uefi @@ -26,13 +27,13 @@ def suggest_single_disk_layout(block_device :BlockDevice, compression = False if default_filesystem == 'btrfs': - prompt = 'Would you like to use BTRFS subvolumes with a default structure?' - choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() - using_subvolumes = choice == 'yes' + prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?')) + choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() + using_subvolumes = choice.value == Menu.yes() - prompt = 'Would you like to use BTRFS compression?' - choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() - compression = choice == 'yes' + prompt = str(_('Would you like to use BTRFS compression?')) + choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() + compression = choice.value == Menu.yes() layout = { block_device.path : { @@ -87,9 +88,9 @@ def suggest_single_disk_layout(block_device :BlockDevice, layout[block_device.path]['partitions'][-1]['start'] = '513MiB' if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART: - prompt = 'Would you like to create a separate partition for /home?' - choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() - using_home_partition = choice == 'yes' + prompt = str(_('Would you like to create a separate partition for /home?')) + choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() + using_home_partition = choice.value == Menu.yes() # Set a size for / (/root) if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition: @@ -138,9 +139,7 @@ def suggest_single_disk_layout(block_device :BlockDevice, return layout -def suggest_multi_disk_layout(block_devices :List[BlockDevice], - default_filesystem :Optional[str] = None, - advanced_options :bool = False) -> Dict[str, Any]: +def suggest_multi_disk_layout(block_devices :List[BlockDevice], default_filesystem :Optional[str] = None, advanced_options :bool = False): if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format @@ -158,6 +157,13 @@ def suggest_multi_disk_layout(block_devices :List[BlockDevice], home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART) root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device]) + if home_device is None or root_device is None: + text = _('The selected drives do not have the minimum capacity required for an automatic suggestion\n') + text += _('Minimum capacity for /home partition: {}GB\n').format(MIN_SIZE_TO_ALLOW_HOME_PART) + text += _('Minimum capacity for Arch Linux partition: {}GB').format(ARCH_LINUX_INSTALLED_SIZE) + Menu(str(text), [str(_('Continue'))], skip=False).run() + return None + compression = False if default_filesystem == 'btrfs': @@ -165,9 +171,9 @@ def suggest_multi_disk_layout(block_devices :List[BlockDevice], # choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() # using_subvolumes = choice == 'yes' - prompt = 'Would you like to use BTRFS compression?' - choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run() - compression = choice == 'yes' + prompt = str(_('Would you like to use BTRFS compression?')) + choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run() + compression = choice.value == Menu.yes() log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG) diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py index f6f58151..a16faa3f 100644 --- a/archinstall/lib/exceptions.py +++ b/archinstall/lib/exceptions.py @@ -48,4 +48,8 @@ class PackageError(BaseException): class TranslationError(BaseException): + pass + + +class Deprecated(BaseException): pass
\ No newline at end of file diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index 174acb8a..b99e4a45 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -10,6 +10,9 @@ import string import sys import time import re +import urllib.parse +import urllib.request +import pathlib from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 @@ -132,6 +135,8 @@ class JsonEncoder: return obj.isoformat() elif isinstance(obj, (list, set, tuple)): return [json.loads(json.dumps(item, cls=JSON)) for item in obj] + elif isinstance(obj, (pathlib.Path)): + return str(obj) else: return obj @@ -352,14 +357,13 @@ class SysCommandWorker: # only way to get the traceback without loosing it. self.pid, self.child_fd = pty.fork() - os.chdir(old_dir) # https://stackoverflow.com/questions/4022600/python-pty-fork-how-does-it-work if not self.pid: try: try: with open(f"{storage['LOG_PATH']}/cmd_history.txt", "a") as cmd_log: - cmd_log.write(f"{' '.join(self.cmd)}\n") + cmd_log.write(f"{self.cmd}\n") except PermissionError: pass @@ -371,6 +375,9 @@ class SysCommandWorker: log(f"{self.cmd[0]} does not exist.", level=logging.ERROR, fg="red") self.exit_code = 1 return False + else: + # Only parent process moves back to the original working directory + os.chdir(old_dir) self.started = time.time() self.poll_object.register(self.child_fd, EPOLLIN | EPOLLHUP) @@ -457,7 +464,14 @@ class SysCommand: if self.session: return self.session - with SysCommandWorker(self.cmd, callbacks=self._callbacks, peak_output=self.peak_output, environment_vars=self.environment_vars, remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines) as session: + with SysCommandWorker( + self.cmd, + callbacks=self._callbacks, + peak_output=self.peak_output, + environment_vars=self.environment_vars, + remove_vt100_escape_codes_from_lines=self.remove_vt100_escape_codes_from_lines, + working_directory=self.working_directory) as session: + if not self.session: self.session = session @@ -523,32 +537,40 @@ def run_custom_user_commands(commands :List[str], installation :Installer) -> No log(execution_output) os.unlink(f"{installation.target}/var/tmp/user-command.{index}.sh") -def json_stream_to_structure(id : str, stream :str, target :dict) -> bool : - """ Function to load a stream (file (as name) or valid JSON string into an existing dictionary +def json_stream_to_structure(configuration_identifier : str, stream :str, target :dict) -> bool : + """ + Function to load a stream (file (as name) or valid JSON string into an existing dictionary Returns true if it could be done Return false if operation could not be executed - +id is just a parameter to get meaningful, but not so long messages + +configuration_identifier is just a parameter to get meaningful, but not so long messages """ - from pathlib import Path - if Path(stream).exists(): - try: - with open(Path(stream)) as fh: - target.update(json.load(fh)) - except Exception as e: - log(f"{id} = {stream} does not contain a valid JSON format: {e}",level=logging.ERROR) - return False + + parsed_url = urllib.parse.urlparse(stream) + + if parsed_url.scheme: # The stream is in fact a URL that should be grabed + with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: + target.update(json.loads(response.read())) else: - log(f"{id} = {stream} does not exists in the filesystem. Trying as JSON stream",level=logging.DEBUG) - # NOTE: failure of this check doesn't make stream 'real' invalid JSON, just it first level entry is not an object (i.e. dict), so it is not a format we handle. - if stream.strip().startswith('{') and stream.strip().endswith('}'): + if pathlib.Path(stream).exists(): try: - target.update(json.loads(stream)) - except Exception as e: - log(f" {id} Contains an invalid JSON format : {e}",level=logging.ERROR) + with pathlib.Path(stream).open() as fh: + target.update(json.load(fh)) + except Exception as error: + log(f"{configuration_identifier} = {stream} does not contain a valid JSON format: {error}", level=logging.ERROR, fg="red") return False else: - log(f" {id} is neither a file nor is a JSON string:",level=logging.ERROR) - return False + # NOTE: This is a rudimentary check if what we're trying parse is a dict structure. + # Which is the only structure we tolerate anyway. + if stream.strip().startswith('{') and stream.strip().endswith('}'): + try: + target.update(json.loads(stream)) + except Exception as e: + log(f" {configuration_identifier} Contains an invalid JSON format : {e}",level=logging.ERROR, fg="red") + return False + else: + log(f" {configuration_identifier} is neither a file nor is a JSON string:",level=logging.ERROR, fg="red") + return False + return True def secret(x :str): diff --git a/archinstall/lib/hardware.py b/archinstall/lib/hardware.py index f183b0d3..8400d338 100644 --- a/archinstall/lib/hardware.py +++ b/archinstall/lib/hardware.py @@ -55,7 +55,7 @@ AVAILABLE_GFX_DRIVERS = { "mesa", "xf86-video-intel" ], - "Nvidia (open-source)": [ + "Nvidia (open-source nouveau driver)": [ "mesa", "xf86-video-nouveau", "libva-mesa-driver" diff --git a/archinstall/lib/hsm/__init__.py b/archinstall/lib/hsm/__init__.py new file mode 100644 index 00000000..c0888b04 --- /dev/null +++ b/archinstall/lib/hsm/__init__.py @@ -0,0 +1,4 @@ +from .fido import ( + get_fido2_devices, + fido2_enroll +)
\ No newline at end of file diff --git a/archinstall/lib/hsm/fido.py b/archinstall/lib/hsm/fido.py new file mode 100644 index 00000000..49f36957 --- /dev/null +++ b/archinstall/lib/hsm/fido.py @@ -0,0 +1,57 @@ +import typing +import pathlib +import getpass +import logging +from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes +from ..disk.partition import Partition +from ..general import log + +def get_fido2_devices() -> typing.Dict[str, typing.Dict[str, str]]: + """ + Uses systemd-cryptenroll to list the FIDO2 devices + connected that supports FIDO2. + Some devices might show up in udevadm as FIDO2 compliant + when they are in fact not. + + The drawback of systemd-cryptenroll is that it uses human readable format. + That means we get this weird table like structure that is of no use. + + So we'll look for `MANUFACTURER` and `PRODUCT`, we take their index + and we split each line based on those positions. + """ + worker = clear_vt100_escape_codes(SysCommand(f"systemd-cryptenroll --fido2-device=list").decode('UTF-8')) + + MANUFACTURER_POS = 0 + PRODUCT_POS = 0 + devices = {} + for line in worker.split('\r\n'): + if '/dev' not in line: + MANUFACTURER_POS = line.find('MANUFACTURER') + PRODUCT_POS = line.find('PRODUCT') + continue + + path = line[:MANUFACTURER_POS].rstrip() + manufacturer = line[MANUFACTURER_POS:PRODUCT_POS].rstrip() + product = line[PRODUCT_POS:] + + devices[path] = { + 'manufacturer' : manufacturer, + 'product' : product + } + + return devices + +def fido2_enroll(hsm_device_path :pathlib.Path, partition :Partition, password :str) -> bool: + worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device_path} {partition.real_device}", peak_output=True) + pw_inputted = False + pin_inputted = False + while worker.is_alive(): + if pw_inputted is False and bytes(f"please enter current passphrase for disk {partition.real_device}", 'UTF-8') in worker._trace_log.lower(): + worker.write(bytes(password, 'UTF-8')) + pw_inputted = True + + elif pin_inputted is False and bytes(f"please enter security token pin", 'UTF-8') in worker._trace_log.lower(): + worker.write(bytes(getpass.getpass(" "), 'UTF-8')) + pin_inputted = True + + log(f"You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds.", level=logging.INFO, fg="yellow")
\ No newline at end of file diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index 8b77317a..b2cd6306 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -13,16 +13,17 @@ from .disk import get_partitions_in_use, Partition from .general import SysCommand, generate_password from .hardware import has_uefi, is_vm, cpu_vendor from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout -from .disk.helpers import get_mount_info +from .disk.helpers import findmnt from .mirrors import use_mirrors from .plugins import plugins from .storage import storage # from .user_interaction import * from .output import log from .profiles import Profile -from .disk.btrfs import manage_btrfs_subvolumes from .disk.partition import get_mount_fs_type from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError +from .hsm import fido2_enroll +from .models.users import User if TYPE_CHECKING: _: Any @@ -126,7 +127,9 @@ class Installer: self.MODULES = [] self.BINARIES = [] self.FILES = [] - self.HOOKS = ["base", "udev", "autodetect", "keyboard", "keymap", "modconf", "block", "filesystems", "fsck"] + # systemd, sd-vconsole and sd-encrypt will be replaced by udev, keymap and encrypt + # if HSM is not used to encrypt the root volume. Check mkinitcpio() function for that override. + self.HOOKS = ["base", "systemd", "autodetect", "keyboard", "sd-vconsole", "modconf", "block", "filesystems", "fsck"] self.KERNEL_PARAMS = [] self._zram_enabled = False @@ -230,21 +233,26 @@ class Installer: def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None: from .luks import luks2 + from .disk.btrfs import setup_subvolumes, mount_subvolume + # set the partitions as a list not part of a tree (which we don't need anymore (i think) list_part = [] list_luks_handles = [] for blockdevice in layouts: list_part.extend(layouts[blockdevice]['partitions']) + # TODO: Implement a proper mount-queue system that does not depend on return values. + mount_queue = {} + # we manage the encrypted partititons for partition in [entry for entry in list_part if entry.get('encrypted', False)]: # open the luks device and all associate stuff if not (password := partition.get('!password', None)): raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}") - # i change a bit the naming conventions for the loop device loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" else: loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: if partition.get('generate-encryption-key-file',False) and not self._has_root(partition): @@ -252,38 +260,74 @@ class Installer: # this way all the requesrs will be to the dm_crypt device and not to the physical partition partition['device_instance'] = unlocked_device + if self._has_root(partition) and partition.get('generate-encryption-key-file', False) is False: + if storage['arguments'].get('HSM'): + hsm_device_path = storage['arguments']['HSM'] + fido2_enroll(hsm_device_path, partition['device_instance'], password) + # we manage the btrfs partitions - for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]: - if partition.get('filesystem',{}).get('mount_options',[]): - mount_options = ','.join(partition['filesystem']['mount_options']) - self.mount(partition['device_instance'], "/", options=mount_options) - else: - self.mount(partition['device_instance'], "/") - try: - new_mountpoints = manage_btrfs_subvolumes(self,partition) - except Exception as e: - # every exception unmounts the physical volume. Otherwise we let the system in an unstable state + if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]): + for partition in btrfs_subvolumes: + if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])): + self.mount(partition['device_instance'], "/", options=mount_options) + else: + self.mount(partition['device_instance'], "/") + + setup_subvolumes( + installation=self, + partition_dict=partition + ) + partition['device_instance'].unmount() - raise e - partition['device_instance'].unmount() - if new_mountpoints: - list_part.extend(new_mountpoints) - # we mount. We need to sort by mountpoint to get a good working order - for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']): + # We then handle any special cases, such as btrfs + if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]): + for partition_information in btrfs_subvolumes: + for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]): + btrfs_subvolume_information = {} + + match mountpoint: + case str(): # backwards-compatability + btrfs_subvolume_information['mountpoint'] = mountpoint + btrfs_subvolume_information['options'] = [] + case dict(): + btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None) + btrfs_subvolume_information['options'] = mountpoint.get('options', []) + case _: + continue + + if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'): + # We cache the mount call for later + mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \ + name=name, \ + subvolume_information=btrfs_subvolume_information: mount_subvolume( + installation=self, + device=device, + name=name, + subvolume_information=subvolume_information + ) + + # We mount ordinary partitions, and we sort them by the mountpoint + for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']): mountpoint = partition['mountpoint'] log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) if partition.get('filesystem',{}).get('mount_options',[]): mount_options = ','.join(partition['filesystem']['mount_options']) - partition['device_instance'].mount(f"{self.target}{mountpoint}", options=mount_options) + mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}", options=mount_options: instance.mount(target, options=options) else: - partition['device_instance'].mount(f"{self.target}{mountpoint}") + mount_queue[mountpoint] = lambda instance=partition['device_instance'], target=f"{self.target}{mountpoint}": instance.mount(target) + + log(f"Using mount order: {list(sorted(mount_queue.items(), key=lambda item: item[0]))}", level=logging.INFO, fg="white") + + # We mount everything by sorting on the mountpoint itself. + for mountpoint, frozen_func in sorted(mount_queue.items(), key=lambda item: item[0]): + frozen_func() time.sleep(1) try: - get_mount_info(f"{self.target}{mountpoint}", traverse=False) + findmnt(pathlib.Path(f"{self.target}{mountpoint}"), traverse=False) except DiskError: raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).") @@ -365,15 +409,28 @@ class Installer: self.log(f'Installing packages: {packages}', level=logging.INFO) - if (sync_mirrors := run_pacman('-Syy', default_cmd='/usr/bin/pacman')).exit_code == 0: - if (pacstrap := SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf {self.target} {" ".join(packages)} --noconfirm', peak_output=True)).exit_code == 0: - return True - else: - self.log(f'Could not strap in packages: {pacstrap}', level=logging.ERROR, fg="red") - self.log(f'Could not strap in packages: {pacstrap.exit_code}', level=logging.ERROR, fg="red") - raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") - else: - self.log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=logging.INFO) + # TODO: We technically only need to run the -Syy once. + try: + run_pacman('-Syy', default_cmd='/usr/bin/pacman') + except SysCallError as error: + self.log(f'Could not sync a new package databse: {error}', level=logging.ERROR, fg="red") + + if storage['arguments'].get('silent', False) is False: + if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): + return self.pacstrap(*packages, **kwargs) + + raise RequirementError(f'Could not sync mirrors: {error}', level=logging.ERROR, fg="red") + + try: + return SysCommand(f'/usr/bin/pacstrap -C /etc/pacman.conf {self.target} {" ".join(packages)} --noconfirm', peak_output=True).exit_code == 0 + except SysCallError as error: + self.log(f'Could not strap in packages: {error}', level=logging.ERROR, fg="red") + + if storage['arguments'].get('silent', False) is False: + if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): + return self.pacstrap(*packages, **kwargs) + + raise RequirementError("Pacstrap failed. See /var/log/archinstall/install.log or above message for error details.") def set_mirrors(self, mirrors :Mapping[str, Iterator[str]]) -> None: for plugin in plugins.values(): @@ -596,6 +653,15 @@ class Installer: mkinit.write(f"MODULES=({' '.join(self.MODULES)})\n") mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") + + if not storage['arguments']['HSM']: + # For now, if we don't use HSM we revert to the old + # way of setting up encryption hooks for mkinitcpio. + # This is purely for stability reasons, we're going away from this. + # * systemd -> udev + # * sd-vconsole -> keymap + self.HOOKS = [hook.replace('systemd', 'udev').replace('sd-vconsole', 'keymap') for hook in self.HOOKS] + mkinit.write(f"HOOKS=({' '.join(self.HOOKS)})\n") return SysCommand(f'/usr/bin/arch-chroot {self.target} mkinitcpio {" ".join(flags)}').exit_code == 0 @@ -630,8 +696,15 @@ class Installer: self.HOOKS.remove('fsck') if self.detect_encryption(partition): - if 'encrypt' not in self.HOOKS: - self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') + if storage['arguments']['HSM']: + # Required bby mkinitcpio to add support for fido2-device options + self.pacstrap('libfido2') + + if 'sd-encrypt' not in self.HOOKS: + self.HOOKS.insert(self.HOOKS.index('filesystems'), 'sd-encrypt') + else: + if 'encrypt' not in self.HOOKS: + self.HOOKS.insert(self.HOOKS.index('filesystems'), 'encrypt') if not has_uefi(): self.base_packages.append('grub') @@ -687,6 +760,14 @@ class Installer: # TODO: Use python functions for this SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') + if storage['arguments']['HSM']: + # TODO: + # A bit of a hack, but we need to get vconsole.conf in there + # before running `mkinitcpio` because it expects it in HSM mode. + if (vconsole := pathlib.Path(f"{self.target}/etc/vconsole.conf")).exists() is False: + with vconsole.open('w') as fh: + fh.write(f"KEYMAP={storage['arguments']['keyboard-layout']}\n") + self.mkinitcpio('-P') self.helper_flags['base'] = True @@ -731,7 +812,9 @@ class Installer: # And in which case we should do some clean up. # Install the boot loader - if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0: + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install') + except SysCallError: # Fallback, try creating the boot loader without touching the EFI variables SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') @@ -777,7 +860,7 @@ class Installer: elif vendor == "GenuineIntel": entry.write("initrd /intel-ucode.img\n") else: - self.log("unknow cpu vendor, not adding ucode to systemd-boot config") + self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to systemd-boot config.", level=logging.DEBUG) entry.write(f"initrd /initramfs-{kernel}.img\n") # blkid doesn't trigger on loopback devices really well, # so we'll use the old manual method until we get that sorted out. @@ -801,11 +884,23 @@ class Installer: if real_device := self.detect_encryption(root_partition): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) - entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}') + log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}/{real_device.part_uuid}'.", level=logging.DEBUG) + + kernel_options = f"options" + + if storage['arguments']['HSM']: + # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work + kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" + # Note: tpm2-device and fido2-device don't play along very well: + # https://github.com/archlinux/archinstall/pull/1196#issuecomment-1129715645 + kernel_options += f" rd.luks.options=fido2-device=auto,password-echo=no" + else: + kernel_options += f" cryptdevice=PARTUUID={real_device.part_uuid}:luksdev" + + entry.write(f'{kernel_options} root=/dev/mapper/luksdev {options_entry}') else: - log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG) - entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}') + log(f"Identifying root partition by PARTUUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) + entry.write(f'options root=PARTUUID={root_partition.part_uuid} {options_entry}') self.helper_flags['bootloader'] = "systemd" @@ -881,7 +976,7 @@ class Installer: elif vendor == "GenuineIntel": kernel_parameters.append("initrd=\\intel-ucode.img") else: - self.log("unknow cpu vendor, not adding ucode to firmware boot entry") + self.log(f"Unknown CPU vendor '{vendor}' detected. Archinstall won't add any ucode to firmware boot entry.", level=logging.DEBUG) kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img") @@ -890,11 +985,11 @@ class Installer: if real_device := self.detect_encryption(root_partition): # TODO: We need to detect if the encrypted device is a whole disk encryption, # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.part_uuid}'.", level=logging.DEBUG) + kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.part_uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') else: - log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.part_uuid}'.", level=logging.DEBUG) + kernel_parameters.append(f'root=PARTUUID={root_partition.part_uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') @@ -921,10 +1016,14 @@ class Installer: if plugin.on_add_bootloader(self): return True + if type(self.target) == str: + self.target = pathlib.Path(self.target) + boot_partition = None root_partition = None for partition in self.partitions: - if partition.mountpoint == os.path.join(self.target, 'boot'): + print(partition, [partition.mountpoint], [self.target]) + if partition.mountpoint == self.target / 'boot': boot_partition = partition elif partition.mountpoint == self.target: root_partition = partition @@ -963,10 +1062,10 @@ class Installer: if type(profile) == str: profile = Profile(self, profile) - self.log(f'Installing network profile {profile}', level=logging.INFO) + self.log(f'Installing archinstall profile {profile}', level=logging.INFO) return profile.install() - def enable_sudo(self, entity: str, group :bool = False) -> bool: + def enable_sudo(self, entity: str, group :bool = False): self.log(f'Enabling sudo permissions for {entity}.', level=logging.INFO) sudoers_dir = f"{self.target}/etc/sudoers.d" @@ -996,9 +1095,14 @@ class Installer: # Guarantees sudoer conf file recommended perms os.chmod(pathlib.Path(rule_file_name), 0o440) - return True + def create_users(self, users: Union[User, List[User]]): + if not isinstance(users, list): + users = [users] + + for user in users: + self.user_create(user.username, user.password, user.groups, user.sudo) - def user_create(self, user :str, password :Optional[str] = None, groups :Optional[str] = None, sudo :bool = False) -> None: + def user_create(self, user :str, password :Optional[str] = None, groups :Optional[List[str]] = None, sudo :bool = False) -> None: if groups is None: groups = [] diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index 710af01e..ac480b11 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -15,7 +15,10 @@ from .general import SysCommand, SysCommandWorker from .output import log from .exceptions import SysCallError, DiskError from .storage import storage +from .disk.helpers import get_filesystem_type from .disk.mapperdev import MapperDev +from .disk.btrfs import BTRFSPartition + class luks2: def __init__(self, @@ -149,7 +152,6 @@ class luks2: :param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev :type mountpoint: str """ - from .disk import get_filesystem_type if '/' in mountpoint: os.path.basename(mountpoint) # TODO: Raise exception instead? @@ -162,14 +164,22 @@ class luks2: if os.path.islink(f'/dev/mapper/{mountpoint}'): self.mapdev = f'/dev/mapper/{mountpoint}' - unlocked_partition = Partition( + if (filesystem_type := get_filesystem_type(pathlib.Path(self.mapdev))) == 'btrfs': + return BTRFSPartition( + self.mapdev, + block_device=MapperDev(mountpoint).partition.block_device, + encrypted=True, + filesystem=filesystem_type, + autodetect_filesystem=False + ) + + return Partition( self.mapdev, block_device=MapperDev(mountpoint).partition.block_device, encrypted=True, filesystem=get_filesystem_type(self.mapdev), autodetect_filesystem=False ) - return unlocked_partition def close(self, mountpoint :Optional[str] = None) -> bool: if not mountpoint: diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py index afccd45b..5cb27cab 100644 --- a/archinstall/lib/menu/global_menu.py +++ b/archinstall/lib/menu/global_menu.py @@ -1,6 +1,8 @@ from __future__ import annotations -from typing import Any, List, Optional, Union +from typing import Any, List, Optional, Union, Dict, TYPE_CHECKING + +import archinstall from ..menu import Menu from ..menu.selection_menu import Selector, GeneralMenu @@ -8,8 +10,7 @@ from ..general import SysCommand, secret from ..hardware import has_uefi from ..models import NetworkConfiguration from ..storage import storage -from ..output import log -from ..profiles import is_desktop_profile +from ..profiles import is_desktop_profile, Profile from ..disk import encrypted_partitions from ..user_interaction import get_password, ask_for_a_timezone, save_config @@ -20,7 +21,6 @@ from ..user_interaction import ask_hostname from ..user_interaction import ask_for_audio_selection from ..user_interaction import ask_additional_packages_to_install from ..user_interaction import ask_to_configure_network -from ..user_interaction import ask_for_superuser_account from ..user_interaction import ask_for_additional_users from ..user_interaction import select_language from ..user_interaction import select_mirror_regions @@ -32,126 +32,147 @@ from ..user_interaction import select_encrypted_partitions from ..user_interaction import select_harddrives from ..user_interaction import select_profile from ..user_interaction import select_additional_repositories +from ..models.users import User +from ..user_interaction.partitioning_conf import current_partition_layout +from ..output import FormattedOutput + +if TYPE_CHECKING: + _: Any + class GlobalMenu(GeneralMenu): def __init__(self,data_store): - super().__init__(data_store=data_store, auto_cursor=True) + super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3) def _setup_selection_menu_options(self): # archinstall.Language will not use preset values self._menu_options['archinstall-language'] = \ Selector( - _('Select Archinstall language'), - lambda x: self._select_archinstall_language('English'), + _('Archinstall language'), + lambda x: self._select_archinstall_language(x), default='English') self._menu_options['keyboard-layout'] = \ - Selector(_('Select keyboard layout'), lambda preset: select_language('us',preset), default='us') + Selector( + _('Keyboard layout'), + lambda preset: select_language(preset), + default='us') self._menu_options['mirror-region'] = \ Selector( - _('Select mirror region'), - select_mirror_regions, + _('Mirror region'), + lambda preset: select_mirror_regions(preset), display_func=lambda x: list(x.keys()) if x else '[]', default={}) self._menu_options['sys-language'] = \ - Selector(_('Select locale language'), lambda preset: select_locale_lang('en_US',preset), default='en_US') + Selector( + _('Locale language'), + lambda preset: select_locale_lang(preset), + default='en_US') self._menu_options['sys-encoding'] = \ - Selector(_('Select locale encoding'), lambda preset: select_locale_enc('utf-8',preset), default='utf-8') + Selector( + _('Locale encoding'), + lambda preset: select_locale_enc(preset), + default='UTF-8') self._menu_options['harddrives'] = \ Selector( - _('Select harddrives'), - self._select_harddrives) + _('Drive(s)'), + lambda preset: self._select_harddrives(preset), + display_func=lambda x: f'{len(x)} ' + str(_('Drive(s)')) if x is not None and len(x) > 0 else '', + preview_func=self._prev_harddrives, + ) self._menu_options['disk_layouts'] = \ Selector( - _('Select disk layout'), - lambda x: select_disk_layout( + _('Disk layout'), + lambda preset: select_disk_layout( + preset, storage['arguments'].get('harddrives', []), storage['arguments'].get('advanced', False) ), + preview_func=self._prev_disk_layouts, + display_func=lambda x: self._display_disk_layout(x), dependencies=['harddrives']) self._menu_options['!encryption-password'] = \ Selector( - _('Set encryption password'), + _('Encryption password'), lambda x: self._select_encrypted_password(), display_func=lambda x: secret(x) if x else 'None', dependencies=['harddrives']) + self._menu_options['HSM'] = Selector( + description=_('Use HSM to unlock encrypted drive'), + func=lambda preset: self._select_hsm(preset), + dependencies=['!encryption-password'], + default=None + ) self._menu_options['swap'] = \ Selector( - _('Use swap'), + _('Swap'), lambda preset: ask_for_swap(preset), default=True) self._menu_options['bootloader'] = \ Selector( - _('Select bootloader'), + _('Bootloader'), lambda preset: ask_for_bootloader(storage['arguments'].get('advanced', False),preset), default="systemd-bootctl" if has_uefi() else "grub-install") self._menu_options['hostname'] = \ Selector( - _('Specify hostname'), + _('Hostname'), ask_hostname, default='archlinux') # root password won't have preset value self._menu_options['!root-password'] = \ Selector( - _('Set root password'), + _('Root password'), lambda preset:self._set_root_password(), display_func=lambda x: secret(x) if x else 'None') - self._menu_options['!superusers'] = \ - Selector( - _('Specify superuser account'), - lambda preset: self._create_superuser_account(), - default={}, - exec_func=lambda n,v:self._users_resynch(), - dependencies_not=['!root-password'], - display_func=lambda x: self._display_superusers()) self._menu_options['!users'] = \ Selector( - _('Specify user account'), - lambda x: self._create_user_account(), + _('User account'), + lambda x: self._create_user_account(x), default={}, - exec_func=lambda n,v:self._users_resynch(), - display_func=lambda x: list(x.keys()) if x else '[]') + display_func=lambda x: f'{len(x)} {_("User(s)")}' if len(x) > 0 else None, + preview_func=self._prev_users) self._menu_options['profile'] = \ Selector( - _('Specify profile'), - lambda x: self._select_profile(), - display_func=lambda x: x if x else 'None') + _('Profile'), + lambda preset: self._select_profile(preset), + display_func=lambda x: x if x else 'None' + ) self._menu_options['audio'] = \ Selector( - _('Select audio'), + _('Audio'), lambda preset: ask_for_audio_selection(is_desktop_profile(storage['arguments'].get('profile', None)),preset), display_func=lambda x: x if x else 'None', default=None ) self._menu_options['kernels'] = \ Selector( - _('Select kernels'), + _('Kernels'), lambda preset: select_kernel(preset), default=['linux']) self._menu_options['packages'] = \ Selector( - _('Additional packages to install'), + _('Additional packages'), # lambda x: ask_additional_packages_to_install(storage['arguments'].get('packages', None)), ask_additional_packages_to_install, default=[]) self._menu_options['additional-repositories'] = \ Selector( - _('Additional repositories to enable'), + _('Optional repositories'), select_additional_repositories, default=[]) self._menu_options['nic'] = \ Selector( - _('Configure network'), + _('Network configuration'), ask_to_configure_network, display_func=lambda x: self._prev_network_configuration(x), default={}) self._menu_options['timezone'] = \ Selector( - _('Select timezone'), + _('Timezone'), lambda preset: ask_for_a_timezone(preset), default='UTC') self._menu_options['ntp'] = \ Selector( - _('Set automatic time sync (NTP)'), + _('Automatic time sync (NTP)'), lambda preset: self._select_ntp(preset), default=True) self._menu_options['__separator__'] = \ @@ -172,7 +193,7 @@ class GlobalMenu(GeneralMenu): def _update_install_text(self, name :str = None, result :Any = None): text = self._install_text() - self._menu_options.get('install').update_description(text) + self._menu_options['install'].update_description(text) def post_callback(self,name :str = None ,result :Any = None): self._update_install_text(name, result) @@ -182,14 +203,21 @@ class GlobalMenu(GeneralMenu): # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format.. # Then we need to identify which partitions to encrypt. This will default to / (root). if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0: - storage['arguments']['disk_layouts'] = select_encrypted_partitions( - storage['arguments']['disk_layouts'], storage['arguments']['!encryption-password']) + for blockdevice in storage['arguments']['disk_layouts']: + for partition_index in select_encrypted_partitions( + title="Select which partitions to encrypt:", + partitions=storage['arguments']['disk_layouts'][blockdevice]['partitions'] + ): + + partition = storage['arguments']['disk_layouts'][blockdevice]['partitions'][partition_index] + partition['encrypted'] = True + partition['!password'] = storage['arguments']['!encryption-password'] def _install_text(self): missing = len(self._missing_configs()) if missing > 0: return _('Install ({} config(s) missing)').format(missing) - return 'Install' + return _('Install') def _prev_network_configuration(self, cur_value: Union[NetworkConfiguration, List[NetworkConfiguration]]) -> str: if not cur_value: @@ -201,6 +229,35 @@ class GlobalMenu(GeneralMenu): else: return str(cur_value) + def _prev_harddrives(self) -> Optional[str]: + selector = self._menu_options['harddrives'] + if selector.has_selection(): + drives = selector.current_selection + return '\n\n'.join([d.display_info for d in drives]) + return None + + def _prev_disk_layouts(self) -> Optional[str]: + selector = self._menu_options['disk_layouts'] + if selector.has_selection(): + layouts: Dict[str, Dict[str, Any]] = selector.current_selection + + output = '' + for device, layout in layouts.items(): + output += f'{_("Device")}: {device}\n\n' + output += current_partition_layout(layout['partitions'], with_title=False) + output += '\n\n' + + return output.rstrip() + + return None + + def _display_disk_layout(self, current_value: Optional[Dict[str, Any]]) -> str: + if current_value: + total_partitions = [entry['partitions'] for entry in current_value.values()] + total_nr = sum([len(p) for p in total_partitions]) + return f'{total_nr} {_("Partitions")}' + return '' + def _prev_install_missing_config(self) -> Optional[str]: if missing := self._missing_configs(): text = str(_('Missing configurations:\n')) @@ -209,31 +266,42 @@ class GlobalMenu(GeneralMenu): return text[:-1] # remove last new line return None + def _prev_users(self) -> Optional[str]: + selector = self._menu_options['!users'] + if selector.has_selection(): + users: List[User] = selector.current_selection + return FormattedOutput.as_table(users) + return None + def _missing_configs(self) -> List[str]: def check(s): return self._menu_options.get(s).has_selection() + def has_superuser() -> bool: + users = self._menu_options['!users'].current_selection + return any([u.sudo for u in users]) + missing = [] if not check('bootloader'): missing += ['Bootloader'] if not check('hostname'): missing += ['Hostname'] - if not check('!root-password') and not check('!superusers'): - missing += [str(_('Either root-password or at least 1 superuser must be specified'))] + if not check('!root-password') and not has_superuser(): + missing += [str(_('Either root-password or at least 1 user with sudo privileges must be specified'))] if not check('harddrives'): missing += ['Hard drives'] if check('harddrives'): - if not self._menu_options.get('harddrives').is_empty() and not check('disk_layouts'): + if not self._menu_options['harddrives'].is_empty() and not check('disk_layouts'): missing += ['Disk layout'] return missing - def _set_root_password(self): + def _set_root_password(self) -> Optional[str]: prompt = str(_('Enter root password (leave blank to disable root): ')) password = get_password(prompt=prompt) return password - def _select_encrypted_password(self): + def _select_encrypted_password(self) -> Optional[str]: if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))): return passwd else: @@ -247,59 +315,75 @@ class GlobalMenu(GeneralMenu): return ntp - def _select_harddrives(self, old_harddrives : list) -> list: - # old_haddrives = storage['arguments'].get('harddrives', []) + def _select_harddrives(self, old_harddrives : list) -> List: harddrives = select_harddrives(old_harddrives) - # in case the harddrives got changed we have to reset the disk layout as well - if old_harddrives != harddrives: - self._menu_options.get('disk_layouts').set_current_selection(None) - storage['arguments']['disk_layouts'] = {} - - if not harddrives: + if len(harddrives) == 0: prompt = _( "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n" "WARNING: Archinstall won't check the suitability of this setup\n" "Do you wish to continue?" ).format(storage['MOUNT_POINT']) - choice = Menu(prompt, ['yes', 'no'], default_option='yes').run() + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run() - if choice == 'no': - exit(1) + if choice.value == Menu.no(): + return self._select_harddrives(old_harddrives) + + # in case the harddrives got changed we have to reset the disk layout as well + if old_harddrives != harddrives: + self._menu_options['disk_layouts'].set_current_selection(None) + storage['arguments']['disk_layouts'] = {} return harddrives - def _select_profile(self): - profile = select_profile() + def _select_profile(self, preset): + profile = select_profile(preset) + ret = None + + if profile is None: + if any([ + archinstall.storage.get('profile_minimal', False), + archinstall.storage.get('_selected_servers', None), + archinstall.storage.get('_desktop_profile', None), + archinstall.arguments.get('desktop-environment', None), + archinstall.arguments.get('gfx_driver_packages', None) + ]): + return preset + else: # ctrl+c was actioned and all profile settings have been reset + return None + + servers = archinstall.storage.get('_selected_servers', []) + desktop = archinstall.storage.get('_desktop_profile', None) + desktop_env = archinstall.arguments.get('desktop-environment', None) + gfx_driver = archinstall.arguments.get('gfx_driver_packages', None) # Check the potentially selected profiles preparations to get early checks if some additional questions are needed. if profile and profile.has_prep_function(): namespace = f'{profile.namespace}.py' with profile.load_instructions(namespace=namespace) as imported: - if not imported._prep_function(): - log(' * Profile\'s preparation requirements was not fulfilled.', fg='red') - exit(1) - - return profile - - def _create_superuser_account(self): - superusers = ask_for_superuser_account(str(_('Manage superuser accounts: '))) - return superusers if superusers else None - - def _create_user_account(self): - users = ask_for_additional_users(str(_('Manage ordinary user accounts: '))) + if imported._prep_function(servers=servers, desktop=desktop, desktop_env=desktop_env, gfx_driver=gfx_driver): + ret: Profile = profile + + match ret.name: + case 'minimal': + reset = ['_selected_servers', '_desktop_profile', 'desktop-environment', 'gfx_driver_packages'] + case 'server': + reset = ['_desktop_profile', 'desktop-environment'] + case 'desktop': + reset = ['_selected_servers'] + case 'xorg': + reset = ['_selected_servers', '_desktop_profile', 'desktop-environment'] + + for r in reset: + archinstall.storage[r] = None + else: + return self._select_profile(preset) + elif profile: + ret = profile + + return ret + + def _create_user_account(self, defined_users: List[User]) -> List[User]: + users = ask_for_additional_users(defined_users=defined_users) return users - - def _display_superusers(self): - superusers = self._data_store.get('!superusers', {}) - - if self._menu_options.get('!root-password').has_selection(): - return list(superusers.keys()) if superusers else '[]' - else: - return list(superusers.keys()) if superusers else '' - - def _users_resynch(self): - self.synch('!superusers') - self.synch('!users') - return False diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py index 4e6dffbe..cb567093 100644 --- a/archinstall/lib/menu/list_manager.py +++ b/archinstall/lib/menu/list_manager.py @@ -84,12 +84,12 @@ The contents in the base class of this methods serve for a very basic usage, and ``` """ +import copy +from os import system +from typing import Union, Any, TYPE_CHECKING, Dict, Optional from .text_input import TextInput from .menu import Menu -from os import system -from copy import copy -from typing import Union, Any, List, TYPE_CHECKING if TYPE_CHECKING: _: Any @@ -144,82 +144,99 @@ class ListManager: self.bottom_list = [self.confirm_action,self.cancel_action] self.bottom_item = [self.cancel_action] self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))] - self.base_data = base_list - self._data = copy(base_list) # as refs, changes are immediate + self._original_data = copy.deepcopy(base_list) + self._data = copy.deepcopy(base_list) # as refs, changes are immediate # default values for the null case - self.target = None + self.target: Optional[Any] = None self.action = self._null_action + if len(self._data) == 0 and self._null_action: - self.exec_action(self._data) + self._data = self.exec_action(self._data) def run(self): while True: - self._data_formatted = self.reformat(self._data) - options = self._data_formatted + [self.separator] + # this will return a dictionary with the key as the menu entry to be displayed + # and the value is the original value from the self._data container + data_formatted = self.reformat(self._data) + options = list(data_formatted.keys()) + options.append(self.separator) + if self._default_action: options += self._default_action + options += self.bottom_list + system('clear') - target = Menu(self._prompt, + + target = Menu( + self._prompt, options, sort=False, clear_screen=False, clear_menu_on_exit=False, header=self.header, - skip_empty_entries=True).run() + skip_empty_entries=True, + skip=False + ).run() - if not target or target in self.bottom_list: + if not target.value or target.value in self.bottom_list: self.action = target break - if target and target == self.separator: - continue - if target and target in self._default_action: - self.action = target - target = None + + if target.value and target.value in self._default_action: + self.action = target.value self.target = None - self.exec_action(self._data) + self._data = self.exec_action(self._data) continue + if isinstance(self._data,dict): - key = list(self._data.keys())[self._data_formatted.index(target)] - self.target = {key: self._data[key]} + data_key = data_formatted[target.value] + key = self._data[data_key] + self.target = {data_key: key} + elif isinstance(self._data, list): + self.target = [d for d in self._data if d == data_formatted[target.value]][0] else: - self.target = self._data[self._data_formatted.index(target)] + self.target = self._data[data_formatted[target.value]] + # Possible enhacement. If run_actions returns false a message line indicating the failure - self.run_actions(target) + self.run_actions(target.value) - if not target or target == self.cancel_action: # TODO dubious - return self.base_data # return the original list + if target.value == self.cancel_action: # TODO dubious + return self._original_data # return the original list else: return self._data def run_actions(self,prompt_data=None): options = self.action_list() + self.bottom_item prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target) - self.action = Menu( + choice = Menu( prompt, options, sort=False, clear_screen=False, clear_menu_on_exit=False, preset_values=self.bottom_item, - show_search_hint=False).run() - if not self.action or self.action == self.cancel_action: - return False - else: - return self.exec_action(self._data) + show_search_hint=False + ).run() + + self.action = choice.value + + if self.action and self.action != self.cancel_action: + self._data = self.exec_action(self._data) + """ The following methods are expected to be overwritten by the user if the needs of the list are beyond the simple case """ - def reformat(self, data: Any) -> List[Any]: + def reformat(self, data: Any) -> Dict[str, Any]: """ method to get the data in a format suitable to be shown It is executed once for run loop and processes the whole self._data structure """ if isinstance(data,dict): - return list(map(lambda x:f"{x} : {data[x]}",data)) + return {f'{k}: {v}': k for k, v in data.items()} else: - return list(map(lambda x:str(x),data)) + return {str(k): k for k in data} def action_list(self): """ @@ -238,18 +255,18 @@ class ListManager: # TODO guarantee unicity if isinstance(self._data,list): if self.action == str(_('Add')): - self.target = TextInput(_('Add :'),None).run() + self.target = TextInput(_('Add: '),None).run() self._data.append(self.target) if self.action == str(_('Copy')): while True: - target = TextInput(_('Copy to :'),self.target).run() + target = TextInput(_('Copy to: '),self.target).run() if target != self.target: self._data.append(self.target) break elif self.action == str(_('Edit')): tgt = self.target idx = self._data.index(self.target) - result = TextInput(_('Edite :'),tgt).run() + result = TextInput(_('Edit: '),tgt).run() self._data[idx] = result elif self.action == str(_('Delete')): del self._data[self._data.index(self.target)] @@ -261,8 +278,8 @@ class ListManager: origkey = None origval = None if self.action == str(_('Add')): - key = TextInput(_('Key :'),None).run() - value = TextInput(_('Value :'),None).run() + key = TextInput(_('Key: '),None).run() + value = TextInput(_('Value: '),None).run() self._data[key] = value if self.action == str(_('Copy')): while True: @@ -271,7 +288,9 @@ class ListManager: self._data[key] = origval break elif self.action == str(_('Edit')): - value = TextInput(_(f'Edit {origkey} :'),origval).run() + value = TextInput(_('Edit {}: ').format(origkey), origval).run() self._data[origkey] = value elif self.action == str(_('Delete')): del self._data[origkey] + + return self._data diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py index d254e0f9..c34814eb 100644 --- a/archinstall/lib/menu/menu.py +++ b/archinstall/lib/menu/menu.py @@ -1,4 +1,6 @@ -from typing import Dict, List, Union, Any, TYPE_CHECKING +from dataclasses import dataclass +from enum import Enum, auto +from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional from archinstall.lib.menu.simple_menu import TerminalMenu @@ -12,21 +14,49 @@ import logging if TYPE_CHECKING: _: Any + +class MenuSelectionType(Enum): + Selection = auto() + Esc = auto() + Ctrl_c = auto() + + +@dataclass +class MenuSelection: + type_: MenuSelectionType + value: Optional[Union[str, List[str]]] = None + + class Menu(TerminalMenu): + + @classmethod + def yes(cls): + return str(_('yes')) + + @classmethod + def no(cls): + return str(_('no')) + + @classmethod + def yes_no(cls): + return [cls.yes(), cls.no()] + def __init__( self, title :str, p_options :Union[List[str], Dict[str, Any]], skip :bool = True, multi :bool = False, - default_option :str = None, + default_option : Optional[str] = None, sort :bool = True, preset_values :Union[str, List[str]] = None, - cursor_index :int = None, + cursor_index : Optional[int] = None, preview_command=None, preview_size=0.75, preview_title='Info', header :Union[List[str],str] = None, + explode_on_interrupt :bool = False, + explode_warning :str = '', **kwargs ): """ @@ -66,9 +96,15 @@ class Menu(TerminalMenu): :param preview_title: Title of the preview window :type preview_title: str - param: header one or more header lines for the menu + param header: one or more header lines for the menu type param: string or list + param explode_on_interrupt: This will explicitly handle a ctrl+c instead and return that specific state + type param: bool + + param explode_warning: If explode_on_interrupt is True and this is non-empty, there will be a warning with a user confirmation displayed + type param: str + :param kwargs : any SimpleTerminal parameter """ # we guarantee the inmutability of the options outside the class. @@ -85,6 +121,8 @@ class Menu(TerminalMenu): log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) raise RequirementError("Menu() requires an iterable as option.") + self._default_str = str(_('(default)')) + if isinstance(p_options,dict): options = list(p_options.keys()) else: @@ -103,27 +141,40 @@ class Menu(TerminalMenu): if sort: options = sorted(options) - self.menu_options = options - self.skip = skip - self.default_option = default_option - self.multi = multi + self._menu_options = options + self._skip = skip + self._default_option = default_option + self._multi = multi + self._explode_on_interrupt = explode_on_interrupt + self._explode_warning = explode_warning + menu_title = f'\n{title}\n\n' + if header: - separator = '\n ' if not isinstance(header,(list,tuple)): - header = [header,] - if skip: - menu_title += str(_("Use ESC to skip\n")) - menu_title += separator + separator.join(header) - elif skip: - menu_title += str(_("Use ESC to skip\n\n")) + header = [header] + header = '\n'.join(header) + menu_title += f'\n{header}\n' + + action_info = '' + if skip: + action_info += str(_("Use ESC to skip")) + + if self._explode_on_interrupt: + if len(action_info) > 0: + action_info += '\n' + action_info += str(_('Use CTRL+C to reset current selection\n\n')) + + menu_title += action_info + if default_option: # if a default value was specified we move that one # to the top of the list and mark it as default as well - default = f'{default_option} (default)' - self.menu_options = [default] + [o for o in self.menu_options if default_option != o] + default = f'{default_option} {self._default_str}' + self._menu_options = [default] + [o for o in self._menu_options if default_option != o] + + self._preselection(preset_values,cursor_index) - self.preselection(preset_values,cursor_index) cursor = "> " main_menu_cursor_style = ("fg_cyan", "bold") main_menu_style = ("bg_blue", "fg_gray") @@ -131,8 +182,9 @@ class Menu(TerminalMenu): kwargs['clear_screen'] = kwargs.get('clear_screen',True) kwargs['show_search_hint'] = kwargs.get('show_search_hint',True) kwargs['cycle_cursor'] = kwargs.get('cycle_cursor',True) + super().__init__( - menu_entries=self.menu_options, + menu_entries=self._menu_options, title=menu_title, menu_cursor=cursor, menu_cursor_style=main_menu_cursor_style, @@ -146,31 +198,46 @@ class Menu(TerminalMenu): preview_command=preview_command, preview_size=preview_size, preview_title=preview_title, + explode_on_interrupt=self._explode_on_interrupt, + multi_select_select_on_accept=False, **kwargs, ) - def _show(self): - idx = self.show() + def _show(self) -> MenuSelection: + try: + idx = self.show() + except KeyboardInterrupt: + return MenuSelection(type_=MenuSelectionType.Ctrl_c) + + def check_default(elem): + if self._default_option is not None and f'{self._default_option} {self._default_str}' in elem: + return self._default_option + else: + return elem + if idx is not None: if isinstance(idx, (list, tuple)): - return [self.default_option if ' (default)' in self.menu_options[i] else self.menu_options[i] for i in idx] + results = [] + for i in idx: + option = check_default(self._menu_options[i]) + results.append(option) + return MenuSelection(type_=MenuSelectionType.Selection, value=results) else: - selected = self.menu_options[idx] - if ' (default)' in selected and self.default_option: - return self.default_option - return selected + result = check_default(self._menu_options[idx]) + return MenuSelection(type_=MenuSelectionType.Selection, value=result) else: - if self.default_option: - if self.multi: - return [self.default_option] - else: - return self.default_option - return None - - def run(self): + return MenuSelection(type_=MenuSelectionType.Esc) + + def run(self) -> MenuSelection: ret = self._show() - if ret is None and not self.skip: + if ret.type_ == MenuSelectionType.Ctrl_c: + if self._explode_on_interrupt and len(self._explode_warning) > 0: + response = Menu(self._explode_warning, Menu.yes_no(), skip=False).run() + if response.value == Menu.no(): + return self.run() + + if ret.type_ is not MenuSelectionType.Selection and not self._skip: return self.run() return ret @@ -185,15 +252,15 @@ class Menu(TerminalMenu): pos = self._menu_entries.index(value) self.set_cursor_pos(pos) - def preselection(self,preset_values :list = [],cursor_index :int = None): + def _preselection(self,preset_values :Union[str, List[str]] = [], cursor_index : Optional[int] = None): def from_preset_to_cursor(): if preset_values: # if the value is not extant return 0 as cursor index try: if isinstance(preset_values,str): - self.cursor_index = self.menu_options.index(self.preset_values) + self.cursor_index = self._menu_options.index(self.preset_values) else: # should return an error, but this is smoother - self.cursor_index = self.menu_options.index(self.preset_values[0]) + self.cursor_index = self._menu_options.index(self.preset_values[0]) except ValueError: self.cursor_index = 0 @@ -203,13 +270,13 @@ class Menu(TerminalMenu): return self.preset_values = preset_values - if self.default_option: - if isinstance(preset_values,str) and self.default_option == preset_values: - self.preset_values = f"{preset_values} (default)" - elif isinstance(preset_values,(list,tuple)) and self.default_option in preset_values: - idx = preset_values.index(self.default_option) - self.preset_values[idx] = f"{preset_values[idx]} (default)" - if cursor_index is None or not self.multi: + if self._default_option: + if isinstance(preset_values,str) and self._default_option == preset_values: + self.preset_values = f"{preset_values} {self._default_str}" + elif isinstance(preset_values,(list,tuple)) and self._default_option in preset_values: + idx = preset_values.index(self._default_option) + self.preset_values[idx] = f"{preset_values[idx]} {self._default_str}" + if cursor_index is None or not self._multi: from_preset_to_cursor() - if not self.multi: # Not supported by the infraestructure + if not self._multi: # Not supported by the infraestructure self.preset_values = None diff --git a/archinstall/lib/menu/selection_menu.py b/archinstall/lib/menu/selection_menu.py index b2f99423..57e290f1 100644 --- a/archinstall/lib/menu/selection_menu.py +++ b/archinstall/lib/menu/selection_menu.py @@ -2,23 +2,27 @@ from __future__ import annotations import logging import sys +import pathlib from typing import Callable, Any, List, Iterator, Tuple, Optional, Dict, TYPE_CHECKING -from .menu import Menu +from .menu import Menu, MenuSelectionType from ..locale_helpers import set_keyboard_language from ..output import log from ..translation import Translation +from ..hsm.fido import get_fido2_devices if TYPE_CHECKING: _: Any -def select_archinstall_language(default='English'): + +def select_archinstall_language(preset_value: str) -> Optional[Any]: """ copied from user_interaction/general_conf.py as a temporary measure """ - languages = Translation.get_all_names() - language = Menu(_('Select Archinstall language'), languages, default_option=default).run() - return language + languages = Translation.get_available_lang() + language = Menu(_('Archinstall language'), languages, preset_values=preset_value).run() + return language.value + class Selector: def __init__( @@ -91,6 +95,10 @@ class Selector: self._no_store = no_store @property + def description(self) -> str: + return self._description + + @property def dependencies(self) -> List: return self._dependencies @@ -115,7 +123,7 @@ class Selector: def update_description(self, description :str): self._description = description - def menu_text(self) -> str: + def menu_text(self, padding: int = 0) -> str: if self._description == '': # special menu option for __separator__ return '' @@ -128,14 +136,14 @@ class Selector: current = str(self._current_selection) if current: - padding = 35 - len(str(self._description)) - current = ' ' * padding + f'SET: {current}' - - return f'{self._description} {current}' + padding += 5 + description = str(self._description).ljust(padding, ' ') + current = str(_('set: {}').format(current)) + else: + description = self._description + current = '' - @property - def text(self): - return self.menu_text() + return f'{description} {current}' def set_current_selection(self, current :Optional[str]): self._current_selection = current @@ -262,8 +270,14 @@ class GeneralMenu: return preview() return None + def _get_menu_text_padding(self, entries: List[Selector]): + return max([len(str(selection.description)) for selection in entries]) + def _find_selection(self, selection_name: str) -> Tuple[str, Selector]: - option = [(k, v) for k, v in self._menu_options.items() if v.text.strip() == selection_name.strip()] + enabled_menus = self._menus_to_enable() + padding = self._get_menu_text_padding(list(enabled_menus.values())) + option = [(k, v) for k, v in self._menu_options.items() if v.menu_text(padding).strip() == selection_name.strip()] + if len(option) != 1: raise ValueError(f'Selection not found: {selection_name}') config_name = option[0][0] @@ -275,14 +289,18 @@ class GeneralMenu: # we synch all the options just in case for item in self.list_options(): self.synch(item) - self.post_callback # as all the values can vary i have to exec this callback + + self.post_callback() # as all the values can vary i have to exec this callback cursor_pos = None + while True: # Before continuing, set the preferred keyboard layout/language in the current terminal. # This will just help the user with the next following questions. self._set_kb_language() enabled_menus = self._menus_to_enable() - menu_options = [m.text for m in enabled_menus.values()] + + padding = self._get_menu_text_padding(list(enabled_menus.values())) + menu_options = [m.menu_text(padding) for m in enabled_menus.values()] selection = Menu( _('Set/Modify the below options'), @@ -291,18 +309,31 @@ class GeneralMenu: cursor_index=cursor_pos, preview_command=self._preview_display, preview_size=self.preview_size, - skip_empty_entries=True + skip_empty_entries=True, + skip=False ).run() - if selection and self.auto_cursor: - cursor_pos = menu_options.index(selection) + 1 # before the strip otherwise fails - if cursor_pos >= len(menu_options): - cursor_pos = len(menu_options) - 1 - selection = selection.strip() - if selection: - # if this calls returns false, we exit the menu. We allow for an callback for special processing on realeasing control - if not self._process_selection(selection): - break + if selection.type_ == MenuSelectionType.Selection: + value = selection.value + + if self.auto_cursor: + cursor_pos = menu_options.index(value) + 1 # before the strip otherwise fails + + # in case the new position lands on a "placeholder" we'll skip them as well + while True: + if cursor_pos >= len(menu_options): + cursor_pos = 0 + if len(menu_options[cursor_pos]) > 0: + break + cursor_pos += 1 + + value = value.strip() + + # if this calls returns false, we exit the menu + # we allow for an callback for special processing on realeasing control + if not self._process_selection(value): + break + if not self.is_context_mgr: self.__exit__() @@ -423,15 +454,41 @@ class GeneralMenu: def mandatory_overview(self) -> Tuple[int, int]: mandatory_fields = 0 mandatory_waiting = 0 - for field in self._menu_options: - option = self._menu_options[field] + for field, option in self._menu_options.items(): if option.is_mandatory(): mandatory_fields += 1 if not option.has_selection(): mandatory_waiting += 1 return mandatory_fields, mandatory_waiting - def _select_archinstall_language(self, default_lang): - language = select_archinstall_language(default_lang) - self._translation.activate(language) - return language + def _select_archinstall_language(self, preset_value: str) -> str: + language = select_archinstall_language(preset_value) + if language is not None: + self._translation.activate(language) + return language + + return preset_value + + def _select_hsm(self, preset :Optional[pathlib.Path] = None) -> Optional[pathlib.Path]: + title = _('Select which partitions to mark for formatting:') + title += '\n' + + fido_devices = get_fido2_devices() + + indexes = [] + for index, path in enumerate(fido_devices.keys()): + title += f"{index}: {path} ({fido_devices[path]['manufacturer']} - {fido_devices[path]['product']})" + indexes.append(f"{index}|{fido_devices[path]['product']}") + + title += '\n' + + choice = Menu(title, indexes, multi=False).run() + + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Selection: + selection: Any = choice.value + index = int(selection.split('|',1)[0]) + return pathlib.Path(list(fido_devices.keys())[index]) + + return None diff --git a/archinstall/lib/menu/simple_menu.py b/archinstall/lib/menu/simple_menu.py index a0a241bd..947259eb 100644 --- a/archinstall/lib/menu/simple_menu.py +++ b/archinstall/lib/menu/simple_menu.py @@ -596,7 +596,8 @@ class TerminalMenu: status_bar: Optional[Union[str, Iterable[str], Callable[[str], str]]] = None, status_bar_below_preview: bool = DEFAULT_STATUS_BAR_BELOW_PREVIEW, status_bar_style: Optional[Iterable[str]] = DEFAULT_STATUS_BAR_STYLE, - title: Optional[Union[str, Iterable[str]]] = None + title: Optional[Union[str, Iterable[str]]] = None, + explode_on_interrupt: bool = False ): def extract_shortcuts_menu_entries_and_preview_arguments( entries: Iterable[str], @@ -718,6 +719,7 @@ class TerminalMenu: self._search_case_sensitive = search_case_sensitive self._search_highlight_style = tuple(search_highlight_style) if search_highlight_style is not None else () self._search_key = search_key + self._explode_on_interrupt = explode_on_interrupt self._shortcut_brackets_highlight_style = ( tuple(shortcut_brackets_highlight_style) if shortcut_brackets_highlight_style is not None else () ) @@ -1538,7 +1540,9 @@ class TerminalMenu: # Only append `next_key` if it is a printable character and the first character is not the # `search_start` key self._search.search_text += next_key - except KeyboardInterrupt: + except KeyboardInterrupt as e: + if self._explode_on_interrupt: + raise e menu_was_interrupted = True finally: reset_signal_handling() @@ -1842,6 +1846,12 @@ def get_argumentparser() -> argparse.ArgumentParser: ) parser.add_argument("-t", "--title", action="store", dest="title", help="menu title") parser.add_argument( + "--explode-on-interrupt", + action="store_true", + dest="explode_on_interrupt", + help="Instead of quitting the menu, this will raise the KeyboardInterrupt Exception", + ) + parser.add_argument( "-V", "--version", action="store_true", dest="print_version", help="print the version number and exit" ) parser.add_argument("entries", action="store", nargs="*", help="the menu entries to show") @@ -1971,6 +1981,7 @@ def main() -> None: status_bar_below_preview=args.status_bar_below_preview, status_bar_style=args.status_bar_style, title=args.title, + explode_on_interrupt=args.explode_on_interrupt, ) except (InvalidParameterCombinationError, InvalidStyleError, UnknownMenuEntryError) as e: print(str(e), file=sys.stderr) diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py index 16136177..4f135da5 100644 --- a/archinstall/lib/models/network_configuration.py +++ b/archinstall/lib/models/network_configuration.py @@ -5,6 +5,7 @@ from enum import Enum from typing import List, Optional, Dict, Union, Any, TYPE_CHECKING from ..output import log +from ..storage import storage if TYPE_CHECKING: _: Any @@ -77,7 +78,9 @@ class NetworkConfigurationHandler: installation.copy_iso_network_config( enable_services=True) # Sources the ISO network configuration to the install medium. elif self._configuration.is_network_manager(): - installation.add_additional_packages("networkmanager") + installation.add_additional_packages(["networkmanager"]) + if (profile := storage['arguments'].get('profile')) and profile.is_desktop_profile: + installation.add_additional_packages(["network-manager-applet"]) installation.enable_service('NetworkManager.service') def _backwards_compability_config(self, config: Union[str,Dict[str, str]]) -> Union[List[NetworkConfiguration], NetworkConfiguration, None]: diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py new file mode 100644 index 00000000..6052b73a --- /dev/null +++ b/archinstall/lib/models/users.py @@ -0,0 +1,77 @@ +from dataclasses import dataclass +from typing import Dict, List, Union, Any, TYPE_CHECKING + +if TYPE_CHECKING: + _: Any + + +@dataclass +class User: + username: str + password: str + sudo: bool + + @property + def groups(self) -> List[str]: + # this property should be transferred into a class attr instead + # if it's every going to be used + return [] + + def json(self) -> Dict[str, Any]: + return { + 'username': self.username, + '!password': self.password, + 'sudo': self.sudo + } + + def display(self) -> str: + password = '*' * len(self.password) + return f'{_("Username")}: {self.username:16} {_("Password")}: {password:16} sudo: {str(self.sudo)}' + + @classmethod + def _parse(cls, config_users: List[Dict[str, Any]]) -> List['User']: + users = [] + + for entry in config_users: + username = entry.get('username', None) + password = entry.get('!password', '') + sudo = entry.get('sudo', False) + + if username is None: + continue + + user = User(username, password, sudo) + users.append(user) + + return users + + @classmethod + def _parse_backwards_compatible(cls, config_users: Dict, sudo: bool) -> List['User']: + if len(config_users.keys()) > 0: + username = list(config_users.keys())[0] + password = config_users[username]['!password'] + + if password: + return [User(username, password, sudo)] + + return [] + + @classmethod + def parse_arguments( + cls, + config_users: Union[List[Dict[str, str]], Dict[str, str]], + config_superusers: Union[List[Dict[str, str]], Dict[str, str]] + ) -> List['User']: + users = [] + + # backwards compability + if isinstance(config_users, dict): + users += cls._parse_backwards_compatible(config_users, False) + else: + users += cls._parse(config_users) + + # backwards compability + if isinstance(config_superusers, dict): + users += cls._parse_backwards_compatible(config_superusers, True) + + return users diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index da41d16d..29b73bc4 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -2,11 +2,47 @@ import logging import os import sys from pathlib import Path -from typing import Dict, Union +from typing import Dict, Union, List, Any from .storage import storage +class FormattedOutput: + + @classmethod + def values(cls, o: Any) -> Dict[str, Any]: + if hasattr(o, 'json'): + return o.json() + else: + return o.__dict__ + + @classmethod + def as_table(cls, obj: List[Any]) -> str: + column_width: Dict[str, int] = {} + for o in obj: + for k, v in cls.values(o).items(): + column_width.setdefault(k, 0) + column_width[k] = max([column_width[k], len(str(v)), len(k)]) + + output = '' + for key, width in column_width.items(): + key = key.replace('!', '') + output += key.ljust(width) + ' | ' + + output = output[:-3] + '\n' + output += '-' * len(output) + '\n' + + for o in obj: + for k, v in cls.values(o).items(): + if '!' in k: + v = '*' * len(str(v)) + output += str(v).ljust(column_width[k]) + ' | ' + output = output[:-3] + output += '\n' + + return output + + class Journald: @staticmethod def log(message :str, level :int = logging.DEBUG) -> None: @@ -61,9 +97,11 @@ def stylize_output(text: str, *opts :str, **kwargs) -> str: 'magenta' : '5', 'cyan' : '6', 'white' : '7', - 'orange' : '8;5;208', # Extended 256-bit colors (not always supported) - 'darkorange' : '8;5;202',# https://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html#256-colors + 'teal' : '8;5;109', # Extended 256-bit colors (not always supported) + 'orange' : '8;5;208', # https://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html#256-colors + 'darkorange' : '8;5;202', 'gray' : '8;5;246', + 'grey' : '8;5;246', 'darkgray' : '8;5;240', 'lightgray' : '8;5;256' } diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 7f920317..99e3811c 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -18,7 +18,7 @@ plugins = {} # 1: List archinstall.plugin definitions # 2: Load the plugin entrypoint # 3: Initiate the plugin and store it as .name in plugins -for plugin_definition in metadata.entry_points().get('archinstall.plugin', []): +for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'): plugin_entrypoint = plugin_definition.load() try: plugins[plugin_definition.name] = plugin_entrypoint() diff --git a/archinstall/lib/profiles.py b/archinstall/lib/profiles.py index 65a30b0b..a4fbe490 100644 --- a/archinstall/lib/profiles.py +++ b/archinstall/lib/profiles.py @@ -207,6 +207,14 @@ class Profile(Script): def __repr__(self, *args :str, **kwargs :str) -> str: return f'Profile({os.path.basename(self.profile)})' + @property + def name(self) -> str: + return os.path.basename(self.profile) + + @property + def is_desktop_profile(self) -> bool: + return is_desktop_profile(repr(self)) + def install(self) -> ModuleType: # Before installing, revert any temporary changes to the namespace. # This ensures that the namespace during installation is the original initiation namespace. diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py index 650b9c0e..dd7ddc88 100644 --- a/archinstall/lib/storage.py +++ b/archinstall/lib/storage.py @@ -23,7 +23,7 @@ storage: Dict[str, Any] = { 'MOUNT_POINT': '/mnt/archinstall', 'ENC_IDENTIFIER': 'ainst', 'DISK_TIMEOUTS' : 1, # seconds - 'DISK_RETRY_ATTEMPTS' : 20, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations + 'DISK_RETRY_ATTEMPTS' : 5, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations 'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overriden with set_cmd_locale() 'CMD_LOCALE_DEFAULT':{'LC_ALL':'C'}, # should be the same as the former. Not be used except in reset_cmd_locale() } diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py index 417870da..3d2f0385 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/systemd.py @@ -91,20 +91,25 @@ class Boot: log(f"The error above occured in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red") shutdown = None + shutdown_exit_code = -1 try: shutdown = SysCommand(f'systemd-run --machine={self.container_name} --pty shutdown now') except SysCallError as error: - if error.exit_code == 256: - pass + shutdown_exit_code = error.exit_code + # if error.exit_code == 256: + # pass while self.session.is_alive(): time.sleep(0.25) - if self.session.exit_code == 0 or (shutdown and shutdown.exit_code == 0): + if shutdown: + shutdown_exit_code = shutdown.exit_code + + if self.session.exit_code == 0 or shutdown_exit_code == 0: storage['active_boot'] = None else: - raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {shutdown}", exit_code=shutdown.exit_code) + raise SysCallError(f"Could not shut down temporary boot of {self.instance}: {self.session.exit_code}/{shutdown_exit_code}", exit_code=next(filter(bool, [self.session.exit_code, shutdown_exit_code]))) def __iter__(self) -> Iterator[str]: if self.session: diff --git a/archinstall/lib/translation.py b/archinstall/lib/translation.py index 74ffd691..1a0e94e4 100644 --- a/archinstall/lib/translation.py +++ b/archinstall/lib/translation.py @@ -5,14 +5,14 @@ import os import gettext from pathlib import Path -from typing import List, Dict, Any, TYPE_CHECKING +from typing import List, Dict, Any, TYPE_CHECKING, Tuple from .exceptions import TranslationError if TYPE_CHECKING: _: Any -class Languages: +class LanguageDefinitions: def __init__(self): self._mappings = self._get_language_mappings() @@ -70,11 +70,11 @@ class Translation: def __init__(self, locales_dir): self._languages = {} - for name in self.get_all_names(): + for names in self._get_translation_lang(): try: - self._languages[name] = gettext.translation('base', localedir=locales_dir, languages=[name]) + self._languages[names[0]] = gettext.translation('base', localedir=locales_dir, languages=names) except FileNotFoundError as error: - raise TranslationError(f"Could not locate language file for '{name}': {error}") + raise TranslationError(f"Could not locate language file for '{names}': {error}") def activate(self, name): if language := self._languages.get(name, None): @@ -94,10 +94,19 @@ class Translation: return locales_dir @classmethod - def get_all_names(cls) -> List[str]: + def _defined_languages(cls) -> List[str]: locales_dir = cls.get_locales_dir() filenames = os.listdir(locales_dir) - def_languages = filter(lambda x: len(x) == 2, filenames) + return list(filter(lambda x: len(x) == 2, filenames)) - languages = Languages() + @classmethod + def _get_translation_lang(cls) -> List[Tuple[str, str]]: + def_languages = cls._defined_languages() + languages = LanguageDefinitions() + return [(languages.get_language(lang), lang) for lang in def_languages] + + @classmethod + def get_available_lang(cls) -> List[str]: + def_languages = cls._defined_languages() + languages = LanguageDefinitions() return [languages.get_language(lang) for lang in def_languages] diff --git a/archinstall/lib/udev/__init__.py b/archinstall/lib/udev/__init__.py new file mode 100644 index 00000000..86c8cc29 --- /dev/null +++ b/archinstall/lib/udev/__init__.py @@ -0,0 +1 @@ +from .udevadm import udevadm_info
\ No newline at end of file diff --git a/archinstall/lib/udev/udevadm.py b/archinstall/lib/udev/udevadm.py new file mode 100644 index 00000000..84ec9cfd --- /dev/null +++ b/archinstall/lib/udev/udevadm.py @@ -0,0 +1,17 @@ +import typing +import pathlib +from ..general import SysCommand + +def udevadm_info(path :pathlib.Path) -> typing.Dict[str, str]: + if path.resolve().exists() is False: + return {} + + result = SysCommand(f"udevadm info {path.resolve()}") + data = {} + for line in result: + if b': ' in line and b'=' in line: + _, obj = line.split(b': ', 1) + key, value = obj.split(b'=', 1) + data[key.decode('UTF-8').lower()] = value.decode('UTF-8').strip() + + return data
\ No newline at end of file diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py index b0174d94..8aba4b4d 100644 --- a/archinstall/lib/user_interaction/__init__.py +++ b/archinstall/lib/user_interaction/__init__.py @@ -1,5 +1,5 @@ from .save_conf import save_config -from .manage_users_conf import ask_for_superuser_account, ask_for_additional_users +from .manage_users_conf import ask_for_additional_users from .backwards_compatible_conf import generic_select, generic_multi_select from .locale_conf import select_locale_lang, select_locale_enc from .system_conf import select_kernel, select_harddrives, select_driver, ask_for_bootloader, ask_for_swap diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py index 9238a766..371d052f 100644 --- a/archinstall/lib/user_interaction/disk_conf.py +++ b/archinstall/lib/user_interaction/disk_conf.py @@ -1,18 +1,18 @@ from __future__ import annotations -from typing import Any, Dict, TYPE_CHECKING +from typing import Any, Dict, TYPE_CHECKING, Optional from .partitioning_conf import manage_new_and_existing_partitions, get_default_partition_layout from ..disk import BlockDevice from ..exceptions import DiskError from ..menu import Menu -from ..output import log +from ..menu.menu import MenuSelectionType if TYPE_CHECKING: _: Any -def ask_for_main_filesystem_format(advanced_options=False): +def ask_for_main_filesystem_format(advanced_options=False) -> str: options = {'btrfs': 'btrfs', 'ext4': 'ext4', 'xfs': 'xfs', 'f2fs': 'f2fs'} advanced = {'ntfs': 'ntfs'} @@ -22,7 +22,7 @@ def ask_for_main_filesystem_format(advanced_options=False): prompt = _('Select which filesystem your main partition should use') choice = Menu(prompt, options, skip=False).run() - return choice + return choice.value def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: @@ -30,27 +30,36 @@ def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: for device in block_devices: layout = manage_new_and_existing_partitions(device) - result[device.path] = layout return result -def select_disk_layout(block_devices: list, advanced_options=False) -> Dict[str, Any]: +def select_disk_layout(preset: Optional[Dict[str, Any]], block_devices: list, advanced_options=False) -> Optional[Dict[str, Any]]: wipe_mode = str(_('Wipe all selected drives and use a best-effort default partition layout')) custome_mode = str(_('Select what to do with each individual drive (followed by partition usage)')) modes = [wipe_mode, custome_mode] - print(modes) - mode = Menu(_('Select what you wish to do with the selected block devices'), modes, skip=False).run() + warning = str(_('Are you sure you want to reset this setting?')) + + choice = Menu( + _('Select what you wish to do with the selected block devices'), + modes, + explode_on_interrupt=True, + explode_warning=warning + ).run() - if mode == wipe_mode: - return get_default_partition_layout(block_devices, advanced_options) - else: - return select_individual_blockdevice_usage(block_devices) + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Ctrl_c: return None + case MenuSelectionType.Selection: + if choice.value == wipe_mode: + return get_default_partition_layout(block_devices, advanced_options) + else: + return select_individual_blockdevice_usage(block_devices) -def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> BlockDevice: +def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> Optional[BlockDevice]: """ Asks the user to select a harddrive from the `dict_o_disks` selection. Usually this is combined with :ref:`archinstall.list_drives`. @@ -63,19 +72,15 @@ def select_disk(dict_o_disks: Dict[str, BlockDevice]) -> BlockDevice: """ drives = sorted(list(dict_o_disks.keys())) if len(drives) >= 1: - for index, drive in enumerate(drives): - print( - f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive].device, dict_o_disks[drive]['label']})" - ) + title = str(_('You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)')) + '\n' + title += str(_('Select one of the disks or skip and use /mnt as default')) - log("You can skip selecting a drive and partitioning and use whatever drive-setup is mounted at /mnt (experimental)", - fg="yellow") + choice = Menu(title, drives).run() - drive = Menu('Select one of the disks or skip and use "/mnt" as default"', drives).run() - if not drive: - return drive + if choice.type_ == MenuSelectionType.Esc: + return None - drive = dict_o_disks[drive] + drive = dict_o_disks[choice.value] return drive raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py index c42e9e27..d4dc60db 100644 --- a/archinstall/lib/user_interaction/general_conf.py +++ b/archinstall/lib/user_interaction/general_conf.py @@ -3,6 +3,9 @@ from __future__ import annotations import logging from typing import List, Any, Optional, Dict, TYPE_CHECKING +import archinstall + +from ..menu.menu import MenuSelectionType from ..menu.text_input import TextInput from ..locale_helpers import list_keyboard_languages, list_timezones @@ -22,11 +25,12 @@ def ask_ntp(preset: bool = True) -> bool: prompt = str(_('Would you like to use automatic time synchronization (NTP) with the default time servers?\n')) prompt += str(_('Hardware time and other post-configuration steps might be required in order for NTP to work.\nFor more information, please check the Arch wiki')) if preset: - preset_val = 'yes' + preset_val = Menu.yes() else: - preset_val = 'no' - choice = Menu(prompt, ['yes', 'no'], skip=False, preset_values=preset_val, default_option='yes').run() - return False if choice == 'no' else True + preset_val = Menu.no() + choice = Menu(prompt, Menu.yes_no(), skip=False, preset_values=preset_val, default_option=Menu.yes()).run() + + return False if choice.value == Menu.no() else True def ask_hostname(preset: str = None) -> str: @@ -38,23 +42,31 @@ def ask_for_a_timezone(preset: str = None) -> str: timezones = list_timezones() default = 'UTC' - selected_tz = Menu(_('Select a timezone'), - list(timezones), - skip=False, - preset_values=preset, - default_option=default).run() + choice = Menu( + _('Select a timezone'), + list(timezones), + preset_values=preset, + default_option=default + ).run() - return selected_tz + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Selection: return choice.value def ask_for_audio_selection(desktop: bool = True, preset: str = None) -> str: - audio = 'pipewire' if desktop else 'none' - choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', 'none'] - selected_audio = Menu(_('Choose an audio server'), choices, preset_values=preset, default_option=audio, skip=False).run() - return selected_audio + no_audio = str(_('No audio server')) + choices = ['pipewire', 'pulseaudio'] if desktop else ['pipewire', 'pulseaudio', no_audio] + default = 'pipewire' if desktop else no_audio + + choice = Menu(_('Choose an audio server'), choices, preset_values=preset, default_option=default).run() + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Selection: return choice.value -def select_language(default_value: str, preset_value: str = None) -> str: + +def select_language(preset_value: str = None) -> str: """ Asks the user to select a language Usually this is combined with :ref:`archinstall.list_keyboard_languages`. @@ -64,16 +76,19 @@ def select_language(default_value: str, preset_value: str = None) -> str: """ kb_lang = list_keyboard_languages() # sort alphabetically and then by length - # it's fine if the list is big because the Menu - # allows for searching anyways sorted_kb_lang = sorted(sorted(list(kb_lang)), key=len) - selected_lang = Menu(_('Select Keyboard layout'), - sorted_kb_lang, - default_option=default_value, - preset_values=preset_value, - sort=False).run() - return selected_lang + selected_lang = Menu( + _('Select keyboard layout'), + sorted_kb_lang, + preset_values=preset_value, + sort=False + ).run() + + if selected_lang.value is None: + return preset_value + + return selected_lang.value def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: @@ -89,24 +104,27 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: else: preselected = list(preset_values.keys()) mirrors = list_mirrors() - selected_mirror = Menu(_('Select one of the regions to download packages from'), - list(mirrors.keys()), - preset_values=preselected, - multi=True).run() + selected_mirror = Menu( + _('Select one of the regions to download packages from'), + list(mirrors.keys()), + preset_values=preselected, + multi=True, + explode_on_interrupt=True + ).run() - if selected_mirror is not None: - return {selected: mirrors[selected] for selected in selected_mirror} - - return {} + match selected_mirror.type_: + case MenuSelectionType.Ctrl_c: return {} + case MenuSelectionType.Esc: return preset_values + case _: return {selected: mirrors[selected] for selected in selected_mirror.value} def select_archinstall_language(default='English'): - languages = Translation.get_all_names() - language = Menu(_('Select Archinstall language'), languages, default_option=default).run() + languages = Translation.get_available_lang() + language = Menu(_('Archinstall language'), languages, default_option=default).run() return language -def select_profile() -> Optional[Profile]: +def select_profile(preset) -> Optional[Profile]: """ # Asks the user to select a profile from the available profiles. # @@ -124,13 +142,27 @@ def select_profile() -> Optional[Profile]: options[option] = profile title = _('This is a list of pre-programmed profiles, they might make it easier to install things like desktop environments') - - selection = Menu(title=title, p_options=list(options.keys())).run() - - if selection is not None: - return options[selection] - - return None + warning = str(_('Are you sure you want to reset this setting?')) + + selection = Menu( + title=title, + p_options=list(options.keys()), + explode_on_interrupt=True, + explode_warning=warning + ).run() + + match selection.type_: + case MenuSelectionType.Selection: + return options[selection.value] if selection.value is not None else None + case MenuSelectionType.Ctrl_c: + archinstall.storage['profile_minimal'] = False + archinstall.storage['_selected_servers'] = [] + archinstall.storage['_desktop_profile'] = None + archinstall.arguments['desktop-environment'] = None + archinstall.arguments['gfx_driver_packages'] = None + return None + case MenuSelectionType.Esc: + return None def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List[str]: @@ -171,14 +203,16 @@ def select_additional_repositories(preset: List[str]) -> List[str]: repositories = ["multilib", "testing"] - additional_repositories = Menu(_('Choose which optional additional repositories to enable'), - repositories, - sort=False, - multi=True, - preset_values=preset, - default_option=[]).run() - - if additional_repositories is not None: - return additional_repositories - - return [] + choice = Menu( + _('Choose which optional additional repositories to enable'), + repositories, + sort=False, + multi=True, + preset_values=preset, + explode_on_interrupt=True + ).run() + + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Ctrl_c: return [] + case MenuSelectionType.Selection: return choice.value diff --git a/archinstall/lib/user_interaction/locale_conf.py b/archinstall/lib/user_interaction/locale_conf.py index d48018cf..15720064 100644 --- a/archinstall/lib/user_interaction/locale_conf.py +++ b/archinstall/lib/user_interaction/locale_conf.py @@ -4,32 +4,39 @@ from typing import Any, TYPE_CHECKING from ..locale_helpers import list_locales from ..menu import Menu +from ..menu.menu import MenuSelectionType if TYPE_CHECKING: _: Any -def select_locale_lang(default: str, preset: str = None) -> str: +def select_locale_lang(preset: str = None) -> str: locales = list_locales() locale_lang = set([locale.split()[0] for locale in locales]) - selected_locale = Menu(_('Choose which locale language to use'), - locale_lang, - sort=True, - preset_values=preset, - default_option=default).run() + selected_locale = Menu( + _('Choose which locale language to use'), + list(locale_lang), + sort=True, + preset_values=preset + ).run() - return selected_locale + match selected_locale.type_: + case MenuSelectionType.Selection: return selected_locale.value + case MenuSelectionType.Esc: return preset -def select_locale_enc(default: str, preset: str = None) -> str: +def select_locale_enc(preset: str = None) -> str: locales = list_locales() locale_enc = set([locale.split()[1] for locale in locales]) - selected_locale = Menu(_('Choose which locale encoding to use'), - locale_enc, - sort=True, - preset_values=preset, - default_option=default).run() + selected_locale = Menu( + _('Choose which locale encoding to use'), + list(locale_enc), + sort=True, + preset_values=preset + ).run() - return selected_locale + match selected_locale.type_: + case MenuSelectionType.Selection: return selected_locale.value + case MenuSelectionType.Esc: return preset diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py index a6ff3111..567a2964 100644 --- a/archinstall/lib/user_interaction/manage_users_conf.py +++ b/archinstall/lib/user_interaction/manage_users_conf.py @@ -1,14 +1,12 @@ from __future__ import annotations -import logging import re -from typing import Any, Dict, TYPE_CHECKING, List +from typing import Any, Dict, TYPE_CHECKING, List, Optional +from .utils import get_password from ..menu import Menu from ..menu.list_manager import ListManager -from ..output import log -from ..storage import storage -from .utils import get_password +from ..models.users import User if TYPE_CHECKING: _: Any @@ -19,7 +17,7 @@ class UserList(ListManager): subclass of ListManager for the managing of user accounts """ - def __init__(self, prompt: str, lusers: dict, sudo: bool = None): + def __init__(self, prompt: str, lusers: List[User]): """ param: prompt type: str @@ -27,140 +25,83 @@ class UserList(ListManager): type: Dict param: sudo. boolean to determine if we handle superusers or users. If None handles both types """ - self.sudo = sudo - self.actions = [ + self._actions = [ str(_('Add a user')), str(_('Change password')), str(_('Promote/Demote user')), str(_('Delete User')) ] - super().__init__(prompt, lusers, self.actions, self.actions[0]) - - def reformat(self, data: Any) -> List[Any]: - def format_element(elem :str): - # secret gives away the length of the password - if data[elem].get('!password'): - pwd = '*' * 16 - else: - pwd = '' - if data[elem].get('sudoer'): - super_user = 'Superuser' - else: - super_user = ' ' - return f"{elem:16}: password {pwd:16} {super_user}" + super().__init__(prompt, lusers, self._actions, self._actions[0]) - return list(map(lambda x: format_element(x), data)) + def reformat(self, data: List[User]) -> Dict[str, User]: + return {e.display(): e for e in data} def action_list(self): - if self.target: - active_user = list(self.target.keys())[0] - else: - active_user = None - sudoer = self.target[active_user].get('sudoer', False) - if self.sudo is None: - return self.actions - if self.sudo and sudoer: - return self.actions - elif self.sudo and not sudoer: - return [self.actions[2]] - elif not self.sudo and sudoer: - return [self.actions[2]] + active_user = self.target if self.target else None + + if active_user is None: + return [self._actions[0]] else: - return self.actions + return self._actions[1:] - def exec_action(self, data: Any): + def exec_action(self, data: List[User]) -> List[User]: if self.target: - active_user = list(self.target.keys())[0] + active_user = self.target else: active_user = None - if self.action == self.actions[0]: # add - new_user = self.add_user() - # no unicity check, if exists will be replaced - data.update(new_user) - elif self.action == self.actions[1]: # change password - data[active_user]['!password'] = get_password( - prompt=str(_('Password for user "{}": ').format(active_user))) - elif self.action == self.actions[2]: # promote/demote - data[active_user]['sudoer'] = not data[active_user]['sudoer'] - elif self.action == self.actions[3]: # delete - del data[active_user] + if self.action == self._actions[0]: # add + new_user = self._add_user() + if new_user is not None: + # in case a user with the same username as an existing user + # was created we'll replace the existing one + data = [d for d in data if d.username != new_user.username] + data += [new_user] + elif self.action == self._actions[1]: # change password + prompt = str(_('Password for user "{}": ').format(active_user.username)) + new_password = get_password(prompt=prompt) + if new_password: + user = next(filter(lambda x: x == active_user, data), 1) + user.password = new_password + elif self.action == self._actions[2]: # promote/demote + user = next(filter(lambda x: x == active_user, data), 1) + user.sudo = False if user.sudo else True + elif self.action == self._actions[3]: # delete + data = [d for d in data if d != active_user] + + return data def _check_for_correct_username(self, username: str) -> bool: if re.match(r'^[a-z_][a-z0-9_-]*\$?$', username) and len(username) <= 32: return True - log("The username you entered is invalid. Try again", level=logging.WARNING, fg='red') return False - def add_user(self): + def _add_user(self) -> Optional[User]: print(_('\nDefine a new user\n')) - prompt = str(_("User Name : ")) + prompt = str(_('Enter username (leave blank to skip): ')) + while True: - userid = input(prompt).strip(' ') - if not userid: - return {} # end - if not self._check_for_correct_username(userid): - pass + username = input(prompt).strip(' ') + if not username: + return None + if not self._check_for_correct_username(username): + prompt = str(_("The username you entered is invalid. Try again")) + '\n' + prompt else: break - if self.sudo: - sudoer = True - elif self.sudo is not None and not self.sudo: - sudoer = False - else: - sudoer = False - sudo_choice = Menu(str(_('Should {} be a superuser (sudoer)?')).format(userid), ['yes', 'no'], - skip=False, - preset_values='yes' if sudoer else 'no', - default_option='no').run() - sudoer = True if sudo_choice == 'yes' else False - - password = get_password(prompt=str(_('Password for user "{}": ').format(userid))) - - return {userid: {"!password": password, "sudoer": sudoer}} - - -def manage_users(prompt: str, sudo: bool) -> tuple[dict, dict]: - # TODO Filtering and some kind of simpler code - lusers = {} - if storage['arguments'].get('!superusers', {}): - lusers.update({ - uid: { - '!password': storage['arguments']['!superusers'][uid].get('!password'), - 'sudoer': True - } - for uid in storage['arguments'].get('!superusers', {}) - }) - if storage['arguments'].get('!users', {}): - lusers.update({ - uid: { - '!password': storage['arguments']['!users'][uid].get('!password'), - 'sudoer': False - } - for uid in storage['arguments'].get('!users', {}) - }) - # processing - lusers = UserList(prompt, lusers, sudo).run() - # return data - superusers = { - uid: { - '!password': lusers[uid].get('!password') - } - for uid in lusers if lusers[uid].get('sudoer', False) - } - users = {uid: {'!password': lusers[uid].get('!password')} for uid in lusers if not lusers[uid].get('sudoer', False)} - storage['arguments']['!superusers'] = superusers - storage['arguments']['!users'] = users - return superusers, users - - -def ask_for_superuser_account(prompt: str) -> Dict[str, Dict[str, str]]: - prompt = prompt if prompt else str(_('Define users with sudo privilege, by username: ')) - superusers, dummy = manage_users(prompt, sudo=True) - return superusers - - -def ask_for_additional_users(prompt: str = '') -> Dict[str, Dict[str, str | None]]: - prompt = prompt if prompt else _('Any additional users to install (leave blank for no users): ') - dummy, users = manage_users(prompt, sudo=False) + + password = get_password(prompt=str(_('Password for user "{}": ').format(username))) + + choice = Menu( + str(_('Should "{}" be a superuser (sudo)?')).format(username), Menu.yes_no(), + skip=False, + default_option=Menu.no() + ).run() + + sudo = True if choice.value == Menu.yes() else False + return User(username, password, sudo) + + +def ask_for_additional_users(prompt: str = '', defined_users: List[User] = []) -> List[User]: + prompt = prompt if prompt else _('Enter username (leave blank to skip): ') + users = UserList(prompt, defined_users).run() return users diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py index 80c9106b..5154d8b1 100644 --- a/archinstall/lib/user_interaction/network_conf.py +++ b/archinstall/lib/user_interaction/network_conf.py @@ -4,6 +4,7 @@ import ipaddress import logging from typing import Any, Optional, TYPE_CHECKING, List, Union +from ..menu.menu import MenuSelectionType from ..menu.text_input import TextInput from ..models.network_configuration import NetworkConfiguration, NicType @@ -63,11 +64,17 @@ class ManualNetworkConfig(ListManager): elif self.action == self._action_delete: del data[iface_name] - def _select_iface(self, existing_ifaces: List[str]) -> Optional[str]: + return data + + def _select_iface(self, existing_ifaces: List[str]) -> Optional[Any]: all_ifaces = list_interfaces().values() available = set(all_ifaces) - set(existing_ifaces) - iface = Menu(str(_('Select interface to add')), list(available), skip=True).run() - return iface + choice = Menu(str(_('Select interface to add')), list(available), skip=True).run() + + if choice.type_ == MenuSelectionType.Esc: + return None + + return choice.value def _edit_iface(self, edit_iface :NetworkConfiguration): iface_name = edit_iface.iface @@ -75,9 +82,9 @@ class ManualNetworkConfig(ListManager): default_mode = 'DHCP (auto detect)' prompt = _('Select which mode to configure for "{}" or skip to use default mode "{}"').format(iface_name, default_mode) - mode = Menu(prompt, modes, default_option=default_mode).run() + mode = Menu(prompt, modes, default_option=default_mode, skip=False).run() - if mode == 'IP (static)': + if mode.value == 'IP (static)': while 1: prompt = _('Enter the IP and subnet for {} (example: 192.168.0.5/24): ').format(iface_name) ip = TextInput(prompt, edit_iface.ip).run().strip() @@ -89,14 +96,14 @@ class ManualNetworkConfig(ListManager): log("You need to enter a valid IP in IP-config mode.", level=logging.WARNING, fg='red') # Implemented new check for correct gateway IP address + gateway = None + while 1: - gateway = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '), + gateway_input = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '), edit_iface.gateway).run().strip() try: - if len(gateway) == 0: - gateway = None - else: - ipaddress.ip_address(gateway) + if len(gateway_input) > 0: + ipaddress.ip_address(gateway_input) break except ValueError: log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red') @@ -107,6 +114,7 @@ class ManualNetworkConfig(ListManager): display_dns = None dns_input = TextInput(_('Enter your DNS servers (space separated, blank for none): '), display_dns).run().strip() + dns = [] if len(dns_input): dns = dns_input.split(' ') @@ -135,23 +143,28 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw elif preset.type == 'network_manager': cursor_idx = 1 - nic = Menu(_( - 'Select one network interface to configure'), + warning = str(_('Are you sure you want to reset this setting?')) + + choice = Menu( + _('Select one network interface to configure'), list(network_options.values()), cursor_index=cursor_idx, - sort=False + sort=False, + explode_on_interrupt=True, + explode_warning=warning ).run() - if not nic: - return preset + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Ctrl_c: return None - if nic == network_options['none']: + if choice.value == network_options['none']: return None - elif nic == network_options['iso_config']: + elif choice.value == network_options['iso_config']: return NetworkConfiguration(NicType.ISO) - elif nic == network_options['network_manager']: + elif choice.value == network_options['network_manager']: return NetworkConfiguration(NicType.NM) - elif nic == network_options['manual']: + elif choice.value == network_options['manual']: manual = ManualNetworkConfig('Configure interfaces', preset) return manual.run_manual() diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py index af1d224f..bfff5705 100644 --- a/archinstall/lib/user_interaction/partitioning_conf.py +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -1,12 +1,13 @@ from __future__ import annotations -from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable +import copy +from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable, Optional from ..menu import Menu +from ..menu.menu import MenuSelectionType from ..output import log from ..disk.validators import fs_types -from ..disk.helpers import has_mountpoint if TYPE_CHECKING: from ..disk import BlockDevice @@ -19,9 +20,9 @@ def partition_overlap(partitions: list, start: str, end: str) -> bool: return False -def _current_partition_layout(partitions: List[Partition], with_idx: bool = False) -> str: +def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool = False, with_title: bool = True) -> str: - def do_padding(name, max_len): + def do_padding(name: str, max_len: int): spaces = abs(len(str(name)) - max_len) + 2 pad_left = int(spaces / 2) pad_right = spaces - pad_left @@ -61,39 +62,54 @@ def _current_partition_layout(partitions: List[Partition], with_idx: bool = Fals current_layout += f'{row[:-1]}\n' - title = str(_('Current partition layout')) - return f'\n\n{title}:\n\n{current_layout}' + if with_title: + title = str(_('Current partition layout')) + return f'\n\n{title}:\n\n{current_layout}' + return current_layout -def select_partition(title :str, partitions :List[Partition], multiple :bool = False, filter :Callable = None) -> Union[int, List[int], None]: + +def _get_partitions(partitions :List[Partition], filter_ :Callable = None) -> List[str]: """ filter allows to filter out the indexes once they are set. Should return True if element is to be included """ partition_indexes = [] for i in range(len(partitions)): - if filter: - if filter(partitions[i]): + if filter_: + if filter_(partitions[i]): partition_indexes.append(str(i)) else: partition_indexes.append(str(i)) + + return partition_indexes + + +def select_partition( + title :str, + partitions :List[Partition], + multiple :bool = False, + filter_ :Callable = None +) -> Optional[int, List[int]]: + partition_indexes = _get_partitions(partitions, filter_) + if len(partition_indexes) == 0: return None - # old code without filter - # partition_indexes = list(map(str, range(len(partitions)))) - partition = Menu(title, partition_indexes, multi=multiple).run() + choice = Menu(title, partition_indexes, multi=multiple).run() - if partition is not None: - if isinstance(partition, list): - return [int(p) for p in partition] - else: - return int(partition) + if choice.type_ == MenuSelectionType.Esc: + return None - return None + if isinstance(choice.value, list): + return [int(p) for p in choice.value] + else: + return int(choice.value) -def get_default_partition_layout(block_devices: Union['BlockDevice', List['BlockDevice']], - advanced_options: bool = False) -> Dict[str, Any]: +def get_default_partition_layout( + block_devices: Union['BlockDevice', List['BlockDevice']], + advanced_options: bool = False +) -> Optional[Dict[str, Any]]: from ..disk import suggest_single_disk_layout, suggest_multi_disk_layout if len(block_devices) == 1: @@ -107,14 +123,15 @@ def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: for device in block_devices: layout = manage_new_and_existing_partitions(device) - result[device.path] = layout return result -def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50 +def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50 block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]} + original_layout = copy.deepcopy(block_device_struct) + # Test code: [part.__dump__() for part in block_device.partitions.values()] # TODO: Squeeze in BTRFS subvolumes here @@ -129,11 +146,13 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, mark_bootable = str(_('Mark/Unmark a partition as bootable (automatic for /boot)')) set_filesystem_partition = str(_('Set desired filesystem for a partition')) set_btrfs_subvolumes = str(_('Set desired subvolumes on a btrfs partition')) + save_and_exit = str(_('Save and exit')) + cancel = str(_('Cancel')) while True: modes = [new_partition, suggest_partition_layout] - if len(block_device_struct['partitions']): + if len(block_device_struct['partitions']) > 0: modes += [ delete_partition, delete_all_partitions, @@ -143,20 +162,31 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, mark_bootable, mark_compressed, set_filesystem_partition, - set_btrfs_subvolumes, ] + indexes = _get_partitions( + block_device_struct["partitions"], + filter_=lambda x: True if x.get('filesystem', {}).get('format') == 'btrfs' else False + ) + + if len(indexes) > 0: + modes += [set_btrfs_subvolumes] + title = _('Select what to do with\n{}').format(block_device) # show current partition layout: if len(block_device_struct["partitions"]): - title += _current_partition_layout(block_device_struct['partitions']) + '\n' + title += current_partition_layout(block_device_struct['partitions']) + '\n' - task = Menu(title, modes, sort=False).run() + modes += [save_and_exit, cancel] - if not task: - break + task = Menu(title, modes, sort=False, skip=False).run() + task = task.value + if task == cancel: + return original_layout + elif task == save_and_exit: + break if task == new_partition: from ..disk import valid_parted_position @@ -165,7 +195,10 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, # # https://www.gnu.org/software/parted/manual/html_node/mklabel.html # name = input("Enter a desired name for the partition: ").strip() - fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), skip=False).run() + fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run() + + if fs_choice.type_ == MenuSelectionType.Esc: + continue prompt = _('Enter the start sector (percentage or block number, default: {}): ').format( block_device.first_free_sector) @@ -197,7 +230,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, "mountpoint": None, "wipe": True, "filesystem": { - "format": fstype + "format": fs_choice.value } }) else: @@ -208,18 +241,15 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, from ..disk import suggest_single_disk_layout if len(block_device_struct["partitions"]): - prompt = _('{} contains queued partitions, this will remove those, are you sure?').format(block_device) - choice = Menu(prompt, ['yes', 'no'], default_option='no').run() + prompt = _('{}\ncontains queued partitions, this will remove those, are you sure?').format(block_device) + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run() - if choice == 'no': + if choice.value == Menu.no(): continue block_device_struct.update(suggest_single_disk_layout(block_device)[block_device.path]) - - elif task is None: - return block_device_struct else: - current_layout = _current_partition_layout(block_device_struct['partitions'], with_idx=True) + current_layout = current_partition_layout(block_device_struct['partitions'], with_idx=True) if task == delete_partition: title = _('{}\n\nSelect by index which partitions to delete').format(current_layout) @@ -243,15 +273,14 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, block_device_struct["partitions"][partition]["filesystem"]["mount_options"].append("compress=zstd") elif task == delete_all_partitions: block_device_struct["partitions"] = [] + block_device_struct["wipe"] = True elif task == assign_mount_point: title = _('{}\n\nSelect by index which partition to mount where').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: - print( - _(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) - mountpoint = input( - _('Select where to mount partition (leave blank to remove mountpoint): ')).strip() + print(_(' * Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + mountpoint = input(_('Select where to mount partition (leave blank to remove mountpoint): ')).strip() if len(mountpoint): block_device_struct["partitions"][partition]['mountpoint'] = mountpoint @@ -273,14 +302,13 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, if not block_device_struct["partitions"][partition].get('filesystem', None): block_device_struct["partitions"][partition]['filesystem'] = {} - fstype = Menu(_('Enter a desired filesystem type for the partition'), fs_types(), - skip=False).run() + fs_choice = Menu(_('Enter a desired filesystem type for the partition'), fs_types()).run() - block_device_struct["partitions"][partition]['filesystem']['format'] = fstype + if fs_choice.type_ == MenuSelectionType.Selection: + block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value # Negate the current wipe marking - block_device_struct["partitions"][partition][ - 'wipe'] = not block_device_struct["partitions"][partition].get('wipe', False) + block_device_struct["partitions"][partition]['wipe'] = not block_device_struct["partitions"][partition].get('wipe', False) elif task == mark_encrypted: title = _('{}\n\nSelect which partition to mark as encrypted').format(current_layout) @@ -288,16 +316,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, if partition is not None: # Negate the current encryption marking - block_device_struct["partitions"][partition][ - 'encrypted'] = not block_device_struct["partitions"][partition].get('encrypted', False) + block_device_struct["partitions"][partition]['encrypted'] = \ + not block_device_struct["partitions"][partition].get('encrypted', False) elif task == mark_bootable: title = _('{}\n\nSelect which partition to mark as bootable').format(current_layout) partition = select_partition(title, block_device_struct["partitions"]) if partition is not None: - block_device_struct["partitions"][partition][ - 'boot'] = not block_device_struct["partitions"][partition].get('boot', False) + block_device_struct["partitions"][partition]['boot'] = \ + not block_device_struct["partitions"][partition].get('boot', False) elif task == set_filesystem_partition: title = _('{}\n\nSelect which partition to set a filesystem on').format(current_layout) @@ -308,16 +336,18 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, block_device_struct["partitions"][partition]['filesystem'] = {} fstype_title = _('Enter a desired filesystem type for the partition: ') - fstype = Menu(fstype_title, fs_types(), skip=False).run() + fs_choice = Menu(fstype_title, fs_types()).run() - block_device_struct["partitions"][partition]['filesystem']['format'] = fstype + if fs_choice.type_ == MenuSelectionType.Selection: + block_device_struct["partitions"][partition]['filesystem']['format'] = fs_choice.value elif task == set_btrfs_subvolumes: from .subvolume_config import SubvolumeList # TODO get preexisting partitions title = _('{}\n\nSelect which partition to set subvolumes on').format(current_layout) - partition = select_partition(title, block_device_struct["partitions"],filter=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False) + partition = select_partition(title, block_device_struct["partitions"],filter_=lambda x:True if x.get('filesystem',{}).get('format') == 'btrfs' else False) + if partition is not None: if not block_device_struct["partitions"][partition].get('btrfs', {}): block_device_struct["partitions"][partition]['btrfs'] = {} @@ -333,19 +363,30 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, return block_device_struct +def select_encrypted_partitions( + title :str, + partitions :List[Partition], + multiple :bool = True, + filter_ :Callable = None +) -> Optional[int, List[int]]: + partition_indexes = _get_partitions(partitions, filter_) -def select_encrypted_partitions(block_devices: dict, password: str) -> dict: - for device in block_devices: - for partition in block_devices[device]['partitions']: - if partition.get('mountpoint', None) != '/boot': - partition['encrypted'] = True - partition['!password'] = password + if len(partition_indexes) == 0: + return None + + title = _('Select which partitions to mark for formatting:') - if not has_mountpoint(partition,'/'): - # Tell the upcoming steps to generate a key-file for non root mounts. - partition['generate-encryption-key-file'] = True + # show current partition layout: + if len(partitions): + title += current_partition_layout(partitions) + '\n' - return block_devices + choice = Menu(title, partition_indexes, multi=multiple).run() - # TODO: Next version perhaps we can support mixed multiple encrypted partitions - # Users might want to single out a partition for non-encryption to share between dualboot etc. + if choice.type_ == MenuSelectionType.Esc: + return None + + if isinstance(choice.value, list): + for partition_index in choice.value: + yield int(partition_index) + else: + yield (partition_index) diff --git a/archinstall/lib/user_interaction/save_conf.py b/archinstall/lib/user_interaction/save_conf.py index c52b97e2..f542bc9b 100644 --- a/archinstall/lib/user_interaction/save_conf.py +++ b/archinstall/lib/user_interaction/save_conf.py @@ -5,6 +5,7 @@ from typing import Any, Dict, TYPE_CHECKING from ..configuration import ConfigurationOutput from ..menu import Menu +from ..menu.menu import MenuSelectionType from ..output import log if TYPE_CHECKING: @@ -45,14 +46,16 @@ def save_config(config: Dict): 'all': str(_('Save all')) } - selection = Menu(_('Choose which configuration to save'), - list(options.values()), - sort=False, - skip=True, - preview_size=0.75, - preview_command=preview).run() + choice = Menu( + _('Choose which configuration to save'), + list(options.values()), + sort=False, + skip=True, + preview_size=0.75, + preview_command=preview + ).run() - if not selection: + if choice.type_ == MenuSelectionType.Esc: return while True: @@ -62,13 +65,13 @@ def save_config(config: Dict): break log(_('Not a valid directory: {}').format(dest_path), fg='red') - if options['user_config'] == selection: + if options['user_config'] == choice.value: config_output.save_user_config(dest_path) - elif options['user_creds'] == selection: + elif options['user_creds'] == choice.value: config_output.save_user_creds(dest_path) - elif options['disk_layout'] == selection: + elif options['disk_layout'] == choice.value: config_output.save_disk_layout(dest_path) - elif options['all'] == selection: + elif options['all'] == choice.value: config_output.save_user_config(dest_path) config_output.save_user_creds(dest_path) - config_output.save_disk_layout + config_output.save_disk_layout(dest_path) diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py index 0515876b..af783639 100644 --- a/archinstall/lib/user_interaction/subvolume_config.py +++ b/archinstall/lib/user_interaction/subvolume_config.py @@ -1,9 +1,11 @@ -from typing import List, Any, Dict +from typing import Dict, List from ..menu.list_manager import ListManager +from ..menu.menu import MenuSelectionType from ..menu.selection_menu import Selector, GeneralMenu from ..menu.text_input import TextInput from ..menu import Menu + """ UI classes """ @@ -14,7 +16,7 @@ class SubvolumeList(ListManager): self.ObjectDefaultAction = str(_('Add')) super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction) - def reformat(self, data: Any) -> List[Any]: + def reformat(self, data: Dict) -> Dict: def presentation(key :str, value :Dict): text = _(" Subvolume :{:16}").format(key) if isinstance(value,str): @@ -24,18 +26,20 @@ class SubvolumeList(ListManager): text += _(" mounted at {:16}").format(value['mountpoint']) else: text += (' ' * 28) + if value.get('options',[]): text += _(" with option {}").format(', '.join(value['options'])) return text - return sorted(list(map(lambda x:presentation(x,data[x]),data))) + formatted = {presentation(k, v): k for k, v in data.items()} + return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])} def action_list(self): return super().action_list() - def exec_action(self, data: Any): + def exec_action(self, data: Dict): if self.target: - origkey,origval = list(self.target.items())[0] + origkey, origval = list(self.target.items())[0] else: origkey = None @@ -46,13 +50,15 @@ class SubvolumeList(ListManager): self.target = {} print(_('\n Fill the desired values for a new subvolume \n')) with SubvolumeMenu(self.target,self.action) as add_menu: - for data in ['name','mountpoint','options']: - add_menu.exec_option(data) + for elem in ['name','mountpoint','options']: + add_menu.exec_option(elem) else: SubvolumeMenu(self.target,self.action).run() data.update(self.target) + return data + class SubvolumeMenu(GeneralMenu): def __init__(self,parameters,action=None): @@ -124,7 +130,17 @@ class SubvolumeMenu(GeneralMenu): def _select_subvolume_mount_point(self,value): return TextInput(str(_("Select a mount point :")),value).run() - def _select_subvolume_options(self,value): + def _select_subvolume_options(self,value) -> List[str]: # def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True): - return Menu(str(_("Select the desired subvolume options ")),['nodatacow','compress'], - skip=True,preset_values=value,multi=True).run() + choice = Menu( + str(_("Select the desired subvolume options ")), + ['nodatacow','compress'], + skip=True, + preset_values=value, + multi=True + ).run() + + if choice.type_ == MenuSelectionType.Selection: + return choice.value + + return [] diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py index 0284dc5f..78daa6a5 100644 --- a/archinstall/lib/user_interaction/system_conf.py +++ b/archinstall/lib/user_interaction/system_conf.py @@ -6,10 +6,9 @@ from ..disk import all_blockdevices from ..exceptions import RequirementError from ..hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics from ..menu import Menu +from ..menu.menu import MenuSelectionType from ..storage import storage -from ..translation import DeferredTranslation - if TYPE_CHECKING: _: Any @@ -25,13 +24,22 @@ def select_kernel(preset: List[str] = None) -> List[str]: kernels = ["linux", "linux-lts", "linux-zen", "linux-pae"] default_kernel = "linux" - selected_kernels = Menu(_('Choose which kernels to use or leave blank for default "{}"').format(default_kernel), - kernels, - sort=True, - multi=True, - preset_values=preset, - default_option=default_kernel).run() - return selected_kernels + warning = str(_('Are you sure you want to reset this setting?')) + + choice = Menu( + _('Choose which kernels to use or leave blank for default "{}"').format(default_kernel), + kernels, + sort=True, + multi=True, + preset_values=preset, + explode_on_interrupt=True, + explode_warning=warning + ).run() + + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Ctrl_c: return [] + case MenuSelectionType.Selection: return choice.value def select_harddrives(preset: List[str] = []) -> List[str]: @@ -49,18 +57,27 @@ def select_harddrives(preset: List[str] = []) -> List[str]: else: preset_disks = {} - selected_harddrive = Menu(_('Select one or more hard drives to use and configure'), - list(options.keys()), - preset_values=list(preset_disks.keys()), - multi=True).run() + title = str(_('Select one or more hard drives to use and configure\n')) + title += str(_('Any modifications to the existing setting will reset the disk layout!')) - if selected_harddrive and len(selected_harddrive) > 0: - return [options[i] for i in selected_harddrive] + warning = str(_('If you reset the harddrive selection this will also reset the current disk layout. Are you sure?')) - return [] + selected_harddrive = Menu( + title, + list(options.keys()), + preset_values=list(preset_disks.keys()), + multi=True, + explode_on_interrupt=True, + explode_warning=warning + ).run() + match selected_harddrive.type_: + case MenuSelectionType.Ctrl_c: return [] + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Selection: return [options[i] for i in selected_harddrive.value] -def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask: bool = False) -> str: + +def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS) -> str: """ Some what convoluted function, whose job is simple. Select a graphics driver from a pre-defined set of popular options. @@ -73,72 +90,85 @@ def select_driver(options: Dict[str, Any] = AVAILABLE_GFX_DRIVERS, force_ask: bo if drivers: arguments = storage.get('arguments', {}) - title = DeferredTranslation('') + title = '' if has_amd_graphics(): - title += _( + title += str(_( 'For the best compatibility with your AMD hardware, you may want to use either the all open-source or AMD / ATI options.' - ) + '\n' + )) + '\n' if has_intel_graphics(): - title += _( + title += str(_( 'For the best compatibility with your Intel hardware, you may want to use either the all open-source or Intel options.\n' - ) + )) if has_nvidia_graphics(): - title += _( + title += str(_( 'For the best compatibility with your Nvidia hardware, you may want to use the Nvidia proprietary driver.\n' - ) + )) - if not arguments.get('gfx_driver', None) or force_ask: - title += _('\n\nSelect a graphics driver or leave blank to install all open-source drivers') - arguments['gfx_driver'] = Menu(title, drivers).run() + title += str(_('\n\nSelect a graphics driver or leave blank to install all open-source drivers')) + choice = Menu(title, drivers).run() - if arguments.get('gfx_driver', None) is None: - arguments['gfx_driver'] = _("All open-source (default)") + if choice.type_ != MenuSelectionType.Selection: + return arguments.get('gfx_driver') - return options.get(arguments.get('gfx_driver')) + arguments['gfx_driver'] = choice.value + return options.get(choice.value) raise RequirementError("Selecting drivers require a least one profile to be given as an option.") def ask_for_bootloader(advanced_options: bool = False, preset: str = None) -> str: - if preset == 'systemd-bootctl': - preset_val = 'systemd-boot' if advanced_options else 'no' + preset_val = 'systemd-boot' if advanced_options else Menu.no() elif preset == 'grub-install': - preset_val = 'grub' if advanced_options else 'yes' + preset_val = 'grub' if advanced_options else Menu.yes() else: preset_val = preset bootloader = "systemd-bootctl" if has_uefi() else "grub-install" + if has_uefi(): if not advanced_options: - bootloader_choice = Menu(_('Would you like to use GRUB as a bootloader instead of systemd-boot?'), - ['yes', 'no'], - preset_values=preset_val, - default_option='no').run() - - if bootloader_choice == "yes": - bootloader = "grub-install" + selection = Menu( + _('Would you like to use GRUB as a bootloader instead of systemd-boot?'), + Menu.yes_no(), + preset_values=preset_val, + default_option=Menu.no() + ).run() + + match selection.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Selection: bootloader = 'grub-install' if selection.value == Menu.yes() else bootloader else: # We use the common names for the bootloader as the selection, and map it back to the expected values. choices = ['systemd-boot', 'grub', 'efistub'] selection = Menu(_('Choose a bootloader'), choices, preset_values=preset_val).run() - if selection != "": - if selection == 'systemd-boot': + + value = '' + match selection.type_: + case MenuSelectionType.Esc: value = preset_val + case MenuSelectionType.Selection: value = selection.value + + if value != "": + if value == 'systemd-boot': bootloader = 'systemd-bootctl' - elif selection == 'grub': + elif value == 'grub': bootloader = 'grub-install' else: - bootloader = selection + bootloader = value return bootloader def ask_for_swap(preset: bool = True) -> bool: if preset: - preset_val = 'yes' + preset_val = Menu.yes() else: - preset_val = 'no' + preset_val = Menu.no() + prompt = _('Would you like to use swap on zram?') - choice = Menu(prompt, ['yes', 'no'], default_option='yes', preset_values=preset_val).run() - return False if choice == 'no' else True + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), preset_values=preset_val).run() + + match choice.type_: + case MenuSelectionType.Esc: return preset + case MenuSelectionType.Selection: return False if choice.value == Menu.no() else True diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py index 48b55e8c..fa079bc2 100644 --- a/archinstall/lib/user_interaction/utils.py +++ b/archinstall/lib/user_interaction/utils.py @@ -28,12 +28,9 @@ def check_password_strong(passwd: str) -> bool: symbol_count += 40 if symbol_count**len(passwd) < 10e20: - - prompt = _("The password you are using seems to be weak,") - prompt += _("are you sure you want to use it?") - - choice = Menu(prompt, ["yes", "no"], default_option="yes").run() - return choice == "yes" + prompt = str(_("The password you are using seems to be weak, are you sure you want to use it?")) + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run() + return choice.value == Menu.yes() return True @@ -43,7 +40,6 @@ def get_password(prompt: str = '') -> Optional[str]: prompt = _("Enter a password: ") while passwd := getpass.getpass(prompt): - if len(passwd.strip()) <= 0: break @@ -56,6 +52,7 @@ def get_password(prompt: str = '') -> Optional[str]: continue return passwd + return None @@ -84,12 +81,13 @@ def do_countdown() -> bool: if SIG_TRIGGER: prompt = _('Do you really want to abort?') - choice = Menu(prompt, ['yes', 'no'], skip=False).run() - if choice == 'yes': + choice = Menu(prompt, Menu.yes_no(), skip=False).run() + if choice.value == Menu.yes(): exit(0) if SIG_TRIGGER is False: sys.stdin.read() + SIG_TRIGGER = False signal.signal(signal.SIGINT, sig_handler) |