Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
authorDaniel Girtler <blackrabbit256@gmail.com>2022-07-26 18:46:50 +1000
committerGitHub <noreply@github.com>2022-07-26 10:46:50 +0200
commit9194f6d85965f435f8d0ae44ba20e73cc761eb44 (patch)
tree4e3b3a1ee3fbe65ad9dbc1ff83df7178404c63be /archinstall/lib/disk
parent5c3c1312a49e1c110d4c5825fbb8242868544900 (diff)
Cleanup partition (#1333)
* Cleanup partition * Update * Remove unused method * Update partitioning * Update * Update * Fix mypy Co-authored-by: Daniel Girtler <girtler.daniel@gmail.com>
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/blockdevice.py10
-rw-r--r--archinstall/lib/disk/btrfs/btrfspartition.py19
-rw-r--r--archinstall/lib/disk/filesystem.py26
-rw-r--r--archinstall/lib/disk/helpers.py10
-rw-r--r--archinstall/lib/disk/partition.py429
5 files changed, 224 insertions, 270 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index 4e207bf4..736bacbc 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -88,9 +88,6 @@ class BlockDevice:
raise KeyError(f'{self.info} does not contain information: "{key}"')
- def __len__(self) -> int:
- return len(self.partitions)
-
def __lt__(self, left_comparitor :'BlockDevice') -> bool:
return self._path < left_comparitor.path
@@ -121,6 +118,8 @@ class BlockDevice:
def _load_partitions(self):
from .partition import Partition
+ self._partitions.clear()
+
lsblk_info = self._call_lsblk(self._path)
device = lsblk_info['blockdevices'][0]
self._partitions.clear()
@@ -233,8 +232,6 @@ class BlockDevice:
@property
def partitions(self) -> Dict[str, 'Partition']:
- self._partprobe()
- self._load_partitions()
return OrderedDict(sorted(self._partitions.items()))
@property
@@ -282,7 +279,7 @@ class BlockDevice:
try:
if uuid and partition.uuid and partition.uuid.lower() == uuid.lower():
return partition
- elif partuuid and partition.part_uuid.lower() == partuuid.lower():
+ elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower():
return partition
except DiskError as error:
# Most likely a blockdevice that doesn't support or use UUID's
@@ -291,6 +288,7 @@ class BlockDevice:
pass
log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG)
+ self.flush_cache()
time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO)
diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py
index a05f1527..d04c9b98 100644
--- a/archinstall/lib/disk/btrfs/btrfspartition.py
+++ b/archinstall/lib/disk/btrfs/btrfspartition.py
@@ -17,22 +17,11 @@ if TYPE_CHECKING:
from ...installer import Installer
from .btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
class BTRFSPartition(Partition):
def __init__(self, *args, **kwargs):
Partition.__init__(self, *args, **kwargs)
- def __repr__(self, *args :str, **kwargs :str) -> str:
- mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
-
- if self._encrypted:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
- else:
- return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
-
@property
def subvolumes(self):
for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []):
@@ -40,11 +29,11 @@ class BTRFSPartition(Partition):
yield subvolume_info_from_path(filesystem['target'])
def iterate_children(struct):
- for child in struct.get('children', []):
+ for c in struct.get('children', []):
if '[' in child.get('source', ''):
- yield subvolume_info_from_path(child['target'])
+ yield subvolume_info_from_path(c['target'])
- for sub_child in iterate_children(child):
+ for sub_child in iterate_children(c):
yield sub_child
for child in iterate_children(filesystem):
diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py
index 1c7a801b..90656308 100644
--- a/archinstall/lib/disk/filesystem.py
+++ b/archinstall/lib/disk/filesystem.py
@@ -210,7 +210,14 @@ class Filesystem:
# TODO: Implement this with declarative profiles instead.
raise ValueError("Installation().use_entire_disk() has to be re-worked.")
- def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition:
+ def add_partition(
+ self,
+ partition_type :str,
+ start :str,
+ end :str,
+ partition_format :Optional[str] = None,
+ skip_mklabel :bool = False
+ ) -> Partition:
log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)
if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
@@ -232,6 +239,7 @@ class Filesystem:
except DiskError:
pass
+ # TODO this check should probably run in the setup process rather than during the installation
if self.mode == MBR:
if len(self.blockdevice.partitions) > 3:
DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")
@@ -246,14 +254,9 @@ class Filesystem:
if self.parted(parted_string):
for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
self.partprobe()
+ self.blockdevice.flush_cache()
- new_partition_uuids = []
- for partition in self.blockdevice.partitions.values():
- try:
- new_partition_uuids.append(partition.part_uuid)
- except DiskError:
- pass
-
+ new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))
if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
@@ -263,17 +266,20 @@ class Filesystem:
log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
- log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
+ log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
raise err
else:
log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
time.sleep(storage.get('DISK_TIMEOUTS', 1) * count)
+ total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
+ total_partitions.update(previous_partuuids)
+
# TODO: This should never be able to happen
log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
- log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red")
+ log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")
raise DiskError(f"Could not add partition using: {parted_string}")
def set_name(self, partition: int, name: str) -> bool:
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index 660594ed..c8ac564e 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -370,7 +370,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict
return filters
-def get_partitions_in_use(mountpoint :str) -> List[Partition]:
+def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]:
from .partition import Partition
try:
@@ -393,8 +393,12 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]:
if not type(blockdev) in (Partition, MapperDev):
continue
- for blockdev_mountpoint in blockdev.mount_information:
- block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
+ if isinstance(blockdev, Partition):
+ for blockdev_mountpoint in blockdev.mountpoints:
+ block_devices_mountpoints[blockdev_mountpoint] = blockdev
+ else:
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 17c24d57..4028f114 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -1,14 +1,16 @@
import glob
-import pathlib
import time
import logging
import json
import os
import hashlib
+import typing
+from dataclasses import dataclass
+from pathlib import Path
from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
-from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
+from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
@@ -16,6 +18,26 @@ from ..general import SysCommand
from .btrfs.btrfs_helpers import subvolume_info_from_path
from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo
+
+@dataclass
+class PartitionInfo:
+ pttype: str
+ partuuid: str
+ uuid: str
+ start: Optional[int]
+ end: Optional[int]
+ bootable: bool
+ size: float
+ sector_size: int
+ filesystem_type: str
+ mountpoints: List[Path]
+
+ def get_first_mountpoint(self) -> Optional[Path]:
+ if len(self.mountpoints) > 0:
+ return self.mountpoints[0]
+ return None
+
+
class Partition:
def __init__(
self,
@@ -25,38 +47,37 @@ class Partition:
filesystem :Optional[str] = None,
mountpoint :Optional[str] = None,
encrypted :bool = False,
- autodetect_filesystem :bool = True
+ autodetect_filesystem :bool = True,
):
-
if not part_id:
part_id = os.path.basename(path)
- self.block_device = block_device
- if type(self.block_device) is str:
+ if type(block_device) is str:
raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
- self.path = path
- self.part_id = part_id
- self.target_mountpoint = mountpoint
- self.filesystem = filesystem
+ self.block_device = block_device
+ self._path = path
+ self._part_id = part_id
+ self._target_mountpoint = mountpoint
self._encrypted = None
- self.encrypted = encrypted
- self.allow_formatting = False
+ self._encrypted = encrypted
+ self._wipe = False
+ self._type = 'primary'
if mountpoint:
self.mount(mountpoint)
- try:
- self.mount_information = list(find_mountpoint(self.path))
- except DiskError:
- self.mount_information = [{}]
+ self._partition_info = self._fetch_information()
- if not self.filesystem and autodetect_filesystem:
- self.filesystem = get_filesystem_type(path)
+ if not autodetect_filesystem and filesystem:
+ self._partition_info.filesystem_type = filesystem
- if self.filesystem == 'crypto_LUKS':
- self.encrypted = True
+ if self._partition_info.filesystem_type == 'crypto_LUKS':
+ self._encrypted = True
+ # I hate doint this but I'm currently unsure where this
+ # is acutally used to be able to fix the typing issues properly
+ @typing.no_type_check
def __lt__(self, left_comparitor :BlockDevice) -> bool:
if type(left_comparitor) == Partition:
left_comparitor = left_comparitor.path
@@ -64,254 +85,191 @@ class Partition:
left_comparitor = str(left_comparitor)
# The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5
- return self.path < left_comparitor
+ return self._path < left_comparitor
def __repr__(self, *args :str, **kwargs :str) -> str:
mount_repr = ''
- if self.mountpoint:
- mount_repr = f", mounted={self.mountpoint}"
- elif self.target_mountpoint:
- mount_repr = f", rel_mountpoint={self.target_mountpoint}"
+ if mountpoint := self._partition_info.get_first_mountpoint():
+ mount_repr = f", mounted={mountpoint}"
+ elif self._target_mountpoint:
+ mount_repr = f", rel_mountpoint={self._target_mountpoint}"
+
+ classname = self.__class__.__name__
if self._encrypted:
- return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})'
+ return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})'
else:
- return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})'
+ return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})'
def as_json(self) -> Dict[str, Any]:
"""
this is used for the table representation of the partition (see FormattedOutput)
"""
partition_info = {
- 'type': 'primary',
- 'PARTUUID': self._safe_uuid,
- 'wipe': self.allow_formatting,
+ 'type': self._type,
+ 'PARTUUID': self.part_uuid,
+ 'wipe': self._wipe,
'boot': self.boot,
'ESP': self.boot,
- 'mountpoint': self.target_mountpoint,
+ 'mountpoint': self._target_mountpoint,
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
- 'filesystem': self.filesystem_type
+ 'filesystem': self._partition_info.filesystem_type
}
return partition_info
def __dump__(self) -> Dict[str, Any]:
# TODO remove this in favour of as_json
-
- log(get_filesystem_type(self.path))
-
return {
- 'type': 'primary',
- 'PARTUUID': self._safe_uuid,
- 'wipe': self.allow_formatting,
+ 'type': self._type,
+ 'PARTUUID': self.part_uuid,
+ 'wipe': self._wipe,
'boot': self.boot,
'ESP': self.boot,
- 'mountpoint': self.target_mountpoint,
+ 'mountpoint': self._target_mountpoint,
'encrypted': self._encrypted,
'start': self.start,
'size': self.end,
'filesystem': {
- 'format': self.filesystem_type
+ 'format': self._partition_info.filesystem_type
}
}
- @property
- def filesystem_type(self) -> Optional[str]:
- return get_filesystem_type(self.path)
+ def _call_lsblk(self) -> Dict[str, Any]:
+ self.partprobe()
+ output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8')
- @property
- def mountpoint(self) -> Optional[str]:
- try:
- data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
- for filesystem in data['filesystems']:
- return pathlib.Path(filesystem.get('target'))
+ if output:
+ lsblk_info = json.loads(output)
+ return lsblk_info
- except SysCallError as error:
- # Not mounted anywhere most likely
- log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey")
- pass
-
- return None
+ raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk')
- @property
- def sector_size(self) -> Optional[int]:
- output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
+ def _call_sfdisk(self) -> Dict[str, Any]:
+ output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')
- for device in output['blockdevices']:
- return device.get('log-sec', None)
+ if output:
+ sfdisk_info = json.loads(output)
+ partitions = sfdisk_info.get('partitiontable', {}).get('partitions', [])
+ node = list(filter(lambda x: x['node'] == self._path, partitions))
- @property
- def start(self) -> Optional[str]:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ if len(node) > 0:
+ return node[0]
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['start'] # * self.sector_size
+ return {}
- @property
- def end(self) -> Optional[str]:
- # TODO: actually this is size in sectors unit
- # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it.
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk')
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['size'] # * self.sector_size
+ def _fetch_information(self) -> PartitionInfo:
+ lsblk_info = self._call_lsblk()
+ sfdisk_info = self._call_sfdisk()
+ device = lsblk_info['blockdevices'][0]
- @property
- def end_sectors(self) -> Optional[str]:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
+ mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint]
+ bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- return partition['start'] + partition['size']
+ return PartitionInfo(
+ pttype=device['pttype'],
+ partuuid=device['partuuid'],
+ uuid=device['uuid'],
+ sector_size=device['log-sec'],
+ size=convert_size_to_gb(device['size']),
+ start=sfdisk_info.get('start', None),
+ end=sfdisk_info.get('size', None),
+ bootable=bootable,
+ filesystem_type=device['fstype'],
+ mountpoints=mountpoints
+ )
@property
- def size(self) -> Optional[float]:
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- self.partprobe()
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
-
- try:
- lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
-
- for device in lsblk['blockdevices']:
- return convert_size_to_gb(device['size'])
- except SysCallError as error:
- if error.exit_code == 8192:
- return None
- else:
- raise error
+ def target_mountpoint(self) -> Optional[str]:
+ return self._target_mountpoint
@property
- def boot(self) -> bool:
- output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8'))
-
- for partition in output.get('partitiontable', {}).get('partitions', []):
- if partition['node'] == self.path:
- # first condition is for MBR disks, second for GPT disks
- return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B'
-
- return False
+ def path(self) -> str:
+ return self._path
@property
- def partition_type(self) -> Optional[str]:
- lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8'))
-
- for device in lsblk['blockdevices']:
- return device['pttype']
+ def filesystem(self) -> str:
+ return self._partition_info.filesystem_type
@property
- def part_uuid(self) -> str:
- """
- Returns the PARTUUID as returned by lsblk.
- This is more reliable than relying on /dev/disk/by-partuuid as
- it doesn't seam to be able to detect md raid partitions.
- For bind mounts all the subvolumes share the same uuid
- """
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- if not self.partprobe():
- raise DiskError(f"Could not perform partprobe on {self.device_path}")
-
- time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))
-
- partuuid = self._safe_part_uuid
- if partuuid:
- return partuuid
-
- raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
+ def mountpoint(self) -> Optional[Path]:
+ if len(self.mountpoints) > 0:
+ return self.mountpoints[0]
+ return None
@property
- def uuid(self) -> Optional[str]:
- """
- Returns the UUID as returned by lsblk for the **partition**.
- This is more reliable than relying on /dev/disk/by-uuid as
- it doesn't seam to be able to detect md raid partitions.
- For bind mounts all the subvolumes share the same uuid
- """
- for i in range(storage['DISK_RETRY_ATTEMPTS']):
- if not self.partprobe():
- raise DiskError(f"Could not perform partprobe on {self.device_path}")
+ def mountpoints(self) -> List[Path]:
+ return self._partition_info.mountpoints
- time.sleep(storage.get('DISK_TIMEOUTS', 1) * i)
-
- partuuid = self._safe_uuid
- if partuuid:
- return partuuid
-
- raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'")
+ @property
+ def sector_size(self) -> int:
+ return self._partition_info.sector_size
@property
- def _safe_uuid(self) -> Optional[str]:
- """
- A near copy of self.uuid but without any delays.
- This function should only be used where uuid is not crucial.
- For instance when you want to get a __repr__ of the class.
- """
- if not self.partprobe():
- if self.block_device.partition_type == 'iso9660':
- return None
+ def start(self) -> Optional[int]:
+ return self._partition_info.start
- log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
+ @property
+ def end(self) -> Optional[int]:
+ return self._partition_info.end
- try:
- return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip()
- except SysCallError as error:
- if self.block_device.partition_type == 'iso9660':
- # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
- return None
+ @property
+ def end_sectors(self) -> Optional[int]:
+ start = self._partition_info.start
+ end = self._partition_info.end
+ if start and end:
+ return start + end
+ return None
- log(f"Could not get PARTUUID of partition using 'blkid -s UUID -o value {self.device_path}': {error}")
+ @property
+ def size(self) -> Optional[float]:
+ return self._partition_info.size
@property
- def _safe_part_uuid(self) -> Optional[str]:
- """
- A near copy of self.uuid but without any delays.
- This function should only be used where uuid is not crucial.
- For instance when you want to get a __repr__ of the class.
- """
- if not self.partprobe():
- if self.block_device.partition_type == 'iso9660':
- return None
+ def boot(self) -> bool:
+ return self._partition_info.bootable
- log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG)
+ @property
+ def partition_type(self) -> Optional[str]:
+ return self._partition_info.pttype
- try:
- return self.block_device.uuid
- except SysCallError as error:
- if self.block_device.partition_type == 'iso9660':
- # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
- return None
+ @property
+ def part_uuid(self) -> str:
+ return self._partition_info.partuuid
- log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}")
+ @property
+ def uuid(self) -> Optional[str]:
+ return self._partition_info.uuid
@property
def encrypted(self) -> Union[bool, None]:
return self._encrypted
- @encrypted.setter
- def encrypted(self, value: bool) -> None:
- self._encrypted = value
-
@property
def parent(self) -> str:
return self.real_device
@property
def real_device(self) -> str:
- for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']:
- if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
- return f"/dev/{parent}"
- # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}')
- return self.path
+ output = SysCommand('lsblk -J').decode('UTF-8')
+
+ if output:
+ for blockdevice in json.loads(output)['blockdevices']:
+ if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)):
+ return f"/dev/{parent}"
+ return self._path
+
+ raise DiskError('Unable to get disk information for command "lsblk -J"')
@property
def device_path(self) -> str:
- """ for bind mounts returns the phisical path of the partition
+ """ for bind mounts returns the physical path of the partition
"""
- device_path, bind_name = split_bind_name(self.path)
+ device_path, bind_name = split_bind_name(self._path)
return device_path
@property
@@ -319,7 +277,7 @@ class Partition:
""" for bind mounts returns the bind name (subvolume path).
Returns none if this property does not exist
"""
- device_path, bind_name = split_bind_name(self.path)
+ device_path, bind_name = split_bind_name(self._path)
return bind_name
@property
@@ -330,29 +288,29 @@ class Partition:
for child in information.get('children', []):
if target := child.get('target'):
if child.get('fstype') == 'btrfs':
- if subvolume := subvolume_info_from_path(pathlib.Path(target)):
+ if subvolume := subvolume_info_from_path(Path(target)):
yield subvolume
if child.get('children'):
for subchild in iterate_children_recursively(child):
yield subchild
- for mountpoint in self.mount_information:
- if result := findmnt(pathlib.Path(mountpoint['target'])):
- for filesystem in result.get('filesystems', []):
- if mountpoint.get('fstype') == 'btrfs':
- if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])):
+ if self._partition_info.filesystem_type == 'btrfs':
+ for mountpoint in self._partition_info.mountpoints:
+ if result := findmnt(mountpoint):
+ for filesystem in result.get('filesystems', []):
+ if subvolume := subvolume_info_from_path(mountpoint):
yield subvolume
- for child in iterate_children_recursively(filesystem):
- yield child
+ for child in iterate_children_recursively(filesystem):
+ yield child
def partprobe(self) -> bool:
try:
if self.block_device:
return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code
except SysCallError as error:
- log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG)
+ log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG)
return False
@@ -364,19 +322,20 @@ class Partition:
with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device:
return unlocked_device.filesystem
except SysCallError:
- return None
+ pass
+ return None
def has_content(self) -> bool:
- fs_type = get_filesystem_type(self.path)
+ fs_type = self._partition_info.filesystem_type
if not fs_type or "swap" in fs_type:
return False
temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest()
- temporary_path = pathlib.Path(temporary_mountpoint)
+ temporary_path = Path(temporary_mountpoint)
temporary_path.mkdir(parents=True, exist_ok=True)
- if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0:
- raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}')
+ if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0:
+ raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}')
files = len(glob.glob(f"{temporary_mountpoint}/*"))
iterations = 0
@@ -387,14 +346,14 @@ class Partition:
return True if files > 0 else False
- def encrypt(self, *args :str, **kwargs :str) -> str:
+ def encrypt(self, password: Optional[str] = None) -> str:
"""
A wrapper function for luks2() instances and the .encrypt() method of that instance.
"""
from ..luks import luks2
handle = luks2(self, None, None)
- return handle.encrypt(self, *args, **kwargs)
+ return handle.encrypt(self, password=password)
def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool:
"""
@@ -402,17 +361,17 @@ class Partition:
the formatting functionality and in essence the support for the given filesystem.
"""
if filesystem is None:
- filesystem = self.filesystem
+ filesystem = self._partition_info.filesystem_type
if path is None:
- path = self.path
+ path = self._path
# This converts from fat32 -> vfat to unify filesystem names
filesystem = get_mount_fs_type(filesystem)
# To avoid "unable to open /dev/x: No such file or directory"
start_wait = time.time()
- while pathlib.Path(path).exists() is False and time.time() - start_wait < 10:
+ while Path(path).exists() is False and time.time() - start_wait < 10:
time.sleep(0.025)
if log_formatting:
@@ -422,57 +381,57 @@ class Partition:
if filesystem == 'btrfs':
options = ['-f'] + options
- if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')):
+ mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')
+ if mkfs and 'UUID:' not in mkfs:
raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}')
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'vfat':
options = ['-F32'] + options
log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")
if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext4':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ext2':
options = ['-F'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0:
- raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}')
- self.filesystem = 'ext2'
-
+ raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
+ self._partition_info.filesystem_type = 'ext2'
elif filesystem == 'xfs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'f2fs':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'ntfs3':
options = ['-f'] + options
if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0:
raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}")
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
elif filesystem == 'crypto_LUKS':
# from ..luks import luks2
# encrypted_partition = luks2(self, None, None)
# encrypted_partition.format(path)
- self.filesystem = filesystem
+ self._partition_info.filesystem_type = filesystem
else:
raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.")
@@ -485,9 +444,9 @@ class Partition:
return self.format(filesystem, path, log_formatting, options, retry=False)
if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS':
- self.encrypted = True
+ self._encrypted = True
else:
- self.encrypted = False
+ self._encrypted = False
return True
@@ -499,18 +458,18 @@ class Partition:
if parent := self.find_parent_of(child, name, parent=data['name']):
return parent
+ return None
+
def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool:
- if not self.mountpoint:
+ if not self._partition_info.get_first_mountpoint():
log(f'Mounting {self} to {target}', level=logging.INFO)
if not fs:
- if not self.filesystem:
- raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.')
- fs = self.filesystem
+ fs = self._partition_info.filesystem_type
fs_type = get_mount_fs_type(fs)
- pathlib.Path(target).mkdir(parents=True, exist_ok=True)
+ Path(target).mkdir(parents=True, exist_ok=True)
if self.bind_name:
device_path = self.device_path
@@ -520,7 +479,7 @@ class Partition:
else:
options = f"subvol={self.bind_name}"
else:
- device_path = self.path
+ device_path = self._path
try:
if options:
mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}")
@@ -529,7 +488,7 @@ class Partition:
# TODO: Should be redundant to check for exit_code
if mnt_handle.exit_code != 0:
- raise DiskError(f"Could not mount {self.path} to {target} using options {options}")
+ raise DiskError(f"Could not mount {self._path} to {target} using options {options}")
except SysCallError as err:
raise err
@@ -538,19 +497,17 @@ class Partition:
return False
def unmount(self) -> bool:
- worker = SysCommand(f"/usr/bin/umount {self.path}")
+ worker = SysCommand(f"/usr/bin/umount {self._path}")
+ exit_code = worker.exit_code
# Without to much research, it seams that low error codes are errors.
# And above 8k is indicators such as "/dev/x not mounted.".
# So anything in between 0 and 8k are errors (?).
- if 0 < worker.exit_code < 8000:
- raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
+ if exit_code and 0 < exit_code < 8000:
+ raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code)
return True
- def umount(self) -> bool:
- return self.unmount()
-
def filesystem_supported(self) -> bool:
"""
The support for a filesystem (this partition) is tested by calling
@@ -559,7 +516,7 @@ class Partition:
2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type
"""
try:
- self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True)
+ self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False)
except (SysCallError, DiskError):
pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code
except UnknownFilesystemFormat as err: