Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/btrfs
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-05-26 18:46:10 +0200
committerGitHub <noreply@github.com>2022-05-26 18:46:10 +0200
commitc93482a8b943a593608d8bae7156e357ed0002d5 (patch)
treeba86096368c74ae48212ed4751d06e7883b229e6 /archinstall/lib/disk/btrfs
parentf1608e76647578c731e51a7c1303656519aa2e5e (diff)
Rework btrfs handling (#1234)
* Restructuring btrfs.py into lib/btrfs/*.py * Reworking how BTRFS subvolumes get represented, and worked with. Subvolumes are now their own entity which can be used to access it's information, parents or mount location. * Added BtrfsSubvolume.partition and other stuff. * Reworking the way luks2().unlock and .format() returns device instances. They should now return BTRFSSubvolume where appropriate. * Fixed a missing import * Fixed an issue where mkfs.btrfs wouldn't trigger due to busy disk. * Fixing subvol mounting without creating a fake instance. * Added creation of mountpint for btrfs subvolume * Fixed root detection * Re-worked mounting into a queue system using frozen mounting calls using lambda * Removed old mount_subvolume() function * Removed get_subvolumes_from_findmnt() * Fixed Partition().subvolumes iteration * Adding .root to BtrfsSubvolume * Fixed issue in SysCommandWorker where log output would break and crash execution due to cmd being a string vs list * Changed return-value from MapperDev.mountpoint to pathlib.Path
Diffstat (limited to 'archinstall/lib/disk/btrfs')
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py182
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py132
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py116
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolume.py191
4 files changed, 621 insertions, 0 deletions
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
new file mode 100644
index 00000000..84b9c0f6
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/__init__.py
@@ -0,0 +1,182 @@
+from __future__ import annotations
+import pathlib
+import glob
+import logging
+import re
+from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from ...installer import Installer
+
+from .btrfs_helpers import (
+ subvolume_info_from_path as subvolume_info_from_path,
+ find_parent_subvolume as find_parent_subvolume,
+ setup_subvolumes as setup_subvolumes,
+ mount_subvolume as mount_subvolume
+)
+from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume
+from .btrfspartition import BTRFSPartition as BTRFSPartition
+
+from ..helpers import get_mount_info
+from ...exceptions import DiskError, Deprecated
+from ...general import SysCommand
+from ...output import log
+from ...exceptions import SysCallError
+
+def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
+ try:
+ output = SysCommand(f"btrfs subvol show {path}").decode()
+ except SysCallError as error:
+ print('Error:', error)
+
+ result = {}
+ for line in output.replace('\r\n', '\n').split('\n'):
+ if ':' in line:
+ key, val = line.replace('\t', '').split(':', 1)
+ result[key.strip().lower().replace(' ', '_')] = val.strip()
+
+ return result
+
+def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
+ """
+ This function uses btrfs to create a subvolume.
+
+ @installation: archinstall.Installer instance
+ @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
+ """
+
+ installation_mountpoint = installation.target
+ if type(installation_mountpoint) == str:
+ installation_mountpoint = pathlib.Path(installation_mountpoint)
+ # Set up the required physical structure
+ if type(subvolume_location) == str:
+ subvolume_location = pathlib.Path(subvolume_location)
+
+ target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
+
+ # Difference from mount_subvolume:
+ # We only check if the parent exists, since we'll run in to "target path already exists" otherwise
+ if not target.parent.exists():
+ target.parent.mkdir(parents=True)
+
+ if glob.glob(str(target / '*')):
+ raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)")
+
+ # Remove the target if it exists
+ if target.exists():
+ target.rmdir()
+
+ log(f"Creating a subvolume on {target}", level=logging.INFO)
+ if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
+ raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
+
+def _has_option(option :str,options :list) -> bool:
+ """ auxiliary routine to check if an option is present in a list.
+ we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...)
+ """
+ if not options:
+ return False
+
+ for item in options:
+ if option in item:
+ return True
+
+ return False
+
+def manage_btrfs_subvolumes(installation :Installer,
+ partition :Dict[str, str],) -> list:
+
+ raise Deprecated("Use setup_subvolumes() instead.")
+
+ from copy import deepcopy
+ """ we do the magic with subvolumes in a centralized place
+ parameters:
+ * the installation object
+ * the partition dictionary entry which represents the physical partition
+ returns
+ * mountpoinst, the list which contains all the "new" partititon to be mounted
+
+ We expect the partition has been mounted as / , and it to be unmounted after the processing
+ Then we create all the subvolumes inside btrfs as demand
+ We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs
+ Then we return a list of "new" partitions to be processed as "normal" partitions
+ # TODO For encrypted devices we need some special processing prior to it
+ """
+ # We process each of the pairs <subvolume name: mount point | None | mount info dict>
+ # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list
+ # of mount options (or similar used by brtfs)
+ mountpoints = []
+ subvolumes = partition['btrfs']['subvolumes']
+ for name, right_hand in subvolumes.items():
+ try:
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use
+ if name.startswith('/'):
+ name = name[1:]
+ # renormalize the right hand.
+ location = None
+ subvol_options = []
+ # no contents, so it is not to be mounted
+ if not right_hand:
+ location = None
+ # just a string. per backward compatibility the mount point
+ elif isinstance(right_hand,str):
+ location = right_hand
+ # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿?
+ elif isinstance(right_hand,dict):
+ location = right_hand.get('mountpoint',None)
+ subvol_options = right_hand.get('options',[])
+ # we create the subvolume
+ create_subvolume(installation,name)
+ # Make the nodatacow processing now
+ # It will be the main cause of creation of subvolumes which are not to be mounted
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ if 'nodatacow' in subvol_options:
+ if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so nodatacow doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('nodatacow')]
+ # Make the compress processing now
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ # in this way only zstd compression is activaded
+ # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
+ if 'compress' in subvol_options:
+ if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])):
+ if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so compress doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('compress')]
+ # END compress processing.
+ # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume
+ if not partition['mountpoint'] and location is not None:
+ # we begin to create a fake partition entry. First we copy the original -the one that corresponds to
+ # the primary partition. We make a deepcopy to avoid altering the original content in any case
+ fake_partition = deepcopy(partition)
+ # we start to modify entries in the "fake partition" to match the needs of the subvolumes
+ # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy
+ del fake_partition['btrfs']
+ fake_partition['encrypted'] = False
+ fake_partition['generate-encryption-key-file'] = False
+ # Mount destination. As of now the right hand part
+ fake_partition['mountpoint'] = location
+ # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path.
+ fake_partition['subvolume'] = name
+ # here we add the special mount options for the subvolume, if any.
+ # if the original partition['options'] is not a list might give trouble
+ if fake_partition.get('filesystem',{}).get('mount_options',[]):
+ fake_partition['filesystem']['mount_options'].extend(subvol_options)
+ else:
+ fake_partition['filesystem']['mount_options'] = subvol_options
+ # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer.
+ # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
+ # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
+ fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
+
+ # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
+ # as "normal" ones
+ mountpoints.append(fake_partition)
+ except Exception as e:
+ raise e
+ return mountpoints
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
new file mode 100644
index 00000000..d529478f
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py
@@ -0,0 +1,132 @@
+import pathlib
+import logging
+from typing import Optional
+
+from ...exceptions import SysCallError, DiskError
+from ...general import SysCommand
+from ...output import log
+from ..helpers import get_mount_info
+from .btrfssubvolume import BtrfsSubvolume
+
+
+def mount_subvolume(installation, device, name, subvolume_information):
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = name.lstrip('/')
+
+ # renormalize the right hand.
+ mountpoint = subvolume_information.get('mountpoint', None)
+ if not mountpoint:
+ return None
+
+ if type(mountpoint) == str:
+ mountpoint = pathlib.Path(mountpoint)
+
+ installation_target = installation.target
+ if type(installation_target) == str:
+ installation_target = pathlib.Path(installation_target)
+
+ mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
+ mountpoint.mkdir(parents=True, exist_ok=True)
+
+ mount_options = subvolume_information.get('options', [])
+ if not any('subvol=' in x for x in mount_options):
+ mount_options += [f'subvol={name}']
+
+ log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
+ SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
+
+
+def setup_subvolumes(installation, partition_dict):
+ """
+ Taken from: ..user_guides.py
+
+ partition['btrfs'] = {
+ "subvolumes" : {
+ "@": "/",
+ "@home": "/home",
+ "@log": "/var/log",
+ "@pkg": "/var/cache/pacman/pkg",
+ "@.snapshots": "/.snapshots"
+ }
+ }
+ """
+ log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
+ for name, right_hand in partition_dict['btrfs']['subvolumes'].items():
+ # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load.
+ # Every subvolume is created from the top of the hierarchy- and simplifies its further use
+ name = name.lstrip('/')
+
+ # renormalize the right hand.
+ # mountpoint = None
+ subvol_options = []
+
+ match right_hand:
+ # case str(): # backwards-compatability
+ # mountpoint = right_hand
+ case dict():
+ # mountpoint = right_hand.get('mountpoint', None)
+ subvol_options = right_hand.get('options', [])
+
+ # We create the subvolume using the BTRFSPartition instance.
+ # That way we ensure not only easy access, but also accurate mount locations etc.
+ partition_dict['device_instance'].create_subvolume(name, installation=installation)
+
+ # Make the nodatacow processing now
+ # It will be the main cause of creation of subvolumes which are not to be mounted
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ if 'nodatacow' in subvol_options:
+ if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so nodatacow doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('nodatacow')]
+ # Make the compress processing now
+ # it is not an options which can be established by subvolume (but for whole file systems), and can be
+ # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
+ # in this way only zstd compression is activaded
+ # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
+
+ if 'compress' in subvol_options:
+ if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
+ if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
+ raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
+ # entry is deleted so compress doesn't propagate to the mount options
+ del subvol_options[subvol_options.index('compress')]
+
+def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]:
+ try:
+ subvolume_name = None
+ result = {}
+ for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
+ if index == 0:
+ subvolume_name = line.strip().decode('UTF-8')
+ continue
+
+ if b':' in line:
+ key, value = line.strip().decode('UTF-8').split(':', 1)
+
+ # A bit of a hack, until I figure out how @dataclass
+ # allows for hooking in a pre-processor to do this we have to do it here:
+ result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
+
+ return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result})
+
+ except SysCallError:
+ pass
+
+ return None
+
+def find_parent_subvolume(path :pathlib.Path, filters=[]):
+ # A root path cannot have a parent
+ if str(path) == '/':
+ return None
+
+ if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters):
+ if not (subvolume := subvolume_info_from_path(found_mount['target'])):
+ if found_mount['target'] == '/':
+ return None
+
+ return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']])
+
+ return subvolume \ No newline at end of file
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
new file mode 100644
index 00000000..5020133d
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -0,0 +1,116 @@
+import glob
+import pathlib
+import logging
+from typing import Optional, TYPE_CHECKING
+
+from ...exceptions import DiskError
+from ...storage import storage
+from ...output import log
+from ...general import SysCommand
+from ..partition import Partition
+from ..helpers import findmnt
+from .btrfs_helpers import (
+ subvolume_info_from_path
+)
+
+if TYPE_CHECKING:
+ from ...installer import Installer
+ from .btrfssubvolume import BtrfsSubvolume
+
+class BTRFSPartition(Partition):
+ def __init__(self, *args, **kwargs):
+ Partition.__init__(self, *args, **kwargs)
+
+ def __repr__(self, *args :str, **kwargs :str) -> str:
+ mount_repr = ''
+ if self.mountpoint:
+ mount_repr = f", mounted={self.mountpoint}"
+ elif self.target_mountpoint:
+ mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+
+ if self._encrypted:
+ return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
+ else:
+ return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
+
+ @property
+ def subvolumes(self):
+ for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
+ if '[' in filesystem.get('source', ''):
+ yield subvolume_info_from_path(filesystem['target'])
+
+ def iterate_children(struct):
+ for child in struct.get('children', []):
+ if '[' in child.get('source', ''):
+ yield subvolume_info_from_path(child['target'])
+
+ for sub_child in iterate_children(child):
+ yield sub_child
+
+ for child in iterate_children(filesystem):
+ yield child
+
+ def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume':
+ """
+ Subvolumes have to be created within a mountpoint.
+ This means we need to get the current installation target.
+ After we get it, we need to verify it is a btrfs subvolume filesystem.
+ Finally, the destination must be empty.
+ """
+
+ # Allow users to override the installation session
+ if not installation:
+ installation = storage.get('installation_session')
+
+ # Determain if the path given, is an absolute path or a releative path.
+ # We do this by checking if the path contains a known mountpoint.
+ if str(subvolume)[0] == '/':
+ if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
+ if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
+ # Path starts with a known mountpoint which isn't /
+ # Which means it's an absolut path to a mounted location.
+ pass
+ else:
+ # Since it's not an absolute position with a known start.
+ # We omit the anchor ('/' basically) and make sure it's appendable
+ # to the installation.target later
+ subvolume = subvolume.relative_to(subvolume.anchor)
+ # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is.
+
+ # If the subvolume is not absolute, then we do two checks:
+ # 1. Check if the partition itself is mounted somewhere, and use that as a root
+ # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs
+ # If both above fail, we need to warn the user that such setup is not supported.
+ if str(subvolume)[0] != '/':
+ if self.mountpoint is None and installation is None:
+ raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.")
+ elif self.mountpoint:
+ subvolume = self.mountpoint / subvolume
+ elif installation:
+ ongoing_installation_destination = installation.target
+ if type(ongoing_installation_destination) == str:
+ ongoing_installation_destination = pathlib.Path(ongoing_installation_destination)
+
+ subvolume = ongoing_installation_destination / subvolume
+
+ subvolume.parent.mkdir(parents=True, exist_ok=True)
+
+ # <!--
+ # We perform one more check from the given absolute position.
+ # And we traverse backwards in order to locate any if possible subvolumes above
+ # our new btrfs subvolume. This is because it needs to be mounted under it to properly
+ # function.
+ # if btrfs_parent := find_parent_subvolume(subvolume):
+ # print('Found parent:', btrfs_parent)
+ # -->
+
+ log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey")
+
+ if glob.glob(str(subvolume / '*')):
+ raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
+ elif subvolinfo := subvolume_info_from_path(subvolume):
+ raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
+
+ SysCommand(f"btrfs subvolume create {subvolume}")
+
+ return subvolume_info_from_path(subvolume) \ No newline at end of file
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolume.py
new file mode 100644
index 00000000..a96e2a94
--- /dev/null
+++ b/archinstall/lib/disk/btrfs/btrfssubvolume.py
@@ -0,0 +1,191 @@
+import pathlib
+import datetime
+import logging
+import string
+import random
+import shutil
+from dataclasses import dataclass
+from typing import Optional, List# , TYPE_CHECKING
+from functools import cached_property
+
+# if TYPE_CHECKING:
+# from ..blockdevice import BlockDevice
+
+from ...exceptions import DiskError
+from ...general import SysCommand
+from ...output import log
+from ...storage import storage
+
+@dataclass
+class BtrfsSubvolume:
+ full_path :pathlib.Path
+ name :str
+ uuid :str
+ parent_uuid :str
+ creation_time :datetime.datetime
+ subvolume_id :int
+ generation :int
+ gen_at_creation :int
+ parent_id :int
+ top_level_id :int
+ send_transid :int
+ send_time :datetime.datetime
+ receive_transid :int
+ received_uuid :Optional[str] = None
+ flags :Optional[str] = None
+ receive_time :Optional[datetime.datetime] = None
+ snapshots :Optional[List] = None
+
+ def __post_init__(self):
+ self.full_path = pathlib.Path(self.full_path)
+
+ # Convert "-" entries to `None`
+ if self.parent_uuid == "-":
+ self.parent_uuid = None
+ if self.received_uuid == "-":
+ self.received_uuid = None
+ if self.flags == "-":
+ self.flags = None
+ if self.receive_time == "-":
+ self.receive_time = None
+ if self.snapshots == "":
+ self.snapshots = []
+
+ # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
+ self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
+ self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
+ if self.receive_time:
+ self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))
+
+ @property
+ def parent_subvolume(self):
+ from .btrfs_helpers import find_parent_subvolume
+
+ return find_parent_subvolume(self.full_path)
+
+ @property
+ def root(self) -> bool:
+ from .btrfs_helpers import subvolume_info_from_path
+
+ # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
+ # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
+ # It would also be nice if it could use findmnt(self.full_path) and traverse backwards
+ # finding the last occurance of a subvolume which 'self' belongs to.
+ if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
+ return self.full_path == volume.full_path
+
+ return False
+
+ @cached_property
+ def partition(self):
+ from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
+ from ..partition import Partition
+ from ..blockdevice import BlockDevice
+ from ..mapperdev import MapperDev
+ from .btrfspartition import BTRFSPartition
+ from .btrfs_helpers import subvolume_info_from_path
+
+ try:
+ # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
+ if filesystem := findmnt(self.full_path).get('filesystems', []):
+ if source := filesystem[0].get('source', None):
+ # Strip away subvolume definitions from findmnt
+ if '[' in source:
+ source = source[:source.find('[')]
+
+ if filesystem[0].get('fstype', '') == 'btrfs':
+ return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
+ elif filesystem[0].get('source', '').startswith('/dev/mapper'):
+ return MapperDev(source)
+ else:
+ return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
+ except DiskError:
+ # Subvolume has never been mounted, we have no reliable way of finding where it is.
+ # But we have the UUID of the partition, and can begin looking for it by mounting
+ # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.
+
+ log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
+ for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
+ if type(instance) in (Partition, MapperDev):
+ we_mounted_it = False
+ detection_mountpoint = instance.mountpoint
+ if not detection_mountpoint:
+ if type(instance) == Partition and instance.encrypted:
+ # TODO: Perhaps support unlocking encrypted volumes?
+ # This will cause a lot of potential user interactions tho.
+ log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
+ continue
+
+ detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
+ detection_mountpoint.mkdir(parents=True, exist_ok=True)
+
+ instance.mount(str(detection_mountpoint))
+ we_mounted_it = True
+
+ if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
+ if subvolume := subvolume_info_from_path(filesystem[0]['target']):
+ if subvolume.uuid == self.uuid:
+ # The top level subvolume matched of ourselves,
+ # which means the instance we're iterating has the subvol we're looking for.
+ log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
+ return instance
+
+ def iterate_children(struct):
+ for child in struct.get('children', []):
+ if '[' in child.get('source', ''):
+ yield subvolume_info_from_path(child['target'])
+
+ for sub_child in iterate_children(child):
+ yield sub_child
+
+ for child in iterate_children(filesystem[0]):
+ if child.uuid == self.uuid:
+ # We found a child within the instance that has the subvol we're looking for.
+ log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
+ return instance
+
+ if we_mounted_it:
+ instance.unmount()
+ shutil.rmtree(detection_mountpoint)
+
+ @cached_property
+ def mount_options(self) -> Optional[List[str]]:
+ from ..helpers import findmnt
+
+ if filesystem := findmnt(self.full_path).get('filesystems', []):
+ return filesystem[0].get('options').split(',')
+
+ def convert_to_ISO_format(self, time_string):
+ time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
+ iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
+ return iso_string
+
+ def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
+ from ..helpers import findmnt
+
+ try:
+ if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
+ log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
+ SysCommand(f"umount {mountpoint}")
+ except DiskError:
+ # No previously mounted device at the mountpoint
+ pass
+
+ if not options:
+ options = []
+
+ try:
+ if include_previously_known_options and (cached_options := self.mount_options):
+ options += cached_options
+ except DiskError:
+ pass
+
+ if not any('subvol=' in x for x in options):
+ options += f'subvol={self.name}'
+
+ SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
+ log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")
+
+ def unmount(self, recurse :bool = True):
+ SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
+ log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") \ No newline at end of file