index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
author | Anton Hvornum <anton@hvornum.se> | 2022-01-06 22:01:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-06 22:01:15 +0100 |
commit | e32cf71ae7dacbf9674262705cb2e8e1a5a2d206 (patch) | |
tree | d33e6202d10813221a9935dcdc49145f6ffca83e /archinstall/lib/disk | |
parent | 015cd2a59fbdf3316ddfb7546b884157ad00c7fe (diff) |
-rw-r--r-- | archinstall/lib/disk/blockdevice.py | 64 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs.py | 18 | ||||
-rw-r--r-- | archinstall/lib/disk/filesystem.py | 44 | ||||
-rw-r--r-- | archinstall/lib/disk/helpers.py | 47 | ||||
-rw-r--r-- | archinstall/lib/disk/partition.py | 94 | ||||
-rw-r--r-- | archinstall/lib/disk/user_guides.py | 16 | ||||
-rw-r--r-- | archinstall/lib/disk/validators.py | 6 |
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index 2be31375..5288f92b 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,14 +1,20 @@ +from __future__ import annotations import os import json import logging import time +from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from ..exceptions import DiskError from ..output import log from ..general import SysCommand from ..storage import storage class BlockDevice: - def __init__(self, path, info=None): + def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_disks # If we don't give any information, we need to auto-fill it. @@ -24,32 +30,32 @@ class BlockDevice: # It's actually partition-encryption, but for future-proofing this # I'm placing the encryption password on a BlockDevice level. - def __repr__(self, *args, **kwargs): + def __repr__(self, *args :str, **kwargs :str) -> str: return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})" - def __iter__(self): + def __iter__(self) -> Iterator[Partition]: for partition in self.partitions: yield self.partitions[partition] - def __getitem__(self, key, *args, **kwargs): + def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if key not in self.info: raise KeyError(f'{self} does not contain information: "{key}"') return self.info[key] - def __len__(self): + def __len__(self) -> int: return len(self.partitions) - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :'BlockDevice') -> bool: return self.path < left_comparitor.path - def json(self): + def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ return self.path - def __dump__(self): + def __dump__(self) -> Dict[str, Dict[str, Any]]: return { self.path : { 'partuuid' : self.uuid, @@ -59,14 +65,14 @@ class BlockDevice: } @property - def partition_type(self): + def partition_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['pttype'] @property - def device_or_backfile(self): + def device_or_backfile(self) -> str: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -82,7 +88,7 @@ class BlockDevice: return self.device @property - def device(self): + def device(self) -> str: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -108,7 +114,7 @@ class BlockDevice: # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @property - def partitions(self): + def partitions(self) -> Dict[str, Partition]: from .filesystem import Partition self.partprobe() @@ -133,17 +139,19 @@ class BlockDevice: return {k: self.part_cache[k] for k in sorted(self.part_cache)} @property - def partition(self): + def partition(self) -> Partition: all_partitions = self.partitions return [all_partitions[k] for k in all_partitions] @property - def partition_table_type(self): + def partition_table_type(self) -> int: + # TODO: Don't hardcode :) + # Remove if we don't use this function anywhere from .filesystem import GPT return GPT @property - def uuid(self): + def uuid(self) -> str: log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') """ Returns the disk UUID as returned by lsblk. @@ -153,7 +161,7 @@ class BlockDevice: return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') @property - def size(self): + def size(self) -> float: from .helpers import convert_size_to_gb output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) @@ -162,21 +170,21 @@ class BlockDevice: return convert_size_to_gb(device['size']) @property - def bus_type(self): + def bus_type(self) -> str: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['tran'] @property - def spinning(self): + def spinning(self) -> bool: output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) for device in output['blockdevices']: return device['rota'] is True @property - def free_space(self): + def free_space(self) -> Tuple[str, str, str]: # NOTE: parted -s will default to `cancel` on prompt, skipping any partition # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, # so the free will ignore the ESP partition and just give the "free" space. @@ -187,7 +195,7 @@ class BlockDevice: yield (start, end, size) @property - def largest_free_space(self): + def largest_free_space(self) -> List[str]: info = [] for space_info in self.free_space: if not info: @@ -199,7 +207,7 @@ class BlockDevice: return info @property - def first_free_sector(self): + def first_free_sector(self) -> str: if info := self.largest_free_space: start = info[0] else: @@ -207,29 +215,29 @@ class BlockDevice: return start @property - def first_end_sector(self): + def first_end_sector(self) -> str: if info := self.largest_free_space: end = info[1] else: end = f"{self.size}GB" return end - def partprobe(self): - SysCommand(['partprobe', self.path]) + def partprobe(self) -> bool: + return SysCommand(['partprobe', self.path]).exit_code == 0 - def has_partitions(self): + def has_partitions(self) -> int: return len(self.partitions) - def has_mount_point(self, mountpoint): + def has_mount_point(self, mountpoint :str) -> bool: for partition in self.partitions: if self.partitions[partition].mountpoint == mountpoint: return True return False - def flush_cache(self): + def flush_cache(self) -> None: self.part_cache = {} - def get_partition(self, uuid): + def get_partition(self, uuid :str) -> Partition: count = 0 while count < 5: for partition_uuid, partition in self.partitions.items(): diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index fb9712f8..084e85d2 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -1,7 +1,12 @@ +from __future__ import annotations import pathlib import glob import logging -from typing import Union +from typing import Union, Dict, TYPE_CHECKING + +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from ..installer import Installer from .helpers import get_mount_info from ..exceptions import DiskError from ..general import SysCommand @@ -9,7 +14,7 @@ from ..output import log from .partition import Partition -def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: +def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: """ This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name. @@ -42,7 +47,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0 -def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -75,7 +80,12 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") -def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None): +def manage_btrfs_subvolumes(installation :Installer, + partition :Dict[str, str], + mountpoints :Dict[str, str], + subvolumes :Dict[str, str], + unlocked_device :Dict[str, str] = None +) -> None: """ we do the magic with subvolumes in a centralized place parameters: * the installation object diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index 51ef949b..e6e965f1 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -1,7 +1,13 @@ +from __future__ import annotations import time import logging import json import pathlib +from typing import Optional, Dict, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .partition import Partition from .validators import valid_fs_type from ..exceptions import DiskError @@ -16,24 +22,25 @@ class Filesystem: # TODO: # When instance of a HDD is selected, check all usages and gracefully unmount them # as well as close any crypto handles. - def __init__(self, blockdevice, mode): + def __init__(self, blockdevice :BlockDevice, mode :int): self.blockdevice = blockdevice self.mode = mode - def __enter__(self, *args, **kwargs): + def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem': return self - def __repr__(self): + def __repr__(self) -> str: return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})" - def __exit__(self, *args, **kwargs): + def __exit__(self, *args :str, **kwargs :str) -> bool: # TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager if len(args) >= 2 and args[1]: raise args[1] + SysCommand('sync') return True - def partuuid_to_index(self, uuid): + def partuuid_to_index(self, uuid :str) -> Optional[int]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() time.sleep(5) @@ -50,7 +57,7 @@ class Filesystem: raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}") - def load_layout(self, layout :dict): + def load_layout(self, layout :Dict[str, Any]) -> None: from ..luks import luks2 # If the layout tells us to wipe the drive, we do so @@ -127,21 +134,21 @@ class Filesystem: log(f"Marking partition {partition['device_instance']} as bootable.") self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on') - def find_partition(self, mountpoint): + def find_partition(self, mountpoint :str) -> Partition: for partition in self.blockdevice: if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint: return partition - def partprobe(self): - SysCommand(f'bash -c "partprobe"') + def partprobe(self) -> bool: + return SysCommand(f'bash -c "partprobe"').exit_code == 0 - def raw_parted(self, string: str): + def raw_parted(self, string: str) -> SysCommand: if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0: log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red") time.sleep(0.5) return cmd_handle - def parted(self, string: str): + def parted(self, string: str) -> bool: """ Performs a parted execution of the given string @@ -149,16 +156,17 @@ class Filesystem: :type string: str """ if (parted_handle := self.raw_parted(string)).exit_code == 0: - self.partprobe() - return True + if self.partprobe(): + return True + return False else: raise DiskError(f"Parted failed to add a partition: {parted_handle}") - def use_entire_disk(self, root_filesystem_type='ext4') -> Partition: + def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type, start, end, partition_format=None): + def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()} @@ -197,14 +205,14 @@ class Filesystem: log("Add partition is exiting due to excessive wait time",level=logging.INFO) raise DiskError(f"New partition never showed up after adding new partition on {self}.") - def set_name(self, partition: int, name: str): + def set_name(self, partition: int, name: str) -> bool: return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0 - def set(self, partition: int, string: str): + def set(self, partition: int, string: str) -> bool: log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO) return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0 - def parted_mklabel(self, device: str, disk_label: str): + def parted_mklabel(self, device: str, disk_label: str) -> bool: log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow") # Try to unmount devices before attempting to run mklabel try: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index ba29744f..e9f6bc10 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -1,10 +1,15 @@ +from __future__ import annotations import json import logging import os import pathlib import re import time -from typing import Union +from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .partition import Partition + from .blockdevice import BlockDevice from ..exceptions import SysCallError, DiskError from ..general import SysCommand @@ -14,10 +19,10 @@ from ..storage import storage ROOT_DIR_PATTERN = re.compile('^.*?/devices') GIGA = 2 ** 30 -def convert_size_to_gb(size): +def convert_size_to_gb(size :Union[int, float]) -> float: return round(size / GIGA,1) -def sort_block_devices_based_on_performance(block_devices): +def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]: result = {device: 0 for device in block_devices} for device, weight in result.items(): @@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices): return result -def filter_disks_below_size_in_gb(devices, gigabytes): +def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]: for disk in devices: if disk.size >= gigabytes: yield disk -def select_largest_device(devices, gigabytes, filter_out=None): +def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None): return max(copy_devices, key=(lambda device : device.size)) -def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): +def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice: if not filter_out: filter_out = [] @@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None): return min(copy_devices, key=(lambda device : abs(device.size - gigabytes))) -def convert_to_gigabytes(string): +def convert_to_gigabytes(string :str) -> float: unit = string.strip()[-1] size = float(string.strip()[:-1]) @@ -81,7 +86,7 @@ def convert_to_gigabytes(string): return size -def device_state(name, *args, **kwargs): +def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709 if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)): with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f: @@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs): return True # lsblk --json -l -n -o path -def all_disks(*args, **kwargs): +def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: kwargs.setdefault("partitions", False) drives = {} @@ -113,7 +118,7 @@ def all_disks(*args, **kwargs): return drives -def harddrive(size=None, model=None, fuzzy=False): +def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: collection = all_disks() for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: @@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path -def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict: +def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: device_path,bind_path = split_bind_name(path) for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: @@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p return {} -def get_partitions_in_use(mountpoint) -> list: +def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: @@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list: return mounts -def get_filesystem_type(path): +def get_filesystem_type(path :str) -> Optional[str]: device_name, bind_name = split_bind_name(path) try: return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip() @@ -201,10 +206,10 @@ def get_filesystem_type(path): return None -def disk_layouts(): +def disk_layouts() -> Optional[Dict[str, Any]]: try: if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0: - return json.loads(handle.decode('UTF-8')) + return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()} else: log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow") return None @@ -216,20 +221,22 @@ def disk_layouts(): return None -def encrypted_partitions(blockdevices :dict) -> bool: +def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: for partition in blockdevices.values(): if partition.get('encrypted', False): yield partition -def find_partition_by_mountpoint(block_devices, relative_mountpoint :str): +def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: for partition in block_devices[device]['partitions']: if partition.get('mountpoint', None) == relative_mountpoint: return partition -def partprobe(): - SysCommand(f'bash -c "partprobe"') - time.sleep(5) +def partprobe() -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk + return True + return False def convert_device_to_uuid(path :str) -> str: device_name, bind_name = split_bind_name(path) diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index bb6f2d53..b8fa2b79 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,7 +5,8 @@ import logging import json import os import hashlib -from typing import Optional +from typing import Optional, Dict, Any, List, Union + from .blockdevice import BlockDevice from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage @@ -15,7 +16,15 @@ from ..general import SysCommand class Partition: - def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True): + def __init__(self, + path: str, + block_device: BlockDevice, + part_id :Optional[str] = None, + filesystem :Optional[str] = None, + mountpoint :Optional[str] = None, + encrypted :bool = False, + autodetect_filesystem :bool = True): + if not part_id: part_id = os.path.basename(path) @@ -50,14 +59,16 @@ class Partition: if self.filesystem == 'crypto_LUKS': self.encrypted = True - def __lt__(self, left_comparitor): + def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path else: left_comparitor = str(left_comparitor) - return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct. - def __repr__(self, *args, **kwargs): + # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 + return self.path < left_comparitor + + def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' if self.mountpoint: mount_repr = f", mounted={self.mountpoint}" @@ -69,7 +80,7 @@ class Partition: else: return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - def __dump__(self): + def __dump__(self) -> Dict[str, Any]: return { 'type': 'primary', 'PARTUUID': self._safe_uuid, @@ -86,14 +97,14 @@ class Partition: } @property - def sector_size(self): + def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) for device in output['blockdevices']: return device.get('log-sec', None) @property - def start(self): + def start(self) -> Optional[str]: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) for partition in output.get('partitiontable', {}).get('partitions', []): @@ -101,7 +112,7 @@ class Partition: return partition['start'] # * self.sector_size @property - def end(self): + def end(self) -> Optional[str]: # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) @@ -110,7 +121,7 @@ class Partition: return partition['size'] # * self.sector_size @property - def size(self): + def size(self) -> Optional[float]: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() @@ -123,7 +134,7 @@ class Partition: time.sleep(storage['DISK_TIMEOUTS']) @property - def boot(self): + def boot(self) -> bool: output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) # Get the bootable flag from the sfdisk output: @@ -143,7 +154,7 @@ class Partition: return False @property - def partition_type(self): + def partition_type(self) -> Optional[str]: lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) for device in lsblk['blockdevices']: @@ -179,19 +190,19 @@ class Partition: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() @property - def encrypted(self): + def encrypted(self) -> Union[bool, None]: return self._encrypted @encrypted.setter - def encrypted(self, value: bool): + def encrypted(self, value: bool) -> None: self._encrypted = value @property - def parent(self): + def parent(self) -> str: return self.real_device @property - def real_device(self): + def real_device(self) -> str: for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): return f"/dev/{parent}" @@ -199,25 +210,27 @@ class Partition: return self.path @property - def device_path(self): + def device_path(self) -> str: """ for bind mounts returns the phisical path of the partition """ device_path, bind_name = split_bind_name(self.path) return device_path @property - def bind_name(self): + def bind_name(self) -> str: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ device_path, bind_name = split_bind_name(self.path) return bind_name - def partprobe(self): - SysCommand(f'bash -c "partprobe"') - time.sleep(1) + def partprobe(self) -> bool: + if SysCommand(f'bash -c "partprobe"').exit_code == 0: + time.sleep(1) + return True + return False - def detect_inner_filesystem(self, password): + def detect_inner_filesystem(self, password :str) -> Optional[str]: log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO) from ..luks import luks2 @@ -227,7 +240,7 @@ class Partition: except SysCallError: return None - def has_content(self): + def has_content(self) -> bool: fs_type = get_filesystem_type(self.path) if not fs_type or "swap" in fs_type: return False @@ -248,7 +261,7 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args, **kwargs): + def encrypt(self, *args :str, **kwargs :str) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ @@ -257,7 +270,7 @@ class Partition: handle = luks2(self, None, None) return handle.encrypt(self, *args, **kwargs) - def format(self, filesystem=None, path=None, log_formatting=True, options=[]): + def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool: """ Format can be given an overriding path, for instance /dev/null to test the formatting functionality and in essence the support for the given filesystem. @@ -342,7 +355,7 @@ class Partition: return True - def find_parent_of(self, data, name, parent=None): + def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]: if data['name'] == name: return parent elif 'children' in data: @@ -350,7 +363,7 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent - def mount(self, target, fs=None, options=''): + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: if not self.mountpoint: log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: @@ -386,25 +399,24 @@ class Partition: self.mountpoint = target return True - def unmount(self): - try: - SysCommand(f"/usr/bin/umount {self.path}") - except SysCallError as err: - exit_code = err.exit_code - - # Without to much research, it seams that low error codes are errors. - # And above 8k is indicators such as "/dev/x not mounted.". - # So anything in between 0 and 8k are errors (?). - if 0 < exit_code < 8000: - raise err + return False + + def unmount(self) -> bool: + worker = SysCommand(f"/usr/bin/umount {self.path}") + + # Without to much research, it seams that low error codes are errors. + # And above 8k is indicators such as "/dev/x not mounted.". + # So anything in between 0 and 8k are errors (?). + if 0 < worker.exit_code < 8000: + raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) self.mountpoint = None return True - def umount(self): + def umount(self) -> bool: return self.unmount() - def filesystem_supported(self): + def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling partition.format() with a path set to '/dev/null' which returns two exceptions: @@ -420,7 +432,7 @@ class Partition: return True -def get_mount_fs_type(fs): +def get_mount_fs_type(fs :str) -> str: if fs == 'ntfs': return 'ntfs3' # Needed to use the Paragon R/W NTFS driver elif fs == 'fat32': diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 3d48c104..b0a8fe8a 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -1,8 +1,17 @@ +from __future__ import annotations import logging +from typing import Optional, Dict, Any, List, TYPE_CHECKING +# https://stackoverflow.com/a/39757388/929999 +if TYPE_CHECKING: + from .blockdevice import BlockDevice + from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to from ..output import log -def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False): +def suggest_single_disk_layout(block_device :BlockDevice, + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) @@ -94,7 +103,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o return layout -def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False): +def suggest_multi_disk_layout(block_devices :List[BlockDevice], + default_filesystem :Optional[str] = None, + advanced_options :bool = False) -> Dict[str, Any]: + if not default_filesystem: from ..user_interaction import ask_for_main_filesystem_format default_filesystem = ask_for_main_filesystem_format(advanced_options) diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index 464f0d73..fd1b7f33 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -1,4 +1,6 @@ -def valid_parted_position(pos :str): +from typing import List + +def valid_parted_position(pos :str) -> bool: if not len(pos): return False @@ -17,7 +19,7 @@ def valid_parted_position(pos :str): return False -def fs_types(): +def fs_types() -> List[str]: # https://www.gnu.org/software/parted/manual/html_node/mkpart.html # Above link doesn't agree with `man parted` /mkpart documentation: """ |