Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/helpers.py')
-rw-r--r--archinstall/lib/disk/helpers.py556
1 files changed, 0 insertions, 556 deletions
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
deleted file mode 100644
index 80d0cb53..00000000
--- a/archinstall/lib/disk/helpers.py
+++ /dev/null
@@ -1,556 +0,0 @@
-from __future__ import annotations
-import json
-import logging
-import os # type: ignore
-import pathlib
-import re
-import time
-import glob
-
-from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
-# https://stackoverflow.com/a/39757388/929999
-from .diskinfo import get_lsblk_info
-from ..models.subvolume import Subvolume
-
-from .blockdevice import BlockDevice
-from .dmcryptdev import DMCryptDev
-from .mapperdev import MapperDev
-from ..exceptions import SysCallError, DiskError
-from ..general import SysCommand
-from ..output import log
-from ..storage import storage
-
-if TYPE_CHECKING:
- from .partition import Partition
-
-
-ROOT_DIR_PATTERN = re.compile('^.*?/devices')
-GIGA = 2 ** 30
-
-def convert_size_to_gb(size :Union[int, float]) -> float:
- return round(size / GIGA,1)
-
-def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]:
- result = {device: 0 for device in block_devices}
-
- for device, weight in result.items():
- if device.spinning:
- weight -= 10
- else:
- weight += 5
-
- if device.bus_type == 'nvme':
- weight += 20
- elif device.bus_type == 'sata':
- weight += 10
-
- result[device] = weight
-
- return result
-
-def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]:
- for disk in devices:
- if disk.size >= gigabytes:
- yield disk
-
-def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
- if not filter_out:
- filter_out = []
-
- copy_devices = [*devices]
- for filter_device in filter_out:
- if filter_device in copy_devices:
- copy_devices.pop(copy_devices.index(filter_device))
-
- copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes))
-
- if not len(copy_devices):
- return None
-
- return max(copy_devices, key=(lambda device : device.size))
-
-def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
- if not filter_out:
- filter_out = []
-
- copy_devices = [*devices]
- for filter_device in filter_out:
- if filter_device in copy_devices:
- copy_devices.pop(copy_devices.index(filter_device))
-
- if not len(copy_devices):
- return None
-
- return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
-
-def convert_to_gigabytes(string :str) -> float:
- unit = string.strip()[-1]
- size = float(string.strip()[:-1])
-
- if unit == 'M':
- size = size / 1024
- elif unit == 'T':
- size = size * 1024
-
- return size
-
-def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
- # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
- if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
- with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
- if f.read(1) == '1':
- return
-
- path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name)))
- hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire")
- for bus in hotplug_buses:
- if os.path.exists('/sys/bus/{}'.format(bus)):
- for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)):
- device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus)))
- if re.search(device_link, path):
- return
- return True
-
-
-def cleanup_bash_escapes(data :str) -> str:
- return data.replace(r'\ ', ' ')
-
-def blkid(cmd :str) -> Dict[str, Any]:
- if '-o' in cmd and '-o export' not in cmd:
- raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.")
- elif '-o' not in cmd:
- cmd += ' -o export'
-
- try:
- raw_data = SysCommand(cmd).decode()
- except SysCallError as error:
- log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
- raise error
-
- result = {}
- # Process the raw result
- devname = None
- for line in raw_data.split('\r\n'):
- if not len(line):
- devname = None
- continue
-
- key, val = line.split('=', 1)
- if key.lower() == 'devname':
- devname = val
- # Lowercase for backwards compatibility with all_disks() previous use cases
- result[devname] = {
- "path": devname,
- "PATH": devname
- }
- continue
-
- result[devname][key] = cleanup_bash_escapes(val)
-
- return result
-
-def get_loop_info(path :str) -> Dict[str, Any]:
- for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == path:
- continue
-
- return {
- path: {
- **drive,
- 'type' : 'loop',
- 'TYPE' : 'loop',
- 'DEVTYPE' : 'loop',
- 'PATH' : drive['name'],
- 'path' : drive['name']
- }
- }
-
- return {}
-
-def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
- result = {}
- for device_path, device_information in information.items():
- dev_name = pathlib.Path(device_information['PATH']).name
- if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
- with open(f"/sys/class/block/{dev_name}/uevent") as fh:
- device_information.update(uevent(fh.read()))
-
- if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
- with dmcrypt_name.open('r') as fh:
- device_information['DMCRYPT_NAME'] = fh.read().strip()
-
- result[device_path] = device_information
-
- return result
-
-def uevent(data :str) -> Dict[str, Any]:
- information = {}
-
- for line in data.replace('\r\n', '\n').split('\n'):
- if len((line := line.strip())):
- key, val = line.split('=', 1)
- information[key] = val
-
- return information
-
-def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
- device_information = {}
- with open(f"/sys/class/block/{dev_name}/uevent") as fh:
- device_information.update(uevent(fh.read()))
-
- return {
- f"/dev/{dev_name}" : {
- **device_information,
- 'path' : f'/dev/{dev_name}',
- 'PATH' : f'/dev/{dev_name}',
- 'PTTYPE' : None
- }
- }
-
-
-def all_disks() -> List[BlockDevice]:
- log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
- return all_blockdevices(partitions=False, mappers=False)
-
-def get_blockdevice_info(device_path, exclude_iso_dev :bool = True) -> Dict[str, Any]:
- for retry_attempt in range(storage['DISK_RETRY_ATTEMPTS']):
- partprobe(device_path)
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * retry_attempt))
-
- try:
- if exclude_iso_dev:
- # exclude all devices associated with the iso boot locations
- iso_devs = ['/run/archiso/airootfs', '/run/archiso/bootmnt']
-
- try:
- lsblk_info = get_lsblk_info(device_path)
- except DiskError:
- continue
-
- if any([dev in lsblk_info.mountpoints for dev in iso_devs]):
- continue
-
- information = blkid(f'blkid -p -o export {device_path}')
- return enrich_blockdevice_information(information)
- except SysCallError as ex:
- if ex.exit_code == 2:
- # Assume that it's a loop device, and try to get info on it
- try:
- resolved_device_name = device_path.readlink().name
- except OSError:
- resolved_device_name = device_path.name
-
- try:
- information = get_loop_info(device_path)
- if not information:
- raise SysCallError(f"Could not get loop information for {resolved_device_name}", exit_code=1)
- return enrich_blockdevice_information(information)
-
- except SysCallError:
- information = get_blockdevice_uevent(resolved_device_name)
- return enrich_blockdevice_information(information)
- else:
- # We could not reliably get any information, perhaps the disk is clean of information?
- if retry_attempt == storage['DISK_RETRY_ATTEMPTS'] - 1:
- raise ex
-
-def all_blockdevices(
- mappers: bool = False,
- partitions: bool = False,
- error: bool = False,
- exclude_iso_dev: bool = True
-) -> Dict[str, Any]:
- """
- Returns BlockDevice() and Partition() objects for all available devices.
- """
- from .partition import Partition
-
- instances = {}
-
- # Due to lsblk being highly unreliable for this use case,
- # we'll iterate the /sys/class definitions and find the information
- # from there.
- for block_device in glob.glob("/sys/class/block/*"):
- try:
- device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}")
- except FileNotFoundError:
- log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
-
- if device_path.exists() is False:
- log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow")
- continue
-
- information = get_blockdevice_info(device_path)
- if not information:
- continue
-
- for path, path_info in information.items():
- if path_info.get('DMCRYPT_NAME'):
- instances[path] = DMCryptDev(dev_path=path)
- elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
- if partitions:
- instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path))))
- elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
- instances[path] = BlockDevice(path, path_info)
- elif path_info.get('TYPE') in ('squashfs', 'erofs'):
- # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
- continue
- else:
- log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
-
- if mappers:
- for block_device in glob.glob("/dev/mapper/*"):
- if (pathobj := pathlib.Path(block_device)).is_symlink():
- instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
-
- return instances
-
-
-def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
- partition_name = path.name
- pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
- return f"/dev/{pci_device.parent.name}"
-
-def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_blockdevices(partitions=False)
- for drive in collection:
- if size and convert_to_gigabytes(collection[drive]['size']) != size:
- continue
- if model and (collection[drive]['model'] is None or collection[drive]['model'].lower() != model.lower()):
- continue
-
- return collection[drive]
-
-def split_bind_name(path :Union[pathlib.Path, str]) -> list:
- # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
- # we check for the bind notation. if exist we'll only use the "true" device path
- if '[' in str(path) : # is a bind path (btrfs subvolume path)
- device_path, bind_path = str(path).split('[')
- bind_path = bind_path[:-1].strip() # remove the ]
- else:
- device_path = path
- bind_path = None
- return device_path,bind_path
-
-def find_mountpoint(device_path :str) -> Dict[str, Any]:
- try:
- for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
- yield filesystem
- except SysCallError:
- return {}
-
-def findmnt(path :pathlib.Path, traverse :bool = False, ignore :List = [], recurse :bool = True) -> Dict[str, Any]:
- for traversal in list(map(str, [str(path)] + list(path.parents))):
- if traversal in ignore:
- continue
-
- try:
- log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
- if (output := SysCommand(f"/usr/bin/findmnt --json {'--submounts' if recurse else ''} {traversal}").decode('UTF-8')):
- return json.loads(output)
-
- except SysCallError as error:
- log(f"Could not get mount information on {path} but continuing and ignoring: {error}", level=logging.INFO, fg="gray")
- pass
-
- if not traverse:
- break
-
- raise DiskError(f"Could not get mount information for path {path}")
-
-
-def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False, ignore :List = []) -> Dict[str, Any]:
- import traceback
-
- log(f"Deprecated: archinstall.get_mount_info(). Use archinstall.findmnt() instead, which does not do any automatic parsing. Please change at:\n{''.join(traceback.format_stack())}")
- device_path, bind_path = split_bind_name(path)
- output = {}
-
- for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
- if traversal in ignore:
- continue
-
- try:
- log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
- if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
- break
-
- except SysCallError as error:
- print('ERROR:', error)
- pass
-
- if not traverse:
- break
-
- if not output:
- raise DiskError(f"Could not get mount information for device path {device_path}")
-
- output = json.loads(output)
-
- # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
- # i.e. the subvolume filesystem we're searching for
- if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
- output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
-
- if 'filesystems' in output:
- if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
-
- if return_real_path:
- return output['filesystems'][0], traversal
- else:
- return output['filesystems'][0]
-
- if return_real_path:
- return {}, traversal
- else:
- return {}
-
-
-def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
- for info in data:
- if info.get('target') not in filters:
- filters[info.get('target')] = None
-
- filters.update(get_all_targets(info.get('children', [])))
-
- return filters
-
-def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
- from .partition import Partition
-
- try:
- output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
- except SysCallError:
- return {}
-
- if not output:
- return {}
-
- output = json.loads(output)
-
- mounts = {}
-
- block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
-
- block_devices_mountpoints = {}
- for blockdev in block_devices_available.values():
- if not type(blockdev) in (Partition, MapperDev):
- continue
-
- if isinstance(blockdev, Partition):
- if blockdev.mountpoints:
- for blockdev_mountpoint in blockdev.mountpoints:
- block_devices_mountpoints[blockdev_mountpoint] = blockdev
- else:
- if blockdev.mount_information:
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
-
- log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
-
- for mountpoint in list(get_all_targets(output['filesystems']).keys()):
- # Since all_blockdevices() returns PosixPath objects, we need to convert
- # findmnt paths to pathlib.Path() first:
- mountpoint = pathlib.Path(mountpoint)
-
- if mountpoint in block_devices_mountpoints:
- if mountpoint not in mounts:
- mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- # If the already defined mountpoint is a DMCryptDev, and the newly found
- # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
- elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
- mounts[mountpoint] = block_devices_mountpoints[mountpoint]
-
- log(f"Available partitions: {mounts}", level=logging.DEBUG)
-
- return mounts
-
-
-def get_filesystem_type(path :str) -> Optional[str]:
- try:
- return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip()
- except SysCallError:
- return None
-
-
-def disk_layouts() -> Optional[Dict[str, Any]]:
- try:
- if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0:
- return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()}
- else:
- log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow")
- return None
- except SysCallError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
- return None
- except json.decoder.JSONDecodeError as err:
- log(f"Could not return disk layouts: {err}", level=logging.WARNING, fg="yellow")
- return None
-
-
-def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
- for device in block_devices:
- for partition in block_devices[device]['partitions']:
- if partition.get('mountpoint', None) == relative_mountpoint:
- return partition
-
-def partprobe(path :str = '') -> bool:
- try:
- if SysCommand(f'bash -c "partprobe {path}"').exit_code == 0:
- return True
- except SysCallError:
- pass
- return False
-
-def convert_device_to_uuid(path :str) -> str:
- device_name, bind_name = split_bind_name(path)
-
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- partprobe(device_name)
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) # TODO: Remove, we should be relying on blkid instead of lsblk
-
- # TODO: Convert lsblk to blkid
- # (lsblk supports BlockDev and Partition UUID grabbing, blkid requires you to pick PTUUID and PARTUUID)
- output = json.loads(SysCommand(f"lsblk --json -o+UUID {device_name}").decode('UTF-8'))
-
- for device in output['blockdevices']:
- if (dev_uuid := device.get('uuid', None)):
- return dev_uuid
-
- raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.")
-
-
-def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool:
- """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path)
- Coded for clarity rather than performance
-
- Input parms:
- :parm partition the partition we check
- :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema
-
- :parm target (a string representing a mount path we want to check for.
- :type str
-
- :parm strict if the check will be strict, target is exactly the mountpoint, or no, where the target is a leaf (f.i. to check if it is in /mnt/archinstall/). Not available for root check ('/') for obvious reasons
-
- """
- # we create the mountpoint list
- if isinstance(partition,dict):
- subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', [])
- mountpoints = [partition.get('mountpoint')]
- mountpoints += [volume.mountpoint for volume in subvolumes]
- else:
- mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes]
-
- # we check
- if strict or target == '/':
- if target in mountpoints:
- return True
- else:
- return False
- else:
- for mp in mountpoints:
- if mp and mp.endswith(target):
- return True
- return False