Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
authorDaniel Girtler <girtler.daniel@gmail.com>2024-04-15 18:49:00 +1000
committerGitHub <noreply@github.com>2024-04-15 18:49:00 +1000
commitb470b16ec923260cfd9c5b9f2b88e0a39611b463 (patch)
tree25a32fd904f739e181a62a62451637bcf7cd6588 /archinstall/lib/disk
parent7d9e9d8ba0bcba888ec46554f87dfc414c73f9c4 (diff)
LVM support (#2104)
* Submenu for disk configuration * Update * Add LVM manual config * PV selection * LVM volume menu * Update * Fix mypy * Update * Update * Update * Update * Update * Update * Update * Update * Update LVM * Update * Update * Btrfs support * Refactor * LVM on Luks * Luks on LVM * Update * LVM on Luks * Update * Update * mypy * Update * Fix bug with LuksOnLvm and Btrfs * Update * Update * Info -> Debug output
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/__init__.py9
-rw-r--r--archinstall/lib/disk/device_handler.py264
-rw-r--r--archinstall/lib/disk/device_model.py408
-rw-r--r--archinstall/lib/disk/disk_menu.py140
-rw-r--r--archinstall/lib/disk/encryption_menu.py131
-rw-r--r--archinstall/lib/disk/fido.py8
-rw-r--r--archinstall/lib/disk/filesystem.py295
-rw-r--r--archinstall/lib/disk/partitioning_menu.py18
-rw-r--r--archinstall/lib/disk/subvolume_menu.py18
9 files changed, 1132 insertions, 159 deletions
diff --git a/archinstall/lib/disk/__init__.py b/archinstall/lib/disk/__init__.py
index 24dafef5..7f881273 100644
--- a/archinstall/lib/disk/__init__.py
+++ b/archinstall/lib/disk/__init__.py
@@ -11,6 +11,11 @@ from .device_model import (
BDevice,
DiskLayoutType,
DiskLayoutConfiguration,
+ LvmLayoutType,
+ LvmConfiguration,
+ LvmVolumeGroup,
+ LvmVolume,
+ LvmVolumeStatus,
PartitionTable,
Unit,
Size,
@@ -30,7 +35,7 @@ from .device_model import (
CleanType,
get_lsblk_info,
get_all_lsblk_info,
- get_lsblk_by_mountpoint
+ get_lsblk_by_mountpoint,
)
from .encryption_menu import (
select_encryption_type,
@@ -39,3 +44,5 @@ from .encryption_menu import (
select_partitions_to_encrypt,
DiskEncryptionMenu,
)
+
+from .disk_menu import DiskLayoutConfigurationMenu
diff --git a/archinstall/lib/disk/device_handler.py b/archinstall/lib/disk/device_handler.py
index 6e91ac2e..7ba70382 100644
--- a/archinstall/lib/disk/device_handler.py
+++ b/archinstall/lib/disk/device_handler.py
@@ -3,8 +3,9 @@ from __future__ import annotations
import json
import os
import logging
+import time
from pathlib import Path
-from typing import List, Dict, Any, Optional, TYPE_CHECKING
+from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal, Iterable
from parted import ( # type: ignore
Disk, Geometry, FileSystem,
@@ -17,11 +18,12 @@ from .device_model import (
BDevice, _DeviceInfo, _PartitionInfo,
FilesystemType, Unit, PartitionTable,
ModificationStatus, get_lsblk_info, LsblkInfo,
- _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption
+ _BtrfsSubvolumeInfo, get_all_lsblk_info, DiskEncryption, LvmVolumeGroup, LvmVolume, Size, LvmGroupInfo,
+ SectorSize, LvmVolumeInfo, LvmPVInfo, SubvolumeModification, BtrfsMountOption
)
from ..exceptions import DiskError, UnknownFilesystemFormat
-from ..general import SysCommand, SysCallError, JSON
+from ..general import SysCommand, SysCallError, JSON, SysCommandWorker
from ..luks import Luks2
from ..output import debug, error, info, warn, log
from ..utils.util import is_subpath
@@ -189,7 +191,7 @@ class DeviceHandler(object):
return subvol_infos
- def _perform_formatting(
+ def format(
self,
fs_type: FilesystemType,
path: Path,
@@ -234,7 +236,7 @@ class DeviceHandler(object):
options += additional_parted_options
options_str = ' '.join(options)
- info(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
+ debug(f'Formatting filesystem: /usr/bin/{command} {options_str} {path}')
try:
SysCommand(f"/usr/bin/{command} {options_str} {path}")
@@ -243,7 +245,33 @@ class DeviceHandler(object):
error(msg)
raise DiskError(msg) from err
- def _perform_enc_formatting(
+ def encrypt(
+ self,
+ dev_path: Path,
+ mapper_name: Optional[str],
+ enc_password: str,
+ lock_after_create: bool = True
+ ) -> Luks2:
+ luks_handler = Luks2(
+ dev_path,
+ mapper_name=mapper_name,
+ password=enc_password
+ )
+
+ key_file = luks_handler.encrypt()
+
+ luks_handler.unlock(key_file=key_file)
+
+ if not luks_handler.mapper_dev:
+ raise DiskError('Failed to unlock luks device')
+
+ if lock_after_create:
+ debug(f'luks2 locking device: {dev_path}')
+ luks_handler.lock()
+
+ return luks_handler
+
+ def format_encrypted(
self,
dev_path: Path,
mapper_name: Optional[str],
@@ -258,71 +286,160 @@ class DeviceHandler(object):
key_file = luks_handler.encrypt()
- debug(f'Unlocking luks2 device: {dev_path}')
luks_handler.unlock(key_file=key_file)
if not luks_handler.mapper_dev:
raise DiskError('Failed to unlock luks device')
info(f'luks2 formatting mapper dev: {luks_handler.mapper_dev}')
- self._perform_formatting(fs_type, luks_handler.mapper_dev)
+ self.format(fs_type, luks_handler.mapper_dev)
info(f'luks2 locking device: {dev_path}')
luks_handler.lock()
- def _validate_partitions(self, partitions: List[PartitionModification]):
- checks = {
- # verify that all partitions have a path set (which implies that they have been created)
- lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
- # crypto luks is not a valid file system type
- lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError('Crypto luks cannot be set as a filesystem type'),
- # file system type must be set
- lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
- }
-
- for check, exc in checks.items():
- found = next(filter(check, partitions), None)
- if found is not None:
- raise exc
-
- def format(
+ def _lvm_info(
self,
- device_mod: DeviceModification,
- enc_conf: Optional['DiskEncryption'] = None
- ):
- """
- Format can be given an overriding path, for instance /dev/null to test
- the formatting functionality and in essence the support for the given filesystem.
- """
+ cmd: str,
+ info_type: Literal['lv', 'vg', 'pvseg']
+ ) -> Optional[Any]:
+ raw_info = SysCommand(cmd).decode().split('\n')
- # only verify partitions that are being created or modified
- create_or_modify_parts = [p for p in device_mod.partitions if p.is_create_or_modify()]
+ # for whatever reason the output sometimes contains
+ # "File descriptor X leaked leaked on vgs invocation
+ data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw])
- self._validate_partitions(create_or_modify_parts)
+ debug(f'LVM info: {data}')
- # make sure all devices are unmounted
- self._umount_all_existing(device_mod.device_path)
-
- for part_mod in create_or_modify_parts:
- # partition will be encrypted
- if enc_conf is not None and part_mod in enc_conf.partitions:
- self._perform_enc_formatting(
- part_mod.safe_dev_path,
- part_mod.mapper_name,
- part_mod.safe_fs_type,
- enc_conf
- )
- else:
- self._perform_formatting(part_mod.safe_fs_type, part_mod.safe_dev_path)
+ reports = json.loads(data)
+
+ for report in reports['report']:
+ if len(report[info_type]) != 1:
+ raise ValueError(f'Report does not contain any entry')
- # synchronize with udev before using lsblk
- SysCommand('udevadm settle')
+ entry = report[info_type][0]
- lsblk_info = self._fetch_part_info(part_mod.safe_dev_path)
+ match info_type:
+ case 'pvseg':
+ return LvmPVInfo(
+ pv_name=Path(entry['pv_name']),
+ lv_name=entry['lv_name'],
+ vg_name=entry['vg_name'],
+ )
+ case 'lv':
+ return LvmVolumeInfo(
+ lv_name=entry['lv_name'],
+ vg_name=entry['vg_name'],
+ lv_size=Size(int(entry[f'lv_size'][:-1]), Unit.B, SectorSize.default())
+ )
+ case 'vg':
+ return LvmGroupInfo(
+ vg_uuid=entry['vg_uuid'],
+ vg_size=Size(int(entry[f'vg_size'][:-1]), Unit.B, SectorSize.default())
+ )
+
+ return None
- part_mod.partn = lsblk_info.partn
- part_mod.partuuid = lsblk_info.partuuid
- part_mod.uuid = lsblk_info.uuid
+ def _lvm_info_with_retry(self, cmd: str, info_type: Literal['lv', 'vg', 'pvseg']) -> Optional[Any]:
+ attempts = 3
+
+ for attempt_nr in range(attempts):
+ try:
+ return self._lvm_info(cmd, info_type)
+ except ValueError:
+ time.sleep(attempt_nr + 1)
+
+ raise ValueError(f'Failed to fetch {info_type} information')
+
+ def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]:
+ cmd = (
+ 'lvs --reportformat json '
+ '--unit B '
+ f'-S lv_name={lv_name}'
+ )
+
+ return self._lvm_info_with_retry(cmd, 'lv')
+
+ def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]:
+ cmd = (
+ 'vgs --reportformat json '
+ '--unit B '
+ '-o vg_name,vg_uuid,vg_size '
+ f'-S vg_name={vg_name}'
+ )
+
+ return self._lvm_info_with_retry(cmd, 'vg')
+
+ def lvm_pvseg_info(self, vg_name: str, lv_name: str) -> Optional[LvmPVInfo]:
+ cmd = (
+ 'pvs '
+ '--segments -o+lv_name,vg_name '
+ f'-S vg_name={vg_name},lv_name={lv_name} '
+ '--reportformat json '
+ )
+
+ return self._lvm_info_with_retry(cmd, 'pvseg')
+
+ def lvm_vol_change(self, vol: LvmVolume, activate: bool):
+ active_flag = 'y' if activate else 'n'
+ cmd = f'lvchange -a {active_flag} {vol.safe_dev_path}'
+
+ debug(f'lvchange volume: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_export_vg(self, vg: LvmVolumeGroup):
+ cmd = f'vgexport {vg.name}'
+
+ debug(f'vgexport: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_import_vg(self, vg: LvmVolumeGroup):
+ cmd = f'vgimport {vg.name}'
+
+ debug(f'vgimport: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_vol_reduce(self, vol_path: Path, amount: Size):
+ val = amount.format_size(Unit.B, include_unit=False)
+ cmd = f'lvreduce -L -{val}B {vol_path}'
+
+ debug(f'Reducing LVM volume size: {cmd}')
+ SysCommand(cmd)
+
+ def lvm_pv_create(self, pvs: Iterable[Path]):
+ cmd = 'pvcreate ' + ' '.join([str(pv) for pv in pvs])
+ debug(f'Creating LVM PVS: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ def lvm_vg_create(self, pvs: Iterable[Path], vg_name: str):
+ pvs_str = ' '.join([str(pv) for pv in pvs])
+ cmd = f'vgcreate --yes {vg_name} {pvs_str}'
+
+ debug(f'Creating LVM group: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size] = None):
+ if offset is not None:
+ length = volume.length - offset
+ else:
+ length = volume.length
+
+ length_str = length.format_size(Unit.B, include_unit=False)
+ cmd = f'lvcreate --yes -L {length_str}B {vg_name} -n {volume.name}'
+
+ debug(f'Creating volume: {cmd}')
+
+ worker = SysCommandWorker(cmd)
+ worker.poll()
+ worker.write(b'y\n', line_ending=False)
+
+ volume.vg_name = vg_name
+ volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}')
def _setup_partition(
self,
@@ -385,7 +502,7 @@ class DeviceHandler(object):
# the partition has a path now that it has been added
part_mod.dev_path = Path(partition.path)
- def _fetch_part_info(self, path: Path) -> LsblkInfo:
+ def fetch_part_info(self, path: Path) -> LsblkInfo:
lsblk_info = get_lsblk_info(path)
if not lsblk_info.partn:
@@ -404,6 +521,37 @@ class DeviceHandler(object):
return lsblk_info
+ def create_lvm_btrfs_subvolumes(
+ self,
+ path: Path,
+ btrfs_subvols: List[SubvolumeModification],
+ mount_options: List[str]
+ ):
+ info(f'Creating subvolumes: {path}')
+
+ self.mount(path, self._TMP_BTRFS_MOUNT, create_target_mountpoint=True)
+
+ for sub_vol in btrfs_subvols:
+ debug(f'Creating subvolume: {sub_vol.name}')
+
+ subvol_path = self._TMP_BTRFS_MOUNT / sub_vol.name
+
+ SysCommand(f"btrfs subvolume create {subvol_path}")
+
+ if BtrfsMountOption.nodatacow.value in mount_options:
+ try:
+ SysCommand(f'chattr +C {subvol_path}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set nodatacow attribute at {subvol_path}: {err}')
+
+ if BtrfsMountOption.compress.value in mount_options:
+ try:
+ SysCommand(f'chattr +c {subvol_path}')
+ except SysCallError as err:
+ raise DiskError(f'Could not set compress attribute at {subvol_path}: {err}')
+
+ self.umount(path)
+
def create_btrfs_volumes(
self,
part_mod: PartitionModification,
@@ -468,8 +616,8 @@ class DeviceHandler(object):
return luks_handler
- def _umount_all_existing(self, device_path: Path):
- info(f'Unmounting all existing partitions: {device_path}')
+ def umount_all_existing(self, device_path: Path):
+ debug(f'Unmounting all existing partitions: {device_path}')
existing_partitions = self._devices[device_path].partition_infos
@@ -498,7 +646,7 @@ class DeviceHandler(object):
raise DiskError('Too many partitions on disk, MBR disks can only have 3 primary partitions')
# make sure all devices are unmounted
- self._umount_all_existing(modification.device_path)
+ self.umount_all_existing(modification.device_path)
# WARNING: the entire device will be wiped and all data lost
if modification.wipe:
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
index fe96203c..1cd3d674 100644
--- a/archinstall/lib/disk/device_model.py
+++ b/archinstall/lib/disk/device_model.py
@@ -41,6 +41,8 @@ class DiskLayoutType(Enum):
class DiskLayoutConfiguration:
config_type: DiskLayoutType
device_modifications: List[DeviceModification] = field(default_factory=list)
+ lvm_config: Optional[LvmConfiguration] = None
+
# used for pre-mounted config
mountpoint: Optional[Path] = None
@@ -51,13 +53,18 @@ class DiskLayoutConfiguration:
'mountpoint': str(self.mountpoint)
}
else:
- return {
+ config: Dict[str, Any] = {
'config_type': self.config_type.value,
- 'device_modifications': [mod.json() for mod in self.device_modifications]
+ 'device_modifications': [mod.json() for mod in self.device_modifications],
}
+ if self.lvm_config:
+ config['lvm_config'] = self.lvm_config.json()
+
+ return config
+
@classmethod
- def parse_arg(cls, disk_config: Dict[str, List[Dict[str, Any]]]) -> Optional[DiskLayoutConfiguration]:
+ def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]:
from .device_handler import device_handler
device_modifications: List[DeviceModification] = []
@@ -124,6 +131,10 @@ class DiskLayoutConfiguration:
device_modification.partitions = device_partitions
device_modifications.append(device_modification)
+ # Parse LVM configuration from settings
+ if (lvm_arg := disk_config.get('lvm_config', None)) is not None:
+ config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config)
+
return config
@@ -133,24 +144,24 @@ class PartitionTable(Enum):
class Unit(Enum):
- B = 1 # byte
- kB = 1000**1 # kilobyte
- MB = 1000**2 # megabyte
- GB = 1000**3 # gigabyte
- TB = 1000**4 # terabyte
- PB = 1000**5 # petabyte
- EB = 1000**6 # exabyte
- ZB = 1000**7 # zettabyte
- YB = 1000**8 # yottabyte
-
- KiB = 1024**1 # kibibyte
- MiB = 1024**2 # mebibyte
- GiB = 1024**3 # gibibyte
- TiB = 1024**4 # tebibyte
- PiB = 1024**5 # pebibyte
- EiB = 1024**6 # exbibyte
- ZiB = 1024**7 # zebibyte
- YiB = 1024**8 # yobibyte
+ B = 1 # byte
+ kB = 1000 ** 1 # kilobyte
+ MB = 1000 ** 2 # megabyte
+ GB = 1000 ** 3 # gigabyte
+ TB = 1000 ** 4 # terabyte
+ PB = 1000 ** 5 # petabyte
+ EB = 1000 ** 6 # exabyte
+ ZB = 1000 ** 7 # zettabyte
+ YB = 1000 ** 8 # yottabyte
+
+ KiB = 1024 ** 1 # kibibyte
+ MiB = 1024 ** 2 # mebibyte
+ GiB = 1024 ** 3 # gibibyte
+ TiB = 1024 ** 4 # tebibyte
+ PiB = 1024 ** 5 # pebibyte
+ EiB = 1024 ** 6 # exbibyte
+ ZiB = 1024 ** 7 # zebibyte
+ YiB = 1024 ** 8 # yobibyte
sectors = 'sectors' # size in sector
@@ -575,7 +586,7 @@ class PartitionFlag(Enum):
Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183
"""
Boot = _ped.PARTITION_BOOT
- XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
+ XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
ESP = _ped.PARTITION_ESP
@@ -658,6 +669,10 @@ class PartitionModification:
flags: List[PartitionFlag] = field(default_factory=list)
btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+ # only set when modification was created from an existing
+ # partition info object to be able to reference it back
+ part_info: Optional[_PartitionInfo] = None
+
# only set if the device was created or exists
dev_path: Optional[Path] = None
partn: Optional[int] = None
@@ -724,7 +739,8 @@ class PartitionModification:
uuid=partition_info.uuid,
flags=partition_info.flags,
mountpoint=mountpoint,
- btrfs_subvols=subvol_mods
+ btrfs_subvols=subvol_mods,
+ part_info=partition_info
)
@property
@@ -832,6 +848,270 @@ class PartitionModification:
return part_mod
+class LvmLayoutType(Enum):
+ Default = 'default'
+
+ # Manual = 'manual_lvm'
+
+ def display_msg(self) -> str:
+ match self:
+ case LvmLayoutType.Default:
+ return str(_('Default layout'))
+ # case LvmLayoutType.Manual:
+ # return str(_('Manual configuration'))
+
+ raise ValueError(f'Unknown type: {self}')
+
+
+@dataclass
+class LvmVolumeGroup:
+ name: str
+ pvs: List[PartitionModification]
+ volumes: List[LvmVolume] = field(default_factory=list)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'lvm_pvs': [p.obj_id for p in self.pvs],
+ 'volumes': [vol.json() for vol in self.volumes]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmVolumeGroup(
+ arg['name'],
+ lvm_pvs,
+ [LvmVolume.parse_arg(vol) for vol in arg['volumes']]
+ )
+
+ def contains_lv(self, lv: LvmVolume) -> bool:
+ return lv in self.volumes
+
+
+class LvmVolumeStatus(Enum):
+ Exist = 'existing'
+ Modify = 'modify'
+ Delete = 'delete'
+ Create = 'create'
+
+
+@dataclass
+class LvmVolume:
+ status: LvmVolumeStatus
+ name: str
+ fs_type: FilesystemType
+ length: Size
+ mountpoint: Optional[Path]
+ mount_options: List[str] = field(default_factory=list)
+ btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+
+ # volume group name
+ vg_name: Optional[str] = None
+ # mapper device path /dev/<vg>/<vol>
+ dev_path: Optional[Path] = None
+
+ def __post_init__(self):
+ # needed to use the object as a dictionary key due to hash func
+ if not hasattr(self, '_obj_id'):
+ self._obj_id = uuid.uuid4()
+
+ def __hash__(self):
+ return hash(self._obj_id)
+
+ @property
+ def obj_id(self) -> str:
+ if hasattr(self, '_obj_id'):
+ return str(self._obj_id)
+ return ''
+
+ @property
+ def mapper_name(self) -> Optional[str]:
+ if self.dev_path:
+ return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}'
+ return None
+
+ @property
+ def mapper_path(self) -> Path:
+ if self.mapper_name:
+ return Path(f'/dev/mapper/{self.mapper_name}')
+
+ raise ValueError('No mapper path set')
+
+ @property
+ def safe_dev_path(self) -> Path:
+ if self.dev_path:
+ return self.dev_path
+ raise ValueError('No device path for volume defined')
+
+ @property
+ def safe_fs_type(self) -> FilesystemType:
+ if self.fs_type is None:
+ raise ValueError('File system type is not set')
+ return self.fs_type
+
+ @property
+ def relative_mountpoint(self) -> Path:
+ """
+ Will return the relative path based on the anchor
+ e.g. Path('/mnt/test') -> Path('mnt/test')
+ """
+ if self.mountpoint is not None:
+ return self.mountpoint.relative_to(self.mountpoint.anchor)
+
+ raise ValueError('Mountpoint is not specified')
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any]) -> LvmVolume:
+ volume = LvmVolume(
+ status=LvmVolumeStatus(arg['status']),
+ name=arg['name'],
+ fs_type=FilesystemType(arg['fs_type']),
+ length=Size.parse_args(arg['length']),
+ mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None,
+ mount_options=arg.get('mount_options', []),
+ btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', []))
+ )
+
+ setattr(volume, '_obj_id', arg['obj_id'])
+
+ return volume
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'obj_id': self.obj_id,
+ 'status': self.status.value,
+ 'name': self.name,
+ 'fs_type': self.fs_type.value,
+ 'length': self.length.json(),
+ 'mountpoint': str(self.mountpoint) if self.mountpoint else None,
+ 'mount_options': self.mount_options,
+ 'btrfs': [vol.json() for vol in self.btrfs_subvols]
+ }
+
+ def table_data(self) -> Dict[str, Any]:
+ part_mod = {
+ 'Type': self.status.value,
+ 'Name': self.name,
+ 'Size': self.length.format_highest(),
+ 'FS type': self.fs_type.value,
+ 'Mountpoint': str(self.mountpoint) if self.mountpoint else '',
+ 'Mount options': ', '.join(self.mount_options),
+ 'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol')
+ }
+ return part_mod
+
+ def is_modify(self) -> bool:
+ return self.status == LvmVolumeStatus.Modify
+
+ def exists(self) -> bool:
+ return self.status == LvmVolumeStatus.Exist
+
+ def is_exists_or_modify(self) -> bool:
+ return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify]
+
+ def is_root(self) -> bool:
+ if self.mountpoint is not None:
+ return Path('/') == self.mountpoint
+ else:
+ for subvol in self.btrfs_subvols:
+ if subvol.is_root():
+ return True
+
+ return False
+
+
+@dataclass
+class LvmGroupInfo:
+ vg_size: Size
+ vg_uuid: str
+
+
+@dataclass
+class LvmVolumeInfo:
+ lv_name: str
+ vg_name: str
+ lv_size: Size
+
+
+@dataclass
+class LvmPVInfo:
+ pv_name: Path
+ lv_name: str
+ vg_name: str
+
+
+@dataclass
+class LvmConfiguration:
+ config_type: LvmLayoutType
+ vol_groups: List[LvmVolumeGroup]
+
+ def __post_init__(self):
+ # make sure all volume groups have unique PVs
+ pvs = []
+ for group in self.vol_groups:
+ for pv in group.pvs:
+ if pv in pvs:
+ raise ValueError('A PV cannot be used in multiple volume groups')
+ pvs.append(pv)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'config_type': self.config_type.value,
+ 'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmConfiguration(
+ config_type=LvmLayoutType(arg['config_type']),
+ vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']],
+ )
+
+ def get_all_pvs(self) -> List[PartitionModification]:
+ pvs = []
+ for vg in self.vol_groups:
+ pvs += vg.pvs
+
+ return pvs
+
+ def get_all_volumes(self) -> List[LvmVolume]:
+ volumes = []
+
+ for vg in self.vol_groups:
+ volumes += vg.volumes
+
+ return volumes
+
+ def get_root_volume(self) -> Optional[LvmVolume]:
+ for vg in self.vol_groups:
+ filtered = next(filter(lambda x: x.is_root(), vg.volumes), None)
+ if filtered:
+ return filtered
+
+ return None
+
+
+# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str:
+# """
+# Find the LUKS superblock UUID for the device that
+# contains the given logical volume
+# """
+# for vg in self.vol_groups:
+# if vg.contains_lv(lv):
+
+
@dataclass
class DeviceModification:
device: BDevice
@@ -885,11 +1165,16 @@ class DeviceModification:
class EncryptionType(Enum):
NoEncryption = "no_encryption"
Luks = "luks"
+ LvmOnLuks = 'lvm_on_luks'
+ LuksOnLvm = 'luks_on_lvm'
@classmethod
def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']:
return {
- 'Luks': EncryptionType.Luks
+ str(_('No Encryption')): EncryptionType.NoEncryption,
+ str(_('LUKS')): EncryptionType.Luks,
+ str(_('LVM on LUKS')): EncryptionType.LvmOnLuks,
+ str(_('LUKS on LVM')): EncryptionType.LuksOnLvm
}
@classmethod
@@ -906,18 +1191,31 @@ class EncryptionType(Enum):
@dataclass
class DiskEncryption:
- encryption_type: EncryptionType = EncryptionType.Luks
+ encryption_type: EncryptionType = EncryptionType.NoEncryption
encryption_password: str = ''
partitions: List[PartitionModification] = field(default_factory=list)
+ lvm_volumes: List[LvmVolume] = field(default_factory=list)
hsm_device: Optional[Fido2Device] = None
- def should_generate_encryption_file(self, part_mod: PartitionModification) -> bool:
- return part_mod in self.partitions and part_mod.mountpoint != Path('/')
+ def __post_init__(self):
+ if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions:
+ raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined')
+
+ if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes:
+ raise ValueError('LuksOnLvm encryption require LMV volumes to be defined')
+
+ def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool:
+ if isinstance(dev, PartitionModification):
+ return dev in self.partitions and dev.mountpoint != Path('/')
+ elif isinstance(dev, LvmVolume):
+ return dev in self.lvm_volumes and dev.mountpoint != Path('/')
+ return False
def json(self) -> Dict[str, Any]:
obj: Dict[str, Any] = {
'encryption_type': self.encryption_type.value,
- 'partitions': [p.obj_id for p in self.partitions]
+ 'partitions': [p.obj_id for p in self.partitions],
+ 'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes]
}
if self.hsm_device:
@@ -926,22 +1224,46 @@ class DiskEncryption:
return obj
@classmethod
+ def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool:
+ partitions = []
+
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ partitions.append(part)
+
+ if len(partitions) > 2: # assume one boot and at least 2 additional
+ if disk_config.lvm_config:
+ return False
+
+ return True
+
+ @classmethod
def parse_arg(
cls,
disk_config: DiskLayoutConfiguration,
arg: Dict[str, Any],
password: str = ''
- ) -> 'DiskEncryption':
+ ) -> Optional['DiskEncryption']:
+ if not cls.validate_enc(disk_config):
+ return None
+
enc_partitions = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.obj_id in arg.get('partitions', []):
enc_partitions.append(part)
+ volumes = []
+ if disk_config.lvm_config:
+ for vol in disk_config.lvm_config.get_all_volumes():
+ if vol.obj_id in arg.get('lvm_volumes', []):
+ volumes.append(vol)
+
enc = DiskEncryption(
EncryptionType(arg['encryption_type']),
password,
- enc_partitions
+ enc_partitions,
+ volumes
)
if hsm := arg.get('hsm_device', None):
@@ -992,7 +1314,7 @@ class LsblkInfo:
tran: Optional[str] = None
partn: Optional[int] = None
partuuid: Optional[str] = None
- parttype :Optional[str] = None
+ parttype: Optional[str] = None
uuid: Optional[str] = None
fstype: Optional[str] = None
fsver: Optional[str] = None
@@ -1017,7 +1339,7 @@ class LsblkInfo:
'tran': self.tran,
'partn': self.partn,
'partuuid': self.partuuid,
- 'parttype' : self.parttype,
+ 'parttype': self.parttype,
'uuid': self.uuid,
'fstype': self.fstype,
'fsver': self.fsver,
@@ -1102,13 +1424,24 @@ def _clean_field(name: str, clean_type: CleanType) -> str:
return name.replace('_percentage', '%').replace('_', '-')
-def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[LsblkInfo]:
+def _fetch_lsblk_info(
+ dev_path: Optional[Union[Path, str]] = None,
+ reverse: bool = False,
+ full_dev_path: bool = False,
+ retry: int = 3
+) -> List[LsblkInfo]:
fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()]
cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)]
if dev_path:
cmd.append(str(dev_path))
+ if reverse:
+ cmd.append('--inverse')
+
+ if full_dev_path:
+ cmd.append('--paths')
+
try:
result = SysCommand(cmd).decode()
except SysCallError as err:
@@ -1132,8 +1465,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[Lsblk
return [LsblkInfo.from_json(device) for device in blockdevices]
-def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
- if infos := _fetch_lsblk_info(dev_path):
+def get_lsblk_info(
+ dev_path: Union[Path, str],
+ reverse: bool = False,
+ full_dev_path: bool = False
+) -> LsblkInfo:
+ if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path):
return infos[0]
raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"')
@@ -1142,6 +1479,7 @@ def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
def get_all_lsblk_info() -> List[LsblkInfo]:
return _fetch_lsblk_info()
+
def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]:
def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]:
devices = []
diff --git a/archinstall/lib/disk/disk_menu.py b/archinstall/lib/disk/disk_menu.py
new file mode 100644
index 00000000..a7d9ccc3
--- /dev/null
+++ b/archinstall/lib/disk/disk_menu.py
@@ -0,0 +1,140 @@
+from typing import Dict, Optional, Any, TYPE_CHECKING, List
+
+from . import DiskLayoutConfiguration, DiskLayoutType
+from .device_model import LvmConfiguration
+from ..disk import (
+ DeviceModification
+)
+from ..interactions import select_disk_config
+from ..interactions.disk_conf import select_lvm_config
+from ..menu import (
+ Selector,
+ AbstractSubMenu
+)
+from ..output import FormattedOutput
+
+if TYPE_CHECKING:
+ _: Any
+
+
+class DiskLayoutConfigurationMenu(AbstractSubMenu):
+ def __init__(
+ self,
+ disk_layout_config: Optional[DiskLayoutConfiguration],
+ data_store: Dict[str, Any],
+ advanced: bool = False
+ ):
+ self._disk_layout_config = disk_layout_config
+ self._advanced = advanced
+
+ super().__init__(data_store=data_store, preview_size=0.5)
+
+ def setup_selection_menu_options(self):
+ self._menu_options['disk_config'] = \
+ Selector(
+ _('Partitioning'),
+ lambda x: self._select_disk_layout_config(x),
+ display_func=lambda x: self._display_disk_layout(x),
+ preview_func=self._prev_disk_layouts,
+ default=self._disk_layout_config,
+ enabled=True
+ )
+ self._menu_options['lvm_config'] = \
+ Selector(
+ _('Logical Volume Management (LVM)'),
+ lambda x: self._select_lvm_config(x),
+ display_func=lambda x: self.defined_text if x else '',
+ preview_func=self._prev_lvm_config,
+ default=self._disk_layout_config.lvm_config if self._disk_layout_config else None,
+ dependencies=[self._check_dep_lvm],
+ enabled=True
+ )
+
+ def run(self, allow_reset: bool = True) -> Optional[DiskLayoutConfiguration]:
+ super().run(allow_reset=allow_reset)
+
+ disk_layout_config: Optional[DiskLayoutConfiguration] = self._data_store.get('disk_config', None)
+
+ if disk_layout_config:
+ disk_layout_config.lvm_config = self._data_store.get('lvm_config', None)
+
+ return disk_layout_config
+
+ def _check_dep_lvm(self) -> bool:
+ disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_layout_conf and disk_layout_conf.config_type == DiskLayoutType.Default:
+ return True
+
+ return False
+
+ def _select_disk_layout_config(
+ self,
+ preset: Optional[DiskLayoutConfiguration]
+ ) -> Optional[DiskLayoutConfiguration]:
+ disk_config = select_disk_config(preset, advanced_option=self._advanced)
+
+ if disk_config != preset:
+ self._menu_options['lvm_config'].set_current_selection(None)
+
+ return disk_config
+
+ def _select_lvm_config(self, preset: Optional[LvmConfiguration]) -> Optional[LvmConfiguration]:
+ disk_config: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+ if disk_config:
+ return select_lvm_config(disk_config, preset=preset)
+ return preset
+
+ def _display_disk_layout(self, current_value: Optional[DiskLayoutConfiguration] = None) -> str:
+ if current_value:
+ return current_value.config_type.display_msg()
+ return ''
+
+ def _prev_disk_layouts(self) -> Optional[str]:
+ disk_layout_conf: Optional[DiskLayoutConfiguration] = self._menu_options['disk_config'].current_selection
+
+ if disk_layout_conf:
+ device_mods: List[DeviceModification] = \
+ list(filter(lambda x: len(x.partitions) > 0, disk_layout_conf.device_modifications))
+
+ if device_mods:
+ output_partition = '{}: {}\n'.format(str(_('Configuration')), disk_layout_conf.config_type.display_msg())
+ output_btrfs = ''
+
+ for mod in device_mods:
+ # create partition table
+ partition_table = FormattedOutput.as_table(mod.partitions)
+
+ output_partition += f'{mod.device_path}: {mod.device.device_info.model}\n'
+ output_partition += partition_table + '\n'
+
+ # create btrfs table
+ btrfs_partitions = list(
+ filter(lambda p: len(p.btrfs_subvols) > 0, mod.partitions)
+ )
+ for partition in btrfs_partitions:
+ output_btrfs += FormattedOutput.as_table(partition.btrfs_subvols) + '\n'
+
+ output = output_partition + output_btrfs
+ return output.rstrip()
+
+ return None
+
+ def _prev_lvm_config(self) -> Optional[str]:
+ lvm_config: Optional[LvmConfiguration] = self._menu_options['lvm_config'].current_selection
+
+ if lvm_config:
+ output = '{}: {}\n'.format(str(_('Configuration')), lvm_config.config_type.display_msg())
+
+ for vol_gp in lvm_config.vol_groups:
+ pv_table = FormattedOutput.as_table(vol_gp.pvs)
+ output += '{}:\n{}'.format(str(_('Physical volumes')), pv_table)
+
+ output += f'\nVolume Group: {vol_gp.name}'
+
+ lvm_volumes = FormattedOutput.as_table(vol_gp.volumes)
+ output += '\n\n{}:\n{}'.format(str(_('Volumes')), lvm_volumes)
+
+ return output
+
+ return None
diff --git a/archinstall/lib/disk/encryption_menu.py b/archinstall/lib/disk/encryption_menu.py
index c3a1c32f..b0e292ce 100644
--- a/archinstall/lib/disk/encryption_menu.py
+++ b/archinstall/lib/disk/encryption_menu.py
@@ -1,6 +1,7 @@
from pathlib import Path
from typing import Dict, Optional, Any, TYPE_CHECKING, List
+from . import LvmConfiguration, LvmVolume
from ..disk import (
DeviceModification,
DiskLayoutConfiguration,
@@ -40,31 +41,41 @@ class DiskEncryptionMenu(AbstractSubMenu):
super().__init__(data_store=data_store)
def setup_selection_menu_options(self):
+ self._menu_options['encryption_type'] = \
+ Selector(
+ _('Encryption type'),
+ func=lambda preset: select_encryption_type(self._disk_config, preset),
+ display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
+ default=self._preset.encryption_type,
+ enabled=True,
+ )
self._menu_options['encryption_password'] = \
Selector(
_('Encryption password'),
lambda x: select_encrypted_password(),
+ dependencies=[self._check_dep_enc_type],
display_func=lambda x: secret(x) if x else '',
default=self._preset.encryption_password,
enabled=True
)
- self._menu_options['encryption_type'] = \
- Selector(
- _('Encryption type'),
- func=lambda preset: select_encryption_type(preset),
- display_func=lambda x: EncryptionType.type_to_text(x) if x else None,
- dependencies=['encryption_password'],
- default=self._preset.encryption_type,
- enabled=True
- )
self._menu_options['partitions'] = \
Selector(
_('Partitions'),
func=lambda preset: select_partitions_to_encrypt(self._disk_config.device_modifications, preset),
display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None,
- dependencies=['encryption_password'],
+ dependencies=[self._check_dep_partitions],
default=self._preset.partitions,
- preview_func=self._prev_disk_layouts,
+ preview_func=self._prev_partitions,
+ enabled=True
+ )
+ self._menu_options['lvm_vols'] = \
+ Selector(
+ _('LVM volumes'),
+ func=lambda preset: self._select_lvm_vols(preset),
+ display_func=lambda x: f'{len(x)} {_("LVM volumes")}' if x else None,
+ dependencies=[self._check_dep_lvm_vols],
+ default=self._preset.lvm_volumes,
+ preview_func=self._prev_lvm_vols,
enabled=True
)
self._menu_options['HSM'] = \
@@ -73,19 +84,54 @@ class DiskEncryptionMenu(AbstractSubMenu):
func=lambda preset: select_hsm(preset),
display_func=lambda x: self._display_hsm(x),
preview_func=self._prev_hsm,
- dependencies=['encryption_password'],
+ dependencies=[self._check_dep_enc_type],
default=self._preset.hsm_device,
enabled=True
)
+ def _select_lvm_vols(self, preset: List[LvmVolume]) -> List[LvmVolume]:
+ if self._disk_config.lvm_config:
+ return select_lvm_vols_to_encrypt(self._disk_config.lvm_config, preset=preset)
+ return []
+
+ def _check_dep_enc_type(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type != EncryptionType.NoEncryption:
+ return True
+ return False
+
+ def _check_dep_partitions(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks]:
+ return True
+ return False
+
+ def _check_dep_lvm_vols(self) -> bool:
+ enc_type: Optional[EncryptionType] = self._menu_options['encryption_type'].current_selection
+ if enc_type and enc_type == EncryptionType.LuksOnLvm:
+ return True
+ return False
+
def run(self, allow_reset: bool = True) -> Optional[DiskEncryption]:
super().run(allow_reset=allow_reset)
- if self._data_store.get('encryption_password', None):
+ enc_type = self._data_store.get('encryption_type', None)
+ enc_password = self._data_store.get('encryption_password', None)
+ enc_partitions = self._data_store.get('partitions', None)
+ enc_lvm_vols = self._data_store.get('lvm_vols', None)
+
+ if enc_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and enc_partitions:
+ enc_lvm_vols = []
+
+ if enc_type == EncryptionType.LuksOnLvm:
+ enc_partitions = []
+
+ if enc_type != EncryptionType.NoEncryption and enc_password and (enc_partitions or enc_lvm_vols):
return DiskEncryption(
- encryption_password=self._data_store.get('encryption_password', None),
- encryption_type=self._data_store['encryption_type'],
- partitions=self._data_store.get('partitions', None),
+ encryption_password=enc_password,
+ encryption_type=enc_type,
+ partitions=enc_partitions,
+ lvm_volumes=enc_lvm_vols,
hsm_device=self._data_store.get('HSM', None)
)
@@ -97,7 +143,7 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
- def _prev_disk_layouts(self) -> Optional[str]:
+ def _prev_partitions(self) -> Optional[str]:
partitions: Optional[List[PartitionModification]] = self._menu_options['partitions'].current_selection
if partitions:
output = str(_('Partitions to be encrypted')) + '\n'
@@ -106,6 +152,15 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
+ def _prev_lvm_vols(self) -> Optional[str]:
+ volumes: Optional[List[PartitionModification]] = self._menu_options['lvm_vols'].current_selection
+ if volumes:
+ output = str(_('LVM volumes to be encrypted')) + '\n'
+ output += FormattedOutput.as_table(volumes)
+ return output.rstrip()
+
+ return None
+
def _prev_hsm(self) -> Optional[str]:
try:
Fido2.get_fido2_devices()
@@ -123,13 +178,19 @@ class DiskEncryptionMenu(AbstractSubMenu):
return None
-def select_encryption_type(preset: EncryptionType) -> Optional[EncryptionType]:
+def select_encryption_type(disk_config: DiskLayoutConfiguration, preset: EncryptionType) -> Optional[EncryptionType]:
title = str(_('Select disk encryption option'))
- options = [
- EncryptionType.type_to_text(EncryptionType.Luks)
- ]
+
+ if disk_config.lvm_config:
+ options = [
+ EncryptionType.type_to_text(EncryptionType.LvmOnLuks),
+ EncryptionType.type_to_text(EncryptionType.LuksOnLvm)
+ ]
+ else:
+ options = [EncryptionType.type_to_text(EncryptionType.Luks)]
preset_value = EncryptionType.type_to_text(preset)
+
choice = Menu(title, options, preset_values=preset_value).run()
match choice.type_:
@@ -197,3 +258,31 @@ def select_partitions_to_encrypt(
case MenuSelectionType.Selection:
return choice.multi_value
return []
+
+
+def select_lvm_vols_to_encrypt(
+ lvm_config: LvmConfiguration,
+ preset: List[LvmVolume]
+) -> List[LvmVolume]:
+ volumes: List[LvmVolume] = lvm_config.get_all_volumes()
+
+ if volumes:
+ title = str(_('Select which LVM volumes to encrypt'))
+ partition_table = FormattedOutput.as_table(volumes)
+
+ choice = TableMenu(
+ title,
+ table_data=(volumes, partition_table),
+ preset=preset,
+ multi=True
+ ).run()
+
+ match choice.type_:
+ case MenuSelectionType.Reset:
+ return []
+ case MenuSelectionType.Skip:
+ return preset
+ case MenuSelectionType.Selection:
+ return choice.multi_value
+
+ return []
diff --git a/archinstall/lib/disk/fido.py b/archinstall/lib/disk/fido.py
index 49904c17..5a139534 100644
--- a/archinstall/lib/disk/fido.py
+++ b/archinstall/lib/disk/fido.py
@@ -4,7 +4,7 @@ import getpass
from pathlib import Path
from typing import List
-from .device_model import PartitionModification, Fido2Device
+from .device_model import Fido2Device
from ..general import SysCommand, SysCommandWorker, clear_vt100_escape_codes
from ..output import error, info
from ..exceptions import SysCallError
@@ -72,16 +72,16 @@ class Fido2:
def fido2_enroll(
cls,
hsm_device: Fido2Device,
- part_mod: PartitionModification,
+ dev_path: Path,
password: str
):
- worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {part_mod.dev_path}", peek_output=True)
+ worker = SysCommandWorker(f"systemd-cryptenroll --fido2-device={hsm_device.path} {dev_path}", peek_output=True)
pw_inputted = False
pin_inputted = False
while worker.is_alive():
if pw_inputted is False:
- if bytes(f"please enter current passphrase for disk {part_mod.dev_path}", 'UTF-8') in worker._trace_log.lower():
+ if bytes(f"please enter current passphrase for disk {dev_path}", 'UTF-8') in worker._trace_log.lower():
worker.write(bytes(password, 'UTF-8'))
pw_inputted = True
elif pin_inputted is False:
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 9c6e6d35..5c11896e 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -3,13 +3,21 @@ from __future__ import annotations
import signal
import sys
import time
-from typing import Any, Optional, TYPE_CHECKING
+from pathlib import Path
+from typing import Any, Optional, TYPE_CHECKING, List, Dict, Set
-from .device_model import DiskLayoutConfiguration, DiskLayoutType, PartitionTable, FilesystemType, DiskEncryption
from .device_handler import device_handler
+from .device_model import (
+ DiskLayoutConfiguration, DiskLayoutType, PartitionTable,
+ FilesystemType, DiskEncryption, LvmVolumeGroup,
+ Size, Unit, SectorSize, PartitionModification, EncryptionType,
+ LvmVolume, LvmConfiguration
+)
from ..hardware import SysInfo
-from ..output import debug
+from ..luks import Luks2
from ..menu import Menu
+from ..output import debug, info
+from ..general import SysCommand
if TYPE_CHECKING:
_: Any
@@ -52,13 +60,288 @@ class FilesystemHandler:
for mod in device_mods:
device_handler.partition(mod, partition_table=partition_table)
- device_handler.format(mod, enc_conf=self._enc_config)
- for part_mod in mod.partitions:
- if part_mod.is_create_or_modify():
+ if self._disk_config.lvm_config:
+ for mod in device_mods:
+ if boot_part := mod.get_boot_partition():
+ debug(f'Formatting boot partition: {boot_part.dev_path}')
+ self._format_partitions(
+ [boot_part],
+ mod.device_path
+ )
+
+ self.perform_lvm_operations()
+ else:
+ for mod in device_mods:
+ self._format_partitions(
+ mod.partitions,
+ mod.device_path
+ )
+
+ for part_mod in mod.partitions:
if part_mod.fs_type == FilesystemType.Btrfs:
device_handler.create_btrfs_volumes(part_mod, enc_conf=self._enc_config)
+ def _format_partitions(
+ self,
+ partitions: List[PartitionModification],
+ device_path: Path
+ ):
+ """
+ Format can be given an overriding path, for instance /dev/null to test
+ the formatting functionality and in essence the support for the given filesystem.
+ """
+
+ # don't touch existing partitions
+ create_or_modify_parts = [p for p in partitions if p.is_create_or_modify()]
+
+ self._validate_partitions(create_or_modify_parts)
+
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(device_path)
+
+ for part_mod in create_or_modify_parts:
+ # partition will be encrypted
+ if self._enc_config is not None and part_mod in self._enc_config.partitions:
+ device_handler.format_encrypted(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ part_mod.safe_fs_type,
+ self._enc_config
+ )
+ else:
+ device_handler.format(part_mod.safe_fs_type, part_mod.safe_dev_path)
+
+ # synchronize with udev before using lsblk
+ SysCommand('udevadm settle')
+
+ lsblk_info = device_handler.fetch_part_info(part_mod.safe_dev_path)
+
+ part_mod.partn = lsblk_info.partn
+ part_mod.partuuid = lsblk_info.partuuid
+ part_mod.uuid = lsblk_info.uuid
+
+ def _validate_partitions(self, partitions: List[PartitionModification]):
+ checks = {
+ # verify that all partitions have a path set (which implies that they have been created)
+ lambda x: x.dev_path is None: ValueError('When formatting, all partitions must have a path set'),
+ # crypto luks is not a valid file system type
+ lambda x: x.fs_type is FilesystemType.Crypto_luks: ValueError(
+ 'Crypto luks cannot be set as a filesystem type'),
+ # file system type must be set
+ lambda x: x.fs_type is None: ValueError('File system type must be set for modification')
+ }
+
+ for check, exc in checks.items():
+ found = next(filter(check, partitions), None)
+ if found is not None:
+ raise exc
+
+ def perform_lvm_operations(self):
+ info('Setting up LVM config...')
+
+ if not self._disk_config.lvm_config:
+ return
+
+ if self._enc_config:
+ self._setup_lvm_encrypted(
+ self._disk_config.lvm_config,
+ self._enc_config
+ )
+ else:
+ self._setup_lvm(self._disk_config.lvm_config)
+ self._format_lvm_vols(self._disk_config.lvm_config)
+
+ def _setup_lvm_encrypted(self, lvm_config: LvmConfiguration, enc_config: DiskEncryption):
+ if enc_config.encryption_type == EncryptionType.LvmOnLuks:
+ enc_mods = self._encrypt_partitions(enc_config, lock_after_create=False)
+
+ self._setup_lvm(lvm_config, enc_mods)
+ self._format_lvm_vols(lvm_config)
+
+ # export the lvm group safely otherwise the Luks cannot be closed
+ self._safely_close_lvm(lvm_config)
+
+ for luks in enc_mods.values():
+ luks.lock()
+ elif enc_config.encryption_type == EncryptionType.LuksOnLvm:
+ self._setup_lvm(lvm_config)
+ enc_vols = self._encrypt_lvm_vols(lvm_config, enc_config, False)
+ self._format_lvm_vols(lvm_config, enc_vols)
+
+ for luks in enc_vols.values():
+ luks.lock()
+
+ self._safely_close_lvm(lvm_config)
+
+ def _safely_close_lvm(self, lvm_config: LvmConfiguration):
+ for vg in lvm_config.vol_groups:
+ for vol in vg.volumes:
+ device_handler.lvm_vol_change(vol, False)
+
+ device_handler.lvm_export_vg(vg)
+
+ def _setup_lvm(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ self._lvm_create_pvs(lvm_config, enc_mods)
+
+ for vg in lvm_config.vol_groups:
+ pv_dev_paths = self._get_all_pv_dev_paths(vg.pvs, enc_mods)
+
+ device_handler.lvm_vg_create(pv_dev_paths, vg.name)
+
+ # figure out what the actual available size in the group is
+ vg_info = device_handler.lvm_group_info(vg.name)
+
+ if not vg_info:
+ raise ValueError('Unable to fetch VG info')
+
+ # the actual available LVM Group size will be smaller than the
+ # total PVs size due to reserved metadata storage etc.
+ # so we'll have a look at the total avail. size, check the delta
+ # to the desired sizes and subtract some equally from the actually
+ # created volume
+ avail_size = vg_info.vg_size
+ desired_size = sum([vol.length for vol in vg.volumes], Size(0, Unit.B, SectorSize.default()))
+
+ delta = desired_size - avail_size
+ max_vol_offset = delta.convert(Unit.B)
+
+ max_vol = max(vg.volumes, key=lambda x: x.length)
+
+ for lv in vg.volumes:
+ offset = max_vol_offset if lv == max_vol else None
+
+ debug(f'vg: {vg.name}, vol: {lv.name}, offset: {offset}')
+ device_handler.lvm_vol_create(vg.name, lv, offset)
+
+ while True:
+ debug('Fetching LVM volume info')
+ lv_info = device_handler.lvm_vol_info(lv.name)
+ if lv_info is not None:
+ break
+
+ time.sleep(1)
+
+ self._lvm_vol_handle_e2scrub(vg)
+
+ def _format_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+ ):
+ for vol in lvm_config.get_all_volumes():
+ if enc_vol := enc_vols.get(vol, None):
+ if not enc_vol.mapper_dev:
+ raise ValueError('No mapper device defined')
+ path = enc_vol.mapper_dev
+ else:
+ path = vol.safe_dev_path
+
+ # wait a bit otherwise the mkfs will fail as it can't
+ # find the mapper device yet
+ device_handler.format(vol.fs_type, path)
+
+ if vol.fs_type == FilesystemType.Btrfs:
+ device_handler.create_lvm_btrfs_subvolumes(path, vol.btrfs_subvols, vol.mount_options)
+
+ def _lvm_create_pvs(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ):
+ pv_paths: Set[Path] = set()
+
+ for vg in lvm_config.vol_groups:
+ pv_paths |= self._get_all_pv_dev_paths(vg.pvs, enc_mods)
+
+ device_handler.lvm_pv_create(pv_paths)
+
+ def _get_all_pv_dev_paths(
+ self,
+ pvs: List[PartitionModification],
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+ ) -> Set[Path]:
+ pv_paths: Set[Path] = set()
+
+ for pv in pvs:
+ if enc_pv := enc_mods.get(pv, None):
+ if mapper := enc_pv.mapper_dev:
+ pv_paths.add(mapper)
+ else:
+ pv_paths.add(pv.safe_dev_path)
+
+ return pv_paths
+
+ def _encrypt_lvm_vols(
+ self,
+ lvm_config: LvmConfiguration,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[LvmVolume, Luks2]:
+ enc_vols: Dict[LvmVolume, Luks2] = {}
+
+ for vol in lvm_config.get_all_volumes():
+ if vol in enc_config.lvm_volumes:
+ luks_handler = device_handler.encrypt(
+ vol.safe_dev_path,
+ vol.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create
+ )
+
+ enc_vols[vol] = luks_handler
+
+ return enc_vols
+
+ def _encrypt_partitions(
+ self,
+ enc_config: DiskEncryption,
+ lock_after_create: bool = True
+ ) -> Dict[PartitionModification, Luks2]:
+ enc_mods: Dict[PartitionModification, Luks2] = {}
+
+ for mod in self._disk_config.device_modifications:
+ partitions = mod.partitions
+
+ # don't touch existing partitions
+ filtered_part = [p for p in partitions if not p.exists()]
+
+ self._validate_partitions(filtered_part)
+
+ # make sure all devices are unmounted
+ device_handler.umount_all_existing(mod.device_path)
+
+ enc_mods = {}
+
+ for part_mod in filtered_part:
+ if part_mod in enc_config.partitions:
+ luks_handler = device_handler.encrypt(
+ part_mod.safe_dev_path,
+ part_mod.mapper_name,
+ enc_config.encryption_password,
+ lock_after_create=lock_after_create
+ )
+
+ enc_mods[part_mod] = luks_handler
+
+ return enc_mods
+
+ def _lvm_vol_handle_e2scrub(self, vol_gp: LvmVolumeGroup):
+ # from arch wiki:
+ # If a logical volume will be formatted with ext4, leave at least 256 MiB
+ # free space in the volume group to allow using e2scrub
+ if any([vol.fs_type == FilesystemType.Ext4 for vol in vol_gp.volumes]):
+ largest_vol = max(vol_gp.volumes, key=lambda x: x.length)
+
+ device_handler.lvm_vol_reduce(
+ largest_vol.safe_dev_path,
+ Size(256, Unit.MiB, SectorSize.default())
+ )
+
def _do_countdown(self) -> bool:
SIG_TRIGGER = False
diff --git a/archinstall/lib/disk/partitioning_menu.py b/archinstall/lib/disk/partitioning_menu.py
index 823605e3..330f61a3 100644
--- a/archinstall/lib/disk/partitioning_menu.py
+++ b/archinstall/lib/disk/partitioning_menu.py
@@ -2,7 +2,7 @@ from __future__ import annotations
import re
from pathlib import Path
-from typing import Any, Dict, TYPE_CHECKING, List, Optional, Tuple
+from typing import Any, TYPE_CHECKING, List, Optional, Tuple
from .device_model import PartitionModification, FilesystemType, BDevice, Size, Unit, PartitionType, PartitionFlag, \
ModificationStatus, DeviceGeometry, SectorSize, BtrfsMountOption
@@ -38,21 +38,6 @@ class PartitioningList(ListManager):
display_actions = list(self._actions.values())
super().__init__(prompt, device_partitions, display_actions[:2], display_actions[3:])
- def reformat(self, data: List[PartitionModification]) -> Dict[str, Optional[PartitionModification]]:
- table = FormattedOutput.as_table(data)
- rows = table.split('\n')
-
- # these are the header rows of the table and do not map to any User obviously
- # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
- # the selectable rows so the header has to be aligned
- display_data: Dict[str, Optional[PartitionModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
-
- for row, user in zip(rows[2:], data):
- row = row.replace('|', '\\|')
- display_data[row] = user
-
- return display_data
-
def selected_action_display(self, partition: PartitionModification) -> str:
return str(_('Partition'))
@@ -258,7 +243,6 @@ class PartitioningList(ListManager):
while True:
value = TextInput(prompt).run().strip()
size: Optional[Size] = None
-
if not value:
size = default
else:
diff --git a/archinstall/lib/disk/subvolume_menu.py b/archinstall/lib/disk/subvolume_menu.py
index 48afa829..ea77149d 100644
--- a/archinstall/lib/disk/subvolume_menu.py
+++ b/archinstall/lib/disk/subvolume_menu.py
@@ -1,9 +1,8 @@
from pathlib import Path
-from typing import Dict, List, Optional, Any, TYPE_CHECKING
+from typing import List, Optional, Any, TYPE_CHECKING
from .device_model import SubvolumeModification
from ..menu import TextInput, ListManager
-from ..output import FormattedOutput
if TYPE_CHECKING:
_: Any
@@ -18,21 +17,6 @@ class SubvolumeMenu(ListManager):
]
super().__init__(prompt, btrfs_subvols, [self._actions[0]], self._actions[1:])
- def reformat(self, data: List[SubvolumeModification]) -> Dict[str, Optional[SubvolumeModification]]:
- table = FormattedOutput.as_table(data)
- rows = table.split('\n')
-
- # these are the header rows of the table and do not map to any User obviously
- # we're adding 2 spaces as prefix because the menu selector '> ' will be put before
- # the selectable rows so the header has to be aligned
- display_data: Dict[str, Optional[SubvolumeModification]] = {f' {rows[0]}': None, f' {rows[1]}': None}
-
- for row, subvol in zip(rows[2:], data):
- row = row.replace('|', '\\|')
- display_data[row] = subvol
-
- return display_data
-
def selected_action_display(self, subvolume: SubvolumeModification) -> str:
return str(subvolume.name)