Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-01-06 22:01:15 +0100
committerGitHub <noreply@github.com>2022-01-06 22:01:15 +0100
commite32cf71ae7dacbf9674262705cb2e8e1a5a2d206 (patch)
treed33e6202d10813221a9935dcdc49145f6ffca83e /archinstall/lib/disk
parent015cd2a59fbdf3316ddfb7546b884157ad00c7fe (diff)
Added type annotations to all functions (#845)
* Added type annotations for 1/5 of the files. There's bound to be some issues with type miss-match, will sort that out later. * Added type hints for 4/5 of the code * Added type hints for 4.7/5 of the code * Added type hints for 5/5 of the code base * Split the linters into individual files This should help with more clearly show which runner is breaking since they don't share a single common name any longer. Also moved mypy settings into pyproject.toml * Fixed some of the last flake8 issues * Missing parameter * Fixed invalid lookahead types * __future__ had to be at the top * Fixed last flake8 issues
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/blockdevice.py64
-rw-r--r--archinstall/lib/disk/btrfs.py18
-rw-r--r--archinstall/lib/disk/filesystem.py44
-rw-r--r--archinstall/lib/disk/helpers.py47
-rw-r--r--archinstall/lib/disk/partition.py94
-rw-r--r--archinstall/lib/disk/user_guides.py16
-rw-r--r--archinstall/lib/disk/validators.py6
7 files changed, 174 insertions, 115 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 2be31375..5288f92b 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -1,14 +1,20 @@
+from __future__ import annotations
import os
import json
import logging
import time
+from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .partition import Partition
+
from ..exceptions import DiskError
from ..output import log
from ..general import SysCommand
from ..storage import storage
class BlockDevice:
- def __init__(self, path, info=None):
+ def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
from .helpers import all_disks
# If we don't give any information, we need to auto-fill it.
@@ -24,32 +30,32 @@ class BlockDevice:
# It's actually partition-encryption, but for future-proofing this
# I'm placing the encryption password on a BlockDevice level.
- def __repr__(self, *args, **kwargs):
+ def __repr__(self, *args :str, **kwargs :str) -> str:
return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
- def __iter__(self):
+ def __iter__(self) -> Iterator[Partition]:
for partition in self.partitions:
yield self.partitions[partition]
- def __getitem__(self, key, *args, **kwargs):
+ def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any:
if key not in self.info:
raise KeyError(f'{self} does not contain information: "{key}"')
return self.info[key]
- def __len__(self):
+ def __len__(self) -> int:
return len(self.partitions)
- def __lt__(self, left_comparitor):
+ def __lt__(self, left_comparitor :'BlockDevice') -> bool:
return self.path < left_comparitor.path
- def json(self):
+ def json(self) -> str:
"""
json() has precedence over __dump__, so this is a way
to give less/partial information for user readability.
"""
return self.path
- def __dump__(self):
+ def __dump__(self) -> Dict[str, Dict[str, Any]]:
return {
self.path : {
'partuuid' : self.uuid,
@@ -59,14 +65,14 @@ class BlockDevice:
}
@property
- def partition_type(self):
+ def partition_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['pttype']
@property
- def device_or_backfile(self):
+ def device_or_backfile(self) -> str:
"""
Returns the actual device-endpoint of the BlockDevice.
If it's a loop-back-device it returns the back-file,
@@ -82,7 +88,7 @@ class BlockDevice:
return self.device
@property
- def device(self):
+ def device(self) -> str:
"""
Returns the device file of the BlockDevice.
If it's a loop-back-device it returns the /dev/X device,
@@ -108,7 +114,7 @@ class BlockDevice:
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@property
- def partitions(self):
+ def partitions(self) -> Dict[str, Partition]:
from .filesystem import Partition
self.partprobe()
@@ -133,17 +139,19 @@ class BlockDevice:
return {k: self.part_cache[k] for k in sorted(self.part_cache)}
@property
- def partition(self):
+ def partition(self) -> Partition:
all_partitions = self.partitions
return [all_partitions[k] for k in all_partitions]
@property
- def partition_table_type(self):
+ def partition_table_type(self) -> int:
+ # TODO: Don't hardcode :)
+ # Remove if we don't use this function anywhere
from .filesystem import GPT
return GPT
@property
- def uuid(self):
+ def uuid(self) -> str:
log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
Returns the disk UUID as returned by lsblk.
@@ -153,7 +161,7 @@ class BlockDevice:
return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
@property
- def size(self):
+ def size(self) -> float:
from .helpers import convert_size_to_gb
output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
@@ -162,21 +170,21 @@ class BlockDevice:
return convert_size_to_gb(device['size'])
@property
- def bus_type(self):
+ def bus_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
@property
- def spinning(self):
+ def spinning(self) -> bool:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
@property
- def free_space(self):
+ def free_space(self) -> Tuple[str, str, str]:
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
@@ -187,7 +195,7 @@ class BlockDevice:
yield (start, end, size)
@property
- def largest_free_space(self):
+ def largest_free_space(self) -> List[str]:
info = []
for space_info in self.free_space:
if not info:
@@ -199,7 +207,7 @@ class BlockDevice:
return info
@property
- def first_free_sector(self):
+ def first_free_sector(self) -> str:
if info := self.largest_free_space:
start = info[0]
else:
@@ -207,29 +215,29 @@ class BlockDevice:
return start
@property
- def first_end_sector(self):
+ def first_end_sector(self) -> str:
if info := self.largest_free_space:
end = info[1]
else:
end = f"{self.size}GB"
return end
- def partprobe(self):
- SysCommand(['partprobe', self.path])
+ def partprobe(self) -> bool:
+ return SysCommand(['partprobe', self.path]).exit_code == 0
- def has_partitions(self):
+ def has_partitions(self) -> int:
return len(self.partitions)
- def has_mount_point(self, mountpoint):
+ def has_mount_point(self, mountpoint :str) -> bool:
for partition in self.partitions:
if self.partitions[partition].mountpoint == mountpoint:
return True
return False
- def flush_cache(self):
+ def flush_cache(self) -> None:
self.part_cache = {}
- def get_partition(self, uuid):
+ def get_partition(self, uuid :str) -> Partition:
count = 0
while count < 5:
for partition_uuid, partition in self.partitions.items():
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py
index fb9712f8..084e85d2 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs.py
@@ -1,7 +1,12 @@
+from __future__ import annotations
import pathlib
import glob
import logging
-from typing import Union
+from typing import Union, Dict, TYPE_CHECKING
+
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from ..installer import Installer
from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
@@ -9,7 +14,7 @@ from ..output import log
from .partition import Partition
-def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
+def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
This function uses mount to mount a subvolume on a given device, at a given location with a given subvolume name.
@@ -42,7 +47,7 @@ def mount_subvolume(installation, subvolume_location :Union[pathlib.Path, str],
return SysCommand(f"mount {mount_information['source']} {target} -o subvol=@{subvolume_location}").exit_code == 0
-def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str]) -> bool:
+def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool:
"""
This function uses btrfs to create a subvolume.
@@ -75,7 +80,12 @@ def create_subvolume(installation, subvolume_location :Union[pathlib.Path, str])
if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0:
raise DiskError(f"Could not create a subvolume at {target}: {cmd}")
-def manage_btrfs_subvolumes(installation, partition :dict, mountpoints :dict, subvolumes :dict, unlocked_device :dict = None):
+def manage_btrfs_subvolumes(installation :Installer,
+ partition :Dict[str, str],
+ mountpoints :Dict[str, str],
+ subvolumes :Dict[str, str],
+ unlocked_device :Dict[str, str] = None
+) -> None:
""" we do the magic with subvolumes in a centralized place
parameters:
* the installation object
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 51ef949b..e6e965f1 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -1,7 +1,13 @@
+from __future__ import annotations
import time
import logging
import json
import pathlib
+from typing import Optional, Dict, Any, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .blockdevice import BlockDevice
+
from .partition import Partition
from .validators import valid_fs_type
from ..exceptions import DiskError
@@ -16,24 +22,25 @@ class Filesystem:
# TODO:
# When instance of a HDD is selected, check all usages and gracefully unmount them
# as well as close any crypto handles.
- def __init__(self, blockdevice, mode):
+ def __init__(self, blockdevice :BlockDevice, mode :int):
self.blockdevice = blockdevice
self.mode = mode
- def __enter__(self, *args, **kwargs):
+ def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
return self
- def __repr__(self):
+ def __repr__(self) -> str:
return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"
- def __exit__(self, *args, **kwargs):
+ def __exit__(self, *args :str, **kwargs :str) -> bool:
# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
if len(args) >= 2 and args[1]:
raise args[1]
+
SysCommand('sync')
return True
- def partuuid_to_index(self, uuid):
+ def partuuid_to_index(self, uuid :str) -> Optional[int]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
time.sleep(5)
@@ -50,7 +57,7 @@ class Filesystem:
raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")
- def load_layout(self, layout :dict):
+ def load_layout(self, layout :Dict[str, Any]) -> None:
from ..luks import luks2
# If the layout tells us to wipe the drive, we do so
@@ -127,21 +134,21 @@ class Filesystem:
log(f"Marking partition {partition['device_instance']} as bootable.")
self.set(self.partuuid_to_index(partition['device_instance'].uuid), 'boot on')
- def find_partition(self, mountpoint):
+ def find_partition(self, mountpoint :str) -> Partition:
for partition in self.blockdevice:
if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
return partition
- def partprobe(self):
- SysCommand(f'bash -c "partprobe"')
+ def partprobe(self) -> bool:
+ return SysCommand(f'bash -c "partprobe"').exit_code == 0
- def raw_parted(self, string: str):
+ def raw_parted(self, string: str) -> SysCommand:
if (cmd_handle := SysCommand(f'/usr/bin/parted -s {string}')).exit_code != 0:
log(f"Parted ended with a bad exit code: {cmd_handle}", level=logging.ERROR, fg="red")
time.sleep(0.5)
return cmd_handle
- def parted(self, string: str):
+ def parted(self, string: str) -> bool:
"""
Performs a parted execution of the given string
@@ -149,16 +156,17 @@ class Filesystem:
:type string: str
"""
if (parted_handle := self.raw_parted(string)).exit_code == 0:
- self.partprobe()
- return True
+ if self.partprobe():
+ return True
+ return False
else:
raise DiskError(f"Parted failed to add a partition: {parted_handle}")
- def use_entire_disk(self, root_filesystem_type='ext4') -> Partition:
+ def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type, start, end, partition_format=None):
+ def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None) -> None:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
previous_partition_uuids = {partition.uuid for partition in self.blockdevice.partitions.values()}
@@ -197,14 +205,14 @@ class Filesystem:
log("Add partition is exiting due to excessive wait time",level=logging.INFO)
raise DiskError(f"New partition never showed up after adding new partition on {self}.")
- def set_name(self, partition: int, name: str):
+ def set_name(self, partition: int, name: str) -> bool:
return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0
- def set(self, partition: int, string: str):
+ def set(self, partition: int, string: str) -> bool:
log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0
- def parted_mklabel(self, device: str, disk_label: str):
+ def parted_mklabel(self, device: str, disk_label: str) -> bool:
log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
# Try to unmount devices before attempting to run mklabel
try:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index ba29744f..e9f6bc10 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -1,10 +1,15 @@
+from __future__ import annotations
import json
import logging
import os
import pathlib
import re
import time
-from typing import Union
+from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .partition import Partition
+
from .blockdevice import BlockDevice
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
@@ -14,10 +19,10 @@ from ..storage import storage
ROOT_DIR_PATTERN = re.compile('^.*?/devices')
GIGA = 2 ** 30
-def convert_size_to_gb(size):
+def convert_size_to_gb(size :Union[int, float]) -> float:
return round(size / GIGA,1)
-def sort_block_devices_based_on_performance(block_devices):
+def sort_block_devices_based_on_performance(block_devices :List[BlockDevice]) -> Dict[BlockDevice, int]:
result = {device: 0 for device in block_devices}
for device, weight in result.items():
@@ -35,12 +40,12 @@ def sort_block_devices_based_on_performance(block_devices):
return result
-def filter_disks_below_size_in_gb(devices, gigabytes):
+def filter_disks_below_size_in_gb(devices :List[BlockDevice], gigabytes :int) -> Iterator[BlockDevice]:
for disk in devices:
if disk.size >= gigabytes:
yield disk
-def select_largest_device(devices, gigabytes, filter_out=None):
+def select_largest_device(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
if not filter_out:
filter_out = []
@@ -56,7 +61,7 @@ def select_largest_device(devices, gigabytes, filter_out=None):
return max(copy_devices, key=(lambda device : device.size))
-def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
+def select_disk_larger_than_or_close_to(devices :List[BlockDevice], gigabytes :int, filter_out :Optional[List[BlockDevice]] = None) -> BlockDevice:
if not filter_out:
filter_out = []
@@ -70,7 +75,7 @@ def select_disk_larger_than_or_close_to(devices, gigabytes, filter_out=None):
return min(copy_devices, key=(lambda device : abs(device.size - gigabytes)))
-def convert_to_gigabytes(string):
+def convert_to_gigabytes(string :str) -> float:
unit = string.strip()[-1]
size = float(string.strip()[:-1])
@@ -81,7 +86,7 @@ def convert_to_gigabytes(string):
return size
-def device_state(name, *args, **kwargs):
+def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
# Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
@@ -99,7 +104,7 @@ def device_state(name, *args, **kwargs):
return True
# lsblk --json -l -n -o path
-def all_disks(*args, **kwargs):
+def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
kwargs.setdefault("partitions", False)
drives = {}
@@ -113,7 +118,7 @@ def all_disks(*args, **kwargs):
return drives
-def harddrive(size=None, model=None, fuzzy=False):
+def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
collection = all_disks()
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
@@ -133,7 +138,7 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
-def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_path=False) -> dict:
+def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
device_path,bind_path = split_bind_name(path)
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
@@ -170,7 +175,7 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse=False, return_real_p
return {}
-def get_partitions_in_use(mountpoint) -> list:
+def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
@@ -193,7 +198,7 @@ def get_partitions_in_use(mountpoint) -> list:
return mounts
-def get_filesystem_type(path):
+def get_filesystem_type(path :str) -> Optional[str]:
device_name, bind_name = split_bind_name(path)
try:
return SysCommand(f"blkid -o value -s TYPE {device_name}").decode('UTF-8').strip()
@@ -201,10 +206,10 @@ def get_filesystem_type(path):
return None
-def disk_layouts():
+def disk_layouts() -> Optional[Dict[str, Any]]:
try:
if (handle := SysCommand("lsblk -f -o+TYPE,SIZE -J")).exit_code == 0:
- return json.loads(handle.decode('UTF-8'))
+ return {str(key): val for key, val in json.loads(handle.decode('UTF-8')).items()}
else:
log(f"Could not return disk layouts: {handle}", level=logging.WARNING, fg="yellow")
return None
@@ -216,20 +221,22 @@ def disk_layouts():
return None
-def encrypted_partitions(blockdevices :dict) -> bool:
+def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool:
for partition in blockdevices.values():
if partition.get('encrypted', False):
yield partition
-def find_partition_by_mountpoint(block_devices, relative_mountpoint :str):
+def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition:
for device in block_devices:
for partition in block_devices[device]['partitions']:
if partition.get('mountpoint', None) == relative_mountpoint:
return partition
-def partprobe():
- SysCommand(f'bash -c "partprobe"')
- time.sleep(5)
+def partprobe() -> bool:
+ if SysCommand(f'bash -c "partprobe"').exit_code == 0:
+ time.sleep(5) # TODO: Remove, we should be relying on blkid instead of lsblk
+ return True
+ return False
def convert_device_to_uuid(path :str) -> str:
device_name, bind_name = split_bind_name(path)
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index bb6f2d53..b8fa2b79 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -5,7 +5,8 @@ import logging
import json
import os
import hashlib
-from typing import Optional
+from typing import Optional, Dict, Any, List, Union
+
from .blockdevice import BlockDevice
from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
@@ -15,7 +16,15 @@ from ..general import SysCommand
class Partition:
- def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
+ def __init__(self,
+ path: str,
+ block_device: BlockDevice,
+ part_id :Optional[str] = None,
+ filesystem :Optional[str] = None,
+ mountpoint :Optional[str] = None,
+ encrypted :bool = False,
+ autodetect_filesystem :bool = True):
+
if not part_id:
part_id = os.path.basename(path)
@@ -50,14 +59,16 @@ class Partition:
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
- def __lt__(self, left_comparitor):
+ def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
else:
left_comparitor = str(left_comparitor)
- return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
- def __repr__(self, *args, **kwargs):
+ # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
+ return self.path < left_comparitor
+
+ def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
if self.mountpoint:
mount_repr = f", mounted={self.mountpoint}"
@@ -69,7 +80,7 @@ class Partition:
else:
return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
- def __dump__(self):
+ def __dump__(self) -> Dict[str, Any]:
return {
'type': 'primary',
'PARTUUID': self._safe_uuid,
@@ -86,14 +97,14 @@ class Partition:
}
@property
- def sector_size(self):
+ def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
@property
- def start(self):
+ def start(self) -> Optional[str]:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
@@ -101,7 +112,7 @@ class Partition:
return partition['start'] # * self.sector_size
@property
- def end(self):
+ def end(self) -> Optional[str]:
# TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
@@ -110,7 +121,7 @@ class Partition:
return partition['size'] # * self.sector_size
@property
- def size(self):
+ def size(self) -> Optional[float]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
@@ -123,7 +134,7 @@ class Partition:
time.sleep(storage['DISK_TIMEOUTS'])
@property
- def boot(self):
+ def boot(self) -> bool:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
# Get the bootable flag from the sfdisk output:
@@ -143,7 +154,7 @@ class Partition:
return False
@property
- def partition_type(self):
+ def partition_type(self) -> Optional[str]:
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
@@ -179,19 +190,19 @@ class Partition:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
@property
- def encrypted(self):
+ def encrypted(self) -> Union[bool, None]:
return self._encrypted
@encrypted.setter
- def encrypted(self, value: bool):
+ def encrypted(self, value: bool) -> None:
self._encrypted = value
@property
- def parent(self):
+ def parent(self) -> str:
return self.real_device
@property
- def real_device(self):
+ def real_device(self) -> str:
for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
@@ -199,25 +210,27 @@ class Partition:
return self.path
@property
- def device_path(self):
+ def device_path(self) -> str:
""" for bind mounts returns the phisical path of the partition
"""
device_path, bind_name = split_bind_name(self.path)
return device_path
@property
- def bind_name(self):
+ def bind_name(self) -> str:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
device_path, bind_name = split_bind_name(self.path)
return bind_name
- def partprobe(self):
- SysCommand(f'bash -c "partprobe"')
- time.sleep(1)
+ def partprobe(self) -> bool:
+ if SysCommand(f'bash -c "partprobe"').exit_code == 0:
+ time.sleep(1)
+ return True
+ return False
- def detect_inner_filesystem(self, password):
+ def detect_inner_filesystem(self, password :str) -> Optional[str]:
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
from ..luks import luks2
@@ -227,7 +240,7 @@ class Partition:
except SysCallError:
return None
- def has_content(self):
+ def has_content(self) -> bool:
fs_type = get_filesystem_type(self.path)
if not fs_type or "swap" in fs_type:
return False
@@ -248,7 +261,7 @@ class Partition:
return True if files > 0 else False
- def encrypt(self, *args, **kwargs):
+ def encrypt(self, *args :str, **kwargs :str) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
@@ -257,7 +270,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem=None, path=None, log_formatting=True, options=[]):
+ def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -342,7 +355,7 @@ class Partition:
return True
- def find_parent_of(self, data, name, parent=None):
+ def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]:
if data['name'] == name:
return parent
elif 'children' in data:
@@ -350,7 +363,7 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
- def mount(self, target, fs=None, options=''):
+ def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
@@ -386,25 +399,24 @@ class Partition:
self.mountpoint = target
return True
- def unmount(self):
- try:
- SysCommand(f"/usr/bin/umount {self.path}")
- except SysCallError as err:
- exit_code = err.exit_code
-
- # Without to much research, it seams that low error codes are errors.
- # And above 8k is indicators such as "/dev/x not mounted.".
- # So anything in between 0 and 8k are errors (?).
- if 0 < exit_code < 8000:
- raise err
+ return False
+
+ def unmount(self) -> bool:
+ worker = SysCommand(f"/usr/bin/umount {self.path}")
+
+ # Without to much research, it seams that low error codes are errors.
+ # And above 8k is indicators such as "/dev/x not mounted.".
+ # So anything in between 0 and 8k are errors (?).
+ if 0 < worker.exit_code < 8000:
+ raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
self.mountpoint = None
return True
- def umount(self):
+ def umount(self) -> bool:
return self.unmount()
- def filesystem_supported(self):
+ def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions:
@@ -420,7 +432,7 @@ class Partition:
return True
-def get_mount_fs_type(fs):
+def get_mount_fs_type(fs :str) -> str:
if fs == 'ntfs':
return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
elif fs == 'fat32':
diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py
index 3d48c104..b0a8fe8a 100644
--- a/archinstall/lib/disk/user_guides.py
+++ b/archinstall/lib/disk/user_guides.py
@@ -1,8 +1,17 @@
+from __future__ import annotations
import logging
+from typing import Optional, Dict, Any, List, TYPE_CHECKING
+# https://stackoverflow.com/a/39757388/929999
+if TYPE_CHECKING:
+ from .blockdevice import BlockDevice
+
from .helpers import sort_block_devices_based_on_performance, select_largest_device, select_disk_larger_than_or_close_to
from ..output import log
-def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_options=False):
+def suggest_single_disk_layout(block_device :BlockDevice,
+ default_filesystem :Optional[str] = None,
+ advanced_options :bool = False) -> Dict[str, Any]:
+
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
default_filesystem = ask_for_main_filesystem_format(advanced_options)
@@ -94,7 +103,10 @@ def suggest_single_disk_layout(block_device, default_filesystem=None, advanced_o
return layout
-def suggest_multi_disk_layout(block_devices, default_filesystem=None, advanced_options=False):
+def suggest_multi_disk_layout(block_devices :List[BlockDevice],
+ default_filesystem :Optional[str] = None,
+ advanced_options :bool = False) -> Dict[str, Any]:
+
if not default_filesystem:
from ..user_interaction import ask_for_main_filesystem_format
default_filesystem = ask_for_main_filesystem_format(advanced_options)
diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py
index 464f0d73..fd1b7f33 100644
--- a/archinstall/lib/disk/validators.py
+++ b/archinstall/lib/disk/validators.py
@@ -1,4 +1,6 @@
-def valid_parted_position(pos :str):
+from typing import List
+
+def valid_parted_position(pos :str) -> bool:
if not len(pos):
return False
@@ -17,7 +19,7 @@ def valid_parted_position(pos :str):
return False
-def fs_types():
+def fs_types() -> List[str]:
# https://www.gnu.org/software/parted/manual/html_node/mkpart.html
# Above link doesn't agree with `man parted` /mkpart documentation:
"""