Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/helpers.py')
-rw-r--r--archinstall/lib/disk/helpers.py241
1 files changed, 206 insertions, 35 deletions
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index b04e2740..afaf9e5e 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -5,12 +5,15 @@ import os
import pathlib
import re
import time
+import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from .blockdevice import BlockDevice
+from .dmcryptdev import DMCryptDev
+from .mapperdev import MapperDev
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
from ..output import log
@@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
return
return True
-# lsblk --json -l -n -o path
-def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
- kwargs.setdefault("partitions", False)
- drives = {}
- lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
- for drive in lsblk['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part':
+def cleanup_bash_escapes(data :str) -> str:
+ return data.replace(r'\ ', ' ')
+
+def blkid(cmd :str) -> Dict[str, Any]:
+ if '-o' in cmd and '-o export' not in cmd:
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ elif '-o' not in cmd:
+ cmd += ' -o export'
+
+ try:
+ raw_data = SysCommand(cmd).decode()
+ except SysCallError as error:
+ log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
+ raise error
+
+ result = {}
+ # Process the raw result
+ devname = None
+ for line in raw_data.split('\r\n'):
+ if not len(line):
+ devname = None
+ continue
+
+ key, val = line.split('=', 1)
+ if key.lower() == 'devname':
+ devname = val
+ # Lowercase for backwards compatability with all_disks() previous use cases
+ result[devname] = {
+ "path": devname,
+ "PATH": devname
+ }
+ continue
+
+ result[devname][key] = cleanup_bash_escapes(val)
+
+ return result
+
+def get_loop_info(path :str) -> Dict[str, Any]:
+ for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == path:
continue
- drives[drive['path']] = BlockDevice(drive['path'], drive)
+ return {
+ path: {
+ **drive,
+ 'type' : 'loop',
+ 'TYPE' : 'loop',
+ 'DEVTYPE' : 'loop',
+ 'PATH' : drive['name'],
+ 'path' : drive['name']
+ }
+ }
+
+ return {}
+
+def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
+ result = {}
+ for device_path, device_information in information.items():
+ dev_name = pathlib.Path(device_information['PATH']).name
+ if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
+ with dmcrypt_name.open('r') as fh:
+ device_information['DMCRYPT_NAME'] = fh.read().strip()
+
+ result[device_path] = device_information
+
+ return result
+
+def uevent(data :str) -> Dict[str, Any]:
+ information = {}
+
+ for line in data.replace('\r\n', '\n').split('\n'):
+ if len((line := line.strip())):
+ key, val = line.split('=', 1)
+ information[key] = val
+
+ return information
+
+def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
+ device_information = {}
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ return {
+ f"/dev/{dev_name}" : {
+ **device_information,
+ 'path' : f'/dev/{dev_name}',
+ 'PATH' : f'/dev/{dev_name}',
+ 'PTTYPE' : None
+ }
+ }
+
+def all_disks() -> List[BlockDevice]:
+ log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
+ return all_blockdevices(partitions=False, mappers=False)
+
+def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]:
+ """
+ Returns BlockDevice() and Partition() objects for all available devices.
+ """
+ from .partition import Partition
- return drives
+ instances = {}
+ # Due to lsblk being highly unreliable for this use case,
+ # we'll iterate the /sys/class definitions and find the information
+ # from there.
+ for block_device in glob.glob("/sys/class/block/*"):
+ device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ try:
+ information = blkid(f'blkid -p -o export {device_path}')
+
+ # TODO: No idea why F841 is raised here:
+ except SysCallError as error: # noqa: F841
+ if error.exit_code in (512, 2):
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError("Could not get loop information", exit_code=1)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
+ else:
+ raise error
+
+ information = enrich_blockdevice_information(information)
+
+ for path, path_info in information.items():
+ if path_info.get('DMCRYPT_NAME'):
+ instances[path] = DMCryptDev(dev_path=path)
+ elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
+ if partitions:
+ instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path))))
+ elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
+ instances[path] = BlockDevice(path, path_info)
+ elif path_info.get('TYPE') == 'squashfs':
+ # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
+ continue
+ else:
+ log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
+
+ if mappers:
+ for block_device in glob.glob("/dev/mapper/*"):
+ if (pathobj := pathlib.Path(block_device)).is_symlink():
+ instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
+
+ return instances
+
+
+def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
+ partition_name = path.name
+ pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
+ return f"/dev/{pci_device.parent.name}"
def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_disks()
+ collection = all_blockdevices(partitions=False)
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
continue
@@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :
return collection[drive]
def split_bind_name(path :Union[pathlib.Path, str]) -> list:
+ # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
# we check for the bind notation. if exist we'll only use the "true" device path
if '[' in str(path) : # is a bind path (btrfs subvolume path)
device_path, bind_path = str(path).split('[')
@@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
+def find_mountpoint(device_path :str) -> Dict[str, Any]:
+ try:
+ for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
+ yield filesystem
+ except SysCallError:
+ return {}
+
def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
- device_path,bind_path = split_bind_name(path)
+ device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
- log(f"Getting mount information for device path {traversal}", level=logging.INFO)
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
break
- except SysCallError:
+
+ except SysCallError as error:
+ print('ERROR:', error)
pass
if not traverse:
break
if not output:
- raise DiskError(f"Could not get mount information for device path {path}")
+ raise DiskError(f"Could not get mount information for device path {device_path}")
output = json.loads(output)
+
# for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
# i.e. the subvolume filesystem we're searching for
if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
+
if 'filesystems' in output:
if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
+ raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
if return_real_path:
return output['filesystems'][0], traversal
@@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur
return {}
+def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
+ for info in data:
+ if info.get('target') not in filters:
+ filters[info.get('target')] = None
+
+ filters.update(get_all_targets(info.get('children', [])))
+
+ return filters
+
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError:
- return []
-
- mounts = []
+ return {}
if not output:
- return []
+ return {}
output = json.loads(output)
- for target in output.get('filesystems', []):
- # We need to create a BlockDevice() instead of 'None' here when creaiting Partition()
- # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None.
+ # print(output)
- # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device
- # Note: doing print(partition) here will break because the above mentioned issue.
- partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])
- partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ mounts = {}
+
+ block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
+
+ block_devices_mountpoints = {}
+ for blockdev in block_devices_available.values():
+ if not type(blockdev) in (Partition, MapperDev):
+ continue
- # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using
- # (lsblk pkname lists both the partition and blockdevice, BD being the last entry)
- result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1]
- block_device = BlockDevice(f"/dev/{result}")
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
- # Once we figured the block device out, we can properly create the partition object
- partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
- mounts.append(partition)
+ for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ if mountpoint in block_devices_mountpoints:
+ if mountpoint not in mounts:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
+ # If the already defined mountpoint is a DMCryptDev, and the newly found
+ # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
+ elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- for child in target.get('children', []):
- mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target']))
+ log(f"Available partitions: {mounts}", level=logging.DEBUG)
return mounts