Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/device_model.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/device_model.py')
-rw-r--r--archinstall/lib/disk/device_model.py408
1 files changed, 373 insertions, 35 deletions
diff --git a/archinstall/lib/disk/device_model.py b/archinstall/lib/disk/device_model.py
index fe96203c..1cd3d674 100644
--- a/archinstall/lib/disk/device_model.py
+++ b/archinstall/lib/disk/device_model.py
@@ -41,6 +41,8 @@ class DiskLayoutType(Enum):
class DiskLayoutConfiguration:
config_type: DiskLayoutType
device_modifications: List[DeviceModification] = field(default_factory=list)
+ lvm_config: Optional[LvmConfiguration] = None
+
# used for pre-mounted config
mountpoint: Optional[Path] = None
@@ -51,13 +53,18 @@ class DiskLayoutConfiguration:
'mountpoint': str(self.mountpoint)
}
else:
- return {
+ config: Dict[str, Any] = {
'config_type': self.config_type.value,
- 'device_modifications': [mod.json() for mod in self.device_modifications]
+ 'device_modifications': [mod.json() for mod in self.device_modifications],
}
+ if self.lvm_config:
+ config['lvm_config'] = self.lvm_config.json()
+
+ return config
+
@classmethod
- def parse_arg(cls, disk_config: Dict[str, List[Dict[str, Any]]]) -> Optional[DiskLayoutConfiguration]:
+ def parse_arg(cls, disk_config: Dict[str, Any]) -> Optional[DiskLayoutConfiguration]:
from .device_handler import device_handler
device_modifications: List[DeviceModification] = []
@@ -124,6 +131,10 @@ class DiskLayoutConfiguration:
device_modification.partitions = device_partitions
device_modifications.append(device_modification)
+ # Parse LVM configuration from settings
+ if (lvm_arg := disk_config.get('lvm_config', None)) is not None:
+ config.lvm_config = LvmConfiguration.parse_arg(lvm_arg, config)
+
return config
@@ -133,24 +144,24 @@ class PartitionTable(Enum):
class Unit(Enum):
- B = 1 # byte
- kB = 1000**1 # kilobyte
- MB = 1000**2 # megabyte
- GB = 1000**3 # gigabyte
- TB = 1000**4 # terabyte
- PB = 1000**5 # petabyte
- EB = 1000**6 # exabyte
- ZB = 1000**7 # zettabyte
- YB = 1000**8 # yottabyte
-
- KiB = 1024**1 # kibibyte
- MiB = 1024**2 # mebibyte
- GiB = 1024**3 # gibibyte
- TiB = 1024**4 # tebibyte
- PiB = 1024**5 # pebibyte
- EiB = 1024**6 # exbibyte
- ZiB = 1024**7 # zebibyte
- YiB = 1024**8 # yobibyte
+ B = 1 # byte
+ kB = 1000 ** 1 # kilobyte
+ MB = 1000 ** 2 # megabyte
+ GB = 1000 ** 3 # gigabyte
+ TB = 1000 ** 4 # terabyte
+ PB = 1000 ** 5 # petabyte
+ EB = 1000 ** 6 # exabyte
+ ZB = 1000 ** 7 # zettabyte
+ YB = 1000 ** 8 # yottabyte
+
+ KiB = 1024 ** 1 # kibibyte
+ MiB = 1024 ** 2 # mebibyte
+ GiB = 1024 ** 3 # gibibyte
+ TiB = 1024 ** 4 # tebibyte
+ PiB = 1024 ** 5 # pebibyte
+ EiB = 1024 ** 6 # exbibyte
+ ZiB = 1024 ** 7 # zebibyte
+ YiB = 1024 ** 8 # yobibyte
sectors = 'sectors' # size in sector
@@ -575,7 +586,7 @@ class PartitionFlag(Enum):
Which is the way libparted checks for its flags: https://git.savannah.gnu.org/gitweb/?p=parted.git;a=blob;f=libparted/labels/gpt.c;hb=4a0e468ed63fff85a1f9b923189f20945b32f4f1#l183
"""
Boot = _ped.PARTITION_BOOT
- XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
+ XBOOTLDR = _ped.PARTITION_BLS_BOOT # Note: parted calls this bls_boot
ESP = _ped.PARTITION_ESP
@@ -658,6 +669,10 @@ class PartitionModification:
flags: List[PartitionFlag] = field(default_factory=list)
btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+ # only set when modification was created from an existing
+ # partition info object to be able to reference it back
+ part_info: Optional[_PartitionInfo] = None
+
# only set if the device was created or exists
dev_path: Optional[Path] = None
partn: Optional[int] = None
@@ -724,7 +739,8 @@ class PartitionModification:
uuid=partition_info.uuid,
flags=partition_info.flags,
mountpoint=mountpoint,
- btrfs_subvols=subvol_mods
+ btrfs_subvols=subvol_mods,
+ part_info=partition_info
)
@property
@@ -832,6 +848,270 @@ class PartitionModification:
return part_mod
+class LvmLayoutType(Enum):
+ Default = 'default'
+
+ # Manual = 'manual_lvm'
+
+ def display_msg(self) -> str:
+ match self:
+ case LvmLayoutType.Default:
+ return str(_('Default layout'))
+ # case LvmLayoutType.Manual:
+ # return str(_('Manual configuration'))
+
+ raise ValueError(f'Unknown type: {self}')
+
+
+@dataclass
+class LvmVolumeGroup:
+ name: str
+ pvs: List[PartitionModification]
+ volumes: List[LvmVolume] = field(default_factory=list)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'name': self.name,
+ 'lvm_pvs': [p.obj_id for p in self.pvs],
+ 'volumes': [vol.json() for vol in self.volumes]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmVolumeGroup:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmVolumeGroup(
+ arg['name'],
+ lvm_pvs,
+ [LvmVolume.parse_arg(vol) for vol in arg['volumes']]
+ )
+
+ def contains_lv(self, lv: LvmVolume) -> bool:
+ return lv in self.volumes
+
+
+class LvmVolumeStatus(Enum):
+ Exist = 'existing'
+ Modify = 'modify'
+ Delete = 'delete'
+ Create = 'create'
+
+
+@dataclass
+class LvmVolume:
+ status: LvmVolumeStatus
+ name: str
+ fs_type: FilesystemType
+ length: Size
+ mountpoint: Optional[Path]
+ mount_options: List[str] = field(default_factory=list)
+ btrfs_subvols: List[SubvolumeModification] = field(default_factory=list)
+
+ # volume group name
+ vg_name: Optional[str] = None
+ # mapper device path /dev/<vg>/<vol>
+ dev_path: Optional[Path] = None
+
+ def __post_init__(self):
+ # needed to use the object as a dictionary key due to hash func
+ if not hasattr(self, '_obj_id'):
+ self._obj_id = uuid.uuid4()
+
+ def __hash__(self):
+ return hash(self._obj_id)
+
+ @property
+ def obj_id(self) -> str:
+ if hasattr(self, '_obj_id'):
+ return str(self._obj_id)
+ return ''
+
+ @property
+ def mapper_name(self) -> Optional[str]:
+ if self.dev_path:
+ return f'{storage.get("ENC_IDENTIFIER", "ai")}{self.safe_dev_path.name}'
+ return None
+
+ @property
+ def mapper_path(self) -> Path:
+ if self.mapper_name:
+ return Path(f'/dev/mapper/{self.mapper_name}')
+
+ raise ValueError('No mapper path set')
+
+ @property
+ def safe_dev_path(self) -> Path:
+ if self.dev_path:
+ return self.dev_path
+ raise ValueError('No device path for volume defined')
+
+ @property
+ def safe_fs_type(self) -> FilesystemType:
+ if self.fs_type is None:
+ raise ValueError('File system type is not set')
+ return self.fs_type
+
+ @property
+ def relative_mountpoint(self) -> Path:
+ """
+ Will return the relative path based on the anchor
+ e.g. Path('/mnt/test') -> Path('mnt/test')
+ """
+ if self.mountpoint is not None:
+ return self.mountpoint.relative_to(self.mountpoint.anchor)
+
+ raise ValueError('Mountpoint is not specified')
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any]) -> LvmVolume:
+ volume = LvmVolume(
+ status=LvmVolumeStatus(arg['status']),
+ name=arg['name'],
+ fs_type=FilesystemType(arg['fs_type']),
+ length=Size.parse_args(arg['length']),
+ mountpoint=Path(arg['mountpoint']) if arg['mountpoint'] else None,
+ mount_options=arg.get('mount_options', []),
+ btrfs_subvols=SubvolumeModification.parse_args(arg.get('btrfs', []))
+ )
+
+ setattr(volume, '_obj_id', arg['obj_id'])
+
+ return volume
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'obj_id': self.obj_id,
+ 'status': self.status.value,
+ 'name': self.name,
+ 'fs_type': self.fs_type.value,
+ 'length': self.length.json(),
+ 'mountpoint': str(self.mountpoint) if self.mountpoint else None,
+ 'mount_options': self.mount_options,
+ 'btrfs': [vol.json() for vol in self.btrfs_subvols]
+ }
+
+ def table_data(self) -> Dict[str, Any]:
+ part_mod = {
+ 'Type': self.status.value,
+ 'Name': self.name,
+ 'Size': self.length.format_highest(),
+ 'FS type': self.fs_type.value,
+ 'Mountpoint': str(self.mountpoint) if self.mountpoint else '',
+ 'Mount options': ', '.join(self.mount_options),
+ 'Btrfs': '{} {}'.format(str(len(self.btrfs_subvols)), 'vol')
+ }
+ return part_mod
+
+ def is_modify(self) -> bool:
+ return self.status == LvmVolumeStatus.Modify
+
+ def exists(self) -> bool:
+ return self.status == LvmVolumeStatus.Exist
+
+ def is_exists_or_modify(self) -> bool:
+ return self.status in [LvmVolumeStatus.Exist, LvmVolumeStatus.Modify]
+
+ def is_root(self) -> bool:
+ if self.mountpoint is not None:
+ return Path('/') == self.mountpoint
+ else:
+ for subvol in self.btrfs_subvols:
+ if subvol.is_root():
+ return True
+
+ return False
+
+
+@dataclass
+class LvmGroupInfo:
+ vg_size: Size
+ vg_uuid: str
+
+
+@dataclass
+class LvmVolumeInfo:
+ lv_name: str
+ vg_name: str
+ lv_size: Size
+
+
+@dataclass
+class LvmPVInfo:
+ pv_name: Path
+ lv_name: str
+ vg_name: str
+
+
+@dataclass
+class LvmConfiguration:
+ config_type: LvmLayoutType
+ vol_groups: List[LvmVolumeGroup]
+
+ def __post_init__(self):
+ # make sure all volume groups have unique PVs
+ pvs = []
+ for group in self.vol_groups:
+ for pv in group.pvs:
+ if pv in pvs:
+ raise ValueError('A PV cannot be used in multiple volume groups')
+ pvs.append(pv)
+
+ def json(self) -> Dict[str, Any]:
+ return {
+ 'config_type': self.config_type.value,
+ 'vol_groups': [vol_gr.json() for vol_gr in self.vol_groups]
+ }
+
+ @staticmethod
+ def parse_arg(arg: Dict[str, Any], disk_config: DiskLayoutConfiguration) -> LvmConfiguration:
+ lvm_pvs = []
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ if part.obj_id in arg.get('lvm_pvs', []):
+ lvm_pvs.append(part)
+
+ return LvmConfiguration(
+ config_type=LvmLayoutType(arg['config_type']),
+ vol_groups=[LvmVolumeGroup.parse_arg(vol_group, disk_config) for vol_group in arg['vol_groups']],
+ )
+
+ def get_all_pvs(self) -> List[PartitionModification]:
+ pvs = []
+ for vg in self.vol_groups:
+ pvs += vg.pvs
+
+ return pvs
+
+ def get_all_volumes(self) -> List[LvmVolume]:
+ volumes = []
+
+ for vg in self.vol_groups:
+ volumes += vg.volumes
+
+ return volumes
+
+ def get_root_volume(self) -> Optional[LvmVolume]:
+ for vg in self.vol_groups:
+ filtered = next(filter(lambda x: x.is_root(), vg.volumes), None)
+ if filtered:
+ return filtered
+
+ return None
+
+
+# def get_lv_crypt_uuid(self, lv: LvmVolume, encryption: EncryptionType) -> str:
+# """
+# Find the LUKS superblock UUID for the device that
+# contains the given logical volume
+# """
+# for vg in self.vol_groups:
+# if vg.contains_lv(lv):
+
+
@dataclass
class DeviceModification:
device: BDevice
@@ -885,11 +1165,16 @@ class DeviceModification:
class EncryptionType(Enum):
NoEncryption = "no_encryption"
Luks = "luks"
+ LvmOnLuks = 'lvm_on_luks'
+ LuksOnLvm = 'luks_on_lvm'
@classmethod
def _encryption_type_mapper(cls) -> Dict[str, 'EncryptionType']:
return {
- 'Luks': EncryptionType.Luks
+ str(_('No Encryption')): EncryptionType.NoEncryption,
+ str(_('LUKS')): EncryptionType.Luks,
+ str(_('LVM on LUKS')): EncryptionType.LvmOnLuks,
+ str(_('LUKS on LVM')): EncryptionType.LuksOnLvm
}
@classmethod
@@ -906,18 +1191,31 @@ class EncryptionType(Enum):
@dataclass
class DiskEncryption:
- encryption_type: EncryptionType = EncryptionType.Luks
+ encryption_type: EncryptionType = EncryptionType.NoEncryption
encryption_password: str = ''
partitions: List[PartitionModification] = field(default_factory=list)
+ lvm_volumes: List[LvmVolume] = field(default_factory=list)
hsm_device: Optional[Fido2Device] = None
- def should_generate_encryption_file(self, part_mod: PartitionModification) -> bool:
- return part_mod in self.partitions and part_mod.mountpoint != Path('/')
+ def __post_init__(self):
+ if self.encryption_type in [EncryptionType.Luks, EncryptionType.LvmOnLuks] and not self.partitions:
+ raise ValueError('Luks or LvmOnLuks encryption require partitions to be defined')
+
+ if self.encryption_type == EncryptionType.LuksOnLvm and not self.lvm_volumes:
+ raise ValueError('LuksOnLvm encryption require LMV volumes to be defined')
+
+ def should_generate_encryption_file(self, dev: PartitionModification | LvmVolume) -> bool:
+ if isinstance(dev, PartitionModification):
+ return dev in self.partitions and dev.mountpoint != Path('/')
+ elif isinstance(dev, LvmVolume):
+ return dev in self.lvm_volumes and dev.mountpoint != Path('/')
+ return False
def json(self) -> Dict[str, Any]:
obj: Dict[str, Any] = {
'encryption_type': self.encryption_type.value,
- 'partitions': [p.obj_id for p in self.partitions]
+ 'partitions': [p.obj_id for p in self.partitions],
+ 'lvm_volumes': [vol.obj_id for vol in self.lvm_volumes]
}
if self.hsm_device:
@@ -926,22 +1224,46 @@ class DiskEncryption:
return obj
@classmethod
+ def validate_enc(cls, disk_config: DiskLayoutConfiguration) -> bool:
+ partitions = []
+
+ for mod in disk_config.device_modifications:
+ for part in mod.partitions:
+ partitions.append(part)
+
+ if len(partitions) > 2: # assume one boot and at least 2 additional
+ if disk_config.lvm_config:
+ return False
+
+ return True
+
+ @classmethod
def parse_arg(
cls,
disk_config: DiskLayoutConfiguration,
arg: Dict[str, Any],
password: str = ''
- ) -> 'DiskEncryption':
+ ) -> Optional['DiskEncryption']:
+ if not cls.validate_enc(disk_config):
+ return None
+
enc_partitions = []
for mod in disk_config.device_modifications:
for part in mod.partitions:
if part.obj_id in arg.get('partitions', []):
enc_partitions.append(part)
+ volumes = []
+ if disk_config.lvm_config:
+ for vol in disk_config.lvm_config.get_all_volumes():
+ if vol.obj_id in arg.get('lvm_volumes', []):
+ volumes.append(vol)
+
enc = DiskEncryption(
EncryptionType(arg['encryption_type']),
password,
- enc_partitions
+ enc_partitions,
+ volumes
)
if hsm := arg.get('hsm_device', None):
@@ -992,7 +1314,7 @@ class LsblkInfo:
tran: Optional[str] = None
partn: Optional[int] = None
partuuid: Optional[str] = None
- parttype :Optional[str] = None
+ parttype: Optional[str] = None
uuid: Optional[str] = None
fstype: Optional[str] = None
fsver: Optional[str] = None
@@ -1017,7 +1339,7 @@ class LsblkInfo:
'tran': self.tran,
'partn': self.partn,
'partuuid': self.partuuid,
- 'parttype' : self.parttype,
+ 'parttype': self.parttype,
'uuid': self.uuid,
'fstype': self.fstype,
'fsver': self.fsver,
@@ -1102,13 +1424,24 @@ def _clean_field(name: str, clean_type: CleanType) -> str:
return name.replace('_percentage', '%').replace('_', '-')
-def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[LsblkInfo]:
+def _fetch_lsblk_info(
+ dev_path: Optional[Union[Path, str]] = None,
+ reverse: bool = False,
+ full_dev_path: bool = False,
+ retry: int = 3
+) -> List[LsblkInfo]:
fields = [_clean_field(f, CleanType.Lsblk) for f in LsblkInfo.fields()]
cmd = ['lsblk', '--json', '--bytes', '--output', '+' + ','.join(fields)]
if dev_path:
cmd.append(str(dev_path))
+ if reverse:
+ cmd.append('--inverse')
+
+ if full_dev_path:
+ cmd.append('--paths')
+
try:
result = SysCommand(cmd).decode()
except SysCallError as err:
@@ -1132,8 +1465,12 @@ def _fetch_lsblk_info(dev_path: Optional[Union[Path, str]] = None) -> List[Lsblk
return [LsblkInfo.from_json(device) for device in blockdevices]
-def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
- if infos := _fetch_lsblk_info(dev_path):
+def get_lsblk_info(
+ dev_path: Union[Path, str],
+ reverse: bool = False,
+ full_dev_path: bool = False
+) -> LsblkInfo:
+ if infos := _fetch_lsblk_info(dev_path, reverse=reverse, full_dev_path=full_dev_path):
return infos[0]
raise DiskError(f'lsblk failed to retrieve information for "{dev_path}"')
@@ -1142,6 +1479,7 @@ def get_lsblk_info(dev_path: Union[Path, str]) -> LsblkInfo:
def get_all_lsblk_info() -> List[LsblkInfo]:
return _fetch_lsblk_info()
+
def get_lsblk_by_mountpoint(mountpoint: Path, as_prefix: bool = False) -> List[LsblkInfo]:
def _check(infos: List[LsblkInfo]) -> List[LsblkInfo]:
devices = []