Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/helpers.py')
-rw-r--r--archinstall/lib/disk/helpers.py50
1 files changed, 37 insertions, 13 deletions
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 99856aad..f19125f4 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -8,6 +8,8 @@ import time
import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
+from ..models.subvolume import Subvolume
+
if TYPE_CHECKING:
from .partition import Partition
@@ -112,7 +114,7 @@ def cleanup_bash_escapes(data :str) -> str:
def blkid(cmd :str) -> Dict[str, Any]:
if '-o' in cmd and '-o export' not in cmd:
- raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.")
elif '-o' not in cmd:
cmd += ' -o export'
@@ -133,7 +135,7 @@ def blkid(cmd :str) -> Dict[str, Any]:
key, val = line.split('=', 1)
if key.lower() == 'devname':
devname = val
- # Lowercase for backwards compatability with all_disks() previous use cases
+ # Lowercase for backwards compatibility with all_disks() previous use cases
result[devname] = {
"path": devname,
"PATH": devname
@@ -218,7 +220,12 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
# we'll iterate the /sys/class definitions and find the information
# from there.
for block_device in glob.glob("/sys/class/block/*"):
- device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
+
+ if device_path.exists() is False:
+ log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
+ continue
+
try:
information = blkid(f'blkid -p -o export {device_path}')
except SysCallError as ex:
@@ -227,12 +234,17 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
try:
information = get_loop_info(device_path)
if not information:
+ print("Exit code for blkid -p -o export was:", ex.exit_code)
raise SysCallError("Could not get loop information", exit_code=1)
except SysCallError:
+ print("Not a loop device, trying uevent rules.")
information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
else:
+ # We could not reliably get any information, perhaps the disk is clean of information?
+ print("Raising ex because:", ex.exit_code)
raise ex
+ # return instances
information = enrich_blockdevice_information(information)
@@ -244,7 +256,7 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str,
instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path))))
elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
instances[path] = BlockDevice(path, path_info)
- elif path_info.get('TYPE') == 'squashfs':
+ elif path_info.get('TYPE') in ('squashfs', 'erofs'):
# We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
continue
else:
@@ -368,7 +380,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict
return filters
-def get_partitions_in_use(mountpoint :str) -> List[Partition]:
+def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
from .partition import Partition
try:
@@ -391,12 +403,20 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
if not type(blockdev) in (Partition, MapperDev):
continue
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
+ if isinstance(blockdev, Partition):
+ for blockdev_mountpoint in blockdev.mountpoints:
+ block_devices_mountpoints[blockdev_mountpoint] = blockdev
+ else:
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ # Since all_blockdevices() returns PosixPath objects, we need to convert
+ # findmnt paths to pathlib.Path() first:
+ mountpoint = pathlib.Path(mountpoint)
+
if mountpoint in block_devices_mountpoints:
if mountpoint not in mounts:
mounts[mountpoint] = block_devices_mountpoints[mountpoint]
@@ -433,9 +453,10 @@ def disk_layouts() -> Optional[Dict[str, Any]]:
def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool:
- for partition in blockdevices.values():
- if partition.get('encrypted', False):
- yield partition
+ for blockdevice in blockdevices.values():
+ for partition in blockdevice.get('partitions', []):
+ if partition.get('encrypted', False):
+ yield partition
def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
for device in block_devices:
@@ -468,13 +489,14 @@ def convert_device_to_uuid(path :str) -> str:
raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
+
def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
""" Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
Coded for clarity rather than performance
Input parms:
:parm partition the partition we check
- :type Either a Partition object or a dict with the contents of a partition definiton in the disk_layouts schema
+ :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema
:parm target (a string representing a mount path we want to check for.
:type str
@@ -484,10 +506,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri
"""
# we create the mountpoint list
if isinstance(partition,dict):
- subvols = partition.get('btrfs',{}).get('subvolumes',{})
- mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols]
+ subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
+ mountpoints = [partition.get('mountpoint')]
+ mountpoints += [volume.mountpoint for volume in subvolumes]
else:
mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
+
# we check
if strict or target == '/':
if target in mountpoints: