Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/__init__.py55
-rw-r--r--archinstall/lib/disk/blockdevice.py301
-rw-r--r--archinstall/lib/disk/btrfs/__init__.py56
-rw-r--r--archinstall/lib/disk/btrfs/btrfs_helpers.py136
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py109
-rw-r--r--archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py192
-rw-r--r--archinstall/lib/disk/device_handler.py809
-rw-r--r--archinstall/lib/disk/device_model.py1499
-rw-r--r--archinstall/lib/disk/disk_menu.py140
-rw-r--r--archinstall/lib/disk/diskinfo.py40
-rw-r--r--archinstall/lib/disk/dmcryptdev.py48
-rw-r--r--archinstall/lib/disk/encryption.py174
-rw-r--r--archinstall/lib/disk/encryption_menu.py288
-rw-r--r--archinstall/lib/disk/fido.py92
-rw-r--r--archinstall/lib/disk/filesystem.py622
-rw-r--r--archinstall/lib/disk/helpers.py556
-rw-r--r--archinstall/lib/disk/mapperdev.py92
-rw-r--r--archinstall/lib/disk/partition.py661
-rw-r--r--archinstall/lib/disk/partitioning_menu.py429
-rw-r--r--archinstall/lib/disk/subvolume_menu.py61
-rw-r--r--archinstall/lib/disk/user_guides.py240
-rw-r--r--archinstall/lib/disk/validators.py48
22 files changed, 3717 insertions, 2931 deletions
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py
index 352d04b9..7f881273 100644
--- a/archinstall/lib/disk/__init__.py
+++ b/archinstall/lib/disk/__init__.py
@@ -1,7 +1,48 @@
-from .btrfs import *
-from .helpers import *
-from .blockdevice import BlockDevice
-from .filesystem import Filesystem, MBR, GPT
-from .partition import *
-from .user_guides import *
-from .validators import * \ No newline at end of file
+from .device_handler import device_handler, disk_layouts
+from .fido import Fido2
+from .filesystem import FilesystemHandler
+from .subvolume_menu import SubvolumeMenu
+from .partitioning_menu import (
+ manual_partitioning,
+ PartitioningList
+)
+from .device_model import (
+ _DeviceInfo,
+ BDevice,
+ DiskLayoutType,
+ DiskLayoutConfiguration,
+ LvmLayoutType,
+ LvmConfiguration,
+ LvmVolumeGroup,
+ LvmVolume,
+ LvmVolumeStatus,
+ PartitionTable,
+ Unit,
+ Size,
+ SectorSize,
+ SubvolumeModification,
+ DeviceGeometry,
+ PartitionType,
+ PartitionFlag,
+ FilesystemType,
+ ModificationStatus,
+ PartitionModification,
+ DeviceModification,
+ EncryptionType,
+ DiskEncryption,
+ Fido2Device,
+ LsblkInfo,
+ CleanType,
+ get_lsblk_info,
+ get_all_lsblk_info,
+ get_lsblk_by_mountpoint,
+)
+from .encryption_menu import (
+ select_encryption_type,
+ select_encrypted_password,
+ select_hsm,
+ select_partitions_to_encrypt,
+ DiskEncryptionMenu,
+)
+
+from .disk_menu import DiskLayoutConfigurationMenu
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
deleted file mode 100644
index 178b786a..00000000
--- a/archinstall/lib/disk/blockdevice.py
+++ /dev/null
@@ -1,301 +0,0 @@
-from __future__ import annotations
-import json
-import logging
-import time
-
-from collections import OrderedDict
-from dataclasses import dataclass
-from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING
-
-from ..exceptions import DiskError, SysCallError
-from ..output import log
-from ..general import SysCommand
-from ..storage import storage
-
-
-if TYPE_CHECKING:
- from .partition import Partition
- _: Any
-
-
-@dataclass
-class BlockSizeInfo:
- start: str
- end: str
- size: str
-
-
-@dataclass
-class BlockInfo:
- pttype: str
- ptuuid: str
- size: int
- tran: Optional[str]
- rota: bool
- free_space: Optional[List[BlockSizeInfo]]
-
-
-class BlockDevice:
- def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
- if not info:
- from .helpers import all_blockdevices
- # If we don't give any information, we need to auto-fill it.
- # Otherwise any subsequent usage will break.
- self.info = all_blockdevices(partitions=False)[path].info
- else:
- self.info = info
-
- self._path = path
- self.keep_partitions = True
- self._block_info = self._fetch_information()
- self._partitions: Dict[str, 'Partition'] = {}
-
- self._load_partitions()
-
- # TODO: Currently disk encryption is a BIT misleading.
- # It's actually partition-encryption, but for future-proofing this
- # I'm placing the encryption password on a BlockDevice level.
-
- def __repr__(self, *args :str, **kwargs :str) -> str:
- return self._str_repr
-
- @property
- def path(self) -> str:
- return self._path
-
- @property
- def _str_repr(self) -> str:
- return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})"
-
- def as_json(self) -> Dict[str, Any]:
- return {
- str(_('Device')): self._device_or_backfile,
- str(_('Size')): f'{self.size}GB',
- str(_('Free space')): f'{self._safe_free_space()}',
- str(_('Bus-type')): f'{self.bus_type}'
- }
-
- def __iter__(self) -> Iterator['Partition']:
- for partition in self.partitions:
- yield self.partitions[partition]
-
- def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any:
- if hasattr(self, key):
- return getattr(self, key)
-
- if self.info and key in self.info:
- return self.info[key]
-
- raise KeyError(f'{self.info} does not contain information: "{key}"')
-
- def __lt__(self, left_comparitor :'BlockDevice') -> bool:
- return self._path < left_comparitor.path
-
- def json(self) -> str:
- """
- json() has precedence over __dump__, so this is a way
- to give less/partial information for user readability.
- """
- return self._path
-
- def __dump__(self) -> Dict[str, Dict[str, Any]]:
- return {
- self._path: {
- 'partuuid': self.uuid,
- 'wipe': self.info.get('wipe', None),
- 'partitions': [part.__dump__() for part in self.partitions.values()]
- }
- }
-
- def _call_lsblk(self, path: str) -> Dict[str, Any]:
- output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8')
- if output:
- lsblk_info = json.loads(output)
- return lsblk_info
-
- raise DiskError(f'Failed to read disk "{self.path}" with lsblk')
-
- def _load_partitions(self):
- from .partition import Partition
-
- self._partitions.clear()
-
- lsblk_info = self._call_lsblk(self._path)
- device = lsblk_info['blockdevices'][0]
- self._partitions.clear()
-
- if children := device.get('children', None):
- root = f'/dev/{device["name"]}'
- for child in children:
- part_id = child['name'].removeprefix(device['name'])
- self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id)
-
- def _get_free_space(self) -> Optional[List[BlockSizeInfo]]:
- # NOTE: parted -s will default to `cancel` on prompt, skipping any partition
- # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
- # so the free will ignore the ESP partition and just give the "free" space.
- # Doesn't harm us, but worth noting in case something weird happens.
- try:
- output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8')
- if output:
- free_lines = [line for line in output.split('\n') if 'free' in line]
- sizes = []
- for free_space in free_lines:
- _, start, end, size, *_ = free_space.strip('\r\n;').split(':')
- sizes.append(BlockSizeInfo(start, end, size))
-
- return sizes
- except SysCallError as error:
- log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG)
-
- return None
-
- def _fetch_information(self) -> BlockInfo:
- lsblk_info = self._call_lsblk(self._path)
- device = lsblk_info['blockdevices'][0]
- free_space = self._get_free_space()
-
- return BlockInfo(
- pttype=device['pttype'],
- ptuuid=device['ptuuid'],
- size=device['size'],
- tran=device['tran'],
- rota=device['rota'],
- free_space=free_space
- )
-
- @property
- def _device_or_backfile(self) -> Optional[str]:
- """
- Returns the actual device-endpoint of the BlockDevice.
- If it's a loop-back-device it returns the back-file,
- For other types it return self.device
- """
- if self.info.get('type') == 'loop':
- return self.info['back-file']
- else:
- return self.device
-
- @property
- def mountpoint(self) -> None:
- """
- A dummy function to enable transparent comparisons of mountpoints.
- As blockdevices can't be mounted directly, this will always be None
- """
- return None
-
- @property
- def device(self) -> Optional[str]:
- """
- Returns the device file of the BlockDevice.
- If it's a loop-back-device it returns the /dev/X device,
- If it's a ATA-drive it returns the /dev/X device
- And if it's a crypto-device it returns the parent device
- """
- if "DEVTYPE" not in self.info:
- raise DiskError(f'Could not locate backplane info for "{self._path}"')
-
- if self.info['DEVTYPE'] in ['disk','loop']:
- return self._path
- elif self.info['DEVTYPE'][:4] == 'raid':
- # This should catch /dev/md## raid devices
- return self._path
- elif self.info['DEVTYPE'] == 'crypt':
- if 'pkname' not in self.info:
- raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.')
- return f"/dev/{self.info['pkname']}"
- else:
- log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
-
- return None
-
- @property
- def partition_type(self) -> str:
- return self._block_info.pttype
-
- @property
- def uuid(self) -> str:
- return self._block_info.ptuuid
-
- @property
- def size(self) -> float:
- from .helpers import convert_size_to_gb
- return convert_size_to_gb(self._block_info.size)
-
- @property
- def bus_type(self) -> Optional[str]:
- return self._block_info.tran
-
- @property
- def spinning(self) -> bool:
- return self._block_info.rota
-
- @property
- def partitions(self) -> Dict[str, 'Partition']:
- return OrderedDict(sorted(self._partitions.items()))
-
- @property
- def partition(self) -> List['Partition']:
- return list(self.partitions.values())
-
- @property
- def first_free_sector(self) -> str:
- if block_size := self._largest_free_space():
- return block_size.start
- else:
- return '512MB'
-
- @property
- def first_end_sector(self) -> str:
- if block_size := self._largest_free_space():
- return block_size.end
- else:
- return f"{self.size}GB"
-
- def _safe_free_space(self) -> str:
- if self._block_info.free_space:
- sizes = [free_space.size for free_space in self._block_info.free_space]
- return '+'.join(sizes)
- return '?'
-
- def _largest_free_space(self) -> Optional[BlockSizeInfo]:
- if self._block_info.free_space:
- sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True)
- return sorted_sizes[0]
- return None
-
- def _partprobe(self) -> bool:
- return SysCommand(['partprobe', self._path]).exit_code == 0
-
- def flush_cache(self) -> None:
- self._load_partitions()
-
- def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition:
- if not uuid and not partuuid:
- raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.")
-
- log(f"Retrieving partition PARTUUID={partuuid} or UUID={uuid}", level=logging.DEBUG, fg="gray")
-
- for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)):
- for partition_index, partition in self.partitions.items():
- try:
- if uuid and partition.uuid and partition.uuid.lower() == uuid.lower():
- log(f"Matched UUID={uuid} against {partition.uuid}", level=logging.DEBUG, fg="gray")
- return partition
- elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower():
- log(f"Matched PARTUUID={partuuid} against {partition.part_uuid}", level=logging.DEBUG, fg="gray")
- return partition
- except DiskError as error:
- # Most likely a blockdevice that doesn't support or use UUID's
- # (like Microsoft recovery partition)
- log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray")
- pass
-
- log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
- self.flush_cache()
- time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
-
- log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
- log(f"Cache: {self._partitions}")
- log(f"Partitions: {self.partitions.items()}")
- raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.")
diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py
deleted file mode 100644
index a26e0160..00000000
--- a/archinstall/lib/disk/btrfs/__init__.py
+++ /dev/null
@@ -1,56 +0,0 @@
-from __future__ import annotations
-import pathlib
-import glob
-import logging
-from typing import Union, Dict, TYPE_CHECKING
-
-# https://stackoverflow.com/a/39757388/929999
-if TYPE_CHECKING:
- from ...installer import Installer
-
-from .btrfs_helpers import (
- subvolume_info_from_path as subvolume_info_from_path,
- find_parent_subvolume as find_parent_subvolume,
- setup_subvolumes as setup_subvolumes,
- mount_subvolume as mount_subvolume
-)
-from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume
-from .btrfspartition import BTRFSPartition as BTRFSPartition
-
-from ...exceptions import DiskError, Deprecated
-from ...general import SysCommand
-from ...output import log
-
-
-def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
- """
- This function uses btrfs to create a subvolume.
-
- @installation: archinstall.Installer instance
- @subvolume_location: a localized string or path inside the installation / or /boot for instance without specifying /mnt/boot
- """
-
- installation_mountpoint = installation.target
- if type(installation_mountpoint) == str:
- installation_mountpoint = pathlib.Path(installation_mountpoint)
- # Set up the required physical structure
- if type(subvolume_location) == str:
- subvolume_location = pathlib.Path(subvolume_location)
-
- target = installation_mountpoint / subvolume_location.relative_to(subvolume_location.anchor)
-
- # Difference from mount_subvolume:
- # We only check if the parent exists, since we'll run in to "target path already exists" otherwise
- if not target.parent.exists():
- target.parent.mkdir(parents=True)
-
- if glob.glob(str(target / '*')):
- raise DiskError(f"Cannot create subvolume at {target} because it contains data (non-empty folder target)")
-
- # Remove the target if it exists
- if target.exists():
- target.rmdir()
-
- log(f"Creating a subvolume on {target}", level=logging.INFO)
- if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
- raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py
deleted file mode 100644
index f6d2734a..00000000
--- a/archinstall/lib/disk/btrfs/btrfs_helpers.py
+++ /dev/null
@@ -1,136 +0,0 @@
-import logging
-import re
-from pathlib import Path
-from typing import Optional, Dict, Any, TYPE_CHECKING
-
-from ...models.subvolume import Subvolume
-from ...exceptions import SysCallError, DiskError
-from ...general import SysCommand
-from ...output import log
-from ...plugins import plugins
-from ..helpers import get_mount_info
-from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
-
-if TYPE_CHECKING:
- from .btrfspartition import BTRFSPartition
- from ...installer import Installer
-
-
-class fstab_btrfs_compression_plugin():
- def __init__(self, partition_dict):
- self.partition_dict = partition_dict
-
- def on_genfstab(self, installation):
- with open(f"{installation.target}/etc/fstab", 'r') as fh:
- fstab = fh.read()
-
- # Replace the {installation}/etc/fstab with entries
- # using the compress=zstd where the mountpoint has compression set.
- with open(f"{installation.target}/etc/fstab", 'w') as fh:
- for line in fstab.split('\n'):
- # So first we grab the mount options by using subvol=.*? as a locator.
- # And we also grab the mountpoint for the entry, for instance /var/log
- if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)):
- for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []):
- # We then locate the correct subvolume and check if it's compressed
- if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip():
- # We then sneak in the compress=zstd option if it doesn't already exist:
- # We skip entries where compression is already defined
- if ',compress=zstd,' not in line:
- line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}")
- break
-
- fh.write(f"{line}\n")
-
- return True
-
-
-def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume):
- # we normalize the subvolume name (getting rid of slash at the start if exists.
- # In our implementation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = subvolume.name.lstrip('/')
- mountpoint = Path(subvolume.mountpoint)
- installation_target = Path(installation.target)
-
- mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor)
- mountpoint.mkdir(parents=True, exist_ok=True)
- mount_options = subvolume.options + [f'subvol={name}']
-
- log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray")
- SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}")
-
-
-def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]):
- log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray")
-
- for subvolume in partition_dict['btrfs']['subvolumes']:
- # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load.
- # Every subvolume is created from the top of the hierarchy- and simplifies its further use
- name = subvolume.name.lstrip('/')
-
- # We create the subvolume using the BTRFSPartition instance.
- # That way we ensure not only easy access, but also accurate mount locations etc.
- partition_dict['device_instance'].create_subvolume(name, installation=installation)
-
- # Make the nodatacow processing now
- # It will be the main cause of creation of subvolumes which are not to be mounted
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- if subvolume.nodatacow:
- if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}")
-
- # Make the compress processing now
- # it is not an options which can be established by subvolume (but for whole file systems), and can be
- # set up via a simple attribute change in a directory (if empty). And here the directories are brand new
- # in this way only zstd compression is activaded
- # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated
-
- if subvolume.compress:
- if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]):
- if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0:
- raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}")
-
- if 'fstab_btrfs_compression_plugin' not in plugins:
- plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict)
-
-
-def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]:
- try:
- subvolume_name = ''
- result = {}
- for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")):
- if index == 0:
- subvolume_name = line.strip().decode('UTF-8')
- continue
-
- if b':' in line:
- key, value = line.strip().decode('UTF-8').split(':', 1)
-
- # A bit of a hack, until I figure out how @dataclass
- # allows for hooking in a pre-processor to do this we have to do it here:
- result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip()
-
- return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore
- except SysCallError as error:
- log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange")
-
- return None
-
-
-def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]:
- # A root path cannot have a parent
- if str(path) == '/':
- return None
-
- if found_mount := get_mount_info(str(path.parent), traverse=True, ignore=filters):
- if not (subvolume := subvolume_info_from_path(found_mount['target'])):
- if found_mount['target'] == '/':
- return None
-
- return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']])
-
- return subvolume
-
- return None
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
deleted file mode 100644
index d04c9b98..00000000
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ /dev/null
@@ -1,109 +0,0 @@
-import glob
-import pathlib
-import logging
-from typing import Optional, TYPE_CHECKING
-
-from ...exceptions import DiskError
-from ...storage import storage
-from ...output import log
-from ...general import SysCommand
-from ..partition import Partition
-from ..helpers import findmnt
-from .btrfs_helpers import (
- subvolume_info_from_path
-)
-
-if TYPE_CHECKING:
- from ...installer import Installer
- from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
-
-
-class BTRFSPartition(Partition):
- def __init__(self, *args, **kwargs):
- Partition.__init__(self, *args, **kwargs)
-
- @property
- def subvolumes(self):
- for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
- if '[' in filesystem.get('source', ''):
- yield subvolume_info_from_path(filesystem['target'])
-
- def iterate_children(struct):
- for c in struct.get('children', []):
- if '[' in child.get('source', ''):
- yield subvolume_info_from_path(c['target'])
-
- for sub_child in iterate_children(c):
- yield sub_child
-
- for child in iterate_children(filesystem):
- yield child
-
- def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo':
- """
- Subvolumes have to be created within a mountpoint.
- This means we need to get the current installation target.
- After we get it, we need to verify it is a btrfs subvolume filesystem.
- Finally, the destination must be empty.
- """
-
- # Allow users to override the installation session
- if not installation:
- installation = storage.get('installation_session')
-
- # Determain if the path given, is an absolute path or a relative path.
- # We do this by checking if the path contains a known mountpoint.
- if str(subvolume)[0] == '/':
- if filesystems := findmnt(subvolume, traverse=True).get('filesystems'):
- if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target):
- # Path starts with a known mountpoint which isn't /
- # Which means it's an absolute path to a mounted location.
- pass
- else:
- # Since it's not an absolute position with a known start.
- # We omit the anchor ('/' basically) and make sure it's appendable
- # to the installation.target later
- subvolume = subvolume.relative_to(subvolume.anchor)
- # else: We don't need to do anything about relative paths, they should be appendable to installation.target as-is.
-
- # If the subvolume is not absolute, then we do two checks:
- # 1. Check if the partition itself is mounted somewhere, and use that as a root
- # 2. Use an active Installer().target as the root, assuming it's filesystem is btrfs
- # If both above fail, we need to warn the user that such setup is not supported.
- if str(subvolume)[0] != '/':
- if self.mountpoint is None and installation is None:
- raise DiskError("When creating a subvolume on BTRFSPartition()'s, you need to either initiate a archinstall.Installer() or give absolute paths when creating the subvoulme.")
- elif self.mountpoint:
- subvolume = self.mountpoint / subvolume
- elif installation:
- ongoing_installation_destination = installation.target
- if type(ongoing_installation_destination) == str:
- ongoing_installation_destination = pathlib.Path(ongoing_installation_destination)
-
- subvolume = ongoing_installation_destination / subvolume
-
- subvolume.parent.mkdir(parents=True, exist_ok=True)
-
- # <!--
- # We perform one more check from the given absolute position.
- # And we traverse backwards in order to locate any if possible subvolumes above
- # our new btrfs subvolume. This is because it needs to be mounted under it to properly
- # function.
- # if btrfs_parent := find_parent_subvolume(subvolume):
- # print('Found parent:', btrfs_parent)
- # -->
-
- log(f'Attempting to create subvolume at {subvolume}', level=logging.DEBUG, fg="grey")
-
- if glob.glob(str(subvolume / '*')):
- raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)")
- # Ideally we would like to check if the destination is already a subvolume.
- # But then we would need the mount-point at this stage as well.
- # So we'll comment out this check:
- # elif subvolinfo := subvolume_info_from_path(subvolume):
- # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}")
-
- # And deal with it here:
- SysCommand(f"btrfs subvolume create {subvolume}")
-
- return subvolume_info_from_path(subvolume)
diff --git a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
deleted file mode 100644
index 5f5bdea6..00000000
--- a/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py
+++ /dev/null
@@ -1,192 +0,0 @@
-import pathlib
-import datetime
-import logging
-import string
-import random
-import shutil
-from dataclasses import dataclass
-from typing import Optional, List# , TYPE_CHECKING
-from functools import cached_property
-
-# if TYPE_CHECKING:
-# from ..blockdevice import BlockDevice
-
-from ...exceptions import DiskError
-from ...general import SysCommand
-from ...output import log
-from ...storage import storage
-
-
-@dataclass
-class BtrfsSubvolumeInfo:
- full_path :pathlib.Path
- name :str
- uuid :str
- parent_uuid :str
- creation_time :datetime.datetime
- subvolume_id :int
- generation :int
- gen_at_creation :int
- parent_id :int
- top_level_id :int
- send_transid :int
- send_time :datetime.datetime
- receive_transid :int
- received_uuid :Optional[str] = None
- flags :Optional[str] = None
- receive_time :Optional[datetime.datetime] = None
- snapshots :Optional[List] = None
-
- def __post_init__(self):
- self.full_path = pathlib.Path(self.full_path)
-
- # Convert "-" entries to `None`
- if self.parent_uuid == "-":
- self.parent_uuid = None
- if self.received_uuid == "-":
- self.received_uuid = None
- if self.flags == "-":
- self.flags = None
- if self.receive_time == "-":
- self.receive_time = None
- if self.snapshots == "":
- self.snapshots = []
-
- # Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
- self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
- self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
- if self.receive_time:
- self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))
-
- @property
- def parent_subvolume(self):
- from .btrfs_helpers import find_parent_subvolume
-
- return find_parent_subvolume(self.full_path)
-
- @property
- def root(self) -> bool:
- from .btrfs_helpers import subvolume_info_from_path
-
- # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
- # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
- # It would also be nice if it could use findmnt(self.full_path) and traverse backwards
- # finding the last occurrence of a subvolume which 'self' belongs to.
- if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
- return self.full_path == volume.full_path
-
- return False
-
- @cached_property
- def partition(self):
- from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
- from ..partition import Partition
- from ..blockdevice import BlockDevice
- from ..mapperdev import MapperDev
- from .btrfspartition import BTRFSPartition
- from .btrfs_helpers import subvolume_info_from_path
-
- try:
- # If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
- if filesystem := findmnt(self.full_path).get('filesystems', []):
- if source := filesystem[0].get('source', None):
- # Strip away subvolume definitions from findmnt
- if '[' in source:
- source = source[:source.find('[')]
-
- if filesystem[0].get('fstype', '') == 'btrfs':
- return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
- elif filesystem[0].get('source', '').startswith('/dev/mapper'):
- return MapperDev(source)
- else:
- return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
- except DiskError:
- # Subvolume has never been mounted, we have no reliable way of finding where it is.
- # But we have the UUID of the partition, and can begin looking for it by mounting
- # all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.
-
- log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
- for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
- if type(instance) in (Partition, MapperDev):
- we_mounted_it = False
- detection_mountpoint = instance.mountpoint
- if not detection_mountpoint:
- if type(instance) == Partition and instance.encrypted:
- # TODO: Perhaps support unlocking encrypted volumes?
- # This will cause a lot of potential user interactions tho.
- log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
- continue
-
- detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
- detection_mountpoint.mkdir(parents=True, exist_ok=True)
-
- instance.mount(str(detection_mountpoint))
- we_mounted_it = True
-
- if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
- if subvolume := subvolume_info_from_path(filesystem[0]['target']):
- if subvolume.uuid == self.uuid:
- # The top level subvolume matched of ourselves,
- # which means the instance we're iterating has the subvol we're looking for.
- log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
- return instance
-
- def iterate_children(struct):
- for child in struct.get('children', []):
- if '[' in child.get('source', ''):
- yield subvolume_info_from_path(child['target'])
-
- for sub_child in iterate_children(child):
- yield sub_child
-
- for child in iterate_children(filesystem[0]):
- if child.uuid == self.uuid:
- # We found a child within the instance that has the subvol we're looking for.
- log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
- return instance
-
- if we_mounted_it:
- instance.unmount()
- shutil.rmtree(detection_mountpoint)
-
- @cached_property
- def mount_options(self) -> Optional[List[str]]:
- from ..helpers import findmnt
-
- if filesystem := findmnt(self.full_path).get('filesystems', []):
- return filesystem[0].get('options').split(',')
-
- def convert_to_ISO_format(self, time_string):
- time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
- iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
- return iso_string
-
- def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
- from ..helpers import findmnt
-
- try:
- if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
- log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
- SysCommand(f"umount {mountpoint}")
- except DiskError:
- # No previously mounted device at the mountpoint
- pass
-
- if not options:
- options = []
-
- try:
- if include_previously_known_options and (cached_options := self.mount_options):
- options += cached_options
- except DiskError:
- pass
-
- if not any('subvol=' in x for x in options):
- options += f'subvol={self.name}'
-
- SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
- log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")
-
- def unmount(self, recurse :bool = True):
- SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
- log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py
new file mode 100644
index 00000000..7ba70382
--- /dev/null
+++ b/archinstall/lib/disk/device_handler.py
@@ -0,0 +1,809 @@
+from __future__ import annotations
+
+import json
+import os
+import logging
+import time
+from pathlib import Path
+from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal, Iterable
+
+from parted import ( # type: ignore
+ Disk, Geometry, FileSystem,
+ PartitionException, DiskLabelException,
+ getDevice, getAllDevices, freshDisk, Partition, Device
+)
+
+from .device_model import (
+ DeviceModification, PartitionModification,
+ BDevice, _DeviceInfo, _PartitionInfo,
+ FilesystemType, Unit, PartitionTable,
+ ModificationStatus, get_lsblk_info, LsblkInfo,
+ _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption, LvmVolumeGroup, LvmVolume, Size, LvmGroupInfo,
+ SectorSize, LvmVolumeInfo, LvmPVInfo, SubvolumeModification, BtrfsMountOption
+)
+
+from ..exceptions import DiskError, UnknownFilesystemFormat
+from ..general import SysCommand, SysCallError, JSON, SysCommandWorker
+from ..luks import Luks2
+from ..output import debug, error, info, warn, log
+from ..utils.util import is_subpath
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class DeviceHandler(object):
+ _TMP_BTRFS_MOUNT = Path('/mnt/arch_btrfs')
+
+ def __init__(self):
+ self._devices: Dict[Path, BDevice] = {}
+ self.load_devices()
+
+ @property
+ def devices(self) -> List[BDevice]:
+ return list(self._devices.values())
+
+ def load_devices(self):
+ block_devices = {}
+
+ devices = getAllDevices()
+
+ try:
+ loop_devices = SysCommand(['losetup', '-a'])
+ for ld_info in str(loop_devices).splitlines():
+ loop_device = getDevice(ld_info.split(':', maxsplit=1)[0])
+ devices.append(loop_device)
+ except Exception as err:
+ debug(f'Failed to get loop devices: {err}')
+
+ for device in devices:
+ if get_lsblk_info(device.path).type == 'rom':
+ continue
+
+ try:
+ disk = Disk(device)
+ except DiskLabelException as err:
+ if 'unrecognised disk label' in getattr(error, 'message', str(err)):
+ disk = freshDisk(device, PartitionTable.GPT.value)
+ else:
+ debug(f'Unable to get disk from device: {device}')
+ continue
+
+ device_info = _DeviceInfo.from_disk(disk)
+ partition_infos = []
+
+ for partition in disk.partitions:
+ lsblk_info = get_lsblk_info(partition.path)
+ fs_type = self._determine_fs_type(partition, lsblk_info)
+ subvol_infos = []
+
+ if fs_type == FilesystemType.Btrfs:
+ subvol_infos = self.get_btrfs_info(partition.path)
+
+ partition_infos.append(
+ _PartitionInfo.from_partition(
+ partition,
+ fs_type,
+ lsblk_info.partn,
+ lsblk_info.partuuid,
+ lsblk_info.uuid,
+ lsblk_info.mountpoints,
+ subvol_infos
+ )
+ )
+
+ block_device = BDevice(disk, device_info, partition_infos)
+ block_devices[block_device.device_info.path] = block_device
+
+ self._devices = block_devices
+
+ def _determine_fs_type(
+ self,
+ partition: Partition,
+ lsblk_info: Optional[LsblkInfo] = None
+ ) -> Optional[FilesystemType]:
+ try:
+ if partition.fileSystem:
+ return FilesystemType(partition.fileSystem.type)
+ elif lsblk_info is not None:
+ return FilesystemType(lsblk_info.fstype) if lsblk_info.fstype else None
+ return None
+ except ValueError:
+ debug(f'Could not determine the filesystem: {partition.fileSystem}')
+
+ return None
+
+ def get_device(self, path: Path) -> Optional[BDevice]:
+ return self._devices.get(path, None)
+
+ def get_device_by_partition_path(self, partition_path: Path) -> Optional[BDevice]:
+ partition = self.find_partition(partition_path)
+ if partition:
+ device: Device = partition.disk.device
+ return self.get_device(Path(device.path))
+ return None
+
+ def find_partition(self, path: Path) -> Optional[_PartitionInfo]:
+ for device in self._devices.values():
+ part = next(filter(lambda x: str(x.path) == str(path), device.partition_infos), None)
+ if part is not None:
+ return part
+ return None
+
+ def get_parent_device_path(self, dev_path: Path) -> Path:
+ lsblk = get_lsblk_info(dev_path)
+ return Path(f'/dev/{lsblk.pkname}')
+
+ def get_unique_path_for_device(self, dev_path: Path) -> Optional[Path]:
+ paths = Path('/dev/disk/by-id').glob('*')
+ linked_targets = {p.resolve(): p for p in paths}
+ linked_wwn_targets = {p: linked_targets[p] for p in linked_targets
+ if p.name.startswith('wwn-') or p.name.startswith('nvme-eui.')}
+
+ if dev_path in linked_wwn_targets:
+ return linked_wwn_targets[dev_path]
+
+ if dev_path in linked_targets:
+ return linked_targets[dev_path]
+
+ return None
+
+ def get_uuid_for_path(self, path: Path) -> Optional[str]:
+ partition = self.find_partition(path)
+ return partition.partuuid if partition else None
+
+ def get_btrfs_info(self, dev_path: Path) -> List[_BtrfsSubvolumeInfo]:
+ lsblk_info = get_lsblk_info(dev_path)
+ subvol_infos: List[_BtrfsSubvolumeInfo] = []
+
+ if not lsblk_info.mountpoint:
+ self.mount(dev_path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
+ mountpoint = self._TMP_BTRFS_MOUNT
+ else:
+ # when multiple subvolumes are mounted then the lsblk output may look like
+ # "mountpoint": "/mnt/archinstall/.snapshots"
+ # "mountpoints": ["/mnt/archinstall/.snapshots", "/mnt/archinstall/home", ..]
+ # so we'll determine the minimum common path and assume that's the root
+ path_strings = [str(m) for m in lsblk_info.mountpoints]
+ common_prefix = os.path.commonprefix(path_strings)
+ mountpoint = Path(common_prefix)
+
+ try:
+ result = SysCommand(f'btrfs subvolume list {mountpoint}').decode()
+ except SysCallError as err:
+ debug(f'Failed to read btrfs subvolume information: {err}')
+ return subvol_infos
+
+ try:
+ # ID 256 gen 16 top level 5 path @
+ for line in result.splitlines():
+ # expected output format:
+ # ID 257 gen 8 top level 5 path @home
+ name = Path(line.split(' ')[-1])
+ sub_vol_mountpoint = lsblk_info.btrfs_subvol_info.get(name, None)
+ subvol_infos.append(_BtrfsSubvolumeInfo(name, sub_vol_mountpoint))
+ except json.decoder.JSONDecodeError as err:
+ error(f"Could not decode lsblk JSON: {result}")
+ raise err
+
+ if not lsblk_info.mountpoint:
+ self.umount(dev_path)
+
+ return subvol_infos
+
+ def format(
+ self,
+ fs_type: FilesystemType,
+ path: Path,
+ additional_parted_options: List[str] = []
+ ):
+ options = []
+ command = ''
+
+ match fs_type:
+ case FilesystemType.Btrfs:
+ options += ['-f']
+ command += 'mkfs.btrfs'
+ case FilesystemType.Fat16:
+ options += ['-F16']
+ command += 'mkfs.fat'
+ case FilesystemType.Fat32:
+ options += ['-F32']
+ command += 'mkfs.fat'
+ case FilesystemType.Ext2:
+ options += ['-F']
+ command += 'mkfs.ext2'
+ case FilesystemType.Ext3:
+ options += ['-F']
+ command += 'mkfs.ext3'
+ case FilesystemType.Ext4:
+ options += ['-F']
+ command += 'mkfs.ext4'
+ case FilesystemType.Xfs:
+ options += ['-f']
+ command += 'mkfs.xfs'
+ case FilesystemType.F2fs:
+ options += ['-f']
+ command += 'mkfs.f2fs'
+ case FilesystemType.Ntfs:
+ options += ['-f', '-Q']
+ command += 'mkfs.ntfs'
+ case FilesystemType.Reiserfs:
+ command += 'mkfs.reiserfs'
+ case _:
+ raise UnknownFilesystemFormat(f'Filetype "{fs_type.value}" is not supported')
+
+ options += additional_parted_options
+ options_str = ' '.join(options)
+
+ debug(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
+
+ try:
+ SysCommand(f"/usr/bin/{command} {options_str} {path}")
+ except SysCallError as err:
+ msg = f'Could not format {path} with {fs_type.value}: {err.message}'
+ error(msg)
+ raise DiskError(msg) from err
+
+ def encrypt(
+ self,
+ dev_path: Path,
+ mapper_name: Optional[str],
+ enc_password: str,
+ lock_after_create: bool = True
+ ) -> Luks2:
+ luks_handler = Luks2(
+ dev_path,
+ mapper_name=mapper_name,
+ password=enc_password
+ )
+
+ key_file = luks_handler.encrypt()
+
+ luks_handler.unlock(key_file=key_file)
+
+ if not luks_handler.mapper_dev:
+ raise DiskError('Failed to unlock luks device')
+
+ if lock_after_create:
+ debug(f'luks2 locking device: {dev_path}')
+ luks_handler.lock()
+
+ return luks_handler
+
+ def format_encrypted(
+ self,
+ dev_path: Path,
+ mapper_name: Optional[str],
+ fs_type: FilesystemType,
+ enc_conf: DiskEncryption
+ ):
+ luks_handler = Luks2(
+ dev_path,
+ mapper_name=mapper_name,
+ password=enc_conf.encryption_password
+ )
+
+ key_file = luks_handler.encrypt()
+
+ luks_handler.unlock(key_file=key_file)
+
+ if not luks_handler.mapper_dev:
+ raise DiskError('Failed to unlock luks device')
+
+ info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
+ self.format(fs_type, luks_handler.mapper_dev)
+
+ info(f'luks2 locking device: {dev_path}')
+ luks_handler.lock()
+
+ def _lvm_info(
+ self,
+ cmd: str,
+ info_type: Literal['lv', 'vg', 'pvseg']
+ ) -> Optional[Any]:
+ raw_info = SysCommand(cmd).decode().split('\n')
+
+ # for whatever reason the output sometimes contains
+ # "File descriptor X leaked leaked on vgs invocation
+ data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw])
+
+ debug(f'LVM info: {data}')
+
+ reports = json.loads(data)
+
+ for report in reports['report']:
+ if len(report[info_type]) != 1:
+ raise ValueError(f'Report does not contain any entry')
+
+ entry = report[info_type][0]
+
+ match info_type:
+ case 'pvseg':
+ return LvmPVInfo(
+ pv_name=Path(entry['pv_name']),
+ lv_name=entry['lv_name'],
+ vg_name=entry['vg_name'],
+ )
+ case 'lv':
+ return LvmVolumeInfo(
+ lv_name=entry['lv_name'],
+ vg_name=entry['vg_name'],
+ lv_size=Size(int(entry[f'lv_size'][:-1]), Unit.B, SectorSize.default())
+ )
+ case 'vg':
+ return LvmGroupInfo(
+ vg_uuid=entry['vg_uuid'],
+ vg_size=Size(int(entry[f'vg_size'][:-1]), Unit.B, SectorSize.default())
+ )
+
+ return None
+
+ def _lvm_info_with_retry(self, cmd: str, info_type: Literal['lv', 'vg', 'pvseg']) -> Optional[Any]:
+ attempts = 3
+
+ for attempt_nr in range(attempts):
+ try:
+ return self._lvm_info(cmd, info_type)
+ except ValueError:
+ time.sleep(attempt_nr + 1)
+
+ raise ValueError(f'Failed to fetch {info_type} information')
+
+ def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]:
+ cmd = (
+ 'lvs --reportformat json '
+ '--unit B '
+ f'-S lv_name={lv_name}'
+ )
+
+ return self._lvm_info_with_retry(cmd, 'lv')
+
+ def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]:
+ cmd = (
+ 'vgs --reportformat json '
+ '--unit B '
+ '-o vg_name,vg_uuid,vg_size '
+ f'-S vg_name={vg_name}'
+ )
+
+ return self._lvm_info_with_retry(cmd, 'vg')
+
+ def lvm_pvseg_info(self, vg_name: str, lv_name: str) -> Optional[LvmPVInfo]:
+ cmd = (
+ 'pvs '
+ '--segments -o+lv_name,vg_name '
+ f'-S vg_name={vg_name},lv_name={lv_name} '
+ '--reportformat json '
+ )
+
+ return self._lvm_info_with_retry(cmd, 'pvseg')
+
+ def lvm_vol_change(self, vol: LvmVolume, activate: bool):
+ active_flag = 'y' if activate else 'n'
+ cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}'
+
+ debug(f'lvchange volume: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_export_vg(self, vg: LvmVolumeGroup):
+ cmd = f'vgexport {vg.name}'
+
+ debug(f'vgexport: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_import_vg(self, vg: LvmVolumeGroup):
+ cmd = f'vgimport {vg.name}'
+
+ debug(f'vgimport: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_vol_reduce(self, vol_path: Path, amount: Size):
+ val = amount.format_size(Unit.B, include_unit=False)
+ cmd = f'lvreduce -L -{val}B {vol_path}'
+
+ debug(f'Reducing LVM volume size: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_pv_create(self, pvs: Iterable[Path]):
+ cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs])
+ debug(f'Creating LVM PVS: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str):
+ pvs_str = ' '.join([str(pv) for pv in pvs])
+ cmd = f'vgcreate --yes {vg_name} {pvs_str}'
+
+ debug(f'Creating LVM group: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None):
+ if offset is not None:
+ length = volume.length - offset
+ else:
+ length = volume.length
+
+ length_str = length.format_size(Unit.B, include_unit=False)
+ cmd = f'lvcreate --yes -L {length_str}B {vg_name} -n {volume.name}'
+
+ debug(f'Creating volume: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ volume.vg_name = vg_name
+ volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}')
+
+ def _setup_partition(
+ self,
+ part_mod: PartitionModification,
+ block_device: BDevice,
+ disk: Disk,
+ requires_delete: bool
+ ):
+ # when we require a delete and the partition to be (re)created
+ # already exists then we have to delete it first
+ if requires_delete and part_mod.status in [ModificationStatus.Modify, ModificationStatus.Delete]:
+ info(f'Delete existing partition: {part_mod.safe_dev_path}')
+ part_info = self.find_partition(part_mod.safe_dev_path)
+
+ if not part_info:
+ raise DiskError(f'No partition for dev path found: {part_mod.safe_dev_path}')
+
+ disk.deletePartition(part_info.partition)
+
+ if part_mod.status == ModificationStatus.Delete:
+ return
+
+ start_sector = part_mod.start.convert(
+ Unit.sectors,
+ block_device.device_info.sector_size
+ )
+
+ length_sector = part_mod.length.convert(
+ Unit.sectors,
+ block_device.device_info.sector_size
+ )
+
+ geometry = Geometry(
+ device=block_device.disk.device,
+ start=start_sector.value,
+ length=length_sector.value
+ )
+
+ filesystem = FileSystem(type=part_mod.safe_fs_type.value, geometry=geometry)
+
+ partition = Partition(
+ disk=disk,
+ type=part_mod.type.get_partition_code(),
+ fs=filesystem,
+ geometry=geometry
+ )
+
+ for flag in part_mod.flags:
+ partition.setFlag(flag.value)
+
+ debug(f'\tType: {part_mod.type.value}')
+ debug(f'\tFilesystem: {part_mod.safe_fs_type.value}')
+ debug(f'\tGeometry: {start_sector.value} start sector, {length_sector.value} length')
+
+ try:
+ disk.addPartition(partition=partition, constraint=disk.device.optimalAlignedConstraint)
+ except PartitionException as ex:
+ raise DiskError(f'Unable to add partition, most likely due to overlapping sectors: {ex}') from ex
+
+ # the partition has a path now that it has been added
+ part_mod.dev_path = Path(partition.path)
+
+ def fetch_part_info(self, path: Path) -> LsblkInfo:
+ lsblk_info = get_lsblk_info(path)
+
+ if not lsblk_info.partn:
+ debug(f'Unable to determine new partition number: {path}\n{lsblk_info}')
+ raise DiskError(f'Unable to determine new partition number: {path}')
+
+ if not lsblk_info.partuuid:
+ debug(f'Unable to determine new partition uuid: {path}\n{lsblk_info}')
+ raise DiskError(f'Unable to determine new partition uuid: {path}')
+
+ if not lsblk_info.uuid:
+ debug(f'Unable to determine new uuid: {path}\n{lsblk_info}')
+ raise DiskError(f'Unable to determine new uuid: {path}')
+
+ debug(f'partition information found: {lsblk_info.json()}')
+
+ return lsblk_info
+
+ def create_lvm_btrfs_subvolumes(
+ self,
+ path: Path,
+ btrfs_subvols: List[SubvolumeModification],
+ mount_options: List[str]
+ ):
+ info(f'Creating subvolumes: {path}')
+
+ self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
+
+ for sub_vol in btrfs_subvols:
+ debug(f'Creating subvolume: {sub_vol.name}')
+
+ subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
+
+ SysCommand(f"btrfs subvolume create {subvol_path}")
+
+ if BtrfsMountOption.nodatacow.value in mount_options:
+ try:
+ SysCommand(f'chattr +C {subvol_path}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}')
+
+ if BtrfsMountOption.compress.value in mount_options:
+ try:
+ SysCommand(f'chattr +c {subvol_path}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}')
+
+ self.umount(path)
+
+ def create_btrfs_volumes(
+ self,
+ part_mod: PartitionModification,
+ enc_conf: Optional['DiskEncryption'] = None
+ ):
+ info(f'Creating subvolumes: {part_mod.safe_dev_path}')
+
+ luks_handler = None
+
+ # unlock the partition first if it's encrypted
+ if enc_conf is not None and part_mod in enc_conf.partitions:
+ if not part_mod.mapper_name:
+ raise ValueError('No device path specified for modification')
+
+ luks_handler = self.unlock_luks2_dev(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ enc_conf.encryption_password
+ )
+
+ if not luks_handler.mapper_dev:
+ raise DiskError('Failed to unlock luks device')
+
+ self.mount(
+ luks_handler.mapper_dev,
+ self._TMP_BTRFS_MOUNT,
+ create_target_mountpoint=True,
+ options=part_mod.mount_options
+ )
+ else:
+ self.mount(
+ part_mod.safe_dev_path,
+ self._TMP_BTRFS_MOUNT,
+ create_target_mountpoint=True,
+ options=part_mod.mount_options
+ )
+
+ for sub_vol in part_mod.btrfs_subvols:
+ debug(f'Creating subvolume: {sub_vol.name}')
+
+ if luks_handler is not None:
+ subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
+ else:
+ subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
+
+ SysCommand(f"btrfs subvolume create {subvol_path}")
+
+ if luks_handler is not None and luks_handler.mapper_dev is not None:
+ self.umount(luks_handler.mapper_dev)
+ luks_handler.lock()
+ else:
+ self.umount(part_mod.safe_dev_path)
+
+ def unlock_luks2_dev(self, dev_path: Path, mapper_name: str, enc_password: str) -> Luks2:
+ luks_handler = Luks2(dev_path, mapper_name=mapper_name, password=enc_password)
+
+ if not luks_handler.is_unlocked():
+ luks_handler.unlock()
+
+ if not luks_handler.is_unlocked():
+ raise DiskError(f'Failed to unlock luks2 device: {dev_path}')
+
+ return luks_handler
+
+ def umount_all_existing(self, device_path: Path):
+ debug(f'Unmounting all existing partitions: {device_path}')
+
+ existing_partitions = self._devices[device_path].partition_infos
+
+ for partition in existing_partitions:
+ debug(f'Unmounting: {partition.path}')
+
+ # un-mount for existing encrypted partitions
+ if partition.fs_type == FilesystemType.Crypto_luks:
+ Luks2(partition.path).lock()
+ else:
+ self.umount(partition.path, recursive=True)
+
+ def partition(
+ self,
+ modification: DeviceModification,
+ partition_table: Optional[PartitionTable] = None
+ ):
+ """
+ Create a partition table on the block device and create all partitions.
+ """
+ if modification.wipe:
+ if partition_table is None:
+ raise ValueError('Modification is marked as wipe but no partitioning table was provided')
+
+ if partition_table.MBR and len(modification.partitions) > 3:
+ raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions')
+
+ # make sure all devices are unmounted
+ self.umount_all_existing(modification.device_path)
+
+ # WARNING: the entire device will be wiped and all data lost
+ if modification.wipe:
+ self.wipe_dev(modification.device)
+ part_table = partition_table.value if partition_table else None
+ disk = freshDisk(modification.device.disk.device, part_table)
+ else:
+ info(f'Use existing device: {modification.device_path}')
+ disk = modification.device.disk
+
+ info(f'Creating partitions: {modification.device_path}')
+
+ # don't touch existing partitions
+ filtered_part = [p for p in modification.partitions if not p.exists()]
+
+ for part_mod in filtered_part:
+ # if the entire disk got nuked then we don't have to delete
+ # any existing partitions anymore because they're all gone already
+ requires_delete = modification.wipe is False
+ self._setup_partition(part_mod, modification.device, disk, requires_delete=requires_delete)
+
+ disk.commit()
+
+ def mount(
+ self,
+ dev_path: Path,
+ target_mountpoint: Path,
+ mount_fs: Optional[str] = None,
+ create_target_mountpoint: bool = True,
+ options: List[str] = []
+ ):
+ if create_target_mountpoint and not target_mountpoint.exists():
+ target_mountpoint.mkdir(parents=True, exist_ok=True)
+
+ if not target_mountpoint.exists():
+ raise ValueError('Target mountpoint does not exist')
+
+ lsblk_info = get_lsblk_info(dev_path)
+ if target_mountpoint in lsblk_info.mountpoints:
+ info(f'Device already mounted at {target_mountpoint}')
+ return
+
+ cmd = ['mount']
+
+ if len(options):
+ cmd.extend(('-o', ','.join(options)))
+ if mount_fs:
+ cmd.extend(('-t', mount_fs))
+
+ cmd.extend((str(dev_path), str(target_mountpoint)))
+
+ command = ' '.join(cmd)
+
+ debug(f'Mounting {dev_path}: {command}')
+
+ try:
+ SysCommand(command)
+ except SysCallError as err:
+ raise DiskError(f'Could not mount {dev_path}: {command}\n{err.message}')
+
+ def umount(self, mountpoint: Path, recursive: bool = False):
+ try:
+ lsblk_info = get_lsblk_info(mountpoint)
+ except SysCallError as ex:
+ # this could happen if before partitioning the device contained 3 partitions
+ # and after partitioning only 2 partitions were created, then the modifications object
+ # will have a reference to /dev/sX3 which is being tried to umount here now
+ if 'not a block device' in ex.message:
+ return
+ raise ex
+
+ if len(lsblk_info.mountpoints) > 0:
+ debug(f'Partition {mountpoint} is currently mounted at: {[str(m) for m in lsblk_info.mountpoints]}')
+
+ for mountpoint in lsblk_info.mountpoints:
+ debug(f'Unmounting mountpoint: {mountpoint}')
+
+ command = 'umount'
+
+ if recursive:
+ command += ' -R'
+
+ SysCommand(f'{command} {mountpoint}')
+
+ def detect_pre_mounted_mods(self, base_mountpoint: Path) -> List[DeviceModification]:
+ part_mods: Dict[Path, List[PartitionModification]] = {}
+
+ for device in self.devices:
+ for part_info in device.partition_infos:
+ for mountpoint in part_info.mountpoints:
+ if is_subpath(mountpoint, base_mountpoint):
+ path = Path(part_info.disk.device.path)
+ part_mods.setdefault(path, [])
+ part_mod = PartitionModification.from_existing_partition(part_info)
+ if part_mod.mountpoint:
+ part_mod.mountpoint = mountpoint.root / mountpoint.relative_to(base_mountpoint)
+ else:
+ for subvol in part_mod.btrfs_subvols:
+ if sm := subvol.mountpoint:
+ subvol.mountpoint = sm.root / sm.relative_to(base_mountpoint)
+ part_mods[path].append(part_mod)
+ break
+
+ device_mods: List[DeviceModification] = []
+ for device_path, mods in part_mods.items():
+ device_mod = DeviceModification(self._devices[device_path], False, mods)
+ device_mods.append(device_mod)
+
+ return device_mods
+
+ def partprobe(self, path: Optional[Path] = None):
+ if path is not None:
+ command = f'partprobe {path}'
+ else:
+ command = 'partprobe'
+
+ try:
+ debug(f'Calling partprobe: {command}')
+ SysCommand(command)
+ except SysCallError as err:
+ if 'have been written, but we have been unable to inform the kernel of the change' in str(err):
+ log(f"Partprobe was not able to inform the kernel of the new disk state (ignoring error): {err}", fg="gray", level=logging.INFO)
+ else:
+ error(f'"{command}" failed to run (continuing anyway): {err}')
+
+ def _wipe(self, dev_path: Path):
+ """
+ Wipe a device (partition or otherwise) of meta-data, be it file system, LVM, etc.
+ @param dev_path: Device path of the partition to be wiped.
+ @type dev_path: str
+ """
+ with open(dev_path, 'wb') as p:
+ p.write(bytearray(1024))
+
+ def wipe_dev(self, block_device: BDevice):
+ """
+ Wipe the block device of meta-data, be it file system, LVM, etc.
+ This is not intended to be secure, but rather to ensure that
+ auto-discovery tools don't recognize anything here.
+ """
+ info(f'Wiping partitions and metadata: {block_device.device_info.path}')
+ for partition in block_device.partition_infos:
+ self._wipe(partition.path)
+
+ self._wipe(block_device.device_info.path)
+
+
+device_handler = DeviceHandler()
+
+
+def disk_layouts() -> str:
+ try:
+ lsblk_info = get_all_lsblk_info()
+ return json.dumps(lsblk_info, indent=4, sort_keys=True, cls=JSON)
+ except SysCallError as err:
+ warn(f"Could not return disk layouts: {err}")
+ return ''
+ except json.decoder.JSONDecodeError as err:
+ warn(f"Could not return disk layouts: {err}")
+ return ''
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
new file mode 100644
index 00000000..f98d05fb
--- /dev/null
+++ b/archinstall/lib/disk/device_model.py
@@ -0,0 +1,1499 @@
+from __future__ import annotations
+
+import dataclasses
+import json
+import math
+import uuid
+from dataclasses import dataclass, field
+from enum import Enum
+from enum import auto
+from pathlib import Path
+from typing import Optional, List, Dict, TYPE_CHECKING, Any
+from typing import Union
+
+import parted # type: ignore
+import _ped # type: ignore
+from parted import Disk, Geometry, Partition
+
+from ..exceptions import DiskError, SysCallError
+from ..general import SysCommand
+from ..output import debug, error
+from ..storage import storage
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class DiskLayoutType(Enum):
+ Default = 'default_layout'
+ Manual = 'manual_partitioning'
+ Pre_mount = 'pre_mounted_config'
+
+ def display_msg(self) -> str:
+ match self:
+ case DiskLayoutType.Default: return str(_('Use a best-effort default partition layout'))
+ case DiskLayoutType.Manual: return str(_('Manual Partitioning'))
+ case DiskLayoutType.Pre_mount: return str(_('Pre-mounted configuration'))
+
+
+@dataclass
+class DiskLayoutConfiguration:
+ config_type: DiskLayoutType
+ device_modifications: List[DeviceModification] = field(default_factory=list)
+ lvm_config: Optional[LvmConfiguration] = None
+
+ # used for pre-mounted config
+ mountpoint: Optional[Path] = None
+
+ def json(self) -> Dict[str, Any]:
+ if self.config_type == DiskLayoutType.Pre_mount:
+ return {
+ 'config_type': self.config_type.value,
+ 'mountpoint': str(self.mountpoint)
+ }
+ else:
+ config: Dict[str, Any] = {
+ 'config_type': self.config_type.value,
+ 'device_modifications': [mod.json() for mod in self.device_modifications],
+ }
+
+ if self.lvm_config:
+ config['lvm_config'] = self.lvm_config.json()
+
+ return config
+
+ @classmethod
+ def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]:
+ from .device_handler import device_handler
+
+ device_modifications: List[DeviceModification] = []
+ config_type = disk_config.get('config_type', None)
+
+ if not config_type:
+ raise ValueError('Missing disk layout configuration: config_type')
+
+ config = DiskLayoutConfiguration(
+ config_type=DiskLayoutType(config_type),
+ device_modifications=device_modifications
+ )
+
+ if config_type == DiskLayoutType.Pre_mount.value:
+ if not (mountpoint := disk_config.get('mountpoint')):
+ raise ValueError('Must set a mountpoint when layout type is pre-mount')
+
+ path = Path(str(mountpoint))
+
+ mods = device_handler.detect_pre_mounted_mods(path)
+ device_modifications.extend(mods)
+
+ storage['MOUNT_POINT'] = path
+
+ config.mountpoint = path
+
+ return config
+
+ for entry in disk_config.get('device_modifications', []):
+ device_path = Path(entry.get('device', None)) if entry.get('device', None) else None
+
+ if not device_path:
+ continue
+
+ device = device_handler.get_device(device_path)
+
+ if not device:
+ continue
+
+ device_modification = DeviceModification(
+ wipe=entry.get('wipe', False),
+ device=device
+ )
+
+ device_partitions: List[PartitionModification] = []
+
+ for partition in entry.get('partitions', []):
+ device_partition = PartitionModification(
+ status=ModificationStatus(partition['status']),
+ fs_type=FilesystemType(partition['fs_type']) if partition.get('fs_type') else None,
+ start=Size.parse_args(partition['start']),
+ length=Size.parse_args(partition['size']),
+ mount_options=partition['mount_options'],
+ mountpoint=Path(partition['mountpoint']) if partition['mountpoint'] else None,
+ dev_path=Path(partition['dev_path']) if partition['dev_path'] else None,
+ type=PartitionType(partition['type']),
+ flags=[PartitionFlag[f] for f in partition.get('flags', [])],
+ btrfs_subvols=SubvolumeModification.parse_args(partition.get('btrfs', [])),
+ )
+ # special 'invisible attr to internally identify the part mod
+ setattr(device_partition, '_obj_id', partition['obj_id'])
+ device_partitions.append(device_partition)
+
+ device_modification.partitions = device_partitions
+ device_modifications.append(device_modification)
+
+ # Parse LVM configuration from settings
+ if (lvm_arg := disk_config.get('lvm_config', None)) is not None:
+ config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config)
+
+ return config
+
+
+class PartitionTable(Enum):
+ GPT = 'gpt'
+ MBR = 'msdos'
+
+
+class Unit(Enum):
+ B = 1 # byte
+ kB = 1000 ** 1 # kilobyte
+ MB = 1000 ** 2 # megabyte
+ GB = 1000 ** 3 # gigabyte
+ TB = 1000 ** 4 # terabyte
+ PB = 1000 ** 5 # petabyte
+ EB = 1000 ** 6 # exabyte
+ ZB = 1000 ** 7 # zettabyte
+ YB = 1000 ** 8 # yottabyte
+
+ KiB = 1024 ** 1 # kibibyte
+ MiB = 1024 ** 2 # mebibyte
+ GiB = 1024 ** 3 # gibibyte
+ TiB = 1024 ** 4 # tebibyte
+ PiB = 1024 ** 5 # pebibyte
+ EiB = 1024 ** 6 # exbibyte
+ ZiB = 1024 ** 7 # zebibyte
+ YiB = 1024 ** 8 # yobibyte
+
+ sectors = 'sectors' # size in sector
+
+ @staticmethod
+ def get_all_units() -> List[str]:
+ return [u.name for u in Unit]
+
+ @staticmethod
+ def get_si_units() -> List[Unit]:
+ return [u for u in Unit if 'i' not in u.name and u.name != 'sectors']
+
+
+@dataclass
+class SectorSize:
+ value: int
+ unit: Unit
+
+ def __post_init__(self):
+ match self.unit:
+ case Unit.sectors:
+ raise ValueError('Unit type sector not allowed for SectorSize')
+
+ @staticmethod
+ def default() -> SectorSize:
+ return SectorSize(512, Unit.B)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'value': self.value,
+ 'unit': self.unit.name,
+ }
+
+ @classmethod
+ def parse_args(cls, arg: Dict[str, Any]) -> SectorSize:
+ return SectorSize(
+ arg['value'],
+ Unit[arg['unit']]
+ )
+
+ def normalize(self) -> int:
+ """
+ will normalize the value of the unit to Byte
+ """
+ return int(self.value * self.unit.value) # type: ignore
+
+
+@dataclass
+class Size:
+ value: int
+ unit: Unit
+ sector_size: SectorSize
+
+ def __post_init__(self):
+ if not isinstance(self.sector_size, SectorSize):
+ raise ValueError('sector size must be of type SectorSize')
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'value': self.value,
+ 'unit': self.unit.name,
+ 'sector_size': self.sector_size.json() if self.sector_size else None
+ }
+
+ @classmethod
+ def parse_args(cls, size_arg: Dict[str, Any]) -> Size:
+ sector_size = size_arg['sector_size']
+
+ return Size(
+ size_arg['value'],
+ Unit[size_arg['unit']],
+ SectorSize.parse_args(sector_size),
+ )
+
+ def convert(
+ self,
+ target_unit: Unit,
+ sector_size: Optional[SectorSize] = None
+ ) -> Size:
+ if target_unit == Unit.sectors and sector_size is None:
+ raise ValueError('If target has unit sector, a sector size must be provided')
+
+ if self.unit == target_unit:
+ return self
+ elif self.unit == Unit.sectors:
+ norm = self._normalize()
+ return Size(norm, Unit.B, self.sector_size).convert(target_unit, sector_size)
+ else:
+ if target_unit == Unit.sectors and sector_size is not None:
+ norm = self._normalize()
+ sectors = math.ceil(norm / sector_size.value)
+ return Size(sectors, Unit.sectors, sector_size)
+ else:
+ value = int(self._normalize() / target_unit.value) # type: ignore
+ return Size(value, target_unit, self.sector_size)
+
+ def as_text(self) -> str:
+ return self.format_size(
+ self.unit,
+ self.sector_size
+ )
+
+ def format_size(
+ self,
+ target_unit: Unit,
+ sector_size: Optional[SectorSize] = None,
+ include_unit: bool = True
+ ) -> str:
+ target_size = self.convert(target_unit, sector_size)
+
+ if include_unit:
+ return f'{target_size.value} {target_unit.name}'
+ return f'{target_size.value}'
+
+ def format_highest(self, include_unit: bool = True) -> str:
+ si_units = Unit.get_si_units()
+ all_si_values = [self.convert(si) for si in si_units]
+ filtered = filter(lambda x: x.value >= 1, all_si_values)
+
+ # we have to get the max by the unit value as we're interested
+ # in getting the value in the highest possible unit without floats
+ si_value = max(filtered, key=lambda x: x.unit.value)
+
+ if include_unit:
+ return f'{si_value.value} {si_value.unit.name}'
+ return f'{si_value.value}'
+
+ def _normalize(self) -> int:
+ """
+ will normalize the value of the unit to Byte
+ """
+ if self.unit == Unit.sectors and self.sector_size is not None:
+ return self.value * self.sector_size.normalize()
+ return int(self.value * self.unit.value) # type: ignore
+
+ def __sub__(self, other: Size) -> Size:
+ src_norm = self._normalize()
+ dest_norm = other._normalize()
+ return Size(abs(src_norm - dest_norm), Unit.B, self.sector_size)
+
+ def __add__(self, other: Size) -> Size:
+ src_norm = self._normalize()
+ dest_norm = other._normalize()
+ return Size(abs(src_norm + dest_norm), Unit.B, self.sector_size)
+
+ def __lt__(self, other):
+ return self._normalize() < other._normalize()
+
+ def __le__(self, other):
+ return self._normalize() <= other._normalize()
+
+ def __eq__(self, other):
+ return self._normalize() == other._normalize()
+
+ def __ne__(self, other):
+ return self._normalize() != other._normalize()
+
+ def __gt__(self, other):
+ return self._normalize() > other._normalize()
+
+ def __ge__(self, other):
+ return self._normalize() >= other._normalize()
+
+
+class BtrfsMountOption(Enum):
+ compress = 'compress=zstd'
+ nodatacow = 'nodatacow'
+
+
+@dataclass
+class _BtrfsSubvolumeInfo:
+ name: Path
+ mountpoint: Optional[Path]
+
+
+@dataclass
+class _PartitionInfo:
+ partition: Partition
+ name: str
+ type: PartitionType
+ fs_type: Optional[FilesystemType]
+ path: Path
+ start: Size
+ length: Size
+ flags: List[PartitionFlag]
+ partn: Optional[int]
+ partuuid: Optional[str]
+ uuid: Optional[str]
+ disk: Disk
+ mountpoints: List[Path]
+ btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = field(default_factory=list)
+
+ @property
+ def sector_size(self) -> SectorSize:
+ sector_size = self.partition.geometry.device.sectorSize
+ return SectorSize(sector_size, Unit.B)
+
+ def table_data(self) -> Dict[str, Any]:
+ end = self.start + self.length
+
+ part_info = {
+ 'Name': self.name,
+ 'Type': self.type.value,
+ 'Filesystem': self.fs_type.value if self.fs_type else str(_('Unknown')),
+ 'Path': str(self.path),
+ 'Start': self.start.format_size(Unit.sectors, self.sector_size, include_unit=False),
+ 'End': end.format_size(Unit.sectors, self.sector_size, include_unit=False),
+ 'Size': self.length.format_highest(),
+ 'Flags': ', '.join([f.name for f in self.flags])
+ }
+
+ if self.btrfs_subvol_infos:
+ part_info['Btrfs vol.'] = f'{len(self.btrfs_subvol_infos)} subvolumes'
+
+ return part_info
+
+ @classmethod
+ def from_partition(
+ cls,
+ partition: Partition,
+ fs_type: Optional[FilesystemType],
+ partn: Optional[int],
+ partuuid: Optional[str],
+ uuid: Optional[str],
+ mountpoints: List[Path],
+ btrfs_subvol_infos: List[_BtrfsSubvolumeInfo] = []
+ ) -> _PartitionInfo:
+ partition_type = PartitionType.get_type_from_code(partition.type)
+ flags = [f for f in PartitionFlag if partition.getFlag(f.value)]
+
+ start = Size(
+ partition.geometry.start,
+ Unit.sectors,
+ SectorSize(partition.disk.device.sectorSize, Unit.B)
+ )
+
+ length = Size(
+ int(partition.getLength(unit='B')),
+ Unit.B,
+ SectorSize(partition.disk.device.sectorSize, Unit.B)
+ )
+
+ return _PartitionInfo(
+ partition=partition,
+ name=partition.get_name(),
+ type=partition_type,
+ fs_type=fs_type,
+ path=partition.path,
+ start=start,
+ length=length,
+ flags=flags,
+ partn=partn,
+ partuuid=partuuid,
+ uuid=uuid,
+ disk=partition.disk,
+ mountpoints=mountpoints,
+ btrfs_subvol_infos=btrfs_subvol_infos
+ )
+
+
+@dataclass
+class _DeviceInfo:
+ model: str
+ path: Path
+ type: str
+ total_size: Size
+ free_space_regions: List[DeviceGeometry]
+ sector_size: SectorSize
+ read_only: bool
+ dirty: bool
+
+ def table_data(self) -> Dict[str, Any]:
+ total_free_space = sum([region.get_length(unit=Unit.MiB) for region in self.free_space_regions])
+ return {
+ 'Model': self.model,
+ 'Path': str(self.path),
+ 'Type': self.type,
+ 'Size': self.total_size.format_highest(),
+ 'Free space': int(total_free_space),
+ 'Sector size': self.sector_size.value,
+ 'Read only': self.read_only
+ }
+
+ @classmethod
+ def from_disk(cls, disk: Disk) -> _DeviceInfo:
+ device = disk.device
+ if device.type == 18:
+ device_type = 'loop'
+ elif device.type in parted.devices:
+ device_type = parted.devices[device.type]
+ else:
+ debug(f'Device code unknown: {device.type}')
+ device_type = parted.devices[parted.DEVICE_UNKNOWN]
+
+ sector_size = SectorSize(device.sectorSize, Unit.B)
+ free_space = [DeviceGeometry(g, sector_size) for g in disk.getFreeSpaceRegions()]
+
+ sector_size = SectorSize(device.sectorSize, Unit.B)
+
+ return _DeviceInfo(
+ model=device.model.strip(),
+ path=Path(device.path),
+ type=device_type,
+ sector_size=sector_size,
+ total_size=Size(int(device.getLength(unit='B')), Unit.B, sector_size),
+ free_space_regions=free_space,
+ read_only=device.readOnly,
+ dirty=device.dirty
+ )
+
+
+@dataclass
+class SubvolumeModification:
+ name: Path
+ mountpoint: Optional[Path] = None
+
+ @classmethod
+ def from_existing_subvol_info(cls, info: _BtrfsSubvolumeInfo) -> SubvolumeModification:
+ return SubvolumeModification(info.name, mountpoint=info.mountpoint)
+
+ @classmethod
+ def parse_args(cls, subvol_args: List[Dict[str, Any]]) -> List[SubvolumeModification]:
+ mods = []
+ for entry in subvol_args:
+ if not entry.get('name', None) or not entry.get('mountpoint', None):
+ debug(f'Subvolume arg is missing name: {entry}')
+ continue
+
+ mountpoint = Path(entry['mountpoint']) if entry['mountpoint'] else None
+
+ mods.append(SubvolumeModification(entry['name'], mountpoint))
+
+ return mods
+
+ @property
+ def relative_mountpoint(self) -> Path:
+ """
+ Will return the relative path based on the anchor
+ e.g. Path('/mnt/test') -> Path('mnt/test')
+ """
+ if self.mountpoint is not None:
+ return self.mountpoint.relative_to(self.mountpoint.anchor)
+
+ raise ValueError('Mountpoint is not specified')
+
+ def is_root(self) -> bool:
+ if self.mountpoint:
+ return self.mountpoint == Path('/')
+ return False
+
+ def json(self) -> Dict[str, Any]:
+ return {'name': str(self.name), 'mountpoint': str(self.mountpoint)}
+
+ def table_data(self) -> Dict[str, Any]:
+ return self.json()
+
+
+class DeviceGeometry:
+ def __init__(self, geometry: Geometry, sector_size: SectorSize):
+ self._geometry = geometry
+ self._sector_size = sector_size
+
+ @property
+ def start(self) -> int:
+ return self._geometry.start
+
+ @property
+ def end(self) -> int:
+ return self._geometry.end
+
+ def get_length(self, unit: Unit = Unit.sectors) -> int:
+ return self._geometry.getLength(unit.name)
+
+ def table_data(self) -> Dict[str, Any]:
+ start = Size(self._geometry.start, Unit.sectors, self._sector_size)
+ end = Size(self._geometry.end, Unit.sectors, self._sector_size)
+ length = Size(self._geometry.getLength(), Unit.sectors, self._sector_size)
+
+ start_str = f'{self._geometry.start} / {start.format_size(Unit.B, include_unit=False)}'
+ end_str = f'{self._geometry.end} / {end.format_size(Unit.B, include_unit=False)}'
+ length_str = f'{self._geometry.getLength()} / {length.format_size(Unit.B, include_unit=False)}'
+
+ return {
+ 'Sector size': self._sector_size.value,
+ 'Start (sector/B)': start_str,
+ 'End (sector/B)': end_str,
+ 'Size (sectors/B)': length_str
+ }
+
+
+@dataclass
+class BDevice:
+ disk: Disk
+ device_info: _DeviceInfo
+ partition_infos: List[_PartitionInfo]
+
+ def __hash__(self):
+ return hash(self.disk.device.path)
+
+
+class PartitionType(Enum):
+ Boot = 'boot'
+ Primary = 'primary'
+ _Unknown = 'unknown'
+
+ @classmethod
+ def get_type_from_code(cls, code: int) -> PartitionType:
+ if code == parted.PARTITION_NORMAL:
+ return PartitionType.Primary
+ else:
+ debug(f'Partition code not supported: {code}')
+ return PartitionType._Unknown
+
+ def get_partition_code(self) -> Optional[int]:
+ if self == PartitionType.Primary:
+ return parted.PARTITION_NORMAL
+ elif self == PartitionType.Boot:
+ return parted.PARTITION_BOOT
+ return None
+
+
+class PartitionFlag(Enum):
+ """
+ Flags are taken from _ped because pyparted uses this to look
+ up their flag definitions: https://github.com/dcantrell/pyparted/blob/c4e0186dad45c8efbe67c52b02c8c4319df8aa9b/src/parted/__init__.py#L200-L202
+ Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183
+ """
+ Boot = _ped.PARTITION_BOOT
+ XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
+ ESP = _ped.PARTITION_ESP
+
+
+# class PartitionGUIDs(Enum):
+# """
+# A list of Partition type GUIDs (lsblk -o+PARTTYPE) can be found here: https://en.wikipedia.org/wiki/GUID_Partition_Table#Partition_type_GUIDs
+# """
+# XBOOTLDR = 'bc13c2ff-59e6-4262-a352-b275fd6f7172'
+
+
+class FilesystemType(Enum):
+ Btrfs = 'btrfs'
+ Ext2 = 'ext2'
+ Ext3 = 'ext3'
+ Ext4 = 'ext4'
+ F2fs = 'f2fs'
+ Fat16 = 'fat16'
+ Fat32 = 'fat32'
+ Ntfs = 'ntfs'
+ Reiserfs = 'reiserfs'
+ Xfs = 'xfs'
+
+ # this is not a FS known to parted, so be careful
+ # with the usage from this enum
+ Crypto_luks = 'crypto_LUKS'
+
+ def is_crypto(self) -> bool:
+ return self == FilesystemType.Crypto_luks
+
+ @property
+ def fs_type_mount(self) -> str:
+ match self:
+ case FilesystemType.Ntfs: return 'ntfs3'
+ case FilesystemType.Fat32: return 'vfat'
+ case _: return self.value # type: ignore
+
+ @property
+ def installation_pkg(self) -> Optional[str]:
+ match self:
+ case FilesystemType.Btrfs: return 'btrfs-progs'
+ case FilesystemType.Xfs: return 'xfsprogs'
+ case FilesystemType.F2fs: return 'f2fs-tools'
+ case _: return None
+
+ @property
+ def installation_module(self) -> Optional[str]:
+ match self:
+ case FilesystemType.Btrfs: return 'btrfs'
+ case _: return None
+
+ @property
+ def installation_binary(self) -> Optional[str]:
+ match self:
+ case FilesystemType.Btrfs: return '/usr/bin/btrfs'
+ case _: return None
+
+ @property
+ def installation_hooks(self) -> Optional[str]:
+ match self:
+ case FilesystemType.Btrfs: return 'btrfs'
+ case _: return None
+
+
+class ModificationStatus(Enum):
+ Exist = 'existing'
+ Modify = 'modify'
+ Delete = 'delete'
+ Create = 'create'
+
+
+@dataclass
+class PartitionModification:
+ status: ModificationStatus
+ type: PartitionType
+ start: Size
+ length: Size
+ fs_type: Optional[FilesystemType] = None
+ mountpoint: Optional[Path] = None
+ mount_options: List[str] = field(default_factory=list)
+ flags: List[PartitionFlag] = field(default_factory=list)
+ btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+
+ # only set if the device was created or exists
+ dev_path: Optional[Path] = None
+ partn: Optional[int] = None
+ partuuid: Optional[str] = None
+ uuid: Optional[str] = None
+
+ _efi_indicator_flags = (PartitionFlag.Boot, PartitionFlag.ESP)
+ _boot_indicator_flags = (PartitionFlag.Boot, PartitionFlag.XBOOTLDR)
+
+ def __post_init__(self):
+ # needed to use the object as a dictionary key due to hash func
+ if not hasattr(self, '_obj_id'):
+ self._obj_id = uuid.uuid4()
+
+ if self.is_exists_or_modify() and not self.dev_path:
+ raise ValueError('If partition marked as existing a path must be set')
+
+ if self.fs_type is None and self.status == ModificationStatus.Modify:
+ raise ValueError('FS type must not be empty on modifications with status type modify')
+
+ def __hash__(self):
+ return hash(self._obj_id)
+
+ @property
+ def end(self) -> Size:
+ return self.start + self.length
+
+ @property
+ def obj_id(self) -> str:
+ if hasattr(self, '_obj_id'):
+ return str(self._obj_id)
+ return ''
+
+ @property
+ def safe_dev_path(self) -> Path:
+ if self.dev_path is None:
+ raise ValueError('Device path was not set')
+ return self.dev_path
+
+ @property
+ def safe_fs_type(self) -> FilesystemType:
+ if self.fs_type is None:
+ raise ValueError('File system type is not set')
+ return self.fs_type
+
+ @classmethod
+ def from_existing_partition(cls, partition_info: _PartitionInfo) -> PartitionModification:
+ if partition_info.btrfs_subvol_infos:
+ mountpoint = None
+ subvol_mods = []
+ for i in partition_info.btrfs_subvol_infos:
+ subvol_mods.append(
+ SubvolumeModification.from_existing_subvol_info(i)
+ )
+ else:
+ mountpoint = partition_info.mountpoints[0] if partition_info.mountpoints else None
+ subvol_mods = []
+
+ return PartitionModification(
+ status=ModificationStatus.Exist,
+ type=partition_info.type,
+ start=partition_info.start,
+ length=partition_info.length,
+ fs_type=partition_info.fs_type,
+ dev_path=partition_info.path,
+ partn=partition_info.partn,
+ partuuid=partition_info.partuuid,
+ uuid=partition_info.uuid,
+ flags=partition_info.flags,
+ mountpoint=mountpoint,
+ btrfs_subvols=subvol_mods
+ )
+
+ @property
+ def relative_mountpoint(self) -> Path:
+ """
+ Will return the relative path based on the anchor
+ e.g. Path('/mnt/test') -> Path('mnt/test')
+ """
+ if self.mountpoint:
+ return self.mountpoint.relative_to(self.mountpoint.anchor)
+
+ raise ValueError('Mountpoint is not specified')
+
+ def is_efi(self) -> bool:
+ return (
+ any(set(self.flags) & set(self._efi_indicator_flags))
+ and self.fs_type == FilesystemType.Fat32
+ and PartitionFlag.XBOOTLDR not in self.flags
+ )
+
+ def is_boot(self) -> bool:
+ """
+ Returns True if any of the boot indicator flags are found in self.flags
+ """
+ return any(set(self.flags) & set(self._boot_indicator_flags))
+
+ def is_root(self) -> bool:
+ if self.mountpoint is not None:
+ return Path('/') == self.mountpoint
+ else:
+ for subvol in self.btrfs_subvols:
+ if subvol.is_root():
+ return True
+
+ return False
+
+ def is_modify(self) -> bool:
+ return self.status == ModificationStatus.Modify
+
+ def exists(self) -> bool:
+ return self.status == ModificationStatus.Exist
+
+ def is_exists_or_modify(self) -> bool:
+ return self.status in [ModificationStatus.Exist, ModificationStatus.Modify]
+
+ def is_create_or_modify(self) -> bool:
+ return self.status in [ModificationStatus.Create, ModificationStatus.Modify]
+
+ @property
+ def mapper_name(self) -> Optional[str]:
+ if self.dev_path:
+ return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.dev_path.name}'
+ return None
+
+ def set_flag(self, flag: PartitionFlag):
+ if flag not in self.flags:
+ self.flags.append(flag)
+
+ def invert_flag(self, flag: PartitionFlag):
+ if flag in self.flags:
+ self.flags = [f for f in self.flags if f != flag]
+ else:
+ self.set_flag(flag)
+
+ def json(self) -> Dict[str, Any]:
+ """
+ Called for configuration settings
+ """
+ return {
+ 'obj_id': self.obj_id,
+ 'status': self.status.value,
+ 'type': self.type.value,
+ 'start': self.start.json(),
+ 'size': self.length.json(),
+ 'fs_type': self.fs_type.value if self.fs_type else None,
+ 'mountpoint': str(self.mountpoint) if self.mountpoint else None,
+ 'mount_options': self.mount_options,
+ 'flags': [f.name for f in self.flags],
+ 'dev_path': str(self.dev_path) if self.dev_path else None,
+ 'btrfs': [vol.json() for vol in self.btrfs_subvols]
+ }
+
+ def table_data(self) -> Dict[str, Any]:
+ """
+ Called for displaying data in table format
+ """
+ part_mod = {
+ 'Status': self.status.value,
+ 'Device': str(self.dev_path) if self.dev_path else '',
+ 'Type': self.type.value,
+ 'Start': self.start.format_size(Unit.sectors, self.start.sector_size, include_unit=False),
+ 'End': self.end.format_size(Unit.sectors, self.start.sector_size, include_unit=False),
+ 'Size': self.length.format_highest(),
+ 'FS type': self.fs_type.value if self.fs_type else 'Unknown',
+ 'Mountpoint': self.mountpoint if self.mountpoint else '',
+ 'Mount options': ', '.join(self.mount_options),
+ 'Flags': ', '.join([f.name for f in self.flags]),
+ }
+
+ if self.btrfs_subvols:
+ part_mod['Btrfs vol.'] = f'{len(self.btrfs_subvols)} subvolumes'
+
+ return part_mod
+
+
+class LvmLayoutType(Enum):
+ Default = 'default'
+
+ # Manual = 'manual_lvm'
+
+ def display_msg(self) -> str:
+ match self:
+ case LvmLayoutType.Default:
+ return str(_('Default layout'))
+ # case LvmLayoutType.Manual:
+ # return str(_('Manual configuration'))
+
+ raise ValueError(f'Unknown type: {self}')
+
+
+@dataclass
+class LvmVolumeGroup:
+ name: str
+ pvs: List[PartitionModification]
+ volumes: List[LvmVolume] = field(default_factory=list)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'lvm_pvs': [p.obj_id for p in self.pvs],
+ 'volumes': [vol.json() for vol in self.volumes]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmVolumeGroup(
+ arg['name'],
+ lvm_pvs,
+ [LvmVolume.parse_arg(vol) for vol in arg['volumes']]
+ )
+
+ def contains_lv(self, lv: LvmVolume) -> bool:
+ return lv in self.volumes
+
+
+class LvmVolumeStatus(Enum):
+ Exist = 'existing'
+ Modify = 'modify'
+ Delete = 'delete'
+ Create = 'create'
+
+
+@dataclass
+class LvmVolume:
+ status: LvmVolumeStatus
+ name: str
+ fs_type: FilesystemType
+ length: Size
+ mountpoint: Optional[Path]
+ mount_options: List[str] = field(default_factory=list)
+ btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+
+ # volume group name
+ vg_name: Optional[str] = None
+ # mapper device path /dev/<vg>/<vol>
+ dev_path: Optional[Path] = None
+
+ def __post_init__(self):
+ # needed to use the object as a dictionary key due to hash func
+ if not hasattr(self, '_obj_id'):
+ self._obj_id = uuid.uuid4()
+
+ def __hash__(self):
+ return hash(self._obj_id)
+
+ @property
+ def obj_id(self) -> str:
+ if hasattr(self, '_obj_id'):
+ return str(self._obj_id)
+ return ''
+
+ @property
+ def mapper_name(self) -> Optional[str]:
+ if self.dev_path:
+ return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}'
+ return None
+
+ @property
+ def mapper_path(self) -> Path:
+ if self.mapper_name:
+ return Path(f'/dev/mapper/{self.mapper_name}')
+
+ raise ValueError('No mapper path set')
+
+ @property
+ def safe_dev_path(self) -> Path:
+ if self.dev_path:
+ return self.dev_path
+ raise ValueError('No device path for volume defined')
+
+ @property
+ def safe_fs_type(self) -> FilesystemType:
+ if self.fs_type is None:
+ raise ValueError('File system type is not set')
+ return self.fs_type
+
+ @property
+ def relative_mountpoint(self) -> Path:
+ """
+ Will return the relative path based on the anchor
+ e.g. Path('/mnt/test') -> Path('mnt/test')
+ """
+ if self.mountpoint is not None:
+ return self.mountpoint.relative_to(self.mountpoint.anchor)
+
+ raise ValueError('Mountpoint is not specified')
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any]) -> LvmVolume:
+ volume = LvmVolume(
+ status=LvmVolumeStatus(arg['status']),
+ name=arg['name'],
+ fs_type=FilesystemType(arg['fs_type']),
+ length=Size.parse_args(arg['length']),
+ mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None,
+ mount_options=arg.get('mount_options', []),
+ btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', []))
+ )
+
+ setattr(volume, '_obj_id', arg['obj_id'])
+
+ return volume
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'obj_id': self.obj_id,
+ 'status': self.status.value,
+ 'name': self.name,
+ 'fs_type': self.fs_type.value,
+ 'length': self.length.json(),
+ 'mountpoint': str(self.mountpoint) if self.mountpoint else None,
+ 'mount_options': self.mount_options,
+ 'btrfs': [vol.json() for vol in self.btrfs_subvols]
+ }
+
+ def table_data(self) -> Dict[str, Any]:
+ part_mod = {
+ 'Type': self.status.value,
+ 'Name': self.name,
+ 'Size': self.length.format_highest(),
+ 'FS type': self.fs_type.value,
+ 'Mountpoint': str(self.mountpoint) if self.mountpoint else '',
+ 'Mount options': ', '.join(self.mount_options),
+ 'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol')
+ }
+ return part_mod
+
+ def is_modify(self) -> bool:
+ return self.status == LvmVolumeStatus.Modify
+
+ def exists(self) -> bool:
+ return self.status == LvmVolumeStatus.Exist
+
+ def is_exists_or_modify(self) -> bool:
+ return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify]
+
+ def is_root(self) -> bool:
+ if self.mountpoint is not None:
+ return Path('/') == self.mountpoint
+ else:
+ for subvol in self.btrfs_subvols:
+ if subvol.is_root():
+ return True
+
+ return False
+
+
+@dataclass
+class LvmGroupInfo:
+ vg_size: Size
+ vg_uuid: str
+
+
+@dataclass
+class LvmVolumeInfo:
+ lv_name: str
+ vg_name: str
+ lv_size: Size
+
+
+@dataclass
+class LvmPVInfo:
+ pv_name: Path
+ lv_name: str
+ vg_name: str
+
+
+@dataclass
+class LvmConfiguration:
+ config_type: LvmLayoutType
+ vol_groups: List[LvmVolumeGroup]
+
+ def __post_init__(self):
+ # make sure all volume groups have unique PVs
+ pvs = []
+ for group in self.vol_groups:
+ for pv in group.pvs:
+ if pv in pvs:
+ raise ValueError('A PV cannot be used in multiple volume groups')
+ pvs.append(pv)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'config_type': self.config_type.value,
+ 'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmConfiguration(
+ config_type=LvmLayoutType(arg['config_type']),
+ vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']],
+ )
+
+ def get_all_pvs(self) -> List[PartitionModification]:
+ pvs = []
+ for vg in self.vol_groups:
+ pvs += vg.pvs
+
+ return pvs
+
+ def get_all_volumes(self) -> List[LvmVolume]:
+ volumes = []
+
+ for vg in self.vol_groups:
+ volumes += vg.volumes
+
+ return volumes
+
+ def get_root_volume(self) -> Optional[LvmVolume]:
+ for vg in self.vol_groups:
+ filtered = next(filter(lambda x: x.is_root(), vg.volumes), None)
+ if filtered:
+ return filtered
+
+ return None
+
+
+# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str:
+# """
+# Find the LUKS superblock UUID for the device that
+# contains the given logical volume
+# """
+# for vg in self.vol_groups:
+# if vg.contains_lv(lv):
+
+
+@dataclass
+class DeviceModification:
+ device: BDevice
+ wipe: bool
+ partitions: List[PartitionModification] = field(default_factory=list)
+
+ @property
+ def device_path(self) -> Path:
+ return self.device.device_info.path
+
+ def add_partition(self, partition: PartitionModification):
+ self.partitions.append(partition)
+
+ def get_efi_partition(self) -> Optional[PartitionModification]:
+ """
+ Similar to get_boot_partition() but excludes XBOOTLDR partitions from it's candidates.
+ """
+ filtered = filter(lambda x: x.is_efi() and x.mountpoint, self.partitions)
+ return next(filtered, None)
+
+ def get_boot_partition(self) -> Optional[PartitionModification]:
+ """
+ Returns the first partition marked as XBOOTLDR (PARTTYPE id of bc13c2ff-...) or Boot and has a mountpoint.
+ Only returns XBOOTLDR if separate EFI is detected using self.get_efi_partition()
+ Will return None if no suitable partition is found.
+ """
+ if efi_partition := self.get_efi_partition():
+ filtered = filter(lambda x: x.is_boot() and x != efi_partition and x.mountpoint, self.partitions)
+ if boot_partition := next(filtered, None):
+ return boot_partition
+ return efi_partition
+ else:
+ filtered = filter(lambda x: x.is_boot() and x.mountpoint, self.partitions)
+ return next(filtered, None)
+
+ def get_root_partition(self) -> Optional[PartitionModification]:
+ filtered = filter(lambda x: x.is_root(), self.partitions)
+ return next(filtered, None)
+
+ def json(self) -> Dict[str, Any]:
+ """
+ Called when generating configuration files
+ """
+ return {
+ 'device': str(self.device.device_info.path),
+ 'wipe': self.wipe,
+ 'partitions': [p.json() for p in self.partitions]
+ }
+
+
+class EncryptionType(Enum):
+ NoEncryption = "no_encryption"
+ Luks = "luks"
+ LvmOnLuks = 'lvm_on_luks'
+ LuksOnLvm = 'luks_on_lvm'
+
+ @classmethod
+ def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']:
+ return {
+ str(_('No Encryption')): EncryptionType.NoEncryption,
+ str(_('LUKS')): EncryptionType.Luks,
+ str(_('LVM on LUKS')): EncryptionType.LvmOnLuks,
+ str(_('LUKS on LVM')): EncryptionType.LuksOnLvm
+ }
+
+ @classmethod
+ def text_to_type(cls, text: str) -> 'EncryptionType':
+ mapping = cls._encryption_type_mapper()
+ return mapping[text]
+
+ @classmethod
+ def type_to_text(cls, type_: 'EncryptionType') -> str:
+ mapping = cls._encryption_type_mapper()
+ type_to_text = {type_: text for text, type_ in mapping.items()}
+ return type_to_text[type_]
+
+
+@dataclass
+class DiskEncryption:
+ encryption_type: EncryptionType = EncryptionType.NoEncryption
+ encryption_password: str = ''
+ partitions: List[PartitionModification] = field(default_factory=list)
+ lvm_volumes: List[LvmVolume] = field(default_factory=list)
+ hsm_device: Optional[Fido2Device] = None
+
+ def __post_init__(self):
+ if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions:
+ raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined')
+
+ if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes:
+ raise ValueError('LuksOnLvm encryption require LMV volumes to be defined')
+
+ def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool:
+ if isinstance(dev, PartitionModification):
+ return dev in self.partitions and dev.mountpoint != Path('/')
+ elif isinstance(dev, LvmVolume):
+ return dev in self.lvm_volumes and dev.mountpoint != Path('/')
+ return False
+
+ def json(self) -> Dict[str, Any]:
+ obj: Dict[str, Any] = {
+ 'encryption_type': self.encryption_type.value,
+ 'partitions': [p.obj_id for p in self.partitions],
+ 'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes]
+ }
+
+ if self.hsm_device:
+ obj['hsm_device'] = self.hsm_device.json()
+
+ return obj
+
+ @classmethod
+ def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool:
+ partitions = []
+
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ partitions.append(part)
+
+ if len(partitions) > 2: # assume one boot and at least 2 additional
+ if disk_config.lvm_config:
+ return False
+
+ return True
+
+ @classmethod
+ def parse_arg(
+ cls,
+ disk_config: DiskLayoutConfiguration,
+ arg: Dict[str, Any],
+ password: str = ''
+ ) -> Optional['DiskEncryption']:
+ if not cls.validate_enc(disk_config):
+ return None
+
+ enc_partitions = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('partitions', []):
+ enc_partitions.append(part)
+
+ volumes = []
+ if disk_config.lvm_config:
+ for vol in disk_config.lvm_config.get_all_volumes():
+ if vol.obj_id in arg.get('lvm_volumes', []):
+ volumes.append(vol)
+
+ enc = DiskEncryption(
+ EncryptionType(arg['encryption_type']),
+ password,
+ enc_partitions,
+ volumes
+ )
+
+ if hsm := arg.get('hsm_device', None):
+ enc.hsm_device = Fido2Device.parse_arg(hsm)
+
+ return enc
+
+
+@dataclass
+class Fido2Device:
+ path: Path
+ manufacturer: str
+ product: str
+
+ def json(self) -> Dict[str, str]:
+ return {
+ 'path': str(self.path),
+ 'manufacturer': self.manufacturer,
+ 'product': self.product
+ }
+
+ def table_data(self) -> Dict[str, str]:
+ return {
+ 'Path': str(self.path),
+ 'Manufacturer': self.manufacturer,
+ 'Product': self.product
+ }
+
+ @classmethod
+ def parse_arg(cls, arg: Dict[str, str]) -> 'Fido2Device':
+ return Fido2Device(
+ Path(arg['path']),
+ arg['manufacturer'],
+ arg['product']
+ )
+
+
+@dataclass
+class LsblkInfo:
+ name: str = ''
+ path: Path = Path()
+ pkname: str = ''
+ size: Size = field(default_factory=lambda: Size(0, Unit.B, SectorSize.default()))
+ log_sec: int = 0
+ pttype: str = ''
+ ptuuid: str = ''
+ rota: bool = False
+ tran: Optional[str] = None
+ partn: Optional[int] = None
+ partuuid: Optional[str] = None
+ parttype: Optional[str] = None
+ uuid: Optional[str] = None
+ fstype: Optional[str] = None
+ fsver: Optional[str] = None
+ fsavail: Optional[str] = None
+ fsuse_percentage: Optional[str] = None
+ type: Optional[str] = None
+ mountpoint: Optional[Path] = None
+ mountpoints: List[Path] = field(default_factory=list)
+ fsroots: List[Path] = field(default_factory=list)
+ children: List[LsblkInfo] = field(default_factory=list)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'path': str(self.path),
+ 'pkname': self.pkname,
+ 'size': self.size.format_size(Unit.MiB),
+ 'log_sec': self.log_sec,
+ 'pttype': self.pttype,
+ 'ptuuid': self.ptuuid,
+ 'rota': self.rota,
+ 'tran': self.tran,
+ 'partn': self.partn,
+ 'partuuid': self.partuuid,
+ 'parttype': self.parttype,
+ 'uuid': self.uuid,
+ 'fstype': self.fstype,
+ 'fsver': self.fsver,
+ 'fsavail': self.fsavail,
+ 'fsuse_percentage': self.fsuse_percentage,
+ 'type': self.type,
+ 'mountpoint': self.mountpoint,
+ 'mountpoints': [str(m) for m in self.mountpoints],
+ 'fsroots': [str(r) for r in self.fsroots],
+ 'children': [c.json() for c in self.children]
+ }
+
+ @property
+ def btrfs_subvol_info(self) -> Dict[Path, Path]:
+ """
+ It is assumed that lsblk will contain the fields as
+
+ "mountpoints": ["/mnt/archinstall/log", "/mnt/archinstall/home", "/mnt/archinstall", ...]
+ "fsroots": ["/@log", "/@home", "/@"...]
+
+ we'll thereby map the fsroot, which are the mounted filesystem roots
+ to the corresponding mountpoints
+ """
+ return dict(zip(self.fsroots, self.mountpoints))
+
+ @classmethod
+ def exclude(cls) -> List[str]:
+ return ['children']
+
+ @classmethod
+ def fields(cls) -> List[str]:
+ return [f.name for f in dataclasses.fields(LsblkInfo) if f.name not in cls.exclude()]
+
+ @classmethod
+ def from_json(cls, blockdevice: Dict[str, Any]) -> LsblkInfo:
+ lsblk_info = cls()
+
+ for f in cls.fields():
+ lsblk_field = _clean_field(f, CleanType.Blockdevice)
+ data_field = _clean_field(f, CleanType.Dataclass)
+
+ val: Any = None
+ if isinstance(getattr(lsblk_info, data_field), Path):
+ val = Path(blockdevice[lsblk_field])
+ elif isinstance(getattr(lsblk_info, data_field), Size):
+ sector_size = SectorSize(blockdevice['log-sec'], Unit.B)
+ val = Size(blockdevice[lsblk_field], Unit.B, sector_size)
+ else:
+ val = blockdevice[lsblk_field]
+
+ setattr(lsblk_info, data_field, val)
+
+ lsblk_info.children = [LsblkInfo.from_json(child) for child in blockdevice.get('children', [])]
+
+ # sometimes lsblk returns 'mountpoints': [null]
+ lsblk_info.mountpoints = [Path(mnt) for mnt in lsblk_info.mountpoints if mnt]
+
+ fs_roots = []
+ for r in lsblk_info.fsroots:
+ if r:
+ path = Path(r)
+ # store the fsroot entries without the leading /
+ fs_roots.append(path.relative_to(path.anchor))
+ lsblk_info.fsroots = fs_roots
+
+ return lsblk_info
+
+
+class CleanType(Enum):
+ Blockdevice = auto()
+ Dataclass = auto()
+ Lsblk = auto()
+
+
+def _clean_field(name: str, clean_type: CleanType) -> str:
+ match clean_type:
+ case CleanType.Blockdevice:
+ return name.replace('_percentage', '%').replace('_', '-')
+ case CleanType.Dataclass:
+ return name.lower().replace('-', '_').replace('%', '_percentage')
+ case CleanType.Lsblk:
+ return name.replace('_percentage', '%').replace('_', '-')
+
+
+def _fetch_lsblk_info(
+ dev_path: Optional[Union[Path, str]] = None,
+ reverse: bool = False,
+ full_dev_path: bool = False
+) -> List[LsblkInfo]:
+ fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()]
+ cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)]
+
+ if dev_path:
+ cmd.append(str(dev_path))
+
+ if reverse:
+ cmd.append('--inverse')
+
+ if full_dev_path:
+ cmd.append('--paths')
+
+ try:
+ result = SysCommand(cmd).decode()
+ except SysCallError as err:
+ # Get the output minus the message/info from lsblk if it returns a non-zero exit code.
+ if err.worker:
+ err_str = err.worker.decode()
+ debug(f'Error calling lsblk: {err_str}')
+
+ if dev_path:
+ raise DiskError(f'Failed to read disk "{dev_path}" with lsblk')
+
+ raise err
+
+ try:
+ block_devices = json.loads(result)
+ except json.decoder.JSONDecodeError as err:
+ error(f"Could not decode lsblk JSON: {result}")
+ raise err
+
+ blockdevices = block_devices['blockdevices']
+ return [LsblkInfo.from_json(device) for device in blockdevices]
+
+
+def get_lsblk_info(
+ dev_path: Union[Path, str],
+ reverse: bool = False,
+ full_dev_path: bool = False
+) -> LsblkInfo:
+ if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path):
+ return infos[0]
+
+ raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"')
+
+
+def get_all_lsblk_info() -> List[LsblkInfo]:
+ return _fetch_lsblk_info()
+
+
+def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]:
+ def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]:
+ devices = []
+ for entry in infos:
+ if as_prefix:
+ matches = [m for m in entry.mountpoints if str(m).startswith(str(mountpoint))]
+ if matches:
+ devices += [entry]
+ elif mountpoint in entry.mountpoints:
+ devices += [entry]
+
+ if len(entry.children) > 0:
+ if len(match := _check(entry.children)) > 0:
+ devices += match
+
+ return devices
+
+ all_info = get_all_lsblk_info()
+ return _check(all_info)
diff --git a/archinstall/lib/disk/disk_menu.py b/archinstall/lib/disk/disk_menu.py
new file mode 100644
index 00000000..a7d9ccc3
--- /dev/null
+++ b/archinstall/lib/disk/disk_menu.py
@@ -0,0 +1,140 @@
+from typing import Dict, Optional, Any, TYPE_CHECKING, List
+
+from . import DiskLayoutConfiguration, DiskLayoutType
+from .device_model import LvmConfiguration
+from ..disk import (
+ DeviceModification
+)
+from ..interactions import select_disk_config
+from ..interactions.disk_conf import select_lvm_config
+from ..menu import (
+ Selector,
+ AbstractSubMenu
+)
+from ..output import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class DiskLayoutConfigurationMenu(AbstractSubMenu):
+ def __init__(
+ self,
+ disk_layout_config: Optional[DiskLayoutConfiguration],
+ data_store: Dict[str, Any],
+ advanced: bool = False
+ ):
+ self._disk_layout_config = disk_layout_config
+ self._advanced = advanced
+
+ super().__init__(data_store=data_store, preview_size=0.5)
+
+ def setup_selection_menu_options(self):
+ self._menu_options['disk_config'] = \
+ Selector(
+ _('Partitioning'),
+ lambda x: self._select_disk_layout_config(x),
+ display_func=lambda x: self._display_disk_layout(x),
+ preview_func=self._prev_disk_layouts,
+ default=self._disk_layout_config,
+ enabled=True
+ )
+ self._menu_options['lvm_config'] = \
+ Selector(
+ _('Logical Volume Management (LVM)'),
+ lambda x: self._select_lvm_config(x),
+ display_func=lambda x: self.defined_text if x else '',
+ preview_func=self._prev_lvm_config,
+ default=self._disk_layout_config.lvm_config if self._disk_layout_config else None,
+ dependencies=[self._check_dep_lvm],
+ enabled=True
+ )
+
+ def run(self, allow_reset: bool = True) -> Optional[DiskLayoutConfiguration]:
+ super().run(allow_reset=allow_reset)
+
+ disk_layout_config: Optional[DiskLayoutConfiguration] = self._data_store.get('disk_config', None)
+
+ if disk_layout_config:
+ disk_layout_config.lvm_config = self._data_store.get('lvm_config', None)
+
+ return disk_layout_config
+
+ def _check_dep_lvm(self) -> bool:
+ disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_layout_conf and disk_layout_conf.config_type == DiskLayoutType.Default:
+ return True
+
+ return False
+
+ def _select_disk_layout_config(
+ self,
+ preset: Optional[DiskLayoutConfiguration]
+ ) -> Optional[DiskLayoutConfiguration]:
+ disk_config = select_disk_config(preset, advanced_option=self._advanced)
+
+ if disk_config != preset:
+ self._menu_options['lvm_config'].set_current_selection(None)
+
+ return disk_config
+
+ def _select_lvm_config(self, preset: Optional[LvmConfiguration]) -> Optional[LvmConfiguration]:
+ disk_config: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+ if disk_config:
+ return select_lvm_config(disk_config, preset=preset)
+ return preset
+
+ def _display_disk_layout(self, current_value: Optional[DiskLayoutConfiguration] = None) -> str:
+ if current_value:
+ return current_value.config_type.display_msg()
+ return ''
+
+ def _prev_disk_layouts(self) -> Optional[str]:
+ disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_layout_conf:
+ device_mods: List[DeviceModification] = \
+ list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications))
+
+ if device_mods:
+ output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg())
+ output_btrfs = ''
+
+ for mod in device_mods:
+ # create partition table
+ partition_table = FormattedOutput.as_table(mod.partitions)
+
+ output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n'
+ output_partition += partition_table + '\n'
+
+ # create btrfs table
+ btrfs_partitions = list(
+ filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions)
+ )
+ for partition in btrfs_partitions:
+ output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n'
+
+ output = output_partition + output_btrfs
+ return output.rstrip()
+
+ return None
+
+ def _prev_lvm_config(self) -> Optional[str]:
+ lvm_config: Optional[LvmConfiguration] = self._menu_options['lvm_config'].current_selection
+
+ if lvm_config:
+ output = '{}: {}\n'.format(str(_('Configuration')), lvm_config.config_type.display_msg())
+
+ for vol_gp in lvm_config.vol_groups:
+ pv_table = FormattedOutput.as_table(vol_gp.pvs)
+ output += '{}:\n{}'.format(str(_('Physical volumes')), pv_table)
+
+ output += f'\nVolume Group: {vol_gp.name}'
+
+ lvm_volumes = FormattedOutput.as_table(vol_gp.volumes)
+ output += '\n\n{}:\n{}'.format(str(_('Volumes')), lvm_volumes)
+
+ return output
+
+ return None
diff --git a/archinstall/lib/disk/diskinfo.py b/archinstall/lib/disk/diskinfo.py
deleted file mode 100644
index b56ba282..00000000
--- a/archinstall/lib/disk/diskinfo.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import dataclasses
-import json
-from dataclasses import dataclass, field
-from typing import Optional, List
-
-from ..general import SysCommand
-from ..exceptions import DiskError
-
-@dataclass
-class LsblkInfo:
- size: int = 0
- log_sec: int = 0
- pttype: Optional[str] = None
- rota: bool = False
- tran: Optional[str] = None
- ptuuid: Optional[str] = None
- partuuid: Optional[str] = None
- uuid: Optional[str] = None
- fstype: Optional[str] = None
- type: Optional[str] = None
- mountpoints: List[str] = field(default_factory=list)
-
-
-def get_lsblk_info(dev_path: str) -> LsblkInfo:
- fields = [f.name for f in dataclasses.fields(LsblkInfo)]
- lsblk_fields = ','.join([f.upper().replace('_', '-') for f in fields])
-
- output = SysCommand(f'lsblk --json -b -o+{lsblk_fields} {dev_path}').decode('UTF-8')
-
- if output:
- block_devices = json.loads(output)
- info = block_devices['blockdevices'][0]
- lsblk_info = LsblkInfo()
-
- for f in fields:
- setattr(lsblk_info, f, info[f.replace('_', '-')])
-
- return lsblk_info
-
- raise DiskError(f'Failed to read disk "{dev_path}" with lsblk')
diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py
deleted file mode 100644
index 63392ffb..00000000
--- a/archinstall/lib/disk/dmcryptdev.py
+++ /dev/null
@@ -1,48 +0,0 @@
-import pathlib
-import logging
-import json
-from dataclasses import dataclass
-from typing import Optional
-from ..exceptions import SysCallError
-from ..general import SysCommand
-from ..output import log
-from .mapperdev import MapperDev
-
-@dataclass
-class DMCryptDev:
- dev_path :pathlib.Path
-
- @property
- def name(self):
- with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh:
- return fh.read().strip()
-
- @property
- def path(self):
- return f"/dev/mapper/{self.dev_path}"
-
- @property
- def blockdev(self):
- pass
-
- @property
- def MapperDev(self):
- return MapperDev(mappername=self.name)
-
- @property
- def mountpoint(self) -> Optional[str]:
- try:
- data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode())
- for filesystem in data['filesystems']:
- return filesystem.get('target')
-
- except SysCallError as error:
- # Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow")
- pass
-
- return None
-
- @property
- def filesystem(self) -> Optional[str]:
- return self.MapperDev.filesystem \ No newline at end of file
diff --git a/archinstall/lib/disk/encryption.py b/archinstall/lib/disk/encryption.py
deleted file mode 100644
index c7496bfa..00000000
--- a/archinstall/lib/disk/encryption.py
+++ /dev/null
@@ -1,174 +0,0 @@
-from typing import Dict, Optional, Any, TYPE_CHECKING, List
-
-from ..menu.abstract_menu import Selector, AbstractSubMenu
-from ..menu.menu import MenuSelectionType
-from ..menu.table_selection_menu import TableMenu
-from ..models.disk_encryption import EncryptionType, DiskEncryption
-from ..user_interaction.partitioning_conf import current_partition_layout
-from ..user_interaction.utils import get_password
-from ..menu import Menu
-from ..general import secret
-from ..hsm.fido import Fido2Device, Fido2
-
-if TYPE_CHECKING:
- _: Any
-
-
-class DiskEncryptionMenu(AbstractSubMenu):
- def __init__(self, data_store: Dict[str, Any], preset: Optional[DiskEncryption], disk_layouts: Dict[str, Any]):
- if preset:
- self._preset = preset
- else:
- self._preset = DiskEncryption()
-
- self._disk_layouts = disk_layouts
- super().__init__(data_store=data_store)
-
- def _setup_selection_menu_options(self):
- self._menu_options['encryption_password'] = \
- Selector(
- _('Encryption password'),
- lambda x: select_encrypted_password(),
- display_func=lambda x: secret(x) if x else '',
- default=self._preset.encryption_password,
- enabled=True
- )
- self._menu_options['encryption_type'] = \
- Selector(
- _('Encryption type'),
- func=lambda preset: select_encryption_type(preset),
- display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
- dependencies=['encryption_password'],
- default=self._preset.encryption_type,
- enabled=True
- )
- self._menu_options['partitions'] = \
- Selector(
- _('Partitions'),
- func=lambda preset: select_partitions_to_encrypt(self._disk_layouts, preset),
- display_func=lambda x: f'{sum([len(y) for y in x.values()])} {_("Partitions")}' if x else None,
- dependencies=['encryption_password'],
- default=self._preset.partitions,
- preview_func=self._prev_disk_layouts,
- enabled=True
- )
- self._menu_options['HSM'] = \
- Selector(
- description=_('Use HSM to unlock encrypted drive'),
- func=lambda preset: select_hsm(preset),
- display_func=lambda x: self._display_hsm(x),
- dependencies=['encryption_password'],
- default=self._preset.hsm_device,
- enabled=True
- )
-
- def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]:
- super().run(allow_reset=allow_reset)
-
- if self._data_store.get('encryption_password', None):
- return DiskEncryption(
- encryption_password=self._data_store.get('encryption_password', None),
- encryption_type=self._data_store['encryption_type'],
- partitions=self._data_store.get('partitions', None),
- hsm_device=self._data_store.get('HSM', None)
- )
-
- return None
-
- def _display_hsm(self, device: Optional[Fido2Device]) -> Optional[str]:
- if device:
- return device.manufacturer
-
- if not Fido2.get_fido2_devices():
- return str(_('No HSM devices available'))
- return None
-
- def _prev_disk_layouts(self) -> Optional[str]:
- selector = self._menu_options['partitions']
- if selector.has_selection():
- partitions: Dict[str, Any] = selector.current_selection
-
- all_partitions = []
- for parts in partitions.values():
- all_partitions += parts
-
- output = str(_('Partitions to be encrypted')) + '\n'
- output += current_partition_layout(all_partitions, with_title=False)
- return output.rstrip()
- return None
-
-
-def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]:
- title = str(_('Select disk encryption option'))
- options = [
- # _type_to_text(EncryptionType.FullDiskEncryption),
- EncryptionType.type_to_text(EncryptionType.Partition)
- ]
-
- preset_value = EncryptionType.type_to_text(preset)
- choice = Menu(title, options, preset_values=preset_value).run()
-
- match choice.type_:
- case MenuSelectionType.Reset: return None
- case MenuSelectionType.Skip: return preset
- case MenuSelectionType.Selection: return EncryptionType.text_to_type(choice.value) # type: ignore
-
-
-def select_encrypted_password() -> Optional[str]:
- if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))):
- return passwd
- return None
-
-
-def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]:
- title = _('Select a FIDO2 device to use for HSM')
- fido_devices = Fido2.get_fido2_devices()
-
- if fido_devices:
- choice = TableMenu(title, data=fido_devices).run()
- match choice.type_:
- case MenuSelectionType.Reset:
- return None
- case MenuSelectionType.Skip:
- return preset
- case MenuSelectionType.Selection:
- return choice.value # type: ignore
-
- return None
-
-
-def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: Dict[str, Any]) -> Dict[str, Any]:
- # If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
- # Then we need to identify which partitions to encrypt. This will default to / (root).
- all_partitions = []
- for blockdevice in disk_layouts.values():
- if partitions := blockdevice.get('partitions'):
- partitions = [p for p in partitions if p['mountpoint'] != '/boot']
- all_partitions += partitions
-
- if all_partitions:
- title = str(_('Select which partitions to encrypt'))
- partition_table = current_partition_layout(all_partitions, with_title=False).strip()
-
- choice = TableMenu(
- title,
- table_data=(all_partitions, partition_table),
- multi=True
- ).run()
-
- match choice.type_:
- case MenuSelectionType.Reset:
- return {}
- case MenuSelectionType.Skip:
- return preset
- case MenuSelectionType.Selection:
- selections: List[Any] = choice.value # type: ignore
- partitions = {}
-
- for path, device in disk_layouts.items():
- for part in selections:
- if part in device.get('partitions', []):
- partitions.setdefault(path, []).append(part)
-
- return partitions
- return {}
diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py
new file mode 100644
index 00000000..b0e292ce
--- /dev/null
+++ b/archinstall/lib/disk/encryption_menu.py
@@ -0,0 +1,288 @@
+from pathlib import Path
+from typing import Dict, Optional, Any, TYPE_CHECKING, List
+
+from . import LvmConfiguration, LvmVolume
+from ..disk import (
+ DeviceModification,
+ DiskLayoutConfiguration,
+ PartitionModification,
+ DiskEncryption,
+ EncryptionType
+)
+from ..menu import (
+ Selector,
+ AbstractSubMenu,
+ MenuSelectionType,
+ TableMenu
+)
+from ..interactions.utils import get_password
+from ..menu import Menu
+from ..general import secret
+from .fido import Fido2Device, Fido2
+from ..output import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class DiskEncryptionMenu(AbstractSubMenu):
+ def __init__(
+ self,
+ disk_config: DiskLayoutConfiguration,
+ data_store: Dict[str, Any],
+ preset: Optional[DiskEncryption] = None
+ ):
+ if preset:
+ self._preset = preset
+ else:
+ self._preset = DiskEncryption()
+
+ self._disk_config = disk_config
+ super().__init__(data_store=data_store)
+
+ def setup_selection_menu_options(self):
+ self._menu_options['encryption_type'] = \
+ Selector(
+ _('Encryption type'),
+ func=lambda preset: select_encryption_type(self._disk_config, preset),
+ display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
+ default=self._preset.encryption_type,
+ enabled=True,
+ )
+ self._menu_options['encryption_password'] = \
+ Selector(
+ _('Encryption password'),
+ lambda x: select_encrypted_password(),
+ dependencies=[self._check_dep_enc_type],
+ display_func=lambda x: secret(x) if x else '',
+ default=self._preset.encryption_password,
+ enabled=True
+ )
+ self._menu_options['partitions'] = \
+ Selector(
+ _('Partitions'),
+ func=lambda preset: select_partitions_to_encrypt(self._disk_config.device_modifications, preset),
+ display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None,
+ dependencies=[self._check_dep_partitions],
+ default=self._preset.partitions,
+ preview_func=self._prev_partitions,
+ enabled=True
+ )
+ self._menu_options['lvm_vols'] = \
+ Selector(
+ _('LVM volumes'),
+ func=lambda preset: self._select_lvm_vols(preset),
+ display_func=lambda x: f'{len(x)} {_("LVM volumes")}' if x else None,
+ dependencies=[self._check_dep_lvm_vols],
+ default=self._preset.lvm_volumes,
+ preview_func=self._prev_lvm_vols,
+ enabled=True
+ )
+ self._menu_options['HSM'] = \
+ Selector(
+ description=_('Use HSM to unlock encrypted drive'),
+ func=lambda preset: select_hsm(preset),
+ display_func=lambda x: self._display_hsm(x),
+ preview_func=self._prev_hsm,
+ dependencies=[self._check_dep_enc_type],
+ default=self._preset.hsm_device,
+ enabled=True
+ )
+
+ def _select_lvm_vols(self, preset: List[LvmVolume]) -> List[LvmVolume]:
+ if self._disk_config.lvm_config:
+ return select_lvm_vols_to_encrypt(self._disk_config.lvm_config, preset=preset)
+ return []
+
+ def _check_dep_enc_type(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type != EncryptionType.NoEncryption:
+ return True
+ return False
+
+ def _check_dep_partitions(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks]:
+ return True
+ return False
+
+ def _check_dep_lvm_vols(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type == EncryptionType.LuksOnLvm:
+ return True
+ return False
+
+ def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]:
+ super().run(allow_reset=allow_reset)
+
+ enc_type = self._data_store.get('encryption_type', None)
+ enc_password = self._data_store.get('encryption_password', None)
+ enc_partitions = self._data_store.get('partitions', None)
+ enc_lvm_vols = self._data_store.get('lvm_vols', None)
+
+ if enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and enc_partitions:
+ enc_lvm_vols = []
+
+ if enc_type == EncryptionType.LuksOnLvm:
+ enc_partitions = []
+
+ if enc_type != EncryptionType.NoEncryption and enc_password and (enc_partitions or enc_lvm_vols):
+ return DiskEncryption(
+ encryption_password=enc_password,
+ encryption_type=enc_type,
+ partitions=enc_partitions,
+ lvm_volumes=enc_lvm_vols,
+ hsm_device=self._data_store.get('HSM', None)
+ )
+
+ return None
+
+ def _display_hsm(self, device: Optional[Fido2Device]) -> Optional[str]:
+ if device:
+ return device.manufacturer
+
+ return None
+
+ def _prev_partitions(self) -> Optional[str]:
+ partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection
+ if partitions:
+ output = str(_('Partitions to be encrypted')) + '\n'
+ output += FormattedOutput.as_table(partitions)
+ return output.rstrip()
+
+ return None
+
+ def _prev_lvm_vols(self) -> Optional[str]:
+ volumes: Optional[List[PartitionModification]] = self._menu_options['lvm_vols'].current_selection
+ if volumes:
+ output = str(_('LVM volumes to be encrypted')) + '\n'
+ output += FormattedOutput.as_table(volumes)
+ return output.rstrip()
+
+ return None
+
+ def _prev_hsm(self) -> Optional[str]:
+ try:
+ Fido2.get_fido2_devices()
+ except ValueError:
+ return str(_('Unable to determine fido2 devices. Is libfido2 installed?'))
+
+ fido_device: Optional[Fido2Device] = self._menu_options['HSM'].current_selection
+
+ if fido_device:
+ output = '{}: {}'.format(str(_('Path')), fido_device.path)
+ output += '{}: {}'.format(str(_('Manufacturer')), fido_device.manufacturer)
+ output += '{}: {}'.format(str(_('Product')), fido_device.product)
+ return output
+
+ return None
+
+
+def select_encryption_type(disk_config: DiskLayoutConfiguration, preset: EncryptionType) -> Optional[EncryptionType]:
+ title = str(_('Select disk encryption option'))
+
+ if disk_config.lvm_config:
+ options = [
+ EncryptionType.type_to_text(EncryptionType.LvmOnLuks),
+ EncryptionType.type_to_text(EncryptionType.LuksOnLvm)
+ ]
+ else:
+ options = [EncryptionType.type_to_text(EncryptionType.Luks)]
+
+ preset_value = EncryptionType.type_to_text(preset)
+
+ choice = Menu(title, options, preset_values=preset_value).run()
+
+ match choice.type_:
+ case MenuSelectionType.Reset: return None
+ case MenuSelectionType.Skip: return preset
+ case MenuSelectionType.Selection: return EncryptionType.text_to_type(choice.value) # type: ignore
+
+
+def select_encrypted_password() -> Optional[str]:
+ if passwd := get_password(prompt=str(_('Enter disk encryption password (leave blank for no encryption): '))):
+ return passwd
+ return None
+
+
+def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]:
+ title = _('Select a FIDO2 device to use for HSM')
+
+ try:
+ fido_devices = Fido2.get_fido2_devices()
+ except ValueError:
+ return None
+
+ if fido_devices:
+ choice = TableMenu(title, data=fido_devices).run()
+ match choice.type_:
+ case MenuSelectionType.Reset:
+ return None
+ case MenuSelectionType.Skip:
+ return preset
+ case MenuSelectionType.Selection:
+ return choice.value # type: ignore
+
+ return None
+
+
+def select_partitions_to_encrypt(
+ modification: List[DeviceModification],
+ preset: List[PartitionModification]
+) -> List[PartitionModification]:
+ partitions: List[PartitionModification] = []
+
+ # do not allow encrypting the boot partition
+ for mod in modification:
+ partitions += list(filter(lambda x: x.mountpoint != Path('/boot'), mod.partitions))
+
+ # do not allow encrypting existing partitions that are not marked as wipe
+ avail_partitions = list(filter(lambda x: not x.exists(), partitions))
+
+ if avail_partitions:
+ title = str(_('Select which partitions to encrypt'))
+ partition_table = FormattedOutput.as_table(avail_partitions)
+
+ choice = TableMenu(
+ title,
+ table_data=(avail_partitions, partition_table),
+ preset=preset,
+ multi=True
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Reset:
+ return []
+ case MenuSelectionType.Skip:
+ return preset
+ case MenuSelectionType.Selection:
+ return choice.multi_value
+ return []
+
+
+def select_lvm_vols_to_encrypt(
+ lvm_config: LvmConfiguration,
+ preset: List[LvmVolume]
+) -> List[LvmVolume]:
+ volumes: List[LvmVolume] = lvm_config.get_all_volumes()
+
+ if volumes:
+ title = str(_('Select which LVM volumes to encrypt'))
+ partition_table = FormattedOutput.as_table(volumes)
+
+ choice = TableMenu(
+ title,
+ table_data=(volumes, partition_table),
+ preset=preset,
+ multi=True
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Reset:
+ return []
+ case MenuSelectionType.Skip:
+ return preset
+ case MenuSelectionType.Selection:
+ return choice.multi_value
+
+ return []
diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py
new file mode 100644
index 00000000..5a139534
--- /dev/null
+++ b/archinstall/lib/disk/fido.py
@@ -0,0 +1,92 @@
+from __future__ import annotations
+
+import getpass
+from pathlib import Path
+from typing import List
+
+from .device_model import Fido2Device
+from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
+from ..output import error, info
+from ..exceptions import SysCallError
+
+
+class Fido2:
+ _loaded: bool = False
+ _fido2_devices: List[Fido2Device] = []
+
+ @classmethod
+ def get_fido2_devices(cls, reload: bool = False) -> List[Fido2Device]:
+ """
+ Uses systemd-cryptenroll to list the FIDO2 devices
+ connected that supports FIDO2.
+ Some devices might show up in udevadm as FIDO2 compliant
+ when they are in fact not.
+
+ The drawback of systemd-cryptenroll is that it uses human readable format.
+ That means we get this weird table like structure that is of no use.
+
+ So we'll look for `MANUFACTURER` and `PRODUCT`, we take their index
+ and we split each line based on those positions.
+
+ Output example:
+
+ PATH MANUFACTURER PRODUCT
+ /dev/hidraw1 Yubico YubiKey OTP+FIDO+CCID
+ """
+
+ # to prevent continuous reloading which will slow
+ # down moving the cursor in the menu
+ if not cls._loaded or reload:
+ try:
+ ret = SysCommand("systemd-cryptenroll --fido2-device=list").decode()
+ except SysCallError:
+ error('fido2 support is most likely not installed')
+ raise ValueError('HSM devices can not be detected, is libfido2 installed?')
+
+ fido_devices: str = clear_vt100_escape_codes(ret) # type: ignore
+
+ manufacturer_pos = 0
+ product_pos = 0
+ devices = []
+
+ for line in fido_devices.split('\r\n'):
+ if '/dev' not in line:
+ manufacturer_pos = line.find('MANUFACTURER')
+ product_pos = line.find('PRODUCT')
+ continue
+
+ path = line[:manufacturer_pos].rstrip()
+ manufacturer = line[manufacturer_pos:product_pos].rstrip()
+ product = line[product_pos:]
+
+ devices.append(
+ Fido2Device(Path(path), manufacturer, product)
+ )
+
+ cls._loaded = True
+ cls._fido2_devices = devices
+
+ return cls._fido2_devices
+
+ @classmethod
+ def fido2_enroll(
+ cls,
+ hsm_device: Fido2Device,
+ dev_path: Path,
+ password: str
+ ):
+ worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True)
+ pw_inputted = False
+ pin_inputted = False
+
+ while worker.is_alive():
+ if pw_inputted is False:
+ if bytes(f"please enter current passphrase for disk {dev_path}", 'UTF-8') in worker._trace_log.lower():
+ worker.write(bytes(password, 'UTF-8'))
+ pw_inputted = True
+ elif pin_inputted is False:
+ if bytes(f"please enter security token pin", 'UTF-8') in worker._trace_log.lower():
+ worker.write(bytes(getpass.getpass(" "), 'UTF-8'))
+ pin_inputted = True
+
+ info('You might need to touch the FIDO2 device to unlock it if no prompt comes up after 3 seconds')
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 1083df53..5c11896e 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,301 +1,381 @@
from __future__ import annotations
+
+import signal
+import sys
import time
-import logging
-import json
-import pathlib
-from typing import Optional, Dict, Any, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-from ..models.disk_encryption import DiskEncryption
+from pathlib import Path
+from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set
+
+from .device_handler import device_handler
+from .device_model import (
+ DiskLayoutConfiguration, DiskLayoutType, PartitionTable,
+ FilesystemType, DiskEncryption, LvmVolumeGroup,
+ Size, Unit, SectorSize, PartitionModification, EncryptionType,
+ LvmVolume, LvmConfiguration
+)
+from ..hardware import SysInfo
+from ..luks import Luks2
+from ..menu import Menu
+from ..output import debug, info
+from ..general import SysCommand
if TYPE_CHECKING:
- from .blockdevice import BlockDevice
_: Any
-from .partition import Partition
-from .validators import valid_fs_type
-from ..exceptions import DiskError, SysCallError
-from ..general import SysCommand
-from ..output import log
-from ..storage import storage
-GPT = 0b00000001
-MBR = 0b00000010
+class FilesystemHandler:
+ def __init__(
+ self,
+ disk_config: DiskLayoutConfiguration,
+ enc_conf: Optional[DiskEncryption] = None
+ ):
+ self._disk_config = disk_config
+ self._enc_config = enc_conf
+
+ def perform_filesystem_operations(self, show_countdown: bool = True):
+ if self._disk_config.config_type == DiskLayoutType.Pre_mount:
+ debug('Disk layout configuration is set to pre-mount, not performing any operations')
+ return
+
+ device_mods = list(filter(lambda x: len(x.partitions) > 0, self._disk_config.device_modifications))
+
+ if not device_mods:
+ debug('No modifications required')
+ return
+
+ device_paths = ', '.join([str(mod.device.device_info.path) for mod in device_mods])
+
+ # Issue a final warning before we continue with something un-revertable.
+ # We mention the drive one last time, and count from 5 to 0.
+ print(str(_(' ! Formatting {} in ')).format(device_paths))
+
+ if show_countdown:
+ self._do_countdown()
+
+ # Setup the blockdevice, filesystem (and optionally encryption).
+ # Once that's done, we'll hand over to perform_installation()
+ partition_table = PartitionTable.GPT
+ if SysInfo.has_uefi() is False:
+ partition_table = PartitionTable.MBR
+
+ for mod in device_mods:
+ device_handler.partition(mod, partition_table=partition_table)
+
+ if self._disk_config.lvm_config:
+ for mod in device_mods:
+ if boot_part := mod.get_boot_partition():
+ debug(f'Formatting boot partition: {boot_part.dev_path}')
+ self._format_partitions(
+ [boot_part],
+ mod.device_path
+ )
+
+ self.perform_lvm_operations()
+ else:
+ for mod in device_mods:
+ self._format_partitions(
+ mod.partitions,
+ mod.device_path
+ )
-# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR
-# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues.
-# (we've been pestered by disk issues since the start, so please let this be here for a few versions)
-DEFAULT_PARTITION_START = '5MiB'
+ for part_mod in mod.partitions:
+ if part_mod.fs_type == FilesystemType.Btrfs:
+ device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config)
-class Filesystem:
- # TODO:
- # When instance of a HDD is selected, check all usages and gracefully unmount them
- # as well as close any crypto handles.
- def __init__(self, blockdevice :BlockDevice, mode :int):
- self.blockdevice = blockdevice
- self.mode = mode
+ def _format_partitions(
+ self,
+ partitions: List[PartitionModification],
+ device_path: Path
+ ):
+ """
+ Format can be given an overriding path, for instance /dev/null to test
+ the formatting functionality and in essence the support for the given filesystem.
+ """
- def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
- return self
+ # don't touch existing partitions
+ create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()]
- def __repr__(self) -> str:
- return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
+ self._validate_partitions(create_or_modify_parts)
- def __exit__(self, *args :str, **kwargs :str) -> bool:
- # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
- if len(args) >= 2 and args[1]:
- raise args[1]
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(device_path)
- SysCommand('sync')
- return True
+ for part_mod in create_or_modify_parts:
+ # partition will be encrypted
+ if self._enc_config is not None and part_mod in self._enc_config.partitions:
+ device_handler.format_encrypted(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ part_mod.safe_fs_type,
+ self._enc_config
+ )
+ else:
+ device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path)
+
+ # synchronize with udev before using lsblk
+ SysCommand('udevadm settle')
+
+ lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path)
+
+ part_mod.partn = lsblk_info.partn
+ part_mod.partuuid = lsblk_info.partuuid
+ part_mod.uuid = lsblk_info.uuid
+
+ def _validate_partitions(self, partitions: List[PartitionModification]):
+ checks = {
+ # verify that all partitions have a path set (which implies that they have been created)
+ lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
+ # crypto luks is not a valid file system type
+ lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError(
+ 'Crypto luks cannot be set as a filesystem type'),
+ # file system type must be set
+ lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
+ }
+
+ for check, exc in checks.items():
+ found = next(filter(check, partitions), None)
+ if found is not None:
+ raise exc
+
+ def perform_lvm_operations(self):
+ info('Setting up LVM config...')
+
+ if not self._disk_config.lvm_config:
+ return
+
+ if self._enc_config:
+ self._setup_lvm_encrypted(
+ self._disk_config.lvm_config,
+ self._enc_config
+ )
+ else:
+ self._setup_lvm(self._disk_config.lvm_config)
+ self._format_lvm_vols(self._disk_config.lvm_config)
+
+ def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption):
+ if enc_config.encryption_type == EncryptionType.LvmOnLuks:
+ enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False)
+
+ self._setup_lvm(lvm_config, enc_mods)
+ self._format_lvm_vols(lvm_config)
+
+ # export the lvm group safely otherwise the Luks cannot be closed
+ self._safely_close_lvm(lvm_config)
+
+ for luks in enc_mods.values():
+ luks.lock()
+ elif enc_config.encryption_type == EncryptionType.LuksOnLvm:
+ self._setup_lvm(lvm_config)
+ enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False)
+ self._format_lvm_vols(lvm_config, enc_vols)
+
+ for luks in enc_vols.values():
+ luks.lock()
+
+ self._safely_close_lvm(lvm_config)
+
+ def _safely_close_lvm(self, lvm_config: LvmConfiguration):
+ for vg in lvm_config.vol_groups:
+ for vol in vg.volumes:
+ device_handler.lvm_vol_change(vol, False)
+
+ device_handler.lvm_export_vg(vg)
+
+ def _setup_lvm(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ self._lvm_create_pvs(lvm_config, enc_mods)
+
+ for vg in lvm_config.vol_groups:
+ pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods)
+
+ device_handler.lvm_vg_create(pv_dev_paths, vg.name)
+
+ # figure out what the actual available size in the group is
+ vg_info = device_handler.lvm_group_info(vg.name)
+
+ if not vg_info:
+ raise ValueError('Unable to fetch VG info')
+
+ # the actual available LVM Group size will be smaller than the
+ # total PVs size due to reserved metadata storage etc.
+ # so we'll have a look at the total avail. size, check the delta
+ # to the desired sizes and subtract some equally from the actually
+ # created volume
+ avail_size = vg_info.vg_size
+ desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default()))
+
+ delta = desired_size - avail_size
+ max_vol_offset = delta.convert(Unit.B)
+
+ max_vol = max(vg.volumes, key=lambda x: x.length)
+
+ for lv in vg.volumes:
+ offset = max_vol_offset if lv == max_vol else None
+
+ debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}')
+ device_handler.lvm_vol_create(vg.name, lv, offset)
- def partuuid_to_index(self, uuid :str) -> Optional[int]:
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
-
- # We'll use unreliable lbslk to grab children under the /dev/<device>
- output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- for index, partition in enumerate(device.get('children', [])):
- # But we'll use blkid to reliably grab the PARTUUID for that child device (partition)
- partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip()
- if partition_uuid.lower() == uuid.lower():
- return index
-
- raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
-
- def load_layout(self, layout :Dict[str, Any]) -> None:
- from ..luks import luks2
- from .btrfs import BTRFSPartition
-
- # If the layout tells us to wipe the drive, we do so
- if layout.get('wipe', False):
- if self.mode == GPT:
- if not self.parted_mklabel(self.blockdevice.device, "gpt"):
- raise KeyError(f"Could not create a GPT label on {self}")
- elif self.mode == MBR:
- if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MS-DOS label on {self}")
-
- self.blockdevice.flush_cache()
- time.sleep(3)
-
- prev_partition = None
- # We then iterate the partitions in order
- for partition in layout.get('partitions', []):
- # We don't want to re-add an existing partition (those containing a UUID already)
- if partition.get('wipe', False) and not partition.get('PARTUUID', None):
- start = partition.get('start') or (
- prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START)
- partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
- start=start,
- end=partition.get('size', '100%'),
- partition_format=partition.get('filesystem', {}).get('format', 'btrfs'),
- skip_mklabel=layout.get('wipe', False) is not False)
-
- elif (partition_uuid := partition.get('PARTUUID')):
- # We try to deal with both UUID and PARTUUID of a partition when it's being re-used.
- # We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID')
- # but for now, lets just attempt to deal with both.
- try:
- partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
- except DiskError:
- partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)
-
- log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
+ while True:
+ debug('Fetching LVM volume info')
+ lv_info = device_handler.lvm_vol_info(lv.name)
+ if lv_info is not None:
+ break
+
+ time.sleep(1)
+
+ self._lvm_vol_handle_e2scrub(vg)
+
+ def _format_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+ ):
+ for vol in lvm_config.get_all_volumes():
+ if enc_vol := enc_vols.get(vol, None):
+ if not enc_vol.mapper_dev:
+ raise ValueError('No mapper device defined')
+ path = enc_vol.mapper_dev
else:
- log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
- continue
-
- if partition.get('filesystem', {}).get('format', False):
- # needed for backward compatibility with the introduction of the new "format_options"
- format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
- disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
-
- if disk_encryption and partition in disk_encryption.all_partitions:
- if not partition['device_instance']:
- raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
-
- if partition.get('mountpoint',None):
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
- else:
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
-
- partition['device_instance'].encrypt(password=disk_encryption.encryption_password)
- # Immediately unlock the encrypted device to format the inner volume
- with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device:
- if not partition.get('wipe'):
- if storage['arguments'] == 'silent':
- raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
- else:
- if not partition.get('filesystem'):
- partition['filesystem'] = {}
-
- if not partition['filesystem'].get('format', False):
- while True:
- partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
- if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
- log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
- continue
- break
-
- unlocked_device.format(partition['filesystem']['format'], options=format_options)
-
- elif partition.get('wipe', False):
- if not partition['device_instance']:
- raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
-
- partition['device_instance'].format(partition['filesystem']['format'], options=format_options)
-
- if partition['filesystem']['format'] == 'btrfs':
- # We upgrade the device instance to a BTRFSPartition if we format it as such.
- # This is so that we can gain access to more features than otherwise available in Partition()
- partition['device_instance'] = BTRFSPartition(
- partition['device_instance'].path,
- block_device=partition['device_instance'].block_device,
- encrypted=False,
- filesystem='btrfs',
- autodetect_filesystem=False
- )
-
- if partition.get('boot', False):
- log(f"Marking partition {partition['device_instance']} as bootable.")
- self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on')
-
- prev_partition = partition
-
- def find_partition(self, mountpoint :str) -> Partition:
- for partition in self.blockdevice:
- if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
- return partition
-
- def partprobe(self) -> bool:
- try:
- SysCommand(f'partprobe {self.blockdevice.device}')
- except SysCallError as error:
- log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red")
- raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}")
+ path = vol.safe_dev_path
- return True
+ # wait a bit otherwise the mkfs will fail as it can't
+ # find the mapper device yet
+ device_handler.format(vol.fs_type, path)
- def raw_parted(self, string: str) -> SysCommand:
- try:
- cmd_handle = SysCommand(f'/usr/bin/parted -s {string}')
- time.sleep(0.5)
- return cmd_handle
- except SysCallError as error:
- log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red")
- return error
+ if vol.fs_type == FilesystemType.Btrfs:
+ device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options)
- def parted(self, string: str) -> bool:
- """
- Performs a parted execution of the given string
+ def _lvm_create_pvs(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ pv_paths: Set[Path] = set()
- :param string: A raw string passed to /usr/bin/parted -s <string>
- :type string: str
- """
- if (parted_handle := self.raw_parted(string)).exit_code == 0:
- return self.partprobe()
- else:
- raise DiskError(f"Parted failed to add a partition: {parted_handle}")
+ for vg in lvm_config.vol_groups:
+ pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods)
- def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
- # TODO: Implement this with declarative profiles instead.
- raise ValueError("Installation().use_entire_disk() has to be re-worked.")
+ device_handler.lvm_pv_create(pv_paths)
- def add_partition(
+ def _get_all_pv_dev_paths(
self,
- partition_type :str,
- start :str,
- end :str,
- partition_format :Optional[str] = None,
- skip_mklabel :bool = False
- ) -> Partition:
- log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
-
- if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
- # If it's a completely empty drive, and we're about to add partitions to it
- # we need to make sure there's a filesystem label.
- if self.mode == GPT:
- if not self.parted_mklabel(self.blockdevice.device, "gpt"):
- raise KeyError(f"Could not create a GPT label on {self}")
- elif self.mode == MBR:
- if not self.parted_mklabel(self.blockdevice.device, "msdos"):
- raise KeyError(f"Could not create a MS-DOS label on {self}")
-
- self.blockdevice.flush_cache()
-
- previous_partuuids = []
- for partition in self.blockdevice.partitions.values():
- try:
- previous_partuuids.append(partition.part_uuid)
- except DiskError:
- pass
-
- # TODO this check should probably run in the setup process rather than during the installation
- if self.mode == MBR:
- if len(self.blockdevice.partitions) > 3:
- DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
-
- if partition_format:
- parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}'
- else:
- parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}'
-
- log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG)
-
- if self.parted(parted_string):
- for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
- self.blockdevice.flush_cache()
-
- new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
- new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
-
- if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
- try:
- return self.blockdevice.get_partition(partuuid=new_partuuid)
- except Exception as err:
- log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
- log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
- log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
- log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
- log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
- raise err
- else:
- log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
- self.partprobe()
- time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
- else:
- print("Parted did not return True during partition creation")
+ pvs: List[PartitionModification],
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ) -> Set[Path]:
+ pv_paths: Set[Path] = set()
+
+ for pv in pvs:
+ if enc_pv := enc_mods.get(pv, None):
+ if mapper := enc_pv.mapper_dev:
+ pv_paths.add(mapper)
+ else:
+ pv_paths.add(pv.safe_dev_path)
+
+ return pv_paths
+
+ def _encrypt_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[LvmVolume, Luks2]:
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+
+ for vol in lvm_config.get_all_volumes():
+ if vol in enc_config.lvm_volumes:
+ luks_handler = device_handler.encrypt(
+ vol.safe_dev_path,
+ vol.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create
+ )
+
+ enc_vols[vol] = luks_handler
+
+ return enc_vols
+
+ def _encrypt_partitions(
+ self,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[PartitionModification, Luks2]:
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+
+ for mod in self._disk_config.device_modifications:
+ partitions = mod.partitions
+
+ # don't touch existing partitions
+ filtered_part = [p for p in partitions if not p.exists()]
+
+ self._validate_partitions(filtered_part)
+
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(mod.device_path)
+
+ enc_mods = {}
+
+ for part_mod in filtered_part:
+ if part_mod in enc_config.partitions:
+ luks_handler = device_handler.encrypt(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create=lock_after_create
+ )
+
+ enc_mods[part_mod] = luks_handler
- total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
- total_partitions.update(previous_partuuids)
+ return enc_mods
- # TODO: This should never be able to happen
- log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
- log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
+ def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup):
+ # from arch wiki:
+ # If a logical volume will be formatted with ext4, leave at least 256 MiB
+ # free space in the volume group to allow using e2scrub
+ if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]):
+ largest_vol = max(vol_gp.volumes, key=lambda x: x.length)
- raise DiskError(f"Could not add partition using: {parted_string}")
+ device_handler.lvm_vol_reduce(
+ largest_vol.safe_dev_path,
+ Size(256, Unit.MiB, SectorSize.default())
+ )
- def set_name(self, partition: int, name: str) -> bool:
- return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
+ def _do_countdown(self) -> bool:
+ SIG_TRIGGER = False
- def set(self, partition: int, string: str) -> bool:
- log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
- return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
+ def kill_handler(sig: int, frame: Any) -> None:
+ print()
+ exit(0)
- def parted_mklabel(self, device: str, disk_label: str) -> bool:
- log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
- # Try to unmount devices before attempting to run mklabel
- try:
- SysCommand(f'bash -c "umount {device}?"')
- except:
- pass
+ def sig_handler(sig: int, frame: Any) -> None:
+ signal.signal(signal.SIGINT, kill_handler)
- self.partprobe()
- worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0
- self.partprobe()
+ original_sigint_handler = signal.getsignal(signal.SIGINT)
+ signal.signal(signal.SIGINT, sig_handler)
- return worked
+ for i in range(5, 0, -1):
+ print(f"{i}", end='')
+
+ for x in range(4):
+ sys.stdout.flush()
+ time.sleep(0.25)
+ print(".", end='')
+
+ if SIG_TRIGGER:
+ prompt = _('Do you really want to abort?')
+ choice = Menu(prompt, Menu.yes_no(), skip=False).run()
+ if choice.value == Menu.yes():
+ exit(0)
+
+ if SIG_TRIGGER is False:
+ sys.stdin.read()
+
+ SIG_TRIGGER = False
+ signal.signal(signal.SIGINT, sig_handler)
+
+ print()
+ signal.signal(signal.SIGINT, original_sigint_handler)
+
+ return True
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
deleted file mode 100644
index 80d0cb53..00000000
--- a/archinstall/lib/disk/helpers.py
+++ /dev/null
@@ -1,556 +0,0 @@
-from __future__ import annotations
-import json
-import logging
-import os # type: ignore
-import pathlib
-import re
-import time
-import glob
-
-from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-from .diskinfo import get_lsblk_info
-from ..models.subvolume import Subvolume
-
-from .blockdevice import BlockDevice
-from .dmcryptdev import DMCryptDev
-from .mapperdev import MapperDev
-from ..exceptions import SysCallError, DiskError
-from ..general import SysCommand
-from ..output import log
-from ..storage import storage
-
-if TYPE_CHECKING:
- from .partition import Partition
-
-
-ROOT_DIR_PATTERN = re.compile('^.*?/devices')
-GIGA = 2 ** 30
-
-def convert_size_to_gb(size :Union[int, float]) -> float:
- return round(size / GIGA,1)
-
-def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]:
- result = {device: 0 for device in block_devices}
-
- for device, weight in result.items():
- if device.spinning:
- weight -= 10
- else:
- weight += 5
-
- if device.bus_type == 'nvme':
- weight += 20
- elif device.bus_type == 'sata':
- weight += 10
-
- result[device] = weight
-
- return result
-
-def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]:
- for disk in devices:
- if disk.size >= gigabytes:
- yield disk
-
-def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
- if not filter_out:
- filter_out = []
-
- copy_devices = [*devices]
- for filter_device in filter_out:
- if filter_device in copy_devices:
- copy_devices.pop(copy_devices.index(filter_device))
-
- copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes))
-
- if not len(copy_devices):
- return None
-
- return max(copy_devices, key=(lambda device : device.size))
-
-def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
- if not filter_out:
- filter_out = []
-
- copy_devices = [*devices]
- for filter_device in filter_out:
- if filter_device in copy_devices:
- copy_devices.pop(copy_devices.index(filter_device))
-
- if not len(copy_devices):
- return None
-
- return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
-
-def convert_to_gigabytes(string :str) -> float:
- unit = string.strip()[-1]
- size = float(string.strip()[:-1])
-
- if unit == 'M':
- size = size / 1024
- elif unit == 'T':
- size = size * 1024
-
- return size
-
-def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
- # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
- if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
- with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
- if f.read(1) == '1':
- return
-
- path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name)))
- hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire")
- for bus in hotplug_buses:
- if os.path.exists('/sys/bus/{}'.format(bus)):
- for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)):
- device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus)))
- if re.search(device_link, path):
- return
- return True
-
-
-def cleanup_bash_escapes(data :str) -> str:
- return data.replace(r'\ ', ' ')
-
-def blkid(cmd :str) -> Dict[str, Any]:
- if '-o' in cmd and '-o export' not in cmd:
- raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.")
- elif '-o' not in cmd:
- cmd += ' -o export'
-
- try:
- raw_data = SysCommand(cmd).decode()
- except SysCallError as error:
- log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
- raise error
-
- result = {}
- # Process the raw result
- devname = None
- for line in raw_data.split('\r\n'):
- if not len(line):
- devname = None
- continue
-
- key, val = line.split('=', 1)
- if key.lower() == 'devname':
- devname = val
- # Lowercase for backwards compatibility with all_disks() previous use cases
- result[devname] = {
- "path": devname,
- "PATH": devname
- }
- continue
-
- result[devname][key] = cleanup_bash_escapes(val)
-
- return result
-
-def get_loop_info(path :str) -> Dict[str, Any]:
- for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == path:
- continue
-
- return {
- path: {
- **drive,
- 'type' : 'loop',
- 'TYPE' : 'loop',
- 'DEVTYPE' : 'loop',
- 'PATH' : drive['name'],
- 'path' : drive['name']
- }
- }
-
- return {}
-
-def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
- result = {}
- for device_path, device_information in information.items():
- dev_name = pathlib.Path(device_information['PATH']).name
- if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
- with open(f"/sys/class/block/{dev_name}/uevent") as fh:
- device_information.update(uevent(fh.read()))
-
- if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
- with dmcrypt_name.open('r') as fh:
- device_information['DMCRYPT_NAME'] = fh.read().strip()
-
- result[device_path] = device_information
-
- return result
-
-def uevent(data :str) -> Dict[str, Any]:
- information = {}
-
- for line in data.replace('\r\n', '\n').split('\n'):
- if len((line := line.strip())):
- key, val = line.split('=', 1)
- information[key] = val
-
- return information
-
-def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
- device_information = {}
- with open(f"/sys/class/block/{dev_name}/uevent") as fh:
- device_information.update(uevent(fh.read()))
-
- return {
- f"/dev/{dev_name}" : {
- **device_information,
- 'path' : f'/dev/{dev_name}',
- 'PATH' : f'/dev/{dev_name}',
- 'PTTYPE' : None
- }
- }
-
-
-def all_disks() -> List[BlockDevice]:
- log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
- return all_blockdevices(partitions=False, mappers=False)
-
-def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]:
- for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
- partprobe(device_path)
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt))
-
- try:
- if exclude_iso_dev:
- # exclude all devices associated with the iso boot locations
- iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt']
-
- try:
- lsblk_info = get_lsblk_info(device_path)
- except DiskError:
- continue
-
- if any([dev in lsblk_info.mountpoints for dev in iso_devs]):
- continue
-
- information = blkid(f'blkid -p -o export {device_path}')
- return enrich_blockdevice_information(information)
- except SysCallError as ex:
- if ex.exit_code == 2:
- # Assume that it's a loop device, and try to get info on it
- try:
- resolved_device_name = device_path.readlink().name
- except OSError:
- resolved_device_name = device_path.name
-
- try:
- information = get_loop_info(device_path)
- if not information:
- raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1)
- return enrich_blockdevice_information(information)
-
- except SysCallError:
- information = get_blockdevice_uevent(resolved_device_name)
- return enrich_blockdevice_information(information)
- else:
- # We could not reliably get any information, perhaps the disk is clean of information?
- if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1:
- raise ex
-
-def all_blockdevices(
- mappers: bool = False,
- partitions: bool = False,
- error: bool = False,
- exclude_iso_dev: bool = True
-) -> Dict[str, Any]:
- """
- Returns BlockDevice() and Partition() objects for all available devices.
- """
- from .partition import Partition
-
- instances = {}
-
- # Due to lsblk being highly unreliable for this use case,
- # we'll iterate the /sys/class definitions and find the information
- # from there.
- for block_device in glob.glob("/sys/class/block/*"):
- try:
- device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
- except FileNotFoundError:
- log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
-
- if device_path.exists() is False:
- log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
- continue
-
- information = get_blockdevice_info(device_path)
- if not information:
- continue
-
- for path, path_info in information.items():
- if path_info.get('DMCRYPT_NAME'):
- instances[path] = DMCryptDev(dev_path=path)
- elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
- if partitions:
- instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path))))
- elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
- instances[path] = BlockDevice(path, path_info)
- elif path_info.get('TYPE') in ('squashfs', 'erofs'):
- # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
- continue
- else:
- log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
-
- if mappers:
- for block_device in glob.glob("/dev/mapper/*"):
- if (pathobj := pathlib.Path(block_device)).is_symlink():
- instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
-
- return instances
-
-
-def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
- partition_name = path.name
- pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
- return f"/dev/{pci_device.parent.name}"
-
-def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_blockdevices(partitions=False)
- for drive in collection:
- if size and convert_to_gigabytes(collection[drive]['size']) != size:
- continue
- if model and (collection[drive]['model'] is None or collection[drive]['model'].lower() != model.lower()):
- continue
-
- return collection[drive]
-
-def split_bind_name(path :Union[pathlib.Path, str]) -> list:
- # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
- # we check for the bind notation. if exist we'll only use the "true" device path
- if '[' in str(path) : # is a bind path (btrfs subvolume path)
- device_path, bind_path = str(path).split('[')
- bind_path = bind_path[:-1].strip() # remove the ]
- else:
- device_path = path
- bind_path = None
- return device_path,bind_path
-
-def find_mountpoint(device_path :str) -> Dict[str, Any]:
- try:
- for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
- yield filesystem
- except SysCallError:
- return {}
-
-def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]:
- for traversal in list(map(str, [str(path)] + list(path.parents))):
- if traversal in ignore:
- continue
-
- try:
- log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
- if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')):
- return json.loads(output)
-
- except SysCallError as error:
- log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray")
- pass
-
- if not traverse:
- break
-
- raise DiskError(f"Could not get mount information for path {path}")
-
-
-def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]:
- import traceback
-
- log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}")
- device_path, bind_path = split_bind_name(path)
- output = {}
-
- for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
- if traversal in ignore:
- continue
-
- try:
- log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
- if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
- break
-
- except SysCallError as error:
- print('ERROR:', error)
- pass
-
- if not traverse:
- break
-
- if not output:
- raise DiskError(f"Could not get mount information for device path {device_path}")
-
- output = json.loads(output)
-
- # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
- # i.e. the subvolume filesystem we're searching for
- if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
- output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
-
- if 'filesystems' in output:
- if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
-
- if return_real_path:
- return output['filesystems'][0], traversal
- else:
- return output['filesystems'][0]
-
- if return_real_path:
- return {}, traversal
- else:
- return {}
-
-
-def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
- for info in data:
- if info.get('target') not in filters:
- filters[info.get('target')] = None
-
- filters.update(get_all_targets(info.get('children', [])))
-
- return filters
-
-def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
- from .partition import Partition
-
- try:
- output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
- except SysCallError:
- return {}
-
- if not output:
- return {}
-
- output = json.loads(output)
-
- mounts = {}
-
- block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
-
- block_devices_mountpoints = {}
- for blockdev in block_devices_available.values():
- if not type(blockdev) in (Partition, MapperDev):
- continue
-
- if isinstance(blockdev, Partition):
- if blockdev.mountpoints:
- for blockdev_mountpoint in blockdev.mountpoints:
- block_devices_mountpoints[blockdev_mountpoint] = blockdev
- else:
- if blockdev.mount_information:
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
-
- log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
-
- for mountpoint in list(get_all_targets(output['filesystems']).keys()):
- # Since all_blockdevices() returns PosixPath objects, we need to convert
- # findmnt paths to pathlib.Path() first:
- mountpoint = pathlib.Path(mountpoint)
-
- if mountpoint in block_devices_mountpoints:
- if mountpoint not in mounts:
- mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- # If the already defined mountpoint is a DMCryptDev, and the newly found
- # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
- elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
- mounts[mountpoint] = block_devices_mountpoints[mountpoint]
-
- log(f"Available partitions: {mounts}", level=logging.DEBUG)
-
- return mounts
-
-
-def get_filesystem_type(path :str) -> Optional[str]:
- try:
- return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
- except SysCallError:
- return None
-
-
-def disk_layouts() -> Optional[Dict[str, Any]]:
- try:
- if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0:
- return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()}
- else:
- log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow")
- return None
- except SysCallError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
- return None
- except json.decoder.JSONDecodeError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
- return None
-
-
-def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
- for device in block_devices:
- for partition in block_devices[device]['partitions']:
- if partition.get('mountpoint', None) == relative_mountpoint:
- return partition
-
-def partprobe(path :str = '') -> bool:
- try:
- if SysCommand(f'bash -c "partprobe {path}"').exit_code == 0:
- return True
- except SysCallError:
- pass
- return False
-
-def convert_device_to_uuid(path :str) -> str:
- device_name, bind_name = split_bind_name(path)
-
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- partprobe(device_name)
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) # TODO: Remove, we should be relying on blkid instead of lsblk
-
- # TODO: Convert lsblk to blkid
- # (lsblk supports BlockDev and Partition UUID grabbing, blkid requires you to pick PTUUID and PARTUUID)
- output = json.loads(SysCommand(f"lsblk --json -o+UUID {device_name}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- if (dev_uuid := device.get('uuid', None)):
- return dev_uuid
-
- raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
-
-
-def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
- """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
- Coded for clarity rather than performance
-
- Input parms:
- :parm partition the partition we check
- :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema
-
- :parm target (a string representing a mount path we want to check for.
- :type str
-
- :parm strict if the check will be strict, target is exactly the mountpoint, or no, where the target is a leaf (f.i. to check if it is in /mnt/archinstall/). Not available for root check ('/') for obvious reasons
-
- """
- # we create the mountpoint list
- if isinstance(partition,dict):
- subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
- mountpoints = [partition.get('mountpoint')]
- mountpoints += [volume.mountpoint for volume in subvolumes]
- else:
- mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
-
- # we check
- if strict or target == '/':
- if target in mountpoints:
- return True
- else:
- return False
- else:
- for mp in mountpoints:
- if mp and mp.endswith(target):
- return True
- return False
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
deleted file mode 100644
index bf1b3583..00000000
--- a/archinstall/lib/disk/mapperdev.py
+++ /dev/null
@@ -1,92 +0,0 @@
-import glob
-import pathlib
-import logging
-import json
-from dataclasses import dataclass
-from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING
-
-from ..exceptions import SysCallError
-from ..general import SysCommand
-from ..output import log
-
-if TYPE_CHECKING:
- from .btrfs import BtrfsSubvolumeInfo
-
-@dataclass
-class MapperDev:
- mappername :str
-
- @property
- def name(self):
- return self.mappername
-
- @property
- def path(self):
- return f"/dev/mapper/{self.mappername}"
-
- @property
- def part_uuid(self):
- return self.partition.part_uuid
-
- @property
- def partition(self):
- from .helpers import uevent, get_parent_of_partition
- from .partition import Partition
- from .blockdevice import BlockDevice
-
- for mapper in glob.glob('/dev/mapper/*'):
- path_obj = pathlib.Path(mapper)
- if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink():
- dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve()
-
- for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
- partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
-
- try:
- uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
- except SysCallError as error:
- log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
-
- information = uevent(uevent_data)
- block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
-
- return Partition(information['DEVNAME'], block_device=block_device)
-
- raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device")
-
- @property
- def mountpoint(self) -> Optional[pathlib.Path]:
- try:
- data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
- for filesystem in data['filesystems']:
- return pathlib.Path(filesystem.get('target'))
-
- except SysCallError as error:
- # Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow")
- pass
-
- return None
-
- @property
- def mountpoints(self) -> List[Dict[str, Any]]:
- return [obj['target'] for obj in self.mount_information]
-
- @property
- def mount_information(self) -> List[Dict[str, Any]]:
- from .helpers import find_mountpoint
- return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)]
-
- @property
- def filesystem(self) -> Optional[str]:
- from .helpers import get_filesystem_type
- return get_filesystem_type(self.path)
-
- @property
- def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']:
- from .btrfs import subvolume_info_from_path
-
- for mountpoint in self.mount_information:
- if target := mountpoint.get('target'):
- if subvolume := subvolume_info_from_path(pathlib.Path(target)):
- yield subvolume
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
deleted file mode 100644
index 87eaa6a7..00000000
--- a/archinstall/lib/disk/partition.py
+++ /dev/null
@@ -1,661 +0,0 @@
-import glob
-import time
-import logging
-import json
-import os
-import hashlib
-import typing
-from dataclasses import dataclass, field
-from pathlib import Path
-from typing import Optional, Dict, Any, List, Union, Iterator
-
-from .blockdevice import BlockDevice
-from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name
-from ..storage import storage
-from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
-from ..output import log
-from ..general import SysCommand
-from .btrfs.btrfs_helpers import subvolume_info_from_path
-from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
-
-@dataclass
-class PartitionInfo:
- partition_object: 'Partition'
- device_path: str # This would be /dev/sda1 for instance
- bootable: bool
- size: float
- sector_size: int
- start: Optional[int]
- end: Optional[int]
- pttype: Optional[str]
- filesystem_type: Optional[str]
- partuuid: Optional[str]
- uuid: Optional[str]
- mountpoints: List[Path] = field(default_factory=list)
-
- def __post_init__(self):
- if not all([self.partuuid, self.uuid]):
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- lsblk_info = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
- try:
- lsblk_info = json.loads(lsblk_info)
- except json.decoder.JSONDecodeError:
- log(f"Could not decode JSON: {lsblk_info}", fg="red", level=logging.ERROR)
- raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
-
- if not (device := lsblk_info.get('blockdevices', [None])[0]):
- raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
-
- self.partuuid = device.get('partuuid')
- self.uuid = device.get('uuid')
-
- # Lets build a list of requirements that we would like
- # to retry and build (stuff that can take time between partprobes)
- requirements = []
- requirements.append(self.partuuid)
-
- # Unformatted partitions won't have a UUID
- if lsblk_info.get('fstype') is not None:
- requirements.append(self.uuid)
-
- if all(requirements):
- break
-
- self.partition_object.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
-
- def get_first_mountpoint(self) -> Optional[Path]:
- if len(self.mountpoints) > 0:
- return self.mountpoints[0]
- return None
-
-
-class Partition:
- def __init__(
- self,
- path: str,
- block_device: BlockDevice,
- part_id :Optional[str] = None,
- filesystem :Optional[str] = None,
- mountpoint :Optional[str] = None,
- encrypted :bool = False,
- autodetect_filesystem :bool = True,
- ):
- if not part_id:
- part_id = os.path.basename(path)
-
- if type(block_device) is str:
- raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
-
- self.block_device = block_device
- self._path = path
- self._part_id = part_id
- self._target_mountpoint = mountpoint
- self._encrypted = encrypted
- self._wipe = False
- self._type = 'primary'
-
- if mountpoint:
- self.mount(mountpoint)
-
- try:
- self._partition_info = self._fetch_information()
-
- if not autodetect_filesystem and filesystem:
- self._partition_info.filesystem_type = filesystem
-
- if self._partition_info.filesystem_type == 'crypto_LUKS':
- self._encrypted = True
- except DiskError:
- self._partition_info = None
-
- @typing.no_type_check # I hate doint this but I'm currently unsure where this is used.
- def __lt__(self, left_comparitor :BlockDevice) -> bool:
- if type(left_comparitor) == Partition:
- left_comparitor = left_comparitor.path
- else:
- left_comparitor = str(left_comparitor)
-
- # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
- return self._path < left_comparitor
-
- def __repr__(self, *args :str, **kwargs :str) -> str:
- mount_repr = ''
- if self._partition_info:
- if mountpoint := self._partition_info.get_first_mountpoint():
- mount_repr = f", mounted={mountpoint}"
- elif self._target_mountpoint:
- mount_repr = f", rel_mountpoint={self._target_mountpoint}"
-
- classname = self.__class__.__name__
-
- if not self._partition_info:
- return f'{classname}(path={self._path})'
- elif self._encrypted:
- return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
- else:
- return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
-
- def as_json(self) -> Dict[str, Any]:
- """
- this is used for the table representation of the partition (see FormattedOutput)
- """
- partition_info = {
- 'type': self._type,
- 'PARTUUID': self.part_uuid,
- 'wipe': self._wipe,
- 'boot': self.boot,
- 'ESP': self.boot,
- 'mountpoint': self._target_mountpoint,
- 'encrypted': self._encrypted,
- 'start': self.start,
- 'size': self.end,
- 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown'
- }
-
- return partition_info
-
- def __dump__(self) -> Dict[str, Any]:
- # TODO remove this in favour of as_json
- return {
- 'type': self._type,
- 'PARTUUID': self.part_uuid,
- 'wipe': self._wipe,
- 'boot': self.boot,
- 'ESP': self.boot,
- 'mountpoint': self._target_mountpoint,
- 'encrypted': self._encrypted,
- 'start': self.start,
- 'size': self.end,
- 'filesystem': {
- 'format': self._partition_info.filesystem_type if self._partition_info else 'None'
- }
- }
-
- def _call_lsblk(self) -> Dict[str, Any]:
- for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk
- # This sleep might be overkill, but lsblk is known to
- # work against a chaotic cache that can change during call
- # causing no information to be returned (blkid is better)
- # time.sleep(1)
-
- # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
-
- try:
- output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
- except SysCallError as error:
- # Get the output minus the message/info from lsblk if it returns a non-zero exit code.
- output = error.worker.decode('UTF-8')
- if '{' in output:
- output = output[output.find('{'):]
-
- if output:
- try:
- lsblk_info = json.loads(output)
- return lsblk_info
- except json.decoder.JSONDecodeError:
- log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR)
-
- raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk')
-
- def _call_sfdisk(self) -> Dict[str, Any]:
- output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
-
- if output:
- sfdisk_info = json.loads(output)
- partitions = sfdisk_info.get('partitiontable', {}).get('partitions', [])
- node = list(filter(lambda x: x['node'] == self._path, partitions))
-
- if len(node) > 0:
- return node[0]
-
- return {}
-
- raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk')
-
- def _fetch_information(self) -> PartitionInfo:
- lsblk_info = self._call_lsblk()
- sfdisk_info = self._call_sfdisk()
-
- if not (device := lsblk_info.get('blockdevices', [])):
- raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
-
- # Grab the first (and only) block device in the list as we're targeting a specific partition
- device = device[0]
-
- mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
- bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
-
- return PartitionInfo(
- partition_object=self,
- device_path=self._path,
- pttype=device['pttype'],
- partuuid=device['partuuid'],
- uuid=device['uuid'],
- sector_size=device['log-sec'],
- size=convert_size_to_gb(device['size']),
- start=sfdisk_info.get('start', None),
- end=sfdisk_info.get('size', None),
- bootable=bootable,
- filesystem_type=device['fstype'],
- mountpoints=mountpoints
- )
-
- @property
- def target_mountpoint(self) -> Optional[str]:
- return self._target_mountpoint
-
- @property
- def path(self) -> str:
- return self._path
-
- @property
- def filesystem(self) -> str:
- if self._partition_info:
- return self._partition_info.filesystem_type
-
- @property
- def mountpoint(self) -> Optional[Path]:
- if len(self.mountpoints) > 0:
- return self.mountpoints[0]
- return None
-
- @property
- def mountpoints(self) -> List[Path]:
- if self._partition_info:
- return self._partition_info.mountpoints
-
- @property
- def sector_size(self) -> int:
- if self._partition_info:
- return self._partition_info.sector_size
-
- @property
- def start(self) -> Optional[int]:
- if self._partition_info:
- return self._partition_info.start
-
- @property
- def end(self) -> Optional[int]:
- if self._partition_info:
- return self._partition_info.end
-
- @property
- def end_sectors(self) -> Optional[int]:
- if self._partition_info:
- start = self._partition_info.start
- end = self._partition_info.end
- if start and end:
- return start + end
-
- @property
- def size(self) -> Optional[float]:
- if self._partition_info:
- return self._partition_info.size
-
- @property
- def boot(self) -> bool:
- if self._partition_info:
- return self._partition_info.bootable
-
- @property
- def partition_type(self) -> Optional[str]:
- if self._partition_info:
- return self._partition_info.pttype
-
- @property
- def part_uuid(self) -> str:
- if self._partition_info:
- return self._partition_info.partuuid
-
- @property
- def uuid(self) -> Optional[str]:
- """
- Returns the UUID as returned by lsblk for the **partition**.
- This is more reliable than relying on /dev/disk/by-uuid as
- it doesn't seam to be able to detect md raid partitions.
- For bind mounts all the subvolumes share the same uuid
- """
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- if not self.partprobe():
- raise DiskError(f"Could not perform partprobe on {self.device_path}")
-
- time.sleep(storage.get('DISK_TIMEOUTS', 1) * i)
-
- partuuid = self._safe_uuid
- if partuuid:
- return partuuid
-
- raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
-
- @property
- def _safe_uuid(self) -> Optional[str]:
- """
- A near copy of self.uuid but without any delays.
- This function should only be used where uuid is not crucial.
- For instance when you want to get a __repr__ of the class.
- """
- if not self.partprobe():
- if self.block_device.partition_type == 'iso9660':
- return None
-
- log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
-
- try:
- return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
- except SysCallError as error:
- if self.block_device.partition_type == 'iso9660':
- # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
- return None
-
- log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}")
-
- @property
- def _safe_part_uuid(self) -> Optional[str]:
- """
- A near copy of self.uuid but without any delays.
- This function should only be used where uuid is not crucial.
- For instance when you want to get a __repr__ of the class.
- """
- if not self.partprobe():
- if self.block_device.partition_type == 'iso9660':
- return None
-
- log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
-
- try:
- return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
- except SysCallError as error:
- if self.block_device.partition_type == 'iso9660':
- # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
- return None
-
- log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
-
- if self._partition_info:
- return self._partition_info.uuid
-
- @property
- def encrypted(self) -> Union[bool, None]:
- return self._encrypted
-
- @property
- def parent(self) -> str:
- return self.real_device
-
- @property
- def real_device(self) -> str:
- output = SysCommand('lsblk -J').decode('UTF-8')
-
- if output:
- for blockdevice in json.loads(output)['blockdevices']:
- if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
- return f"/dev/{parent}"
- return self._path
-
- raise DiskError('Unable to get disk information for command "lsblk -J"')
-
- @property
- def device_path(self) -> str:
- """ for bind mounts returns the physical path of the partition
- """
- device_path, bind_name = split_bind_name(self._path)
- return device_path
-
- @property
- def bind_name(self) -> str:
- """ for bind mounts returns the bind name (subvolume path).
- Returns none if this property does not exist
- """
- device_path, bind_name = split_bind_name(self._path)
- return bind_name
-
- @property
- def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]:
- from .helpers import findmnt
-
- def iterate_children_recursively(information):
- for child in information.get('children', []):
- if target := child.get('target'):
- if child.get('fstype') == 'btrfs':
- if subvolume := subvolume_info_from_path(Path(target)):
- yield subvolume
-
- if child.get('children'):
- for subchild in iterate_children_recursively(child):
- yield subchild
-
- if self._partition_info.filesystem_type == 'btrfs':
- for mountpoint in self._partition_info.mountpoints:
- if result := findmnt(mountpoint):
- for filesystem in result.get('filesystems', []):
- if subvolume := subvolume_info_from_path(mountpoint):
- yield subvolume
-
- for child in iterate_children_recursively(filesystem):
- yield child
-
- def partprobe(self) -> bool:
- try:
- if self.block_device:
- return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code
- except SysCallError as error:
- log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG)
-
- return False
-
- def detect_inner_filesystem(self, password :str) -> Optional[str]:
- log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
- from ..luks import luks2
-
- try:
- with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
- return unlocked_device.filesystem
- except SysCallError:
- pass
- return None
-
- def has_content(self) -> bool:
- fs_type = self._partition_info.filesystem_type
- if not fs_type or "swap" in fs_type:
- return False
-
- temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
- temporary_path = Path(temporary_mountpoint)
-
- temporary_path.mkdir(parents=True, exist_ok=True)
- if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0:
- raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}')
-
- files = len(glob.glob(f"{temporary_mountpoint}/*"))
- iterations = 0
- while SysCommand(f"/usr/bin/umount -R {temporary_mountpoint}").exit_code != 0 and (iterations := iterations + 1) < 10:
- time.sleep(1)
-
- temporary_path.rmdir()
-
- return True if files > 0 else False
-
- def encrypt(self, password: Optional[str] = None) -> str:
- """
- A wrapper function for luks2() instances and the .encrypt() method of that instance.
- """
- from ..luks import luks2
-
- handle = luks2(self, None, None)
- return handle.encrypt(self, password=password)
-
- def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
- """
- Format can be given an overriding path, for instance /dev/null to test
- the formatting functionality and in essence the support for the given filesystem.
- """
- if filesystem is None:
- filesystem = self._partition_info.filesystem_type
-
- if path is None:
- path = self._path
-
- # This converts from fat32 -> vfat to unify filesystem names
- filesystem = get_mount_fs_type(filesystem)
-
- # To avoid "unable to open /dev/x: No such file or directory"
- start_wait = time.time()
- while Path(path).exists() is False and time.time() - start_wait < 10:
- time.sleep(0.025)
-
- if log_formatting:
- log(f'Formatting {path} -> {filesystem}', level=logging.INFO)
-
- try:
- if filesystem == 'btrfs':
- options = ['-f'] + options
-
- mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')
- if mkfs and 'UUID:' not in mkfs:
- raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self._partition_info.filesystem_type = filesystem
-
- elif filesystem == 'vfat':
- options = ['-F32'] + options
- log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")
- if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self._partition_info.filesystem_type = filesystem
-
- elif filesystem == 'ext4':
- options = ['-F'] + options
-
- if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self._partition_info.filesystem_type = filesystem
-
- elif filesystem == 'ext2':
- options = ['-F'] + options
-
- if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self._partition_info.filesystem_type = 'ext2'
- elif filesystem == 'xfs':
- options = ['-f'] + options
-
- if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self._partition_info.filesystem_type = filesystem
-
- elif filesystem == 'f2fs':
- options = ['-f'] + options
-
- if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self._partition_info.filesystem_type = filesystem
-
- elif filesystem == 'ntfs3':
- options = ['-f'] + options
-
- if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self._partition_info.filesystem_type = filesystem
-
- elif filesystem == 'crypto_LUKS':
- # from ..luks import luks2
- # encrypted_partition = luks2(self, None, None)
- # encrypted_partition.format(path)
- self._partition_info.filesystem_type = filesystem
-
- else:
- raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
- except SysCallError as error:
- log(f"Formatting ran in to an error: {error}", level=logging.WARNING, fg="orange")
- if retry is True:
- log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange")
- time.sleep(storage.get('DISK_TIMEOUTS', 1))
-
- return self.format(filesystem, path, log_formatting, options, retry=False)
-
- if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
- self._encrypted = True
- else:
- self._encrypted = False
-
- return True
-
- def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]:
- if data['name'] == name:
- return parent
- elif 'children' in data:
- for child in data['children']:
- if parent := self.find_parent_of(child, name, parent=data['name']):
- return parent
-
- return None
-
- def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
- if not self._partition_info.get_first_mountpoint():
- log(f'Mounting {self} to {target}', level=logging.INFO)
-
- if not fs:
- fs = self._partition_info.filesystem_type
-
- fs_type = get_mount_fs_type(fs)
-
- Path(target).mkdir(parents=True, exist_ok=True)
-
- if self.bind_name:
- device_path = self.device_path
- # TODO options should be better be a list than a string
- if options:
- options = f"{options},subvol={self.bind_name}"
- else:
- options = f"subvol={self.bind_name}"
- else:
- device_path = self._path
- try:
- if options:
- mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
- else:
- mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} {device_path} {target}")
-
- # TODO: Should be redundant to check for exit_code
- if mnt_handle.exit_code != 0:
- raise DiskError(f"Could not mount {self._path} to {target} using options {options}")
- except SysCallError as err:
- raise err
-
- # Update the partition info since the mount info has changed after this call.
- self._partition_info = self._fetch_information()
- return True
-
- return False
-
- def unmount(self) -> bool:
- SysCommand(f"/usr/bin/umount {self._path}")
-
- # Update the partition info since the mount info has changed after this call.
- self._partition_info = self._fetch_information()
- return True
-
- def filesystem_supported(self) -> bool:
- """
- The support for a filesystem (this partition) is tested by calling
- partition.format() with a path set to '/dev/null' which returns two exceptions:
- 1. SysCallError saying that /dev/null is not formattable - but the filesystem is supported
- 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
- """
- try:
- self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False)
- except (SysCallError, DiskError):
- pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code
- except UnknownFilesystemFormat as err:
- raise err
- return True
-
-
-def get_mount_fs_type(fs :str) -> str:
- if fs == 'ntfs':
- return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
- elif fs == 'fat32':
- return 'vfat' # This is the actual type used for fat32 mounting
- return fs
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py
new file mode 100644
index 00000000..fb1eb74b
--- /dev/null
+++ b/archinstall/lib/disk/partitioning_menu.py
@@ -0,0 +1,429 @@
+from __future__ import annotations
+
+import re
+from pathlib import Path
+from typing import Any, TYPE_CHECKING, List, Optional, Tuple
+from dataclasses import dataclass
+
+from .device_model import (
+ PartitionModification, FilesystemType, BDevice,
+ Size, Unit, PartitionType, PartitionFlag,
+ ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption
+)
+from ..hardware import SysInfo
+from ..menu import Menu, ListManager, MenuSelection, TextInput
+from ..output import FormattedOutput, warn
+from .subvolume_menu import SubvolumeMenu
+
+if TYPE_CHECKING:
+ _: Any
+
+
+@dataclass
+class DefaultFreeSector:
+ start: Size
+ end: Size
+
+
+class PartitioningList(ListManager):
+ """
+ subclass of ListManager for the managing of user accounts
+ """
+ def __init__(self, prompt: str, device: BDevice, device_partitions: List[PartitionModification]):
+ self._device = device
+ self._actions = {
+ 'create_new_partition': str(_('Create a new partition')),
+ 'suggest_partition_layout': str(_('Suggest partition layout')),
+ 'remove_added_partitions': str(_('Remove all newly added partitions')),
+ 'assign_mountpoint': str(_('Assign mountpoint')),
+ 'mark_formatting': str(_('Mark/Unmark to be formatted (wipes data)')),
+ 'mark_bootable': str(_('Mark/Unmark as bootable')),
+ 'set_filesystem': str(_('Change filesystem')),
+ 'btrfs_mark_compressed': str(_('Mark/Unmark as compressed')), # btrfs only
+ 'btrfs_mark_nodatacow': str(_('Mark/Unmark as nodatacow')), # btrfs only
+ 'btrfs_set_subvolumes': str(_('Set subvolumes')), # btrfs only
+ 'delete_partition': str(_('Delete partition'))
+ }
+
+ display_actions = list(self._actions.values())
+ super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:])
+
+ def selected_action_display(self, partition: PartitionModification) -> str:
+ return str(_('Partition'))
+
+ def filter_options(self, selection: PartitionModification, options: List[str]) -> List[str]:
+ not_filter = []
+
+ # only display formatting if the partition exists already
+ if not selection.exists():
+ not_filter += [self._actions['mark_formatting']]
+ else:
+ # only allow options if the existing partition
+ # was marked as formatting, otherwise we run into issues where
+ # 1. select a new fs -> potentially mark as wipe now
+ # 2. Switch back to old filesystem -> should unmark wipe now, but
+ # how do we know it was the original one?
+ not_filter += [
+ self._actions['set_filesystem'],
+ self._actions['mark_bootable'],
+ self._actions['btrfs_mark_compressed'],
+ self._actions['btrfs_mark_nodatacow'],
+ self._actions['btrfs_set_subvolumes']
+ ]
+
+ # non btrfs partitions shouldn't get btrfs options
+ if selection.fs_type != FilesystemType.Btrfs:
+ not_filter += [
+ self._actions['btrfs_mark_compressed'],
+ self._actions['btrfs_mark_nodatacow'],
+ self._actions['btrfs_set_subvolumes']
+ ]
+ else:
+ not_filter += [self._actions['assign_mountpoint']]
+
+ return [o for o in options if o not in not_filter]
+
+ def handle_action(
+ self,
+ action: str,
+ entry: Optional[PartitionModification],
+ data: List[PartitionModification]
+ ) -> List[PartitionModification]:
+ action_key = [k for k, v in self._actions.items() if v == action][0]
+
+ match action_key:
+ case 'create_new_partition':
+ new_partition = self._create_new_partition()
+ data += [new_partition]
+ case 'suggest_partition_layout':
+ new_partitions = self._suggest_partition_layout(data)
+ if len(new_partitions) > 0:
+ data = new_partitions
+ case 'remove_added_partitions':
+ choice = self._reset_confirmation()
+ if choice.value == Menu.yes():
+ data = [part for part in data if part.is_exists_or_modify()]
+ case 'assign_mountpoint' if entry:
+ entry.mountpoint = self._prompt_mountpoint()
+ if entry.mountpoint == Path('/boot'):
+ entry.set_flag(PartitionFlag.Boot)
+ if SysInfo.has_uefi():
+ entry.set_flag(PartitionFlag.ESP)
+ case 'mark_formatting' if entry:
+ self._prompt_formatting(entry)
+ case 'mark_bootable' if entry:
+ entry.invert_flag(PartitionFlag.Boot)
+ if SysInfo.has_uefi():
+ entry.invert_flag(PartitionFlag.ESP)
+ case 'set_filesystem' if entry:
+ fs_type = self._prompt_partition_fs_type()
+ if fs_type:
+ entry.fs_type = fs_type
+ # btrfs subvolumes will define mountpoints
+ if fs_type == FilesystemType.Btrfs:
+ entry.mountpoint = None
+ case 'btrfs_mark_compressed' if entry:
+ self._toggle_mount_option(entry, BtrfsMountOption.compress)
+ case 'btrfs_mark_nodatacow' if entry:
+ self._toggle_mount_option(entry, BtrfsMountOption.nodatacow)
+ case 'btrfs_set_subvolumes' if entry:
+ self._set_btrfs_subvolumes(entry)
+ case 'delete_partition' if entry:
+ data = self._delete_partition(entry, data)
+
+ return data
+
+ def _delete_partition(
+ self,
+ entry: PartitionModification,
+ data: List[PartitionModification]
+ ) -> List[PartitionModification]:
+ if entry.is_exists_or_modify():
+ entry.status = ModificationStatus.Delete
+ return data
+ else:
+ return [d for d in data if d != entry]
+
+ def _toggle_mount_option(
+ self,
+ partition: PartitionModification,
+ option: BtrfsMountOption
+ ):
+ if option.value not in partition.mount_options:
+ if option == BtrfsMountOption.compress:
+ partition.mount_options = [
+ o for o in partition.mount_options
+ if o != BtrfsMountOption.nodatacow.value
+ ]
+
+ partition.mount_options = [
+ o for o in partition.mount_options
+ if not o.startswith(BtrfsMountOption.compress.name)
+ ]
+
+ partition.mount_options.append(option.value)
+ else:
+ partition.mount_options = [
+ o for o in partition.mount_options if o != option.value
+ ]
+
+ def _set_btrfs_subvolumes(self, partition: PartitionModification):
+ partition.btrfs_subvols = SubvolumeMenu(
+ _("Manage btrfs subvolumes for current partition"),
+ partition.btrfs_subvols
+ ).run()
+
+ def _prompt_formatting(self, partition: PartitionModification):
+ # an existing partition can toggle between Exist or Modify
+ if partition.is_modify():
+ partition.status = ModificationStatus.Exist
+ return
+ elif partition.exists():
+ partition.status = ModificationStatus.Modify
+
+ # If we mark a partition for formatting, but the format is CRYPTO LUKS, there's no point in formatting it really
+ # without asking the user which inner-filesystem they want to use. Since the flag 'encrypted' = True is already set,
+ # it's safe to change the filesystem for this partition.
+ if partition.fs_type == FilesystemType.Crypto_luks:
+ prompt = str(_('This partition is currently encrypted, to format it a filesystem has to be specified'))
+ fs_type = self._prompt_partition_fs_type(prompt)
+ partition.fs_type = fs_type
+
+ if fs_type == FilesystemType.Btrfs:
+ partition.mountpoint = None
+
+ def _prompt_mountpoint(self) -> Path:
+ header = str(_('Partition mount-points are relative to inside the installation, the boot would be /boot as an example.')) + '\n'
+ header += str(_('If mountpoint /boot is set, then the partition will also be marked as bootable.')) + '\n'
+ prompt = str(_('Mountpoint: '))
+
+ print(header)
+
+ while True:
+ value = TextInput(prompt).run().strip()
+
+ if value:
+ mountpoint = Path(value)
+ break
+
+ return mountpoint
+
+ def _prompt_partition_fs_type(self, prompt: str = '') -> FilesystemType:
+ options = {fs.value: fs for fs in FilesystemType if fs != FilesystemType.Crypto_luks}
+
+ prompt = prompt + '\n' + str(_('Enter a desired filesystem type for the partition'))
+ choice = Menu(prompt, options, sort=False, skip=False).run()
+ return options[choice.single_value]
+
+ def _validate_value(
+ self,
+ sector_size: SectorSize,
+ total_size: Size,
+ text: str,
+ start: Optional[Size]
+ ) -> Optional[Size]:
+ match = re.match(r'([0-9]+)([a-zA-Z|%]*)', text, re.I)
+
+ if match:
+ str_value, unit = match.groups()
+
+ if unit == '%' and start:
+ available = total_size - start
+ value = int(available.value * (int(str_value) / 100))
+ unit = available.unit.name
+ else:
+ value = int(str_value)
+
+ if unit and unit not in Unit.get_all_units():
+ return None
+
+ unit = Unit[unit] if unit else Unit.sectors
+ return Size(value, unit, sector_size)
+
+ return None
+
+ def _enter_size(
+ self,
+ sector_size: SectorSize,
+ total_size: Size,
+ prompt: str,
+ default: Size,
+ start: Optional[Size],
+ ) -> Size:
+ while True:
+ value = TextInput(prompt).run().strip()
+ size: Optional[Size] = None
+ if not value:
+ size = default
+ else:
+ size = self._validate_value(sector_size, total_size, value, start)
+
+ if size:
+ return size
+
+ warn(f'Invalid value: {value}')
+
+ def _prompt_size(self) -> Tuple[Size, Size]:
+ device_info = self._device.device_info
+
+ text = str(_('Current free sectors on device {}:')).format(device_info.path) + '\n\n'
+ free_space_table = FormattedOutput.as_table(device_info.free_space_regions)
+ prompt = text + free_space_table + '\n'
+
+ total_sectors = device_info.total_size.format_size(Unit.sectors, device_info.sector_size)
+ total_bytes = device_info.total_size.format_size(Unit.B)
+
+ prompt += str(_('Total: {} / {}')).format(total_sectors, total_bytes) + '\n\n'
+ prompt += str(_('All entered values can be suffixed with a unit: %, B, KB, KiB, MB, MiB...')) + '\n'
+ prompt += str(_('If no unit is provided, the value is interpreted as sectors')) + '\n'
+ print(prompt)
+
+ default_free_sector = self._find_default_free_space()
+
+ if not default_free_sector:
+ default_free_sector = DefaultFreeSector(
+ Size(0, Unit.sectors, self._device.device_info.sector_size),
+ Size(0, Unit.sectors, self._device.device_info.sector_size)
+ )
+
+ # prompt until a valid start sector was entered
+ start_prompt = str(_('Enter start (default: sector {}): ')).format(default_free_sector.start.value)
+
+ start_size = self._enter_size(
+ device_info.sector_size,
+ device_info.total_size,
+ start_prompt,
+ default_free_sector.start,
+ None
+ )
+
+ if start_size.value == default_free_sector.start.value and default_free_sector.end.value != 0:
+ end_size = default_free_sector.end
+ else:
+ end_size = device_info.total_size
+
+ # prompt until valid end sector was entered
+ end_prompt = str(_('Enter end (default: {}): ')).format(end_size.as_text())
+ end_size = self._enter_size(
+ device_info.sector_size,
+ device_info.total_size,
+ end_prompt,
+ end_size,
+ start_size
+ )
+
+ return start_size, end_size
+
+ def _find_default_free_space(self) -> Optional[DefaultFreeSector]:
+ device_info = self._device.device_info
+
+ largest_free_area: Optional[DeviceGeometry] = None
+ largest_deleted_area: Optional[PartitionModification] = None
+
+ if len(device_info.free_space_regions) > 0:
+ largest_free_area = max(device_info.free_space_regions, key=lambda r: r.get_length())
+
+ deleted_partitions = list(filter(lambda x: x.status == ModificationStatus.Delete, self._data))
+ if len(deleted_partitions) > 0:
+ largest_deleted_area = max(deleted_partitions, key=lambda p: p.length)
+
+ def _free_space(space: DeviceGeometry) -> DefaultFreeSector:
+ start = Size(space.start, Unit.sectors, device_info.sector_size)
+ end = Size(space.end, Unit.sectors, device_info.sector_size)
+ return DefaultFreeSector(start, end)
+
+ def _free_deleted(space: PartitionModification) -> DefaultFreeSector:
+ start = space.start.convert(Unit.sectors, self._device.device_info.sector_size)
+ end = space.end.convert(Unit.sectors, self._device.device_info.sector_size)
+ return DefaultFreeSector(start, end)
+
+ if not largest_deleted_area and largest_free_area:
+ return _free_space(largest_free_area)
+ elif not largest_free_area and largest_deleted_area:
+ return _free_deleted(largest_deleted_area)
+ elif not largest_deleted_area and not largest_free_area:
+ return None
+ elif largest_free_area and largest_deleted_area:
+ free_space = _free_space(largest_free_area)
+ if free_space.start > largest_deleted_area.start:
+ return free_space
+ else:
+ return _free_deleted(largest_deleted_area)
+
+ return None
+
+ def _create_new_partition(self) -> PartitionModification:
+ fs_type = self._prompt_partition_fs_type()
+
+ start_size, end_size = self._prompt_size()
+ length = end_size - start_size
+
+ # new line for the next prompt
+ print()
+
+ mountpoint = None
+ if fs_type != FilesystemType.Btrfs:
+ mountpoint = self._prompt_mountpoint()
+
+ partition = PartitionModification(
+ status=ModificationStatus.Create,
+ type=PartitionType.Primary,
+ start=start_size,
+ length=length,
+ fs_type=fs_type,
+ mountpoint=mountpoint
+ )
+
+ if partition.mountpoint == Path('/boot'):
+ partition.set_flag(PartitionFlag.Boot)
+ if SysInfo.has_uefi():
+ partition.set_flag(PartitionFlag.ESP)
+
+ return partition
+
+ def _reset_confirmation(self) -> MenuSelection:
+ prompt = str(_('This will remove all newly added partitions, continue?'))
+ choice = Menu(prompt, Menu.yes_no(), default_option=Menu.no(), skip=False).run()
+ return choice
+
+ def _suggest_partition_layout(self, data: List[PartitionModification]) -> List[PartitionModification]:
+ # if modifications have been done already, inform the user
+ # that this operation will erase those modifications
+ if any([not entry.exists() for entry in data]):
+ choice = self._reset_confirmation()
+ if choice.value == Menu.no():
+ return []
+
+ from ..interactions.disk_conf import suggest_single_disk_layout
+
+ device_modification = suggest_single_disk_layout(self._device)
+ return device_modification.partitions
+
+
+def manual_partitioning(
+ device: BDevice,
+ prompt: str = '',
+ preset: List[PartitionModification] = []
+) -> List[PartitionModification]:
+ if not prompt:
+ prompt = str(_('Partition management: {}')).format(device.device_info.path) + '\n'
+ prompt += str(_('Total length: {}')).format(device.device_info.total_size.format_size(Unit.MiB))
+
+ manual_preset = []
+
+ if not preset:
+ # we'll display the existing partitions of the device
+ for partition in device.partition_infos:
+ manual_preset.append(
+ PartitionModification.from_existing_partition(partition)
+ )
+ else:
+ manual_preset = preset
+
+ menu_list = PartitioningList(prompt, device, manual_preset)
+ partitions: List[PartitionModification] = menu_list.run()
+
+ if menu_list.is_last_choice_cancel():
+ return preset
+
+ return partitions
diff --git a/archinstall/lib/disk/subvolume_menu.py b/archinstall/lib/disk/subvolume_menu.py
new file mode 100644
index 00000000..ea77149d
--- /dev/null
+++ b/archinstall/lib/disk/subvolume_menu.py
@@ -0,0 +1,61 @@
+from pathlib import Path
+from typing import List, Optional, Any, TYPE_CHECKING
+
+from .device_model import SubvolumeModification
+from ..menu import TextInput, ListManager
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class SubvolumeMenu(ListManager):
+ def __init__(self, prompt: str, btrfs_subvols: List[SubvolumeModification]):
+ self._actions = [
+ str(_('Add subvolume')),
+ str(_('Edit subvolume')),
+ str(_('Delete subvolume'))
+ ]
+ super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:])
+
+ def selected_action_display(self, subvolume: SubvolumeModification) -> str:
+ return str(subvolume.name)
+
+ def _add_subvolume(self, editing: Optional[SubvolumeModification] = None) -> Optional[SubvolumeModification]:
+ name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run()
+
+ if not name:
+ return None
+
+ mountpoint = TextInput(f'{_("Subvolume mountpoint")}: ', str(editing.mountpoint) if editing else '').run()
+
+ if not mountpoint:
+ return None
+
+ return SubvolumeModification(Path(name), Path(mountpoint))
+
+ def handle_action(
+ self,
+ action: str,
+ entry: Optional[SubvolumeModification],
+ data: List[SubvolumeModification]
+ ) -> List[SubvolumeModification]:
+ if action == self._actions[0]: # add
+ new_subvolume = self._add_subvolume()
+
+ if new_subvolume is not None:
+ # in case a user with the same username as an existing user
+ # was created we'll replace the existing one
+ data = [d for d in data if d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif entry is not None:
+ if action == self._actions[1]: # edit subvolume
+ new_subvolume = self._add_subvolume(entry)
+
+ if new_subvolume is not None:
+ # we'll remove the original subvolume and add the modified version
+ data = [d for d in data if d.name != entry.name and d.name != new_subvolume.name]
+ data += [new_subvolume]
+ elif action == self._actions[2]: # delete
+ data = [d for d in data if d != entry]
+
+ return data
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
deleted file mode 100644
index 5809c073..00000000
--- a/archinstall/lib/disk/user_guides.py
+++ /dev/null
@@ -1,240 +0,0 @@
-from __future__ import annotations
-import logging
-from typing import Optional, Dict, Any, List, TYPE_CHECKING
-
-# https://stackoverflow.com/a/39757388/929999
-from ..models.subvolume import Subvolume
-
-if TYPE_CHECKING:
- from .blockdevice import BlockDevice
- _: Any
-
-from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
-from ..hardware import has_uefi
-from ..output import log
-from ..menu import Menu
-
-
-def suggest_single_disk_layout(block_device :BlockDevice,
- default_filesystem :Optional[str] = None,
- advanced_options :bool = False) -> Dict[str, Any]:
-
- if not default_filesystem:
- from ..user_interaction import ask_for_main_filesystem_format
- default_filesystem = ask_for_main_filesystem_format(advanced_options)
-
- MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB
- using_subvolumes = False
- using_home_partition = False
- compression = False
-
- if default_filesystem == 'btrfs':
- prompt = str(_('Would you like to use BTRFS subvolumes with a default structure?'))
- choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
- using_subvolumes = choice.value == Menu.yes()
-
- prompt = str(_('Would you like to use BTRFS compression?'))
- choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
- compression = choice.value == Menu.yes()
-
- layout = {
- block_device.path : {
- "wipe" : True,
- "partitions" : []
- }
- }
-
- # Used for reference: https://wiki.archlinux.org/title/partitioning
-
- # 2 MiB is unallocated for GRUB on BIOS. Potentially unneeded for
- # other bootloaders?
-
- # TODO: On BIOS, /boot partition is only needed if the drive will
- # be encrypted, otherwise it is not recommended. We should probably
- # add a check for whether the drive will be encrypted or not.
- layout[block_device.path]['partitions'].append({
- # Boot
- "type" : "primary",
- "start" : "3MiB",
- "size" : "203MiB",
- "boot" : True,
- "encrypted" : False,
- "wipe" : True,
- "mountpoint" : "/boot",
- "filesystem" : {
- "format" : "fat32"
- }
- })
-
- # Increase the UEFI partition if UEFI is detected.
- # Also re-align the start to 1MiB since we don't need the first sectors
- # like we do in MBR layouts where the boot loader is installed traditionally.
- if has_uefi():
- layout[block_device.path]['partitions'][-1]['start'] = '1MiB'
- layout[block_device.path]['partitions'][-1]['size'] = '512MiB'
-
- layout[block_device.path]['partitions'].append({
- # Root
- "type" : "primary",
- "start" : "206MiB",
- "encrypted" : False,
- "wipe" : True,
- "mountpoint" : "/" if not using_subvolumes else None,
- "filesystem" : {
- "format" : default_filesystem,
- "mount_options" : ["compress=zstd"] if compression else []
- }
- })
-
- if has_uefi():
- layout[block_device.path]['partitions'][-1]['start'] = '513MiB'
-
- if not using_subvolumes and block_device.size >= MIN_SIZE_TO_ALLOW_HOME_PART:
- prompt = str(_('Would you like to create a separate partition for /home?'))
- choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
- using_home_partition = choice.value == Menu.yes()
-
- # Set a size for / (/root)
- if using_subvolumes or block_device.size < MIN_SIZE_TO_ALLOW_HOME_PART or not using_home_partition:
- # We'll use subvolumes
- # Or the disk size is too small to allow for a separate /home
- # Or the user doesn't want to create a separate partition for /home
- layout[block_device.path]['partitions'][-1]['size'] = '100%'
- else:
- layout[block_device.path]['partitions'][-1]['size'] = f"{min(block_device.size, 20)}GiB"
-
- if default_filesystem == 'btrfs' and using_subvolumes:
- # if input('Do you want to use a recommended structure? (Y/n): ').strip().lower() in ('', 'y', 'yes'):
- # https://btrfs.wiki.kernel.org/index.php/FAQ
- # https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash
- # https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh
- layout[block_device.path]['partitions'][1]['btrfs'] = {
- 'subvolumes': [
- Subvolume('@', '/'),
- Subvolume('@home', '/home'),
- Subvolume('@log', '/var/log'),
- Subvolume('@pkg', '/var/cache/pacman/pkg'),
- Subvolume('@.snapshots', '/.snapshots')
- ]
- }
- elif using_home_partition:
- # If we don't want to use subvolumes,
- # But we want to be able to re-use data between re-installs..
- # A second partition for /home would be nice if we have the space for it
- layout[block_device.path]['partitions'].append({
- # Home
- "type" : "primary",
- "start" : f"{min(block_device.size, 20)}GiB",
- "size" : "100%",
- "encrypted" : False,
- "wipe" : True,
- "mountpoint" : "/home",
- "filesystem" : {
- "format" : default_filesystem,
- "mount_options" : ["compress=zstd"] if compression else []
- }
- })
-
- return layout
-
-
-def suggest_multi_disk_layout(block_devices :List[BlockDevice], default_filesystem :Optional[str] = None, advanced_options :bool = False):
-
- if not default_filesystem:
- from ..user_interaction import ask_for_main_filesystem_format
- default_filesystem = ask_for_main_filesystem_format(advanced_options)
-
- # Not really a rock solid foundation of information to stand on, but it's a start:
- # https://www.reddit.com/r/btrfs/comments/m287gp/partition_strategy_for_two_physical_disks/
- # https://www.reddit.com/r/btrfs/comments/9us4hr/what_is_your_btrfs_partitionsubvolumes_scheme/
-
- MIN_SIZE_TO_ALLOW_HOME_PART = 40 # GiB
- ARCH_LINUX_INSTALLED_SIZE = 20 # GiB, rough estimate taking in to account user desktops etc. TODO: Catch user packages to detect size?
-
- block_devices = sort_block_devices_based_on_performance(block_devices).keys()
-
- home_device = select_largest_device(block_devices, gigabytes=MIN_SIZE_TO_ALLOW_HOME_PART)
- root_device = select_disk_larger_than_or_close_to(block_devices, gigabytes=ARCH_LINUX_INSTALLED_SIZE, filter_out=[home_device])
-
- if home_device is None or root_device is None:
- text = _('The selected drives do not have the minimum capacity required for an automatic suggestion\n')
- text += _('Minimum capacity for /home partition: {}GB\n').format(MIN_SIZE_TO_ALLOW_HOME_PART)
- text += _('Minimum capacity for Arch Linux partition: {}GB').format(ARCH_LINUX_INSTALLED_SIZE)
- Menu(str(text), [str(_('Continue'))], skip=False).run()
- return None
-
- compression = False
-
- if default_filesystem == 'btrfs':
- # prompt = 'Would you like to use BTRFS subvolumes with a default structure?'
- # choice = Menu(prompt, ['yes', 'no'], skip=False, default_option='yes').run()
- # using_subvolumes = choice == 'yes'
-
- prompt = str(_('Would you like to use BTRFS compression?'))
- choice = Menu(prompt, Menu.yes_no(), skip=False, default_option=Menu.yes()).run()
- compression = choice.value == Menu.yes()
-
- log(f"Suggesting multi-disk-layout using {len(block_devices)} disks, where {root_device} will be /root and {home_device} will be /home", level=logging.DEBUG)
-
- layout = {
- root_device.path : {
- "wipe" : True,
- "partitions" : []
- },
- home_device.path : {
- "wipe" : True,
- "partitions" : []
- },
- }
-
- # TODO: Same deal as with the single disk layout, we should
- # probably check if the drive will be encrypted.
- layout[root_device.path]['partitions'].append({
- # Boot
- "type" : "primary",
- "start" : "3MiB",
- "size" : "203MiB",
- "boot" : True,
- "encrypted" : False,
- "wipe" : True,
- "mountpoint" : "/boot",
- "filesystem" : {
- "format" : "fat32"
- }
- })
-
- if has_uefi():
- layout[root_device.path]['partitions'][-1]['start'] = '1MiB'
- layout[root_device.path]['partitions'][-1]['size'] = '512MiB'
-
- layout[root_device.path]['partitions'].append({
- # Root
- "type" : "primary",
- "start" : "206MiB",
- "size" : "100%",
- "encrypted" : False,
- "wipe" : True,
- "mountpoint" : "/",
- "filesystem" : {
- "format" : default_filesystem,
- "mount_options" : ["compress=zstd"] if compression else []
- }
- })
- if has_uefi():
- layout[root_device.path]['partitions'][-1]['start'] = '513MiB'
-
- layout[home_device.path]['partitions'].append({
- # Home
- "type" : "primary",
- "start" : "1MiB",
- "size" : "100%",
- "encrypted" : False,
- "wipe" : True,
- "mountpoint" : "/home",
- "filesystem" : {
- "format" : default_filesystem,
- "mount_options" : ["compress=zstd"] if compression else []
- }
- })
-
- return layout
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
deleted file mode 100644
index 076a8ba2..00000000
--- a/archinstall/lib/disk/validators.py
+++ /dev/null
@@ -1,48 +0,0 @@
-from typing import List
-
-def valid_parted_position(pos :str) -> bool:
- if not len(pos):
- return False
-
- if pos.isdigit():
- return True
-
- pos_lower = pos.lower()
-
- if (pos_lower.endswith('b') or pos_lower.endswith('s')) and pos[:-1].isdigit():
- return True
-
- if any(pos_lower.endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
- for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']):
- return True
-
- return False
-
-
-def fs_types() -> List[str]:
- # https://www.gnu.org/software/parted/manual/html_node/mkpart.html
- # Above link doesn't agree with `man parted` /mkpart documentation:
- """
- fs-type can
- be one of "btrfs", "ext2",
- "ext3", "ext4", "fat16",
- "fat32", "hfs", "hfs+",
- "linux-swap", "ntfs", "reis‐
- erfs", "udf", or "xfs".
- """
- return [
- "btrfs",
- "ext2",
- "ext3", "ext4", # `man parted` allows these
- "fat16", "fat32",
- "hfs", "hfs+", # "hfsx", not included in `man parted`
- "linux-swap",
- "ntfs",
- "reiserfs",
- "udf", # "ufs", not included in `man parted`
- "xfs", # `man parted` allows this
- ]
-
-
-def valid_fs_type(fstype :str) -> bool:
- return fstype.lower() in fs_types()