From c93482a8b943a593608d8bae7156e357ed0002d5 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Thu, 26 May 2022 18:46:10 +0200 Subject: Rework btrfs handling (#1234) * Restructuring btrfs.py into lib/btrfs/*.py * Reworking how BTRFS subvolumes get represented, and worked with. Subvolumes are now their own entity which can be used to access it's information, parents or mount location. * Added BtrfsSubvolume.partition and other stuff. * Reworking the way luks2().unlock and .format() returns device instances. They should now return BTRFSSubvolume where appropriate. * Fixed a missing import * Fixed an issue where mkfs.btrfs wouldn't trigger due to busy disk. * Fixing subvol mounting without creating a fake instance. * Added creation of mountpint for btrfs subvolume * Fixed root detection * Re-worked mounting into a queue system using frozen mounting calls using lambda * Removed old mount_subvolume() function * Removed get_subvolumes_from_findmnt() * Fixed Partition().subvolumes iteration * Adding .root to BtrfsSubvolume * Fixed issue in SysCommandWorker where log output would break and crash execution due to cmd being a string vs list * Changed return-value from MapperDev.mountpoint to pathlib.Path --- archinstall/lib/disk/btrfs/__init__.py | 182 +++++++++++++++++++++++++ archinstall/lib/disk/btrfs/btrfs_helpers.py | 132 ++++++++++++++++++ archinstall/lib/disk/btrfs/btrfspartition.py | 116 ++++++++++++++++ archinstall/lib/disk/btrfs/btrfssubvolume.py | 191 +++++++++++++++++++++++++++ 4 files changed, 621 insertions(+) create mode 100644 archinstall/lib/disk/btrfs/__init__.py create mode 100644 archinstall/lib/disk/btrfs/btrfs_helpers.py create mode 100644 archinstall/lib/disk/btrfs/btrfspartition.py create mode 100644 archinstall/lib/disk/btrfs/btrfssubvolume.py (limited to 'archinstall/lib/disk/btrfs') diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py new file mode 100644 index 00000000..84b9c0f6 --- /dev/null +++ b/archinstall/lib/disk/btrfs/__init__.py @@ -0,0 +1,182 @@ +from __future__ import annotations +import pathlib +import glob +import logging +import re +from typing import Union, Dict, TYPE_CHECKING, Any, Iterator + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from ...installer import Installer + +from .btrfs_helpers import ( + subvolume_info_from_path as subvolume_info_from_path, + find_parent_subvolume as find_parent_subvolume, + setup_subvolumes as setup_subvolumes, + mount_subvolume as mount_subvolume +) +from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume +from .btrfspartition import BTRFSPartition as BTRFSPartition + +from ..helpers import get_mount_info +from ...exceptions import DiskError, Deprecated +from ...general import SysCommand +from ...output import log +from ...exceptions import SysCallError + +def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: + try: + output = SysCommand(f"btrfs subvol show {path}").decode() + except SysCallError as error: + print('Error:', error) + + result = {} + for line in output.replace('\r\n', '\n').split('\n'): + if ':' in line: + key, val = line.replace('\t', '').split(':', 1) + result[key.strip().lower().replace(' ', '_')] = val.strip() + + return result + +def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: + """ + This function uses btrfs to create a subvolume. + + @installation: archinstall.Installer instance + @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot + """ + + installation_mountpoint = installation.target + if type(installation_mountpoint) == str: + installation_mountpoint = pathlib.Path(installation_mountpoint) + # Set up the required physical structure + if type(subvolume_location) == str: + subvolume_location = pathlib.Path(subvolume_location) + + target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor) + + # Difference from mount_subvolume: + # We only check if the parent exists, since we'll run in to "target path already exists" otherwise + if not target.parent.exists(): + target.parent.mkdir(parents=True) + + if glob.glob(str(target / '*')): + raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)") + + # Remove the target if it exists + if target.exists(): + target.rmdir() + + log(f"Creating a subvolume on {target}", level=logging.INFO) + if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: + raise DiskError(f"Could not create a subvolume at {target}: {cmd}") + +def _has_option(option :str,options :list) -> bool: + """ auxiliary routine to check if an option is present in a list. + we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...) + """ + if not options: + return False + + for item in options: + if option in item: + return True + + return False + +def manage_btrfs_subvolumes(installation :Installer, + partition :Dict[str, str],) -> list: + + raise Deprecated("Use setup_subvolumes() instead.") + + from copy import deepcopy + """ we do the magic with subvolumes in a centralized place + parameters: + * the installation object + * the partition dictionary entry which represents the physical partition + returns + * mountpoinst, the list which contains all the "new" partititon to be mounted + + We expect the partition has been mounted as / , and it to be unmounted after the processing + Then we create all the subvolumes inside btrfs as demand + We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs + Then we return a list of "new" partitions to be processed as "normal" partitions + # TODO For encrypted devices we need some special processing prior to it + """ + # We process each of the pairs + # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list + # of mount options (or similar used by brtfs) + mountpoints = [] + subvolumes = partition['btrfs']['subvolumes'] + for name, right_hand in subvolumes.items(): + try: + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use + if name.startswith('/'): + name = name[1:] + # renormalize the right hand. + location = None + subvol_options = [] + # no contents, so it is not to be mounted + if not right_hand: + location = None + # just a string. per backward compatibility the mount point + elif isinstance(right_hand,str): + location = right_hand + # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿? + elif isinstance(right_hand,dict): + location = right_hand.get('mountpoint',None) + subvol_options = right_hand.get('options',[]) + # we create the subvolume + create_subvolume(installation,name) + # Make the nodatacow processing now + # It will be the main cause of creation of subvolumes which are not to be mounted + # it is not an options which can be established by subvolume (but for whole file systems), and can be + # set up via a simple attribute change in a directory (if empty). And here the directories are brand new + if 'nodatacow' in subvol_options: + if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so nodatacow doesn't propagate to the mount options + del subvol_options[subvol_options.index('nodatacow')] + # Make the compress processing now + # it is not an options which can be established by subvolume (but for whole file systems), and can be + # set up via a simple attribute change in a directory (if empty). And here the directories are brand new + # in this way only zstd compression is activaded + # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated + if 'compress' in subvol_options: + if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])): + if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so compress doesn't propagate to the mount options + del subvol_options[subvol_options.index('compress')] + # END compress processing. + # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume + if not partition['mountpoint'] and location is not None: + # we begin to create a fake partition entry. First we copy the original -the one that corresponds to + # the primary partition. We make a deepcopy to avoid altering the original content in any case + fake_partition = deepcopy(partition) + # we start to modify entries in the "fake partition" to match the needs of the subvolumes + # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy + del fake_partition['btrfs'] + fake_partition['encrypted'] = False + fake_partition['generate-encryption-key-file'] = False + # Mount destination. As of now the right hand part + fake_partition['mountpoint'] = location + # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path. + fake_partition['subvolume'] = name + # here we add the special mount options for the subvolume, if any. + # if the original partition['options'] is not a list might give trouble + if fake_partition.get('filesystem',{}).get('mount_options',[]): + fake_partition['filesystem']['mount_options'].extend(subvol_options) + else: + fake_partition['filesystem']['mount_options'] = subvol_options + # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. + # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes + # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless + fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" + + # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, + # as "normal" ones + mountpoints.append(fake_partition) + except Exception as e: + raise e + return mountpoints diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py new file mode 100644 index 00000000..d529478f --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py @@ -0,0 +1,132 @@ +import pathlib +import logging +from typing import Optional + +from ...exceptions import SysCallError, DiskError +from ...general import SysCommand +from ...output import log +from ..helpers import get_mount_info +from .btrfssubvolume import BtrfsSubvolume + + +def mount_subvolume(installation, device, name, subvolume_information): + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = name.lstrip('/') + + # renormalize the right hand. + mountpoint = subvolume_information.get('mountpoint', None) + if not mountpoint: + return None + + if type(mountpoint) == str: + mountpoint = pathlib.Path(mountpoint) + + installation_target = installation.target + if type(installation_target) == str: + installation_target = pathlib.Path(installation_target) + + mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) + mountpoint.mkdir(parents=True, exist_ok=True) + + mount_options = subvolume_information.get('options', []) + if not any('subvol=' in x for x in mount_options): + mount_options += [f'subvol={name}'] + + log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") + SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") + + +def setup_subvolumes(installation, partition_dict): + """ + Taken from: ..user_guides.py + + partition['btrfs'] = { + "subvolumes" : { + "@": "/", + "@home": "/home", + "@log": "/var/log", + "@pkg": "/var/cache/pacman/pkg", + "@.snapshots": "/.snapshots" + } + } + """ + log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") + for name, right_hand in partition_dict['btrfs']['subvolumes'].items(): + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = name.lstrip('/') + + # renormalize the right hand. + # mountpoint = None + subvol_options = [] + + match right_hand: + # case str(): # backwards-compatability + # mountpoint = right_hand + case dict(): + # mountpoint = right_hand.get('mountpoint', None) + subvol_options = right_hand.get('options', []) + + # We create the subvolume using the BTRFSPartition instance. + # That way we ensure not only easy access, but also accurate mount locations etc. + partition_dict['device_instance'].create_subvolume(name, installation=installation) + + # Make the nodatacow processing now + # It will be the main cause of creation of subvolumes which are not to be mounted + # it is not an options which can be established by subvolume (but for whole file systems), and can be + # set up via a simple attribute change in a directory (if empty). And here the directories are brand new + if 'nodatacow' in subvol_options: + if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so nodatacow doesn't propagate to the mount options + del subvol_options[subvol_options.index('nodatacow')] + # Make the compress processing now + # it is not an options which can be established by subvolume (but for whole file systems), and can be + # set up via a simple attribute change in a directory (if empty). And here the directories are brand new + # in this way only zstd compression is activaded + # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated + + if 'compress' in subvol_options: + if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): + if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: + raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") + # entry is deleted so compress doesn't propagate to the mount options + del subvol_options[subvol_options.index('compress')] + +def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: + try: + subvolume_name = None + result = {} + for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): + if index == 0: + subvolume_name = line.strip().decode('UTF-8') + continue + + if b':' in line: + key, value = line.strip().decode('UTF-8').split(':', 1) + + # A bit of a hack, until I figure out how @dataclass + # allows for hooking in a pre-processor to do this we have to do it here: + result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() + + return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result}) + + except SysCallError: + pass + + return None + +def find_parent_subvolume(path :pathlib.Path, filters=[]): + # A root path cannot have a parent + if str(path) == '/': + return None + + if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters): + if not (subvolume := subvolume_info_from_path(found_mount['target'])): + if found_mount['target'] == '/': + return None + + return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']]) + + return subvolume \ No newline at end of file diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py new file mode 100644 index 00000000..5020133d --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfspartition.py @@ -0,0 +1,116 @@ +import glob +import pathlib +import logging +from typing import Optional, TYPE_CHECKING + +from ...exceptions import DiskError +from ...storage import storage +from ...output import log +from ...general import SysCommand +from ..partition import Partition +from ..helpers import findmnt +from .btrfs_helpers import ( + subvolume_info_from_path +) + +if TYPE_CHECKING: + from ...installer import Installer + from .btrfssubvolume import BtrfsSubvolume + +class BTRFSPartition(Partition): + def __init__(self, *args, **kwargs): + Partition.__init__(self, *args, **kwargs) + + def __repr__(self, *args :str, **kwargs :str) -> str: + mount_repr = '' + if self.mountpoint: + mount_repr = f", mounted={self.mountpoint}" + elif self.target_mountpoint: + mount_repr = f", rel_mountpoint={self.target_mountpoint}" + + if self._encrypted: + return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' + else: + return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' + + @property + def subvolumes(self): + for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): + if '[' in filesystem.get('source', ''): + yield subvolume_info_from_path(filesystem['target']) + + def iterate_children(struct): + for child in struct.get('children', []): + if '[' in child.get('source', ''): + yield subvolume_info_from_path(child['target']) + + for sub_child in iterate_children(child): + yield sub_child + + for child in iterate_children(filesystem): + yield child + + def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume': + """ + Subvolumes have to be created within a mountpoint. + This means we need to get the current installation target. + After we get it, we need to verify it is a btrfs subvolume filesystem. + Finally, the destination must be empty. + """ + + # Allow users to override the installation session + if not installation: + installation = storage.get('installation_session') + + # Determain if the path given, is an absolute path or a releative path. + # We do this by checking if the path contains a known mountpoint. + if str(subvolume)[0] == '/': + if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): + if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): + # Path starts with a known mountpoint which isn't / + # Which means it's an absolut path to a mounted location. + pass + else: + # Since it's not an absolute position with a known start. + # We omit the anchor ('/' basically) and make sure it's appendable + # to the installation.target later + subvolume = subvolume.relative_to(subvolume.anchor) + # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is. + + # If the subvolume is not absolute, then we do two checks: + # 1. Check if the partition itself is mounted somewhere, and use that as a root + # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs + # If both above fail, we need to warn the user that such setup is not supported. + if str(subvolume)[0] != '/': + if self.mountpoint is None and installation is None: + raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.") + elif self.mountpoint: + subvolume = self.mountpoint / subvolume + elif installation: + ongoing_installation_destination = installation.target + if type(ongoing_installation_destination) == str: + ongoing_installation_destination = pathlib.Path(ongoing_installation_destination) + + subvolume = ongoing_installation_destination / subvolume + + subvolume.parent.mkdir(parents=True, exist_ok=True) + + # + + log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey") + + if glob.glob(str(subvolume / '*')): + raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") + elif subvolinfo := subvolume_info_from_path(subvolume): + raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") + + SysCommand(f"btrfs subvolume create {subvolume}") + + return subvolume_info_from_path(subvolume) \ No newline at end of file diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolume.py new file mode 100644 index 00000000..a96e2a94 --- /dev/null +++ b/archinstall/lib/disk/btrfs/btrfssubvolume.py @@ -0,0 +1,191 @@ +import pathlib +import datetime +import logging +import string +import random +import shutil +from dataclasses import dataclass +from typing import Optional, List# , TYPE_CHECKING +from functools import cached_property + +# if TYPE_CHECKING: +# from ..blockdevice import BlockDevice + +from ...exceptions import DiskError +from ...general import SysCommand +from ...output import log +from ...storage import storage + +@dataclass +class BtrfsSubvolume: + full_path :pathlib.Path + name :str + uuid :str + parent_uuid :str + creation_time :datetime.datetime + subvolume_id :int + generation :int + gen_at_creation :int + parent_id :int + top_level_id :int + send_transid :int + send_time :datetime.datetime + receive_transid :int + received_uuid :Optional[str] = None + flags :Optional[str] = None + receive_time :Optional[datetime.datetime] = None + snapshots :Optional[List] = None + + def __post_init__(self): + self.full_path = pathlib.Path(self.full_path) + + # Convert "-" entries to `None` + if self.parent_uuid == "-": + self.parent_uuid = None + if self.received_uuid == "-": + self.received_uuid = None + if self.flags == "-": + self.flags = None + if self.receive_time == "-": + self.receive_time = None + if self.snapshots == "": + self.snapshots = [] + + # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats) + self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time)) + self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time)) + if self.receive_time: + self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time)) + + @property + def parent_subvolume(self): + from .btrfs_helpers import find_parent_subvolume + + return find_parent_subvolume(self.full_path) + + @property + def root(self) -> bool: + from .btrfs_helpers import subvolume_info_from_path + + # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first + # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. + # It would also be nice if it could use findmnt(self.full_path) and traverse backwards + # finding the last occurance of a subvolume which 'self' belongs to. + if volume := subvolume_info_from_path(storage['MOUNT_POINT']): + return self.full_path == volume.full_path + + return False + + @cached_property + def partition(self): + from ..helpers import findmnt, get_parent_of_partition, all_blockdevices + from ..partition import Partition + from ..blockdevice import BlockDevice + from ..mapperdev import MapperDev + from .btrfspartition import BTRFSPartition + from .btrfs_helpers import subvolume_info_from_path + + try: + # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device. + if filesystem := findmnt(self.full_path).get('filesystems', []): + if source := filesystem[0].get('source', None): + # Strip away subvolume definitions from findmnt + if '[' in source: + source = source[:source.find('[')] + + if filesystem[0].get('fstype', '') == 'btrfs': + return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) + elif filesystem[0].get('source', '').startswith('/dev/mapper'): + return MapperDev(source) + else: + return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source)))) + except DiskError: + # Subvolume has never been mounted, we have no reliable way of finding where it is. + # But we have the UUID of the partition, and can begin looking for it by mounting + # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices. + + log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING) + for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items(): + if type(instance) in (Partition, MapperDev): + we_mounted_it = False + detection_mountpoint = instance.mountpoint + if not detection_mountpoint: + if type(instance) == Partition and instance.encrypted: + # TODO: Perhaps support unlocking encrypted volumes? + # This will cause a lot of potential user interactions tho. + log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG) + continue + + detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}") + detection_mountpoint.mkdir(parents=True, exist_ok=True) + + instance.mount(str(detection_mountpoint)) + we_mounted_it = True + + if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])): + if subvolume := subvolume_info_from_path(filesystem[0]['target']): + if subvolume.uuid == self.uuid: + # The top level subvolume matched of ourselves, + # which means the instance we're iterating has the subvol we're looking for. + log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") + return instance + + def iterate_children(struct): + for child in struct.get('children', []): + if '[' in child.get('source', ''): + yield subvolume_info_from_path(child['target']) + + for sub_child in iterate_children(child): + yield sub_child + + for child in iterate_children(filesystem[0]): + if child.uuid == self.uuid: + # We found a child within the instance that has the subvol we're looking for. + log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray") + return instance + + if we_mounted_it: + instance.unmount() + shutil.rmtree(detection_mountpoint) + + @cached_property + def mount_options(self) -> Optional[List[str]]: + from ..helpers import findmnt + + if filesystem := findmnt(self.full_path).get('filesystems', []): + return filesystem[0].get('options').split(',') + + def convert_to_ISO_format(self, time_string): + time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '') + iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}" + return iso_string + + def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True): + from ..helpers import findmnt + + try: + if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False): + log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}") + SysCommand(f"umount {mountpoint}") + except DiskError: + # No previously mounted device at the mountpoint + pass + + if not options: + options = [] + + try: + if include_previously_known_options and (cached_options := self.mount_options): + options += cached_options + except DiskError: + pass + + if not any('subvol=' in x for x in options): + options += f'subvol={self.name}' + + SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}") + log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray") + + def unmount(self, recurse :bool = True): + SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") + log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file -- cgit v1.2.3-54-g00ecf