From 7ac06d75d0785da07d270dc38a7581d74c285cc5 Mon Sep 17 00:00:00 2001 From: Anton Hvornum Date: Fri, 22 Oct 2021 20:43:01 +0200 Subject: Restructured disk.py into lib/disk/.py instead. Shouldn't be any broken links as we expose all the functions through __init__.py - but you never know so I'll keep an eye for issues with this. --- archinstall/lib/disk/helpers.py | 178 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 archinstall/lib/disk/helpers.py (limited to 'archinstall/lib/disk/helpers.py') diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py new file mode 100644 index 00000000..d37dfd9b --- /dev/null +++ b/archinstall/lib/disk/helpers.py @@ -0,0 +1,178 @@ +import re +import json +from ..exceptions import SysCallError +from ..general import SysCommand +from ..output import log + +ROOT_DIR_PATTERN = re.compile('^.*?/devices') + +def sort_block_devices_based_on_performance(block_devices): + result = {device: 0 for device in block_devices} + + for device, weight in result.items(): + if device.spinning: + weight -= 10 + else: + weight += 5 + + if device.bus_type == 'nvme': + weight += 20 + elif device.bus_type == 'sata': + weight += 10 + + result[device] = weight + + return result + +def filter_disks_below_size_in_gb(devices, gigabytes): + for disk in devices: + if disk.size >= gigabytes: + yield disk + +def select_largest_device(devices, gigabytes, filter_out=None): + if not filter_out: + filter_out = [] + + copy_devices = [*devices] + for filter_device in filter_out: + if filter_device in copy_devices: + copy_devices.pop(copy_devices.index(filter_device)) + + copy_devices = list(filter_disks_below_size_in_gb(copy_devices, gigabytes)) + + if not len(copy_devices): + return None + + return max(copy_devices, key=(lambda device : device.size)) + +def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): + if not filter_out: + filter_out = [] + + copy_devices = [*devices] + for filter_device in filter_out: + if filter_device in copy_devices: + copy_devices.pop(copy_devices.index(filter_device)) + + if not len(copy_devices): + return None + + return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) + +def convert_to_gigabytes(string): + unit = string.strip()[-1] + size = float(string.strip()[:-1]) + + if unit == 'M': + size = size / 1024 + elif unit == 'T': + size = size * 1024 + + return size + +def device_state(name, *args, **kwargs): + # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 + if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): + with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: + if f.read(1) == '1': + return + + path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name))) + hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire") + for bus in hotplug_buses: + if os.path.exists('/sys/bus/{}'.format(bus)): + for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)): + device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus))) + if re.search(device_link, path): + return + return True + +# lsblk --json -l -n -o path +def all_disks(*args, **kwargs): + kwargs.setdefault("partitions", False) + drives = {} + + lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8')) + for drive in lsblk['blockdevices']: + if not kwargs['partitions'] and drive['type'] == 'part': + continue + + drives[drive['path']] = BlockDevice(drive['path'], drive) + + return drives + + +def harddrive(size=None, model=None, fuzzy=False): + collection = all_disks() + for drive in collection: + if size and convert_to_gigabytes(collection[drive]['size']) != size: + continue + if model and (collection[drive]['model'] is None or collection[drive]['model'].lower() != model.lower()): + continue + + return collection[drive] + + +def get_mount_info(path) -> dict: + try: + output = SysCommand(f'/usr/bin/findmnt --json {path}').decode('UTF-8') + except SysCallError: + return {} + + if not output: + return {} + + output = json.loads(output) + if 'filesystems' in output: + if len(output['filesystems']) > 1: + raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}") + + return output['filesystems'][0] + + +def get_partitions_in_use(mountpoint) -> list: + try: + output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') + except SysCallError: + return [] + + mounts = [] + + if not output: + return [] + + output = json.loads(output) + for target in output.get('filesystems', []): + mounts.append(Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])) + + for child in target.get('children', []): + mounts.append(Partition(child['source'], None, filesystem=child.get('fstype', None), mountpoint=child['target'])) + + return mounts + + +def get_filesystem_type(path): + try: + return SysCommand(f"blkid -o value -s TYPE {path}").decode('UTF-8').strip() + except SysCallError: + return None + + +def disk_layouts(): + try: + return json.loads(SysCommand("lsblk -f -o+TYPE,SIZE -J").decode('UTF-8')) + except SysCallError as err: + log(f"Could not return disk layouts: {err}") + return None + + +def encrypted_partitions(blockdevices :dict) -> bool: + for partition in blockdevices.values(): + if partition.get('encrypted', False): + yield partition + +def find_partition_by_mountpoint(block_devices, relative_mountpoint :str): + for device in block_devices: + for partition in block_devices[device]['partitions']: + if partition.get('mountpoint', None) == relative_mountpoint: + return partition \ No newline at end of file -- cgit v1.2.3-54-g00ecf