Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/blockdevice.py32
-rw-r--r--archinstall/lib/disk/btrfs.py56
-rw-r--r--archinstall/lib/disk/dmcryptdev.py48
-rw-r--r--archinstall/lib/disk/helpers.py241
-rw-r--r--archinstall/lib/disk/mapperdev.py83
-rw-r--r--archinstall/lib/disk/partition.py67
6 files changed, 447 insertions, 80 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index fac258ef..ff741f18 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -16,10 +16,10 @@ from ..storage import storage
class BlockDevice:
def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
- from .helpers import all_disks
+ from .helpers import all_blockdevices
# If we don't give any information, we need to auto-fill it.
# Otherwise any subsequent usage will break.
- info = all_disks()[path].info
+ info = all_blockdevices(partitions=False)[path].info
self.path = path
self.info = info
@@ -78,16 +78,20 @@ class BlockDevice:
If it's a loop-back-device it returns the back-file,
For other types it return self.device
"""
- if self.info['type'] == 'loop':
- for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == self.path:
- continue
-
- return drive['back-file']
+ if self.info.get('type') == 'loop':
+ return self.info['back-file']
else:
return self.device
@property
+ def mountpoint(self) -> None:
+ """
+ A dummy function to enable transparent comparisons of mountpoints.
+ As blockdevices can't be mounted directly, this will always be None
+ """
+ return None
+
+ @property
def device(self) -> str:
"""
Returns the device file of the BlockDevice.
@@ -95,20 +99,20 @@ class BlockDevice:
If it's a ATA-drive it returns the /dev/X device
And if it's a crypto-device it returns the parent device
"""
- if "type" not in self.info:
+ if "DEVTYPE" not in self.info:
raise DiskError(f'Could not locate backplane info for "{self.path}"')
- if self.info['type'] in ['disk','loop']:
+ if self.info['DEVTYPE'] in ['disk','loop']:
return self.path
- elif self.info['type'][:4] == 'raid':
+ elif self.info['DEVTYPE'][:4] == 'raid':
# This should catch /dev/md## raid devices
return self.path
- elif self.info['type'] == 'crypt':
+ elif self.info['DEVTYPE'] == 'crypt':
if 'pkname' not in self.info:
raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
- log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
+ log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
# if not stat.S_ISBLK(os.stat(full_path).st_mode):
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@@ -195,7 +199,7 @@ class BlockDevice:
_, start, end, size, *_ = free_space.strip('\r\n;').split(':')
yield (start, end, size)
except SysCallError as error:
- log(f"Could not get free space on {self.path}: {error}", level=logging.INFO)
+ log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
@property
def largest_free_space(self) -> List[str]:
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py
index ad8d0a52..f2da957f 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs.py
@@ -2,7 +2,9 @@ from __future__ import annotations
import pathlib
import glob
import logging
-from typing import Union, Dict, TYPE_CHECKING
+import re
+from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from dataclasses import dataclass
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -11,7 +13,49 @@ from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
from ..output import log
-
+from ..exceptions import SysCallError
+
+@dataclass
+class BtrfsSubvolume:
+ target :str
+ source :str
+ fstype :str
+ name :str
+ options :str
+ root :bool = False
+
+def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]:
+ if '@' in struct['source']:
+ subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1]
+ struct['source'] = struct['source'].replace(f"[{subvolume}]", "")
+ yield BtrfsSubvolume(
+ target=struct['target'],
+ source=struct['source'],
+ fstype=struct['fstype'],
+ name=subvolume,
+ options=struct['options'],
+ root=index == 0
+ )
+ index += 1
+
+ for child in struct.get('children', []):
+ for item in get_subvolumes_from_findmnt(child, index=index):
+ yield item
+ index += 1
+
+def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
+ try:
+ output = SysCommand(f"btrfs subvol show {path}").decode()
+ except SysCallError as error:
+ print('Error:', error)
+
+ result = {}
+ for line in output.replace('\r\n', '\n').split('\n'):
+ if ':' in line:
+ key, val = line.replace('\t', '').split(':', 1)
+ result[key.strip().lower().replace(' ', '_')] = val.strip()
+
+ return result
def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
@@ -24,7 +68,7 @@ def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.P
This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure.
Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name]
"""
- log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING)
+ log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING)
installation_mountpoint = installation.target
if type(installation_mountpoint) == str:
installation_mountpoint = pathlib.Path(installation_mountpoint)
@@ -179,11 +223,7 @@ def manage_btrfs_subvolumes(installation :Installer,
# As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
# As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
- # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'.
- # If i don't reset it, process will abort as "already mounted' .
- # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice
- fake_partition['device_instance'].mountpoint = None
- #
+
# Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
# as "normal" ones
mountpoints.append(fake_partition)
diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py
new file mode 100644
index 00000000..63392ffb
--- /dev/null
+++ b/archinstall/lib/disk/dmcryptdev.py
@@ -0,0 +1,48 @@
+import pathlib
+import logging
+import json
+from dataclasses import dataclass
+from typing import Optional
+from ..exceptions import SysCallError
+from ..general import SysCommand
+from ..output import log
+from .mapperdev import MapperDev
+
+@dataclass
+class DMCryptDev:
+ dev_path :pathlib.Path
+
+ @property
+ def name(self):
+ with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh:
+ return fh.read().strip()
+
+ @property
+ def path(self):
+ return f"/dev/mapper/{self.dev_path}"
+
+ @property
+ def blockdev(self):
+ pass
+
+ @property
+ def MapperDev(self):
+ return MapperDev(mappername=self.name)
+
+ @property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow")
+ pass
+
+ return None
+
+ @property
+ def filesystem(self) -> Optional[str]:
+ return self.MapperDev.filesystem \ No newline at end of file
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index b04e2740..afaf9e5e 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -5,12 +5,15 @@ import os
import pathlib
import re
import time
+import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from .blockdevice import BlockDevice
+from .dmcryptdev import DMCryptDev
+from .mapperdev import MapperDev
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
from ..output import log
@@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
return
return True
-# lsblk --json -l -n -o path
-def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
- kwargs.setdefault("partitions", False)
- drives = {}
- lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
- for drive in lsblk['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part':
+def cleanup_bash_escapes(data :str) -> str:
+ return data.replace(r'\ ', ' ')
+
+def blkid(cmd :str) -> Dict[str, Any]:
+ if '-o' in cmd and '-o export' not in cmd:
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ elif '-o' not in cmd:
+ cmd += ' -o export'
+
+ try:
+ raw_data = SysCommand(cmd).decode()
+ except SysCallError as error:
+ log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
+ raise error
+
+ result = {}
+ # Process the raw result
+ devname = None
+ for line in raw_data.split('\r\n'):
+ if not len(line):
+ devname = None
+ continue
+
+ key, val = line.split('=', 1)
+ if key.lower() == 'devname':
+ devname = val
+ # Lowercase for backwards compatability with all_disks() previous use cases
+ result[devname] = {
+ "path": devname,
+ "PATH": devname
+ }
+ continue
+
+ result[devname][key] = cleanup_bash_escapes(val)
+
+ return result
+
+def get_loop_info(path :str) -> Dict[str, Any]:
+ for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == path:
continue
- drives[drive['path']] = BlockDevice(drive['path'], drive)
+ return {
+ path: {
+ **drive,
+ 'type' : 'loop',
+ 'TYPE' : 'loop',
+ 'DEVTYPE' : 'loop',
+ 'PATH' : drive['name'],
+ 'path' : drive['name']
+ }
+ }
+
+ return {}
+
+def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
+ result = {}
+ for device_path, device_information in information.items():
+ dev_name = pathlib.Path(device_information['PATH']).name
+ if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
+ with dmcrypt_name.open('r') as fh:
+ device_information['DMCRYPT_NAME'] = fh.read().strip()
+
+ result[device_path] = device_information
+
+ return result
+
+def uevent(data :str) -> Dict[str, Any]:
+ information = {}
+
+ for line in data.replace('\r\n', '\n').split('\n'):
+ if len((line := line.strip())):
+ key, val = line.split('=', 1)
+ information[key] = val
+
+ return information
+
+def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
+ device_information = {}
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ return {
+ f"/dev/{dev_name}" : {
+ **device_information,
+ 'path' : f'/dev/{dev_name}',
+ 'PATH' : f'/dev/{dev_name}',
+ 'PTTYPE' : None
+ }
+ }
+
+def all_disks() -> List[BlockDevice]:
+ log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
+ return all_blockdevices(partitions=False, mappers=False)
+
+def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]:
+ """
+ Returns BlockDevice() and Partition() objects for all available devices.
+ """
+ from .partition import Partition
- return drives
+ instances = {}
+ # Due to lsblk being highly unreliable for this use case,
+ # we'll iterate the /sys/class definitions and find the information
+ # from there.
+ for block_device in glob.glob("/sys/class/block/*"):
+ device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ try:
+ information = blkid(f'blkid -p -o export {device_path}')
+
+ # TODO: No idea why F841 is raised here:
+ except SysCallError as error: # noqa: F841
+ if error.exit_code in (512, 2):
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError("Could not get loop information", exit_code=1)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
+ else:
+ raise error
+
+ information = enrich_blockdevice_information(information)
+
+ for path, path_info in information.items():
+ if path_info.get('DMCRYPT_NAME'):
+ instances[path] = DMCryptDev(dev_path=path)
+ elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
+ if partitions:
+ instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path))))
+ elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
+ instances[path] = BlockDevice(path, path_info)
+ elif path_info.get('TYPE') == 'squashfs':
+ # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
+ continue
+ else:
+ log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
+
+ if mappers:
+ for block_device in glob.glob("/dev/mapper/*"):
+ if (pathobj := pathlib.Path(block_device)).is_symlink():
+ instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
+
+ return instances
+
+
+def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
+ partition_name = path.name
+ pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
+ return f"/dev/{pci_device.parent.name}"
def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_disks()
+ collection = all_blockdevices(partitions=False)
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
continue
@@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :
return collection[drive]
def split_bind_name(path :Union[pathlib.Path, str]) -> list:
+ # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
# we check for the bind notation. if exist we'll only use the "true" device path
if '[' in str(path) : # is a bind path (btrfs subvolume path)
device_path, bind_path = str(path).split('[')
@@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
+def find_mountpoint(device_path :str) -> Dict[str, Any]:
+ try:
+ for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
+ yield filesystem
+ except SysCallError:
+ return {}
+
def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
- device_path,bind_path = split_bind_name(path)
+ device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
- log(f"Getting mount information for device path {traversal}", level=logging.INFO)
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
break
- except SysCallError:
+
+ except SysCallError as error:
+ print('ERROR:', error)
pass
if not traverse:
break
if not output:
- raise DiskError(f"Could not get mount information for device path {path}")
+ raise DiskError(f"Could not get mount information for device path {device_path}")
output = json.loads(output)
+
# for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
# i.e. the subvolume filesystem we're searching for
if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
+
if 'filesystems' in output:
if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
+ raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
if return_real_path:
return output['filesystems'][0], traversal
@@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur
return {}
+def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
+ for info in data:
+ if info.get('target') not in filters:
+ filters[info.get('target')] = None
+
+ filters.update(get_all_targets(info.get('children', [])))
+
+ return filters
+
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError:
- return []
-
- mounts = []
+ return {}
if not output:
- return []
+ return {}
output = json.loads(output)
- for target in output.get('filesystems', []):
- # We need to create a BlockDevice() instead of 'None' here when creaiting Partition()
- # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None.
+ # print(output)
- # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device
- # Note: doing print(partition) here will break because the above mentioned issue.
- partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])
- partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ mounts = {}
+
+ block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
+
+ block_devices_mountpoints = {}
+ for blockdev in block_devices_available.values():
+ if not type(blockdev) in (Partition, MapperDev):
+ continue
- # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using
- # (lsblk pkname lists both the partition and blockdevice, BD being the last entry)
- result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1]
- block_device = BlockDevice(f"/dev/{result}")
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
- # Once we figured the block device out, we can properly create the partition object
- partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
- mounts.append(partition)
+ for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ if mountpoint in block_devices_mountpoints:
+ if mountpoint not in mounts:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
+ # If the already defined mountpoint is a DMCryptDev, and the newly found
+ # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
+ elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- for child in target.get('children', []):
- mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target']))
+ log(f"Available partitions: {mounts}", level=logging.DEBUG)
return mounts
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
new file mode 100644
index 00000000..91ec6d25
--- /dev/null
+++ b/archinstall/lib/disk/mapperdev.py
@@ -0,0 +1,83 @@
+import glob
+import pathlib
+import logging
+import json
+from dataclasses import dataclass
+from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING
+
+from ..exceptions import SysCallError
+from ..general import SysCommand
+from ..output import log
+
+if TYPE_CHECKING:
+ from .btrfs import BtrfsSubvolume
+
+@dataclass
+class MapperDev:
+ mappername :str
+
+ @property
+ def name(self):
+ return self.mappername
+
+ @property
+ def path(self):
+ return f"/dev/mapper/{self.mappername}"
+
+ @property
+ def partition(self):
+ from .helpers import uevent, get_parent_of_partition
+ from .partition import Partition
+ from .blockdevice import BlockDevice
+
+ for mapper in glob.glob('/dev/mapper/*'):
+ path_obj = pathlib.Path(mapper)
+ if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink():
+ dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve()
+
+ for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
+ partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
+
+ try:
+ uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
+ except SysCallError as error:
+ log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
+
+ information = uevent(uevent_data)
+ block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
+
+ return Partition(information['DEVNAME'], block_device)
+
+ raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device")
+
+ @property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow")
+ pass
+
+ return None
+
+ @property
+ def mount_information(self) -> List[Dict[str, Any]]:
+ from .helpers import find_mountpoint
+ return list(find_mountpoint(self.path))
+
+ @property
+ def filesystem(self) -> Optional[str]:
+ from .helpers import get_filesystem_type
+ return get_filesystem_type(self.path)
+
+ @property
+ def subvolumes(self) -> Iterator['BtrfsSubvolume']:
+ from .btrfs import get_subvolumes_from_findmnt
+
+ for mountpoint in self.mount_information:
+ for result in get_subvolumes_from_findmnt(mountpoint):
+ yield result \ No newline at end of file
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 7bfde64c..708edd29 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -5,15 +5,15 @@ import logging
import json
import os
import hashlib
-from typing import Optional, Dict, Any, List, Union
+from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
-from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
+from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
-
+from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume
class Partition:
def __init__(self,
@@ -29,9 +29,11 @@ class Partition:
part_id = os.path.basename(path)
self.block_device = block_device
+ if type(self.block_device) is str:
+ raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
+
self.path = path
self.part_id = part_id
- self.mountpoint = mountpoint
self.target_mountpoint = mountpoint
self.filesystem = filesystem
self._encrypted = None
@@ -42,20 +44,12 @@ class Partition:
self.mount(mountpoint)
try:
- mount_information = get_mount_info(self.path)
+ self.mount_information = list(find_mountpoint(self.path))
except DiskError:
- mount_information = {}
-
- if mount_information.get('target', None):
- if self.mountpoint != mount_information.get('target', None) and mountpoint:
- raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
-
- if target := mount_information.get('target', None):
- self.mountpoint = target
+ self.mount_information = [{}]
if not self.filesystem and autodetect_filesystem:
- if fstype := mount_information.get('fstype', get_filesystem_type(path)):
- self.filesystem = fstype
+ self.filesystem = get_filesystem_type(path)
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
@@ -98,6 +92,20 @@ class Partition:
}
@property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG)
+ pass
+
+ return None
+
+ @property
def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
@@ -135,14 +143,16 @@ class Partition:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
- if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}")).exit_code == 0:
- lsblk = json.loads(handle.decode('UTF-8'))
+ try:
+ lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
for device in lsblk['blockdevices']:
return convert_size_to_gb(device['size'])
- elif handle.exit_code == 8192:
- # Device is not a block device
- return None
+ except SysCallError as error:
+ if error.exit_code == 8192:
+ return None
+ else:
+ raise error
time.sleep(storage['DISK_TIMEOUTS'])
@@ -200,7 +210,14 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
self.partprobe()
- return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
+ try:
+ return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
+ except SysCallError as error:
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
+ return None
+
+ raise DiskError(f"Could not get PARTUUID of partition {self}: {error}")
@property
def encrypted(self) -> Union[bool, None]:
@@ -237,6 +254,12 @@ class Partition:
device_path, bind_name = split_bind_name(self.path)
return bind_name
+ @property
+ def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ for mountpoint in self.mount_information:
+ for result in get_subvolumes_from_findmnt(mountpoint):
+ yield result
+
def partprobe(self) -> bool:
if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0:
time.sleep(1)
@@ -411,7 +434,6 @@ class Partition:
except SysCallError as err:
raise err
- self.mountpoint = target
return True
return False
@@ -425,7 +447,6 @@ class Partition:
if 0 < worker.exit_code < 8000:
raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
- self.mountpoint = None
return True
def umount(self) -> bool: