From a7ca037a26de53fd242f89bc6a90fd53337b4d13 Mon Sep 17 00:00:00 2001 From: Daniel Girtler Date: Tue, 7 Jun 2022 01:28:46 +1000 Subject: Update the subvolume menu - fix for #1278 (#1297) * Update subvolume * Add mypy compliance Co-authored-by: Daniel Girtler Co-authored-by: Anton Hvornum --- archinstall/lib/disk/btrfs/__init__.py | 130 +------------ archinstall/lib/disk/btrfs/btrfs_helpers.py | 94 ++++------ archinstall/lib/disk/btrfs/btrfspartition.py | 6 +- archinstall/lib/disk/btrfs/btrfssubvolume.py | 191 -------------------- archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py | 192 ++++++++++++++++++++ archinstall/lib/disk/helpers.py | 9 +- archinstall/lib/disk/mapperdev.py | 10 +- archinstall/lib/disk/partition.py | 10 +- archinstall/lib/disk/user_guides.py | 19 +- archinstall/lib/installer.py | 55 ++---- archinstall/lib/menu/global_menu.py | 33 ++-- archinstall/lib/menu/list_manager.py | 39 ++-- archinstall/lib/models/subvolume.py | 68 +++++++ archinstall/lib/models/users.py | 6 +- .../lib/user_interaction/partitioning_conf.py | 12 +- .../lib/user_interaction/subvolume_config.py | 201 +++++++-------------- 16 files changed, 458 insertions(+), 617 deletions(-) delete mode 100644 archinstall/lib/disk/btrfs/btrfssubvolume.py create mode 100644 archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py create mode 100644 archinstall/lib/models/subvolume.py (limited to 'archinstall/lib') diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py index 90c58145..3c183112 100644 --- a/archinstall/lib/disk/btrfs/__init__.py +++ b/archinstall/lib/disk/btrfs/__init__.py @@ -2,8 +2,7 @@ from __future__ import annotations import pathlib import glob import logging -import re -from typing import Union, Dict, TYPE_CHECKING, Any, Iterator +from typing import Union, Dict, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -15,30 +14,15 @@ from .btrfs_helpers import ( setup_subvolumes as setup_subvolumes, mount_subvolume as mount_subvolume ) -from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume +from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume from .btrfspartition import BTRFSPartition as BTRFSPartition -from ..helpers import get_mount_info from ...exceptions import DiskError, Deprecated from ...general import SysCommand from ...output import log -from ...exceptions import SysCallError -def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: - try: - output = SysCommand(f"btrfs subvol show {path}").decode() - except SysCallError as error: - print('Error:', error) - result = {} - for line in output.replace('\r\n', '\n').split('\n'): - if ':' in line: - key, val = line.replace('\t', '').split(':', 1) - result[key.strip().lower().replace(' ', '_')] = val.strip() - - return result - -def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -71,112 +55,6 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib. if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") -def _has_option(option :str,options :list) -> bool: - """ auxiliary routine to check if an option is present in a list. - we check if the string appears in one of the options, 'cause it can appear in several forms (option, option=val,...) - """ - if not options: - return False - - for item in options: - if option in item: - return True - - return False - -def manage_btrfs_subvolumes(installation :Installer, - partition :Dict[str, str],) -> list: +def manage_btrfs_subvolumes(installation :Installer, partition :Dict[str, str]) -> list: raise Deprecated("Use setup_subvolumes() instead.") - - from copy import deepcopy - """ we do the magic with subvolumes in a centralized place - parameters: - * the installation object - * the partition dictionary entry which represents the physical partition - returns - * mountpoinst, the list which contains all the "new" partititon to be mounted - - We expect the partition has been mounted as / , and it to be unmounted after the processing - Then we create all the subvolumes inside btrfs as demand - We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs - Then we return a list of "new" partitions to be processed as "normal" partitions - # TODO For encrypted devices we need some special processing prior to it - """ - # We process each of the pairs - # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list - # of mount options (or similar used by brtfs) - mountpoints = [] - subvolumes = partition['btrfs']['subvolumes'] - for name, right_hand in subvolumes.items(): - try: - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use - if name.startswith('/'): - name = name[1:] - # renormalize the right hand. - location = None - subvol_options = [] - # no contents, so it is not to be mounted - if not right_hand: - location = None - # just a string. per backward compatibility the mount point - elif isinstance(right_hand,str): - location = right_hand - # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿? - elif isinstance(right_hand,dict): - location = right_hand.get('mountpoint',None) - subvol_options = right_hand.get('options',[]) - # we create the subvolume - create_subvolume(installation,name) - # Make the nodatacow processing now - # It will be the main cause of creation of subvolumes which are not to be mounted - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in subvol_options: - if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del subvol_options[subvol_options.index('nodatacow')] - # Make the compress processing now - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - # in this way only zstd compression is activaded - # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in subvol_options: - if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])): - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so compress doesn't propagate to the mount options - del subvol_options[subvol_options.index('compress')] - # END compress processing. - # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume - if not partition['mountpoint'] and location is not None: - # we begin to create a fake partition entry. First we copy the original -the one that corresponds to - # the primary partition. We make a deepcopy to avoid altering the original content in any case - fake_partition = deepcopy(partition) - # we start to modify entries in the "fake partition" to match the needs of the subvolumes - # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy - del fake_partition['btrfs'] - fake_partition['encrypted'] = False - fake_partition['generate-encryption-key-file'] = False - # Mount destination. As of now the right hand part - fake_partition['mountpoint'] = location - # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path. - fake_partition['subvolume'] = name - # here we add the special mount options for the subvolume, if any. - # if the original partition['options'] is not a list might give trouble - if fake_partition.get('filesystem',{}).get('mount_options',[]): - fake_partition['filesystem']['mount_options'].extend(subvol_options) - else: - fake_partition['filesystem']['mount_options'] = subvol_options - # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. - # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes - # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless - fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" - - # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, - # as "normal" ones - mountpoints.append(fake_partition) - except Exception as e: - raise e - return mountpoints diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py index 5fa94314..ab528388 100644 --- a/archinstall/lib/disk/btrfs/btrfs_helpers.py +++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py @@ -1,72 +1,42 @@ -import pathlib import logging -from typing import Optional +from pathlib import Path +from typing import Optional, Dict, Any, TYPE_CHECKING +from ...models.subvolume import Subvolume from ...exceptions import SysCallError, DiskError from ...general import SysCommand from ...output import log from ..helpers import get_mount_info -from .btrfssubvolume import BtrfsSubvolume +from .btrfssubvolumeinfo import BtrfsSubvolumeInfo +if TYPE_CHECKING: + from .btrfspartition import BTRFSPartition + from ...installer import Installer -def mount_subvolume(installation, device, name, subvolume_information): - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = name.lstrip('/') - - # renormalize the right hand. - mountpoint = subvolume_information.get('mountpoint', None) - if not mountpoint: - return None - - if type(mountpoint) == str: - mountpoint = pathlib.Path(mountpoint) - installation_target = installation.target - if type(installation_target) == str: - installation_target = pathlib.Path(installation_target) +def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume): + # we normalize the subvolume name (getting rid of slash at the start if exists. + # In our implementation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = subvolume.name.lstrip('/') + mountpoint = Path(subvolume.mountpoint) + installation_target = Path(installation.target) mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) mountpoint.mkdir(parents=True, exist_ok=True) - - mount_options = subvolume_information.get('options', []) - if not any('subvol=' in x for x in mount_options): - mount_options += [f'subvol={name}'] + mount_options = subvolume.options + [f'subvol={name}'] log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") -def setup_subvolumes(installation, partition_dict): - """ - Taken from: ..user_guides.py - - partition['btrfs'] = { - "subvolumes" : { - "@": "/", - "@home": "/home", - "@log": "/var/log", - "@pkg": "/var/cache/pacman/pkg", - "@.snapshots": "/.snapshots" - } - } - """ +def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]): log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") - for name, right_hand in partition_dict['btrfs']['subvolumes'].items(): + + for subvolume in partition_dict['btrfs']['subvolumes']: # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = name.lstrip('/') - - # renormalize the right hand. - # mountpoint = None - subvol_options = [] - - match right_hand: - # case str(): # backwards-compatability - # mountpoint = right_hand - case dict(): - # mountpoint = right_hand.get('mountpoint', None) - subvol_options = right_hand.get('options', []) + name = subvolume.name.lstrip('/') # We create the subvolume using the BTRFSPartition instance. # That way we ensure not only easy access, but also accurate mount locations etc. @@ -76,27 +46,25 @@ def setup_subvolumes(installation, partition_dict): # It will be the main cause of creation of subvolumes which are not to be mounted # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in subvol_options: + if subvolume.nodatacow: if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del subvol_options[subvol_options.index('nodatacow')] + # Make the compress processing now # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new # in this way only zstd compression is activaded # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in subvol_options: + if subvolume.compress: if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so compress doesn't propagate to the mount options - del subvol_options[subvol_options.index('compress')] -def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: + +def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]: try: - subvolume_name = None + subvolume_name = '' result = {} for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): if index == 0: @@ -110,14 +78,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: # allows for hooking in a pre-processor to do this we have to do it here: result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() - return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result}) - + return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore except SysCallError as error: log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") return None -def find_parent_subvolume(path :pathlib.Path, filters=[]): + +def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]: # A root path cannot have a parent if str(path) == '/': return None @@ -127,6 +95,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]): if found_mount['target'] == '/': return None - return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']]) + return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']]) + + return subvolume - return subvolume \ No newline at end of file + return None diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py index 6f7487e4..a05f1527 100644 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ b/archinstall/lib/disk/btrfs/btrfspartition.py @@ -15,7 +15,7 @@ from .btrfs_helpers import ( if TYPE_CHECKING: from ...installer import Installer - from .btrfssubvolume import BtrfsSubvolume + from .btrfssubvolumeinfo import BtrfsSubvolumeInfo class BTRFSPartition(Partition): def __init__(self, *args, **kwargs): @@ -50,7 +50,7 @@ class BTRFSPartition(Partition): for child in iterate_children(filesystem): yield child - def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume': + def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo': """ Subvolumes have to be created within a mountpoint. This means we need to get the current installation target. @@ -117,4 +117,4 @@ class BTRFSPartition(Partition): # And deal with it here: SysCommand(f"btrfs subvolume create {subvolume}") - return subvolume_info_from_path(subvolume) \ No newline at end of file + return subvolume_info_from_path(subvolume) diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolume.py deleted file mode 100644 index bc7db612..00000000 --- a/archinstall/lib/disk/btrfs/btrfssubvolume.py +++ /dev/null @@ -1,191 +0,0 @@ -import pathlib -import datetime -import logging -import string -import random -import shutil -from dataclasses import dataclass -from typing import Optional, List# , TYPE_CHECKING -from functools import cached_property - -# if TYPE_CHECKING: -# from ..blockdevice import BlockDevice - -from ...exceptions import DiskError -from ...general import SysCommand -from ...output import log -from ...storage import storage - -@dataclass -class BtrfsSubvolume: - full_path :pathlib.Path - name :str - uuid :str - parent_uuid :str - creation_time :datetime.datetime - subvolume_id :int - generation :int - gen_at_creation :int - parent_id :int - top_level_id :int - send_transid :int - send_time :datetime.datetime - receive_transid :int - received_uuid :Optional[str] = None - flags :Optional[str] = None - receive_time :Optional[datetime.datetime] = None - snapshots :Optional[List] = None - - def __post_init__(self): - self.full_path = pathlib.Path(self.full_path) - - # Convert "-" entries to `None` - if self.parent_uuid == "-": - self.parent_uuid = None - if self.received_uuid == "-": - self.received_uuid = None - if self.flags == "-": - self.flags = None - if self.receive_time == "-": - self.receive_time = None - if self.snapshots == "": - self.snapshots = [] - - # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) - self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) - self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) - if self.receive_time: - self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) - - @property - def parent_subvolume(self): - from .btrfs_helpers import find_parent_subvolume - - return find_parent_subvolume(self.full_path) - - @property - def root(self) -> bool: - from .btrfs_helpers import subvolume_info_from_path - - # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first - # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. - # It would also be nice if it could use findmnt(self.full_path) and traverse backwards - # finding the last occurrence of a subvolume which 'self' belongs to. - if volume := subvolume_info_from_path(storage['MOUNT_POINT']): - return self.full_path == volume.full_path - - return False - - @cached_property - def partition(self): - from ..helpers import findmnt, get_parent_of_partition, all_blockdevices - from ..partition import Partition - from ..blockdevice import BlockDevice - from ..mapperdev import MapperDev - from .btrfspartition import BTRFSPartition - from .btrfs_helpers import subvolume_info_from_path - - try: - # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. - if filesystem := findmnt(self.full_path).get('filesystems', []): - if source := filesystem[0].get('source', None): - # Strip away subvolume definitions from findmnt - if '[' in source: - source = source[:source.find('[')] - - if filesystem[0].get('fstype', '') == 'btrfs': - return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - elif filesystem[0].get('source', '').startswith('/dev/mapper'): - return MapperDev(source) - else: - return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) - except DiskError: - # Subvolume has never been mounted, we have no reliable way of finding where it is. - # But we have the UUID of the partition, and can begin looking for it by mounting - # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. - - log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) - for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): - if type(instance) in (Partition, MapperDev): - we_mounted_it = False - detection_mountpoint = instance.mountpoint - if not detection_mountpoint: - if type(instance) == Partition and instance.encrypted: - # TODO: Perhaps support unlocking encrypted volumes? - # This will cause a lot of potential user interactions tho. - log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) - continue - - detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") - detection_mountpoint.mkdir(parents=True, exist_ok=True) - - instance.mount(str(detection_mountpoint)) - we_mounted_it = True - - if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): - if subvolume := subvolume_info_from_path(filesystem[0]['target']): - if subvolume.uuid == self.uuid: - # The top level subvolume matched of ourselves, - # which means the instance we're iterating has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - def iterate_children(struct): - for child in struct.get('children', []): - if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) - - for sub_child in iterate_children(child): - yield sub_child - - for child in iterate_children(filesystem[0]): - if child.uuid == self.uuid: - # We found a child within the instance that has the subvol we're looking for. - log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") - return instance - - if we_mounted_it: - instance.unmount() - shutil.rmtree(detection_mountpoint) - - @cached_property - def mount_options(self) -> Optional[List[str]]: - from ..helpers import findmnt - - if filesystem := findmnt(self.full_path).get('filesystems', []): - return filesystem[0].get('options').split(',') - - def convert_to_ISO_format(self, time_string): - time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') - iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" - return iso_string - - def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): - from ..helpers import findmnt - - try: - if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): - log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") - SysCommand(f"umount {mountpoint}") - except DiskError: - # No previously mounted device at the mountpoint - pass - - if not options: - options = [] - - try: - if include_previously_known_options and (cached_options := self.mount_options): - options += cached_options - except DiskError: - pass - - if not any('subvol=' in x for x in options): - options += f'subvol={self.name}' - - SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") - log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") - - def unmount(self, recurse :bool = True): - SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") - log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file diff --git a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py new file mode 100644 index 00000000..5f5bdea6 --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py @@ -0,0 +1,192 @@ +import pathlib +import datetime +import logging +import string +import random +import shutil +from dataclasses import dataclass +from typing import Optional, List# , TYPE_CHECKING +from functools import cached_property + +# if TYPE_CHECKING: +# from ..blockdevice import BlockDevice + +from ...exceptions import DiskError +from ...general import SysCommand +from ...output import log +from ...storage import storage + + +@dataclass +class BtrfsSubvolumeInfo: + full_path :pathlib.Path + name :str + uuid :str + parent_uuid :str + creation_time :datetime.datetime + subvolume_id :int + generation :int + gen_at_creation :int + parent_id :int + top_level_id :int + send_transid :int + send_time :datetime.datetime + receive_transid :int + received_uuid :Optional[str] = None + flags :Optional[str] = None + receive_time :Optional[datetime.datetime] = None + snapshots :Optional[List] = None + + def __post_init__(self): + self.full_path = pathlib.Path(self.full_path) + + # Convert "-" entries to `None` + if self.parent_uuid == "-": + self.parent_uuid = None + if self.received_uuid == "-": + self.received_uuid = None + if self.flags == "-": + self.flags = None + if self.receive_time == "-": + self.receive_time = None + if self.snapshots == "": + self.snapshots = [] + + # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) + self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) + self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) + if self.receive_time: + self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) + + @property + def parent_subvolume(self): + from .btrfs_helpers import find_parent_subvolume + + return find_parent_subvolume(self.full_path) + + @property + def root(self) -> bool: + from .btrfs_helpers import subvolume_info_from_path + + # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first + # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. + # It would also be nice if it could use findmnt(self.full_path) and traverse backwards + # finding the last occurrence of a subvolume which 'self' belongs to. + if volume := subvolume_info_from_path(storage['MOUNT_POINT']): + return self.full_path == volume.full_path + + return False + + @cached_property + def partition(self): + from ..helpers import findmnt, get_parent_of_partition, all_blockdevices + from ..partition import Partition + from ..blockdevice import BlockDevice + from ..mapperdev import MapperDev + from .btrfspartition import BTRFSPartition + from .btrfs_helpers import subvolume_info_from_path + + try: + # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. + if filesystem := findmnt(self.full_path).get('filesystems', []): + if source := filesystem[0].get('source', None): + # Strip away subvolume definitions from findmnt + if '[' in source: + source = source[:source.find('[')] + + if filesystem[0].get('fstype', '') == 'btrfs': + return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) + elif filesystem[0].get('source', '').startswith('/dev/mapper'): + return MapperDev(source) + else: + return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) + except DiskError: + # Subvolume has never been mounted, we have no reliable way of finding where it is. + # But we have the UUID of the partition, and can begin looking for it by mounting + # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. + + log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) + for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): + if type(instance) in (Partition, MapperDev): + we_mounted_it = False + detection_mountpoint = instance.mountpoint + if not detection_mountpoint: + if type(instance) == Partition and instance.encrypted: + # TODO: Perhaps support unlocking encrypted volumes? + # This will cause a lot of potential user interactions tho. + log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) + continue + + detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") + detection_mountpoint.mkdir(parents=True, exist_ok=True) + + instance.mount(str(detection_mountpoint)) + we_mounted_it = True + + if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): + if subvolume := subvolume_info_from_path(filesystem[0]['target']): + if subvolume.uuid == self.uuid: + # The top level subvolume matched of ourselves, + # which means the instance we're iterating has the subvol we're looking for. + log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") + return instance + + def iterate_children(struct): + for child in struct.get('children', []): + if '[' in child.get('source', ''): + yield subvolume_info_from_path(child['target']) + + for sub_child in iterate_children(child): + yield sub_child + + for child in iterate_children(filesystem[0]): + if child.uuid == self.uuid: + # We found a child within the instance that has the subvol we're looking for. + log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") + return instance + + if we_mounted_it: + instance.unmount() + shutil.rmtree(detection_mountpoint) + + @cached_property + def mount_options(self) -> Optional[List[str]]: + from ..helpers import findmnt + + if filesystem := findmnt(self.full_path).get('filesystems', []): + return filesystem[0].get('options').split(',') + + def convert_to_ISO_format(self, time_string): + time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') + iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" + return iso_string + + def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): + from ..helpers import findmnt + + try: + if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): + log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") + SysCommand(f"umount {mountpoint}") + except DiskError: + # No previously mounted device at the mountpoint + pass + + if not options: + options = [] + + try: + if include_previously_known_options and (cached_options := self.mount_options): + options += cached_options + except DiskError: + pass + + if not any('subvol=' in x for x in options): + options += f'subvol={self.name}' + + SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") + log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") + + def unmount(self, recurse :bool = True): + SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") + log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 85c0390f..660594ed 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -8,6 +8,8 @@ import time import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .partition import Partition @@ -469,6 +471,7 @@ def convert_device_to_uuid(path :str) -> str: raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.") + def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool: """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path) Coded for clarity rather than performance @@ -485,10 +488,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri """ # we create the mountpoint list if isinstance(partition,dict): - subvols = partition.get('btrfs',{}).get('subvolumes',{}) - mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols] + subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', []) + mountpoints = [partition.get('mountpoint')] + mountpoints += [volume.mountpoint for volume in subvolumes] else: mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes] + # we check if strict or target == '/': if target in mountpoints: diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py index 913dbc13..49137ae9 100644 --- a/archinstall/lib/disk/mapperdev.py +++ b/archinstall/lib/disk/mapperdev.py @@ -10,7 +10,7 @@ from ..general import SysCommand from ..output import log if TYPE_CHECKING: - from .btrfs import BtrfsSubvolume + from .btrfs import BtrfsSubvolumeInfo @dataclass class MapperDev: @@ -37,12 +37,12 @@ class MapperDev: for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name - + try: uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() except SysCallError as error: log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") - + information = uevent(uevent_data) block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) @@ -75,10 +75,10 @@ class MapperDev: return get_filesystem_type(self.path) @property - def subvolumes(self) -> Iterator['BtrfsSubvolume']: + def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']: from .btrfs import subvolume_info_from_path for mountpoint in self.mount_information: if target := mountpoint.get('target'): if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume \ No newline at end of file + yield subvolume diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 2c9f50c2..6f25a5f7 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -14,7 +14,7 @@ from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log from ..general import SysCommand from .btrfs.btrfs_helpers import subvolume_info_from_path -from .btrfs.btrfssubvolume import BtrfsSubvolume +from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo class Partition: def __init__(self, @@ -185,7 +185,7 @@ class Partition: for i in range(storage['DISK_RETRY_ATTEMPTS']): if not self.partprobe(): raise DiskError(f"Could not perform partprobe on {self.device_path}") - + time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) partuuid = self._safe_part_uuid @@ -294,9 +294,9 @@ class Partition: return bind_name @property - def subvolumes(self) -> Iterator[BtrfsSubvolume]: + def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]: from .helpers import findmnt - + def iterate_children_recursively(information): for child in information.get('children', []): if target := child.get('target'): @@ -452,7 +452,7 @@ class Partition: if retry is True: log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") time.sleep(storage.get('DISK_TIMEOUTS', 1)) - + return self.format(filesystem, path, log_formatting, options, retry=False) if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 5fa6bfdc..5809c073 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -3,6 +3,8 @@ import logging from typing import Optional, Dict, Any, List, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .blockdevice import BlockDevice _: Any @@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice, # https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash # https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh layout[block_device.path]['partitions'][1]['btrfs'] = { - "subvolumes" : { - "@":"/", - "@home": "/home", - "@log": "/var/log", - "@pkg": "/var/cache/pacman/pkg", - "@.snapshots": "/.snapshots" - } + 'subvolumes': [ + Subvolume('@', '/'), + Subvolume('@home', '/home'), + Subvolume('@log', '/var/log'), + Subvolume('@pkg', '/var/cache/pacman/pkg'), + Subvolume('@.snapshots', '/.snapshots') + ] } - # else: - # pass # ... implement a guided setup - elif using_home_partition: # If we don't want to use subvolumes, # But we want to be able to re-use data between re-installs.. diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index bf296c2e..97c2492d 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -24,6 +24,7 @@ from .disk.partition import get_mount_fs_type from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError from .hsm import fido2_enroll from .models.users import User +from .models.subvolume import Subvolume if TYPE_CHECKING: _: Any @@ -263,47 +264,25 @@ class Installer: hsm_device_path = storage['arguments']['HSM'] fido2_enroll(hsm_device_path, partition['device_instance'], password) - # we manage the btrfs partitions - if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]): - for partition in btrfs_subvolumes: - if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])): - self.mount(partition['device_instance'], "/", options=mount_options) - else: - self.mount(partition['device_instance'], "/") - - setup_subvolumes( - installation=self, - partition_dict=partition - ) + btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])] - partition['device_instance'].unmount() + for partition in btrfs_subvolumes: + device_instance = partition['device_instance'] + mount_options = partition.get('filesystem', {}).get('mount_options', []) + self.mount(device_instance, "/", options=','.join(mount_options)) + setup_subvolumes(installation=self, partition_dict=partition) + device_instance.unmount() # We then handle any special cases, such as btrfs - if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]): - for partition_information in btrfs_subvolumes: - for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]): - btrfs_subvolume_information = {} - - match mountpoint: - case str(): # backwards-compatability - btrfs_subvolume_information['mountpoint'] = mountpoint - btrfs_subvolume_information['options'] = [] - case dict(): - btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None) - btrfs_subvolume_information['options'] = mountpoint.get('options', []) - case _: - continue - - if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'): - # We cache the mount call for later - mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \ - name=name, \ - subvolume_information=btrfs_subvolume_information: mount_subvolume( - installation=self, - device=device, - name=name, - subvolume_information=subvolume_information - ) + for partition in btrfs_subvolumes: + subvolumes: List[Subvolume] = partition['btrfs']['subvolumes'] + for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint): + # We cache the mount call for later + mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume( + installation=self, + device=device, + subvolume=sub_vol + ) # We mount ordinary partitions, and we sort them by the mountpoint for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']): diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py index cb61168d..49083517 100644 --- a/archinstall/lib/menu/global_menu.py +++ b/archinstall/lib/menu/global_menu.py @@ -325,22 +325,23 @@ class GlobalMenu(GeneralMenu): def _select_harddrives(self, old_harddrives : list) -> List: harddrives = select_harddrives(old_harddrives) - if len(harddrives) == 0: - prompt = _( - "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n" - "WARNING: Archinstall won't check the suitability of this setup\n" - "Do you wish to continue?" - ).format(storage['MOUNT_POINT']) - - choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run() - - if choice.value == Menu.no(): - return self._select_harddrives(old_harddrives) - - # in case the harddrives got changed we have to reset the disk layout as well - if old_harddrives != harddrives: - self._menu_options['disk_layouts'].set_current_selection(None) - storage['arguments']['disk_layouts'] = {} + if harddrives: + if len(harddrives) == 0: + prompt = _( + "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n" + "WARNING: Archinstall won't check the suitability of this setup\n" + "Do you wish to continue?" + ).format(storage['MOUNT_POINT']) + + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run() + + if choice.value == Menu.no(): + return self._select_harddrives(old_harddrives) + + # in case the harddrives got changed we have to reset the disk layout as well + if old_harddrives != harddrives: + self._menu_options['disk_layouts'].set_current_selection(None) + storage['arguments']['disk_layouts'] = {} return harddrives diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py index 7e051528..40d01ce3 100644 --- a/archinstall/lib/menu/list_manager.py +++ b/archinstall/lib/menu/list_manager.py @@ -137,34 +137,35 @@ class ListManager: else: self._default_action = [str(default_action),] - self.header = header if header else None - self.cancel_action = str(_('Cancel')) - self.confirm_action = str(_('Confirm and exit')) - self.separator = '' - self.bottom_list = [self.confirm_action,self.cancel_action] - self.bottom_item = [self.cancel_action] - self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))] + self._header = header if header else None + self._cancel_action = str(_('Cancel')) + self._confirm_action = str(_('Confirm and exit')) + self._separator = '' + self._bottom_list = [self._confirm_action, self._cancel_action] + self._bottom_item = [self._cancel_action] + self._base_actions = base_actions if base_actions else [str(_('Add')), str(_('Copy')), str(_('Edit')), str(_('Delete'))] self._original_data = copy.deepcopy(base_list) self._data = copy.deepcopy(base_list) # as refs, changes are immediate + # default values for the null case self.target: Optional[Any] = None self.action = self._null_action - if len(self._data) == 0 and self._null_action: - self._data = self.exec_action(self._data) - def run(self): while True: # this will return a dictionary with the key as the menu entry to be displayed # and the value is the original value from the self._data container + data_formatted = self.reformat(self._data) options = list(data_formatted.keys()) - options.append(self.separator) + + if len(options) > 0: + options.append(self._separator) if self._default_action: options += self._default_action - options += self.bottom_list + options += self._bottom_list system('clear') @@ -174,12 +175,12 @@ class ListManager: sort=False, clear_screen=False, clear_menu_on_exit=False, - header=self.header, + header=self._header, skip_empty_entries=True, skip=False ).run() - if not target.value or target.value in self.bottom_list: + if not target.value or target.value in self._bottom_list: self.action = target break @@ -201,13 +202,13 @@ class ListManager: # Possible enhancement. If run_actions returns false a message line indicating the failure self.run_actions(target.value) - if target.value == self.cancel_action: # TODO dubious + if target.value == self._cancel_action: # TODO dubious return self._original_data # return the original list else: return self._data def run_actions(self,prompt_data=None): - options = self.action_list() + self.bottom_item + options = self.action_list() + self._bottom_item prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target) choice = Menu( prompt, @@ -215,13 +216,13 @@ class ListManager: sort=False, clear_screen=False, clear_menu_on_exit=False, - preset_values=self.bottom_item, + preset_values=self._bottom_item, show_search_hint=False ).run() self.action = choice.value - if self.action and self.action != self.cancel_action: + if self.action and self.action != self._cancel_action: self._data = self.exec_action(self._data) """ @@ -243,7 +244,7 @@ class ListManager: can define alternate action list or customize the list for each item. Executed after any item is selected, contained in self.target """ - return self.base_actions + return self._base_actions def exec_action(self, data: Any): """ diff --git a/archinstall/lib/models/subvolume.py b/archinstall/lib/models/subvolume.py new file mode 100644 index 00000000..34a09227 --- /dev/null +++ b/archinstall/lib/models/subvolume.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass +from typing import List, Any, Dict + + +@dataclass +class Subvolume: + name: str + mountpoint: str + compress: bool = False + nodatacow: bool = False + + def display(self) -> str: + options_str = ','.join(self.options) + return f'{_("Subvolume")}: {self.name:15} {_("Mountpoint")}: {self.mountpoint:20} {_("Options")}: {options_str}' + + @property + def options(self) -> List[str]: + options = [ + 'compress' if self.compress else '', + 'nodatacow' if self.nodatacow else '' + ] + return [o for o in options if len(o)] + + def json(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'mountpoint': self.mountpoint, + 'compress': self.compress, + 'nodatacow': self.nodatacow + } + + @classmethod + def _parse(cls, config_subvolumes: List[Dict[str, Any]]) -> List['Subvolume']: + subvolumes = [] + for entry in config_subvolumes: + if not entry.get('name', None) or not entry.get('mountpoint', None): + continue + + subvolumes.append( + Subvolume( + entry['name'], + entry['mountpoint'], + entry.get('compress', False), + entry.get('nodatacow', False) + ) + ) + + return subvolumes + + @classmethod + def _parse_backwards_compatible(cls, config_subvolumes) -> List['Subvolume']: + subvolumes = [] + for name, mountpoint in config_subvolumes.items(): + if not name or not mountpoint: + continue + + subvolumes.append(Subvolume(name, mountpoint)) + + return subvolumes + + @classmethod + def parse_arguments(cls, config_subvolumes: Any) -> List['Subvolume']: + if isinstance(config_subvolumes, list): + return cls._parse(config_subvolumes) + elif isinstance(config_subvolumes, dict): + return cls._parse_backwards_compatible(config_subvolumes) + + raise ValueError('Unknown disk layout btrfs subvolume format') diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py index f72cabde..a8feb9ef 100644 --- a/archinstall/lib/models/users.py +++ b/archinstall/lib/models/users.py @@ -27,8 +27,10 @@ class User: } def display(self) -> str: - strength = PasswordStrength.strength(self.password) - password = '*' * len(self.password) + f' ({strength.value})' + password = '*' * (len(self.password) if self.password else 0) + if password: + strength = PasswordStrength.strength(self.password) + password += f' ({strength.value})' return f'{_("Username")}: {self.username:16} {_("Password")}: {password:20} sudo: {str(self.sudo)}' @classmethod diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py index caf5f5df..63b7c7df 100644 --- a/archinstall/lib/user_interaction/partitioning_conf.py +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -351,18 +351,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, if partition is not None: if not block_device_struct["partitions"][partition].get('btrfs', {}): block_device_struct["partitions"][partition]['btrfs'] = {} - if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}): - block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {} + if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []): + block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = [] prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes'] - result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run() - if result: - block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result - else: - del block_device_struct["partitions"][partition]['btrfs'] + result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run() + block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result return block_device_struct + def select_encrypted_partitions( title :str, partitions :List[Partition], diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py index 94e6f5d7..a54ec891 100644 --- a/archinstall/lib/user_interaction/subvolume_config.py +++ b/archinstall/lib/user_interaction/subvolume_config.py @@ -1,155 +1,94 @@ -from typing import Dict, List +from typing import Dict, List, Optional, Any, TYPE_CHECKING from ..menu.list_manager import ListManager from ..menu.menu import MenuSelectionType -from ..menu.selection_menu import Selector, GeneralMenu from ..menu.text_input import TextInput from ..menu import Menu +from ..models.subvolume import Subvolume + +if TYPE_CHECKING: + _: Any -""" -UI classes -""" class SubvolumeList(ListManager): - def __init__(self,prompt,list): - self.ObjectNullAction = None # str(_('Add')) - self.ObjectDefaultAction = str(_('Add')) - super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction) - - def reformat(self, data: Dict) -> Dict: - def presentation(key :str, value :Dict): - text = _(" Subvolume :{:16}").format(key) - if isinstance(value,str): - text += _(" mounted at {:16}").format(value) - else: - if value.get('mountpoint'): - text += _(" mounted at {:16}").format(value['mountpoint']) - else: - text += (' ' * 28) - - if value.get('options',[]): - text += _(" with option {}").format(', '.join(value['options'])) - return text - - formatted = {presentation(k, v): k for k, v in data.items()} - return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])} + def __init__(self, prompt: str, current_volumes: List[Subvolume]): + self._actions = [ + str(_('Add subvolume')), + str(_('Edit subvolume')), + str(_('Delete subvolume')) + ] + super().__init__(prompt, current_volumes, self._actions, self._actions[0]) - def action_list(self): - return super().action_list() + def reformat(self, data: List[Subvolume]) -> Dict[str, Subvolume]: + return {e.display(): e for e in data} - def exec_action(self, data: Dict): - if self.target: - origkey, origval = list(self.target.items())[0] - else: - origkey = None + def action_list(self): + active_user = self.target if self.target else None - if self.action == str(_('Delete')): - del data[origkey] + if active_user is None: + return [self._actions[0]] else: - if self.action == str(_('Add')): - self.target = {} - print(_('\n Fill the desired values for a new subvolume \n')) - with SubvolumeMenu(self.target,self.action) as add_menu: - for elem in ['name','mountpoint','options']: - add_menu.exec_option(elem) - else: - SubvolumeMenu(self.target,self.action).run() + return self._actions[1:] - data.update(self.target) + def _prompt_options(self, editing: Optional[Subvolume] = None) -> List[str]: + preset_options = [] + if editing: + preset_options = editing.options - return data - - -class SubvolumeMenu(GeneralMenu): - def __init__(self,parameters,action=None): - self.data = parameters - self.action = action - self.ds = {} - self.ds['name'] = None - self.ds['mountpoint'] = None - self.ds['options'] = None - if self.data: - origkey,origval = list(self.data.items())[0] - self.ds['name'] = origkey - if isinstance(origval,str): - self.ds['mountpoint'] = origval - else: - self.ds['mountpoint'] = self.data[origkey].get('mountpoint') - self.ds['options'] = self.data[origkey].get('options') - - super().__init__(data_store=self.ds) - - def _setup_selection_menu_options(self): - self._menu_options['name'] = Selector( - str(_('Subvolume name ')), - self._select_subvolume_name if not self.action or self.action in (str(_('Add')), str(_('Copy'))) else None, - mandatory=True, - enabled=True) - - self._menu_options['mountpoint'] = Selector( - str(_('Subvolume mountpoint')), - self._select_subvolume_mount_point if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None, - enabled=True) - - self._menu_options['options'] = Selector( - str(_('Subvolume options')), - self._select_subvolume_options if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None, - enabled=True) - - self._menu_options['save'] = Selector( - str(_('Save')), - exec_func=lambda n,v:True, - enabled=True) - - self._menu_options['cancel'] = Selector( - str(_('Cancel')), - # func = lambda pre:True, - exec_func=lambda n,v:self.fast_exit(n), - enabled=True) - - self.cancel_action = 'cancel' - self.save_action = 'save' - self.bottom_list = [self.save_action,self.cancel_action] - - def fast_exit(self,accion): - if self.option(accion).get_selection(): - for item in self.list_options(): - if self.option(item).is_mandatory(): - self.option(item).set_mandatory(False) - return True - - def exit_callback(self): - # we exit without moving data - if self.option(self.cancel_action).get_selection(): - return - if not self.ds['name']: - return - else: - key = self.ds['name'] - value = {} - if self.ds['mountpoint']: - value['mountpoint'] = self.ds['mountpoint'] - if self.ds['options']: - value['options'] = self.ds['options'] - self.data.update({key : value}) - - def _select_subvolume_name(self,value): - return TextInput(str(_("Subvolume name :")),value).run() - - def _select_subvolume_mount_point(self,value): - return TextInput(str(_("Select a mount point :")),value).run() - - def _select_subvolume_options(self,value) -> List[str]: - # def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True): choice = Menu( str(_("Select the desired subvolume options ")), ['nodatacow','compress'], skip=True, - preset_values=value, + preset_values=preset_options, multi=True ).run() if choice.type_ == MenuSelectionType.Selection: - return choice.value + return choice.value # type: ignore return [] + + def _add_subvolume(self, editing: Optional[Subvolume] = None) -> Optional[Subvolume]: + name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run() + + if not name: + return None + + mountpoint = TextInput(f'\n{_("Subvolume mountpoint")}: ', editing.mountpoint if editing else '').run() + + if not mountpoint: + return None + + options = self._prompt_options(editing) + + subvolume = Subvolume(name, mountpoint) + subvolume.compress = 'compress' in options + subvolume.nodatacow = 'nodatacow' in options + + return subvolume + + def exec_action(self, data: List[Subvolume]) -> List[Subvolume]: + if self.target: + active_subvolume = self.target + else: + active_subvolume = None + + if self.action == self._actions[0]: # add + new_subvolume = self._add_subvolume() + + if new_subvolume is not None: + # in case a user with the same username as an existing user + # was created we'll replace the existing one + data = [d for d in data if d.name != new_subvolume.name] + data += [new_subvolume] + elif self.action == self._actions[1]: # edit subvolume + new_subvolume = self._add_subvolume(active_subvolume) + + if new_subvolume is not None: + # we'll remove the original subvolume and add the modified version + data = [d for d in data if d.name != active_subvolume.name and d.name != new_subvolume.name] + data += [new_subvolume] + elif self.action == self._actions[2]: # delete + data = [d for d in data if d != active_subvolume] + + return data -- cgit v1.2.3-54-g00ecf