Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/blockdevice.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/blockdevice.py')
-rw-r--r--archinstall/lib/disk/blockdevice.py104
1 files changed, 77 insertions, 27 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 206c3b7e..c7b69205 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -3,6 +3,7 @@ import os
import json
import logging
import time
+from functools import cached_property
from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -32,7 +33,29 @@ class BlockDevice:
# I'm placing the encryption password on a BlockDevice level.
def __repr__(self, *args :str, **kwargs :str) -> str:
- return f"BlockDevice({self.device_or_backfile}, size={self.size}GB, free_space={'+'.join(part[2] for part in self.free_space)}, bus_type={self.bus_type})"
+ return self._str_repr
+
+ @cached_property
+ def _str_repr(self) -> str:
+ return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})"
+
+ @cached_property
+ def display_info(self) -> str:
+ columns = {
+ str(_('Device')): self.device_or_backfile,
+ str(_('Size')): f'{self._safe_size}GB',
+ str(_('Free space')): f'{self._safe_free_space}',
+ str(_('Bus-type')): f'{self.bus_type}'
+ }
+
+ padding = max([len(k) for k in columns.keys()])
+
+ pretty = ''
+ for k, v in columns.items():
+ k = k.ljust(padding, ' ')
+ pretty += f'{k} = {v}\n'
+
+ return pretty.rstrip()
def __iter__(self) -> Iterator[Partition]:
for partition in self.partitions:
@@ -74,7 +97,7 @@ class BlockDevice:
for device in output['blockdevices']:
return device['pttype']
- @property
+ @cached_property
def device_or_backfile(self) -> str:
"""
Returns the actual device-endpoint of the BlockDevice.
@@ -157,7 +180,7 @@ class BlockDevice:
from .filesystem import GPT
return GPT
- @property
+ @cached_property
def uuid(self) -> str:
log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow')
"""
@@ -167,7 +190,19 @@ class BlockDevice:
"""
return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8')
- @property
+ @cached_property
+ def _safe_size(self) -> float:
+ from .helpers import convert_size_to_gb
+
+ try:
+ output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8'))
+ except SysCallError:
+ return -1.0
+
+ for device in output['blockdevices']:
+ return convert_size_to_gb(device['size'])
+
+ @cached_property
def size(self) -> float:
from .helpers import convert_size_to_gb
@@ -176,22 +211,29 @@ class BlockDevice:
for device in output['blockdevices']:
return convert_size_to_gb(device['size'])
- @property
+ @cached_property
def bus_type(self) -> str:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['tran']
- @property
+ @cached_property
def spinning(self) -> bool:
output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8'))
for device in output['blockdevices']:
return device['rota'] is True
- @property
- def free_space(self) -> Tuple[str, str, str]:
+ @cached_property
+ def _safe_free_space(self) -> Tuple[str, ...]:
+ try:
+ return '+'.join(part[2] for part in self.free_space)
+ except SysCallError:
+ return '?'
+
+ @cached_property
+ def free_space(self) -> Tuple[str, ...]:
# NOTE: parted -s will default to `cancel` on prompt, skipping any partition
# that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso,
# so the free will ignore the ESP partition and just give the "free" space.
@@ -204,7 +246,7 @@ class BlockDevice:
except SysCallError as error:
log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
- @property
+ @cached_property
def largest_free_space(self) -> List[str]:
info = []
for space_info in self.free_space:
@@ -216,7 +258,7 @@ class BlockDevice:
info = space_info
return info
- @property
+ @cached_property
def first_free_sector(self) -> str:
if info := self.largest_free_space:
start = info[0]
@@ -224,7 +266,7 @@ class BlockDevice:
start = '512MB'
return start
- @property
+ @cached_property
def first_end_sector(self) -> str:
if info := self.largest_free_space:
end = info[1]
@@ -247,19 +289,27 @@ class BlockDevice:
def flush_cache(self) -> None:
self.part_cache = {}
- def get_partition(self, uuid :str) -> Partition:
- count = 0
- while count < 5:
- for partition_uuid, partition in self.partitions.items():
- if partition.uuid.lower() == uuid.lower():
- return partition
- else:
- log(f"uuid {uuid} not found. Waiting for {count +1} time",level=logging.DEBUG)
- time.sleep(float(storage['arguments'].get('disk-sleep', 0.2)))
- count += 1
- else:
- log(f"Could not find {uuid} in disk after 5 retries",level=logging.INFO)
- print(f"Cache: {self.part_cache}")
- print(f"Partitions: {self.partitions.items()}")
- print(f"UUID: {[uuid]}")
- raise DiskError(f"New partition {uuid} never showed up after adding new partition on {self}")
+ def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition:
+ if not uuid and not partuuid:
+ raise ValueError(f"BlockDevice.get_partition() requires either a UUID or a PARTUUID for lookups.")
+
+ for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)):
+ for partition_index, partition in self.partitions.items():
+ try:
+ if uuid and partition.uuid.lower() == uuid.lower():
+ return partition
+ elif partuuid and partition.part_uuid.lower() == partuuid.lower():
+ return partition
+ except DiskError as error:
+ # Most likely a blockdevice that doesn't support or use UUID's
+ # (like Microsoft recovery partition)
+ log(f"Could not get UUID/PARTUUID of {partition}: {error}", level=logging.DEBUG, fg="gray")
+ pass
+
+ log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
+ time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
+
+ log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
+ log(f"Cache: {self.part_cache}")
+ log(f"Partitions: {self.partitions.items()}")
+ raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.")