Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/btrfs
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/btrfs')
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py56
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py136
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py109
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py192
4 files changed, 0 insertions, 493 deletions
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
deleted file mode 100644
index a26e0160..00000000
--- a/archinstall/lib/disk/btrfs/__init__.py
+++ /dev/null
@@ -1,56 +0,0 @@
-from __future__ import annotations
-import pathlib
-import glob
-import logging
-from typing import Union, Dict, TYPE_CHECKING
-
-# https://stackoverflow.com/a/39757388/929999
-if TYPE_CHECKING:
- from ...installer import Installer
-
-from .btrfs_helpers import (
- subvolume_info_from_path as subvolume_info_from_path,
- find_parent_subvolume as find_parent_subvolume,
- setup_subvolumes as setup_subvolumes,
- mount_subvolume as mount_subvolume
-)
-from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
-from .btrfspartition import BTRFSPartition as BTRFSPartition
-
-from ...exceptions import DiskError, Deprecated
-from ...general import SysCommand
-from ...output import log
-
-
-def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
- """
- This function uses btrfs to create a subvolume.
-
- @installation: archinstall.Installer instance
- @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
- """
-
- installation_mountpoint = installation.target
- if type(installation_mountpoint) == str:
- installation_mountpoint = pathlib.Path(installation_mountpoint)
- # Set up the required physical structure
- if type(subvolume_location) == str:
- subvolume_location = pathlib.Path(subvolume_location)
-
- target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
-
- # Difference from mount_subvolume:
- # We only check if the parent exists, since we'll run in to "target path already exists" otherwise
- if not target.parent.exists():
- target.parent.mkdir(parents=True)
-
- if glob.glob(str(target / '*')):
- raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)")
-
- # Remove the target if it exists
- if target.exists():
- target.rmdir()
-
- log(f"Creating a subvolume on {target}", level=logging.INFO)
- if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
- raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
deleted file mode 100644
index f6d2734a..00000000
--- a/archinstall/lib/disk/btrfs/btrfs_helpers.py
+++ /dev/null
@@ -1,136 +0,0 @@
-import logging
-import re
-from pathlib import Path
-from typing import Optional, Dict, Any, TYPE_CHECKING
-
-from ...models.subvolume import Subvolume
-from ...exceptions import SysCallError, DiskError
-from ...general import SysCommand
-from ...output import log
-from ...plugins import plugins
-from ..helpers import get_mount_info
-from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
-
-if TYPE_CHECKING:
- from .btrfspartition import BTRFSPartition
- from ...installer import Installer
-
-
-class fstab_btrfs_compression_plugin():
- def __init__(self, partition_dict):
- self.partition_dict = partition_dict
-
- def on_genfstab(self, installation):
- with open(f"{installation.target}/etc/fstab", 'r') as fh:
- fstab = fh.read()
-
- # Replace the {installation}/etc/fstab with entries
- # using the compress=zstd where the mountpoint has compression set.
- with open(f"{installation.target}/etc/fstab", 'w') as fh:
- for line in fstab.split('\n'):
- # So first we grab the mount options by using subvol=.*? as a locator.
- # And we also grab the mountpoint for the entry, for instance /var/log
- if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)):
- for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []):
- # We then locate the correct subvolume and check if it's compressed
- if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip():
- # We then sneak in the compress=zstd option if it doesn't already exist:
- # We skip entries where compression is already defined
- if ',compress=zstd,' not in line:
- line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}")
- break
-
- fh.write(f"{line}\n")
-
- return True
-
-
-def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
- # we normalize the subvolume name (getting rid of slash at the start if exists.
- # In our implementation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = subvolume.name.lstrip('/')
- mountpoint = Path(subvolume.mountpoint)
- installation_target = Path(installation.target)
-
- mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
- mountpoint.mkdir(parents=True, exist_ok=True)
- mount_options = subvolume.options + [f'subvol={name}']
-
- log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
- SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
-
-
-def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
- log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
-
- for subvolume in partition_dict['btrfs']['subvolumes']:
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = subvolume.name.lstrip('/')
-
- # We create the subvolume using the BTRFSPartition instance.
- # That way we ensure not only easy access, but also accurate mount locations etc.
- partition_dict['device_instance'].create_subvolume(name, installation=installation)
-
- # Make the nodatacow processing now
- # It will be the main cause of creation of subvolumes which are not to be mounted
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if subvolume.nodatacow:
- if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
-
- # Make the compress processing now
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- # in this way only zstd compression is activaded
- # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
-
- if subvolume.compress:
- if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
- if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
-
- if 'fstab_btrfs_compression_plugin' not in plugins:
- plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict)
-
-
-def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
- try:
- subvolume_name = ''
- result = {}
- for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
- if index == 0:
- subvolume_name = line.strip().decode('UTF-8')
- continue
-
- if b':' in line:
- key, value = line.strip().decode('UTF-8').split(':', 1)
-
- # A bit of a hack, until I figure out how @dataclass
- # allows for hooking in a pre-processor to do this we have to do it here:
- result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
-
- return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
- except SysCallError as error:
- log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
-
- return None
-
-
-def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
- # A root path cannot have a parent
- if str(path) == '/':
- return None
-
- if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters):
- if not (subvolume := subvolume_info_from_path(found_mount['target'])):
- if found_mount['target'] == '/':
- return None
-
- return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
-
- return subvolume
-
- return None
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
deleted file mode 100644
index d04c9b98..00000000
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ /dev/null
@@ -1,109 +0,0 @@
-import glob
-import pathlib
-import logging
-from typing import Optional, TYPE_CHECKING
-
-from ...exceptions import DiskError
-from ...storage import storage
-from ...output import log
-from ...general import SysCommand
-from ..partition import Partition
-from ..helpers import findmnt
-from .btrfs_helpers import (
- subvolume_info_from_path
-)
-
-if TYPE_CHECKING:
- from ...installer import Installer
- from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
-
-
-class BTRFSPartition(Partition):
- def __init__(self, *args, **kwargs):
- Partition.__init__(self, *args, **kwargs)
-
- @property
- def subvolumes(self):
- for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
- if '[' in filesystem.get('source', ''):
- yield subvolume_info_from_path(filesystem['target'])
-
- def iterate_children(struct):
- for c in struct.get('children', []):
- if '[' in child.get('source', ''):
- yield subvolume_info_from_path(c['target'])
-
- for sub_child in iterate_children(c):
- yield sub_child
-
- for child in iterate_children(filesystem):
- yield child
-
- def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
- """
- Subvolumes have to be created within a mountpoint.
- This means we need to get the current installation target.
- After we get it, we need to verify it is a btrfs subvolume filesystem.
- Finally, the destination must be empty.
- """
-
- # Allow users to override the installation session
- if not installation:
- installation = storage.get('installation_session')
-
- # Determain if the path given, is an absolute path or a relative path.
- # We do this by checking if the path contains a known mountpoint.
- if str(subvolume)[0] == '/':
- if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
- if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
- # Path starts with a known mountpoint which isn't /
- # Which means it's an absolute path to a mounted location.
- pass
- else:
- # Since it's not an absolute position with a known start.
- # We omit the anchor ('/' basically) and make sure it's appendable
- # to the installation.target later
- subvolume = subvolume.relative_to(subvolume.anchor)
- # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is.
-
- # If the subvolume is not absolute, then we do two checks:
- # 1. Check if the partition itself is mounted somewhere, and use that as a root
- # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs
- # If both above fail, we need to warn the user that such setup is not supported.
- if str(subvolume)[0] != '/':
- if self.mountpoint is None and installation is None:
- raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.")
- elif self.mountpoint:
- subvolume = self.mountpoint / subvolume
- elif installation:
- ongoing_installation_destination = installation.target
- if type(ongoing_installation_destination) == str:
- ongoing_installation_destination = pathlib.Path(ongoing_installation_destination)
-
- subvolume = ongoing_installation_destination / subvolume
-
- subvolume.parent.mkdir(parents=True, exist_ok=True)
-
- # <!--
- # We perform one more check from the given absolute position.
- # And we traverse backwards in order to locate any if possible subvolumes above
- # our new btrfs subvolume. This is because it needs to be mounted under it to properly
- # function.
- # if btrfs_parent := find_parent_subvolume(subvolume):
- # print('Found parent:', btrfs_parent)
- # -->
-
- log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey")
-
- if glob.glob(str(subvolume / '*')):
- raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
- # Ideally we would like to check if the destination is already a subvolume.
- # But then we would need the mount-point at this stage as well.
- # So we'll comment out this check:
- # elif subvolinfo := subvolume_info_from_path(subvolume):
- # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
-
- # And deal with it here:
- SysCommand(f"btrfs subvolume create {subvolume}")
-
- return subvolume_info_from_path(subvolume)
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
deleted file mode 100644
index 5f5bdea6..00000000
--- a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
+++ /dev/null
@@ -1,192 +0,0 @@
-import pathlib
-import datetime
-import logging
-import string
-import random
-import shutil
-from dataclasses import dataclass
-from typing import Optional, List# , TYPE_CHECKING
-from functools import cached_property
-
-# if TYPE_CHECKING:
-# from ..blockdevice import BlockDevice
-
-from ...exceptions import DiskError
-from ...general import SysCommand
-from ...output import log
-from ...storage import storage
-
-
-@dataclass
-class BtrfsSubvolumeInfo:
- full_path :pathlib.Path
- name :str
- uuid :str
- parent_uuid :str
- creation_time :datetime.datetime
- subvolume_id :int
- generation :int
- gen_at_creation :int
- parent_id :int
- top_level_id :int
- send_transid :int
- send_time :datetime.datetime
- receive_transid :int
- received_uuid :Optional[str] = None
- flags :Optional[str] = None
- receive_time :Optional[datetime.datetime] = None
- snapshots :Optional[List] = None
-
- def __post_init__(self):
- self.full_path = pathlib.Path(self.full_path)
-
- # Convert "-" entries to `None`
- if self.parent_uuid == "-":
- self.parent_uuid = None
- if self.received_uuid == "-":
- self.received_uuid = None
- if self.flags == "-":
- self.flags = None
- if self.receive_time == "-":
- self.receive_time = None
- if self.snapshots == "":
- self.snapshots = []
-
- # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
- self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
- self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
- if self.receive_time:
- self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))
-
- @property
- def parent_subvolume(self):
- from .btrfs_helpers import find_parent_subvolume
-
- return find_parent_subvolume(self.full_path)
-
- @property
- def root(self) -> bool:
- from .btrfs_helpers import subvolume_info_from_path
-
- # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
- # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
- # It would also be nice if it could use findmnt(self.full_path) and traverse backwards
- # finding the last occurrence of a subvolume which 'self' belongs to.
- if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
- return self.full_path == volume.full_path
-
- return False
-
- @cached_property
- def partition(self):
- from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
- from ..partition import Partition
- from ..blockdevice import BlockDevice
- from ..mapperdev import MapperDev
- from .btrfspartition import BTRFSPartition
- from .btrfs_helpers import subvolume_info_from_path
-
- try:
- # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
- if filesystem := findmnt(self.full_path).get('filesystems', []):
- if source := filesystem[0].get('source', None):
- # Strip away subvolume definitions from findmnt
- if '[' in source:
- source = source[:source.find('[')]
-
- if filesystem[0].get('fstype', '') == 'btrfs':
- return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
- elif filesystem[0].get('source', '').startswith('/dev/mapper'):
- return MapperDev(source)
- else:
- return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
- except DiskError:
- # Subvolume has never been mounted, we have no reliable way of finding where it is.
- # But we have the UUID of the partition, and can begin looking for it by mounting
- # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.
-
- log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
- for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
- if type(instance) in (Partition, MapperDev):
- we_mounted_it = False
- detection_mountpoint = instance.mountpoint
- if not detection_mountpoint:
- if type(instance) == Partition and instance.encrypted:
- # TODO: Perhaps support unlocking encrypted volumes?
- # This will cause a lot of potential user interactions tho.
- log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
- continue
-
- detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
- detection_mountpoint.mkdir(parents=True, exist_ok=True)
-
- instance.mount(str(detection_mountpoint))
- we_mounted_it = True
-
- if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
- if subvolume := subvolume_info_from_path(filesystem[0]['target']):
- if subvolume.uuid == self.uuid:
- # The top level subvolume matched of ourselves,
- # which means the instance we're iterating has the subvol we're looking for.
- log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
- return instance
-
- def iterate_children(struct):
- for child in struct.get('children', []):
- if '[' in child.get('source', ''):
- yield subvolume_info_from_path(child['target'])
-
- for sub_child in iterate_children(child):
- yield sub_child
-
- for child in iterate_children(filesystem[0]):
- if child.uuid == self.uuid:
- # We found a child within the instance that has the subvol we're looking for.
- log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
- return instance
-
- if we_mounted_it:
- instance.unmount()
- shutil.rmtree(detection_mountpoint)
-
- @cached_property
- def mount_options(self) -> Optional[List[str]]:
- from ..helpers import findmnt
-
- if filesystem := findmnt(self.full_path).get('filesystems', []):
- return filesystem[0].get('options').split(',')
-
- def convert_to_ISO_format(self, time_string):
- time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
- iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
- return iso_string
-
- def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
- from ..helpers import findmnt
-
- try:
- if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
- log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
- SysCommand(f"umount {mountpoint}")
- except DiskError:
- # No previously mounted device at the mountpoint
- pass
-
- if not options:
- options = []
-
- try:
- if include_previously_known_options and (cached_options := self.mount_options):
- options += cached_options
- except DiskError:
- pass
-
- if not any('subvol=' in x for x in options):
- options += f'subvol={self.name}'
-
- SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
- log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")
-
- def unmount(self, recurse :bool = True):
- SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
- log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")