Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/encryption.py26
-rw-r--r--archinstall/lib/disk/filesystem.py2
-rw-r--r--archinstall/lib/disk/helpers.py88
-rw-r--r--archinstall/lib/disk/mapperdev.py4
-rw-r--r--archinstall/lib/disk/partition.py133
-rw-r--r--archinstall/lib/disk/validators.py6
6 files changed, 155 insertions, 104 deletions
diff --git a/archinstall/lib/disk/encryption.py b/archinstall/lib/disk/encryption.py
index 67f656c8..c7496bfa 100644
--- a/archinstall/lib/disk/encryption.py
+++ b/archinstall/lib/disk/encryption.py
@@ -46,7 +46,7 @@ class DiskEncryptionMenu(AbstractSubMenu):
Selector(
_('Partitions'),
func=lambda preset: select_partitions_to_encrypt(self._disk_layouts, preset),
- display_func=lambda x: f'{len(x)} {_("Partitions")}' if x else None,
+ display_func=lambda x: f'{sum([len(y) for y in x.values()])} {_("Partitions")}' if x else None,
dependencies=['encryption_password'],
default=self._preset.partitions,
preview_func=self._prev_disk_layouts,
@@ -86,9 +86,14 @@ class DiskEncryptionMenu(AbstractSubMenu):
def _prev_disk_layouts(self) -> Optional[str]:
selector = self._menu_options['partitions']
if selector.has_selection():
- partitions: List[Any] = selector.current_selection
+ partitions: Dict[str, Any] = selector.current_selection
+
+ all_partitions = []
+ for parts in partitions.values():
+ all_partitions += parts
+
output = str(_('Partitions to be encrypted')) + '\n'
- output += current_partition_layout(partitions, with_title=False)
+ output += current_partition_layout(all_partitions, with_title=False)
return output.rstrip()
return None
@@ -132,7 +137,7 @@ def select_hsm(preset: Optional[Fido2Device] = None) -> Optional[Fido2Device]:
return None
-def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: List[Any]) -> List[Any]:
+def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: Dict[str, Any]) -> Dict[str, Any]:
# If no partitions was marked as encrypted, but a password was supplied and we have some disks to format..
# Then we need to identify which partitions to encrypt. This will default to / (root).
all_partitions = []
@@ -153,10 +158,17 @@ def select_partitions_to_encrypt(disk_layouts: Dict[str, Any], preset: List[Any]
match choice.type_:
case MenuSelectionType.Reset:
- return []
+ return {}
case MenuSelectionType.Skip:
return preset
case MenuSelectionType.Selection:
- return choice.value # type: ignore
+ selections: List[Any] = choice.value # type: ignore
+ partitions = {}
+
+ for path, device in disk_layouts.items():
+ for part in selections:
+ if part in device.get('partitions', []):
+ partitions.setdefault(path, []).append(part)
- return []
+ return partitions
+ return {}
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index bdfa502a..1083df53 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -113,7 +113,7 @@ class Filesystem:
format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')
- if partition in disk_encryption.partitions:
+ if disk_encryption and partition in disk_encryption.all_partitions:
if not partition['device_instance']:
raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index a5164b76..80d0cb53 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -212,6 +212,47 @@ def all_disks() -> List[BlockDevice]:
log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
return all_blockdevices(partitions=False, mappers=False)
+def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]:
+ for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
+ partprobe(device_path)
+ time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt))
+
+ try:
+ if exclude_iso_dev:
+ # exclude all devices associated with the iso boot locations
+ iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt']
+
+ try:
+ lsblk_info = get_lsblk_info(device_path)
+ except DiskError:
+ continue
+
+ if any([dev in lsblk_info.mountpoints for dev in iso_devs]):
+ continue
+
+ information = blkid(f'blkid -p -o export {device_path}')
+ return enrich_blockdevice_information(information)
+ except SysCallError as ex:
+ if ex.exit_code == 2:
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ resolved_device_name = device_path.readlink().name
+ except OSError:
+ resolved_device_name = device_path.name
+
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1)
+ return enrich_blockdevice_information(information)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(resolved_device_name)
+ return enrich_blockdevice_information(information)
+ else:
+ # We could not reliably get any information, perhaps the disk is clean of information?
+ if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1:
+ raise ex
def all_blockdevices(
mappers: bool = False,
@@ -230,40 +271,18 @@ def all_blockdevices(
# we'll iterate the /sys/class definitions and find the information
# from there.
for block_device in glob.glob("/sys/class/block/*"):
- device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+ try:
+ device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+ except FileNotFoundError:
+ log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
if device_path.exists() is False:
log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
continue
- try:
- if exclude_iso_dev:
- # exclude all devices associated with the iso boot locations
- iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt']
- lsblk_info = get_lsblk_info(device_path)
- if any([dev in lsblk_info.mountpoints for dev in iso_devs]):
- continue
-
- information = blkid(f'blkid -p -o export {device_path}')
- except SysCallError as ex:
- if ex.exit_code in (512, 2):
- # Assume that it's a loop device, and try to get info on it
- try:
- information = get_loop_info(device_path)
- if not information:
- print("Exit code for blkid -p -o export was:", ex.exit_code)
- raise SysCallError("Could not get loop information", exit_code=1)
-
- except SysCallError:
- print("Not a loop device, trying uevent rules.")
- information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
- else:
- # We could not reliably get any information, perhaps the disk is clean of information?
- print("Raising ex because:", ex.exit_code)
- raise ex
- # return instances
-
- information = enrich_blockdevice_information(information)
+ information = get_blockdevice_info(device_path)
+ if not information:
+ continue
for path, path_info in information.items():
if path_info.get('DMCRYPT_NAME'):
@@ -409,7 +428,6 @@ def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
return {}
output = json.loads(output)
- # print(output)
mounts = {}
@@ -421,11 +439,13 @@ def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
continue
if isinstance(blockdev, Partition):
- for blockdev_mountpoint in blockdev.mountpoints:
- block_devices_mountpoints[blockdev_mountpoint] = blockdev
+ if blockdev.mountpoints:
+ for blockdev_mountpoint in blockdev.mountpoints:
+ block_devices_mountpoints[blockdev_mountpoint] = blockdev
else:
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
+ if blockdev.mount_information:
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
index 71ef2a79..bf1b3583 100644
--- a/archinstall/lib/disk/mapperdev.py
+++ b/archinstall/lib/disk/mapperdev.py
@@ -25,6 +25,10 @@ class MapperDev:
return f"/dev/mapper/{self.mappername}"
@property
+ def part_uuid(self):
+ return self.partition.part_uuid
+
+ @property
def partition(self):
from .helpers import uevent, get_parent_of_partition
from .partition import Partition
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 9febf102..87eaa6a7 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -98,17 +98,18 @@ class Partition:
if mountpoint:
self.mount(mountpoint)
- self._partition_info = self._fetch_information()
-
- if not autodetect_filesystem and filesystem:
- self._partition_info.filesystem_type = filesystem
+ try:
+ self._partition_info = self._fetch_information()
+
+ if not autodetect_filesystem and filesystem:
+ self._partition_info.filesystem_type = filesystem
- if self._partition_info.filesystem_type == 'crypto_LUKS':
- self._encrypted = True
+ if self._partition_info.filesystem_type == 'crypto_LUKS':
+ self._encrypted = True
+ except DiskError:
+ self._partition_info = None
- # I hate doint this but I'm currently unsure where this
- # is acutally used to be able to fix the typing issues properly
- @typing.no_type_check
+ @typing.no_type_check # I hate doint this but I'm currently unsure where this is used.
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
@@ -120,14 +121,17 @@ class Partition:
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
- if mountpoint := self._partition_info.get_first_mountpoint():
- mount_repr = f", mounted={mountpoint}"
- elif self._target_mountpoint:
- mount_repr = f", rel_mountpoint={self._target_mountpoint}"
+ if self._partition_info:
+ if mountpoint := self._partition_info.get_first_mountpoint():
+ mount_repr = f", mounted={mountpoint}"
+ elif self._target_mountpoint:
+ mount_repr = f", rel_mountpoint={self._target_mountpoint}"
classname = self.__class__.__name__
- if self._encrypted:
+ if not self._partition_info:
+ return f'{classname}(path={self._path})'
+ elif self._encrypted:
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
else:
return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
@@ -146,7 +150,7 @@ class Partition:
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
- 'filesystem': self._partition_info.filesystem_type
+ 'filesystem': self._partition_info.filesystem_type if self._partition_info else 'Unknown'
}
return partition_info
@@ -164,34 +168,37 @@ class Partition:
'start': self.start,
'size': self.end,
'filesystem': {
- 'format': self._partition_info.filesystem_type
+ 'format': self._partition_info.filesystem_type if self._partition_info else 'None'
}
}
def _call_lsblk(self) -> Dict[str, Any]:
- self.partprobe()
- # This sleep might be overkill, but lsblk is known to
- # work against a chaotic cache that can change during call
- # causing no information to be returned (blkid is better)
- # time.sleep(1)
+ for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
+ self.partprobe()
+ time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt)) # TODO: Remove, we should be relying on blkid instead of lsblk
+ # This sleep might be overkill, but lsblk is known to
+ # work against a chaotic cache that can change during call
+ # causing no information to be returned (blkid is better)
+ # time.sleep(1)
- # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
+ # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
- try:
- output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
- except SysCallError as error:
- # It appears as if lsblk can return exit codes like 8192 to indicate something.
- # But it does return output so we'll try to catch it.
- output = error.worker.decode('UTF-8')
-
- if output:
try:
- lsblk_info = json.loads(output)
- return lsblk_info
- except json.decoder.JSONDecodeError:
- log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR)
-
- raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk')
+ output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
+ except SysCallError as error:
+ # Get the output minus the message/info from lsblk if it returns a non-zero exit code.
+ output = error.worker.decode('UTF-8')
+ if '{' in output:
+ output = output[output.find('{'):]
+
+ if output:
+ try:
+ lsblk_info = json.loads(output)
+ return lsblk_info
+ except json.decoder.JSONDecodeError:
+ log(f"Could not decode JSON: {output}", fg="red", level=logging.ERROR)
+
+ raise DiskError(f'Failed to get partition information "{self.device_path}" with lsblk')
def _call_sfdisk(self) -> Dict[str, Any]:
output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
@@ -212,9 +219,12 @@ class Partition:
lsblk_info = self._call_lsblk()
sfdisk_info = self._call_sfdisk()
- if not (device := lsblk_info.get('blockdevices', [None])[0]):
+ if not (device := lsblk_info.get('blockdevices', [])):
raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk')
+ # Grab the first (and only) block device in the list as we're targeting a specific partition
+ device = device[0]
+
mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
@@ -243,7 +253,8 @@ class Partition:
@property
def filesystem(self) -> str:
- return self._partition_info.filesystem_type
+ if self._partition_info:
+ return self._partition_info.filesystem_type
@property
def mountpoint(self) -> Optional[Path]:
@@ -253,43 +264,51 @@ class Partition:
@property
def mountpoints(self) -> List[Path]:
- return self._partition_info.mountpoints
+ if self._partition_info:
+ return self._partition_info.mountpoints
@property
def sector_size(self) -> int:
- return self._partition_info.sector_size
+ if self._partition_info:
+ return self._partition_info.sector_size
@property
def start(self) -> Optional[int]:
- return self._partition_info.start
+ if self._partition_info:
+ return self._partition_info.start
@property
def end(self) -> Optional[int]:
- return self._partition_info.end
+ if self._partition_info:
+ return self._partition_info.end
@property
def end_sectors(self) -> Optional[int]:
- start = self._partition_info.start
- end = self._partition_info.end
- if start and end:
- return start + end
- return None
+ if self._partition_info:
+ start = self._partition_info.start
+ end = self._partition_info.end
+ if start and end:
+ return start + end
@property
def size(self) -> Optional[float]:
- return self._partition_info.size
+ if self._partition_info:
+ return self._partition_info.size
@property
def boot(self) -> bool:
- return self._partition_info.bootable
+ if self._partition_info:
+ return self._partition_info.bootable
@property
def partition_type(self) -> Optional[str]:
- return self._partition_info.pttype
+ if self._partition_info:
+ return self._partition_info.pttype
@property
def part_uuid(self) -> str:
- return self._partition_info.partuuid
+ if self._partition_info:
+ return self._partition_info.partuuid
@property
def uuid(self) -> Optional[str]:
@@ -355,7 +374,8 @@ class Partition:
log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
- return self._partition_info.uuid
+ if self._partition_info:
+ return self._partition_info.uuid
@property
def encrypted(self) -> Union[bool, None]:
@@ -611,14 +631,7 @@ class Partition:
return False
def unmount(self) -> bool:
- worker = SysCommand(f"/usr/bin/umount {self._path}")
- exit_code = worker.exit_code
-
- # Without to much research, it seams that low error codes are errors.
- # And above 8k is indicators such as "/dev/x not mounted.".
- # So anything in between 0 and 8k are errors (?).
- if exit_code and 0 < exit_code < 8000:
- raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
+ SysCommand(f"/usr/bin/umount {self._path}")
# Update the partition info since the mount info has changed after this call.
self._partition_info = self._fetch_information()
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index 54808886..076a8ba2 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -7,10 +7,12 @@ def valid_parted_position(pos :str) -> bool:
if pos.isdigit():
return True
- if pos.lower().endswith('b') and pos[:-1].isdigit():
+ pos_lower = pos.lower()
+
+ if (pos_lower.endswith('b') or pos_lower.endswith('s')) and pos[:-1].isdigit():
return True
- if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
+ if any(pos_lower.endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit()
for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']):
return True