Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/partition.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk/partition.py')
-rw-r--r--archinstall/lib/disk/partition.py94
1 files changed, 53 insertions, 41 deletions
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index bb6f2d53..b8fa2b79 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -5,7 +5,8 @@ import logging
import json
import os
import hashlib
-from typing import Optional
+from typing import Optional, Dict, Any, List, Union
+
from .blockdevice import BlockDevice
from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
@@ -15,7 +16,15 @@ from ..general import SysCommand
class Partition:
- def __init__(self, path: str, block_device: BlockDevice, part_id=None, filesystem=None, mountpoint=None, encrypted=False, autodetect_filesystem=True):
+ def __init__(self,
+ path: str,
+ block_device: BlockDevice,
+ part_id :Optional[str] = None,
+ filesystem :Optional[str] = None,
+ mountpoint :Optional[str] = None,
+ encrypted :bool = False,
+ autodetect_filesystem :bool = True):
+
if not part_id:
part_id = os.path.basename(path)
@@ -50,14 +59,16 @@ class Partition:
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
- def __lt__(self, left_comparitor):
+ def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
else:
left_comparitor = str(left_comparitor)
- return self.path < left_comparitor # Not quite sure the order here is correct. But /dev/nvme0n1p1 comes before /dev/nvme0n1p5 so seems correct.
- def __repr__(self, *args, **kwargs):
+ # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
+ return self.path < left_comparitor
+
+ def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
if self.mountpoint:
mount_repr = f", mounted={self.mountpoint}"
@@ -69,7 +80,7 @@ class Partition:
else:
return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
- def __dump__(self):
+ def __dump__(self) -> Dict[str, Any]:
return {
'type': 'primary',
'PARTUUID': self._safe_uuid,
@@ -86,14 +97,14 @@ class Partition:
}
@property
- def sector_size(self):
+ def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
for device in output['blockdevices']:
return device.get('log-sec', None)
@property
- def start(self):
+ def start(self) -> Optional[str]:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
for partition in output.get('partitiontable', {}).get('partitions', []):
@@ -101,7 +112,7 @@ class Partition:
return partition['start'] # * self.sector_size
@property
- def end(self):
+ def end(self) -> Optional[str]:
# TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
@@ -110,7 +121,7 @@ class Partition:
return partition['size'] # * self.sector_size
@property
- def size(self):
+ def size(self) -> Optional[float]:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
@@ -123,7 +134,7 @@ class Partition:
time.sleep(storage['DISK_TIMEOUTS'])
@property
- def boot(self):
+ def boot(self) -> bool:
output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
# Get the bootable flag from the sfdisk output:
@@ -143,7 +154,7 @@ class Partition:
return False
@property
- def partition_type(self):
+ def partition_type(self) -> Optional[str]:
lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
for device in lsblk['blockdevices']:
@@ -179,19 +190,19 @@ class Partition:
return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
@property
- def encrypted(self):
+ def encrypted(self) -> Union[bool, None]:
return self._encrypted
@encrypted.setter
- def encrypted(self, value: bool):
+ def encrypted(self, value: bool) -> None:
self._encrypted = value
@property
- def parent(self):
+ def parent(self) -> str:
return self.real_device
@property
- def real_device(self):
+ def real_device(self) -> str:
for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
return f"/dev/{parent}"
@@ -199,25 +210,27 @@ class Partition:
return self.path
@property
- def device_path(self):
+ def device_path(self) -> str:
""" for bind mounts returns the phisical path of the partition
"""
device_path, bind_name = split_bind_name(self.path)
return device_path
@property
- def bind_name(self):
+ def bind_name(self) -> str:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
device_path, bind_name = split_bind_name(self.path)
return bind_name
- def partprobe(self):
- SysCommand(f'bash -c "partprobe"')
- time.sleep(1)
+ def partprobe(self) -> bool:
+ if SysCommand(f'bash -c "partprobe"').exit_code == 0:
+ time.sleep(1)
+ return True
+ return False
- def detect_inner_filesystem(self, password):
+ def detect_inner_filesystem(self, password :str) -> Optional[str]:
log(f'Trying to detect inner filesystem format on {self} (This might take a while)', level=logging.INFO)
from ..luks import luks2
@@ -227,7 +240,7 @@ class Partition:
except SysCallError:
return None
- def has_content(self):
+ def has_content(self) -> bool:
fs_type = get_filesystem_type(self.path)
if not fs_type or "swap" in fs_type:
return False
@@ -248,7 +261,7 @@ class Partition:
return True if files > 0 else False
- def encrypt(self, *args, **kwargs):
+ def encrypt(self, *args :str, **kwargs :str) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
@@ -257,7 +270,7 @@ class Partition:
handle = luks2(self, None, None)
return handle.encrypt(self, *args, **kwargs)
- def format(self, filesystem=None, path=None, log_formatting=True, options=[]):
+ def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = []) -> bool:
"""
Format can be given an overriding path, for instance /dev/null to test
the formatting functionality and in essence the support for the given filesystem.
@@ -342,7 +355,7 @@ class Partition:
return True
- def find_parent_of(self, data, name, parent=None):
+ def find_parent_of(self, data :Dict[str, Any], name :str, parent :Optional[str] = None) -> Optional[str]:
if data['name'] == name:
return parent
elif 'children' in data:
@@ -350,7 +363,7 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
- def mount(self, target, fs=None, options=''):
+ def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
if not self.mountpoint:
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
@@ -386,25 +399,24 @@ class Partition:
self.mountpoint = target
return True
- def unmount(self):
- try:
- SysCommand(f"/usr/bin/umount {self.path}")
- except SysCallError as err:
- exit_code = err.exit_code
-
- # Without to much research, it seams that low error codes are errors.
- # And above 8k is indicators such as "/dev/x not mounted.".
- # So anything in between 0 and 8k are errors (?).
- if 0 < exit_code < 8000:
- raise err
+ return False
+
+ def unmount(self) -> bool:
+ worker = SysCommand(f"/usr/bin/umount {self.path}")
+
+ # Without to much research, it seams that low error codes are errors.
+ # And above 8k is indicators such as "/dev/x not mounted.".
+ # So anything in between 0 and 8k are errors (?).
+ if 0 < worker.exit_code < 8000:
+ raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
self.mountpoint = None
return True
- def umount(self):
+ def umount(self) -> bool:
return self.unmount()
- def filesystem_supported(self):
+ def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
partition.format() with a path set to '/dev/null' which returns two exceptions:
@@ -420,7 +432,7 @@ class Partition:
return True
-def get_mount_fs_type(fs):
+def get_mount_fs_type(fs :str) -> str:
if fs == 'ntfs':
return 'ntfs3' # Needed to use the Paragon R/W NTFS driver
elif fs == 'fat32':