Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk
diff options
context:
space:
mode:
authorAnton Hvornum <anton@hvornum.se>2022-02-08 23:21:20 +0100
committerGitHub <noreply@github.com>2022-02-08 23:21:20 +0100
commitd3b6832345c9b5cad54951d4f976ebdbfe59a086 (patch)
treefd611b06cd1aa7709fb3b5128969a673e0fdca80 /archinstall/lib/disk
parentfeffa69042ec537eab6d78597c134693546a2b93 (diff)
Change usage `lsblk` to `blkid` where possible. And general improvements to disk handling. (#949)
* Renamed all_disks() to all_blockdevices() as it's technically a bit more correct. As it would return partitions as well. And it now distinguishes between BlockDevice() and Partition() when returning values. Also lsblk has been replaced with blkid and glob.glob() on /sys/class/block/ * Added handling of loop devices * Added device information enrichment * Removed old code * Updated the usage of blockdevice.info['type'] to 'DEVTYPE' as per returned by blkid instead of lsblk * Created a MapperDev() and DMCryptDev() 'disk' classes. This should help differentiate between BlockDevice(), Partition() and crypt-specific devices. Due to some new helper functions (mentioned later) BlockDevice().device_or_backfile has been simplified thanks to the information being pre-existing instead of having to call losetup. BlockDevice().mountpoint has been added as a dummy function. archinstall.enrich_blockdevice_information() has been added to enrich information extracted from blkid. archinstall.uevent() has been created to parse blkdid data when in -o export format, also eats up /sys/class/x/y/uevent data which is of the same format. all_blockdevices() now returns mapper devices found under /dev/mapper/ and properly returns the different types, not just BlockDevice() for everything. archinstall.get_parent_of_partition() has been added, which can translate /dev/sda1 to /dev/sda via strings by using /sys/class/block/ - note here tho that it has to be a blockdevice. Other parents won't return properly. archinstall.find_mountpoint() was created to be able to find mountpoints for a device, rather than the other way around which get_mount_info() handles. find_mountpoint() will convert /dev/sda1 -> /boot for instance, whereas get_mount_info('/boot') will convert /boot -> /dev/sda1. archinstall.get_partitions_in_use() will now properly traverse the entire structure, not just one level deep. This called for the creation of get_all_targets() which will take a dictionary structure and look for 'target': '/path' and return all '/path' values, recursively without being trapped in get_partitions_in_use() which has recursive targets. get_partitions_in_use() now returns a dictionary instead of a list, as it also returns the mountpoint and associated device. It will also prioritize MapperDev over DMCryptDev (as MapperDev is a child of DMCryptDev and can be resolved, other way around is 'trickier'). * Reverted Installer().partitions to return only the partitions. It's a slight speed defecit by not returning the mountpoints directly as {mntpoint: partition}, because the partition.mountpoint is a bit slow. But it avoids breaking legacy code :) * Fixed a few imports, and added MapperDev.filesystem * Fixed so that Installer().detect_encryption() uses the new separated logic of MapperDev/DMCryptDev/Partition * Fixing a translation issue on hostname * Added DMCryptDev.filesystem * Added back archinstall.all_disks() for legacy reasons. * Added a deprecation warning to archinstall.all_disks() * Added a enrichment check for dm-crypt specific devices, which in turn sets DMCRYPT_NAME on the device information, making it easier to detect and target DMCryptDev when needed. This should avoid issues with loopdevices being detected as DMCryptDev. Also some minor tweaks to variable names on unencrypted setups * Made debug output hidden without --verbose. Also added get_subvolume_info() which takes a path and returns any 'btrfs subvol show' information on the path * Partition().subvolumes has been added. Which generates an iterator over potential subvolumes based on mountpoint. Partition().mount_information is now a list of mount-destinations, as one partition should be allowed to be mounted to several places (more commonly used in btrfs subvolumes). BtrfsSubvolume() has been added as a 'device type', which should make it more easy to recognize a BtrfsSubvolume object right out of the gate. Only found from Partition().subvolumes currently. * Parameter fix for --verbose in log() * Made sure loopdev devices have a PATH key as well, to make dealing with them as blockdevice as seamless as possible. * Added backup information grabbing on uninitated devices * Tweaked backup option if losetup cannot find the blockdev * looks like losetup doesn't exit with a bad code even when it should. Raising ourselves when information is empty. * Fixed structure returned from get_blockdevice_uevent() * Made sure that fallback to blkid and loopdev info returns a PTTYPE (Partition Table Type) that is None, since we can't locate it yet (empty drive). But it's not False either, which is what logic after is looking for * Deprecated split_bind_name() and added more debugging * get_partitions_in_use() now only iterates over Partition() objects. And properly checks all mountpoints, not only the first. * Flipped SysCallError to show the last 500 bytes if data rather than the first, to catch the actual errors. * Removed anonymous parameters to all_blockdevices() * Added .mount_information to MapperDev * Added typing annotations * Partition().mountpoint is now a @property that will live-report the current mountpoint. * Removed the need for setting Partition().mountpoint = X when calling .mount() and .unmount(), as it's live-reported instead. * Added .subvolumes to MapperDev() * Added debug information * Muting F841 in helpers.py * Tweaked debug information in get_mount_info() * Minimized log output by moving Partition().mountpoint to DEBUG, as partitions are allowed to not be mounted * Simplified Installer().add_bootloader() to act as a router to the new split function, one for each bootloader type. This since flake8 complained about complexity and I agree that it became to wild.
Diffstat (limited to 'archinstall/lib/disk')
-rw-r--r--archinstall/lib/disk/blockdevice.py32
-rw-r--r--archinstall/lib/disk/btrfs.py56
-rw-r--r--archinstall/lib/disk/dmcryptdev.py48
-rw-r--r--archinstall/lib/disk/helpers.py241
-rw-r--r--archinstall/lib/disk/mapperdev.py83
-rw-r--r--archinstall/lib/disk/partition.py67
6 files changed, 447 insertions, 80 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index fac258ef..ff741f18 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -16,10 +16,10 @@ from ..storage import storage
class BlockDevice:
def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
- from .helpers import all_disks
+ from .helpers import all_blockdevices
# If we don't give any information, we need to auto-fill it.
# Otherwise any subsequent usage will break.
- info = all_disks()[path].info
+ info = all_blockdevices(partitions=False)[path].info
self.path = path
self.info = info
@@ -78,16 +78,20 @@ class BlockDevice:
If it's a loop-back-device it returns the back-file,
For other types it return self.device
"""
- if self.info['type'] == 'loop':
- for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == self.path:
- continue
-
- return drive['back-file']
+ if self.info.get('type') == 'loop':
+ return self.info['back-file']
else:
return self.device
@property
+ def mountpoint(self) -> None:
+ """
+ A dummy function to enable transparent comparisons of mountpoints.
+ As blockdevices can't be mounted directly, this will always be None
+ """
+ return None
+
+ @property
def device(self) -> str:
"""
Returns the device file of the BlockDevice.
@@ -95,20 +99,20 @@ class BlockDevice:
If it's a ATA-drive it returns the /dev/X device
And if it's a crypto-device it returns the parent device
"""
- if "type" not in self.info:
+ if "DEVTYPE" not in self.info:
raise DiskError(f'Could not locate backplane info for "{self.path}"')
- if self.info['type'] in ['disk','loop']:
+ if self.info['DEVTYPE'] in ['disk','loop']:
return self.path
- elif self.info['type'][:4] == 'raid':
+ elif self.info['DEVTYPE'][:4] == 'raid':
# This should catch /dev/md## raid devices
return self.path
- elif self.info['type'] == 'crypt':
+ elif self.info['DEVTYPE'] == 'crypt':
if 'pkname' not in self.info:
raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
- log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
+ log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
# if not stat.S_ISBLK(os.stat(full_path).st_mode):
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@@ -195,7 +199,7 @@ class BlockDevice:
_, start, end, size, *_ = free_space.strip('\r\n;').split(':')
yield (start, end, size)
except SysCallError as error:
- log(f"Could not get free space on {self.path}: {error}", level=logging.INFO)
+ log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
@property
def largest_free_space(self) -> List[str]:
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py
index ad8d0a52..f2da957f 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs.py
@@ -2,7 +2,9 @@ from __future__ import annotations
import pathlib
import glob
import logging
-from typing import Union, Dict, TYPE_CHECKING
+import re
+from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from dataclasses import dataclass
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -11,7 +13,49 @@ from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
from ..output import log
-
+from ..exceptions import SysCallError
+
+@dataclass
+class BtrfsSubvolume:
+ target :str
+ source :str
+ fstype :str
+ name :str
+ options :str
+ root :bool = False
+
+def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]:
+ if '@' in struct['source']:
+ subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1]
+ struct['source'] = struct['source'].replace(f"[{subvolume}]", "")
+ yield BtrfsSubvolume(
+ target=struct['target'],
+ source=struct['source'],
+ fstype=struct['fstype'],
+ name=subvolume,
+ options=struct['options'],
+ root=index == 0
+ )
+ index += 1
+
+ for child in struct.get('children', []):
+ for item in get_subvolumes_from_findmnt(child, index=index):
+ yield item
+ index += 1
+
+def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
+ try:
+ output = SysCommand(f"btrfs subvol show {path}").decode()
+ except SysCallError as error:
+ print('Error:', error)
+
+ result = {}
+ for line in output.replace('\r\n', '\n').split('\n'):
+ if ':' in line:
+ key, val = line.replace('\t', '').split(':', 1)
+ result[key.strip().lower().replace(' ', '_')] = val.strip()
+
+ return result
def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
@@ -24,7 +68,7 @@ def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.P
This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure.
Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name]
"""
- log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING)
+ log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING)
installation_mountpoint = installation.target
if type(installation_mountpoint) == str:
installation_mountpoint = pathlib.Path(installation_mountpoint)
@@ -179,11 +223,7 @@ def manage_btrfs_subvolumes(installation :Installer,
# As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
# As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
- # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'.
- # If i don't reset it, process will abort as "already mounted' .
- # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice
- fake_partition['device_instance'].mountpoint = None
- #
+
# Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
# as "normal" ones
mountpoints.append(fake_partition)
diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py
new file mode 100644
index 00000000..63392ffb
--- /dev/null
+++ b/archinstall/lib/disk/dmcryptdev.py
@@ -0,0 +1,48 @@
+import pathlib
+import logging
+import json
+from dataclasses import dataclass
+from typing import Optional
+from ..exceptions import SysCallError
+from ..general import SysCommand
+from ..output import log
+from .mapperdev import MapperDev
+
+@dataclass
+class DMCryptDev:
+ dev_path :pathlib.Path
+
+ @property
+ def name(self):
+ with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh:
+ return fh.read().strip()
+
+ @property
+ def path(self):
+ return f"/dev/mapper/{self.dev_path}"
+
+ @property
+ def blockdev(self):
+ pass
+
+ @property
+ def MapperDev(self):
+ return MapperDev(mappername=self.name)
+
+ @property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow")
+ pass
+
+ return None
+
+ @property
+ def filesystem(self) -> Optional[str]:
+ return self.MapperDev.filesystem \ No newline at end of file
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index b04e2740..afaf9e5e 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -5,12 +5,15 @@ import os
import pathlib
import re
import time
+import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from .blockdevice import BlockDevice
+from .dmcryptdev import DMCryptDev
+from .mapperdev import MapperDev
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
from ..output import log
@@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
return
return True
-# lsblk --json -l -n -o path
-def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
- kwargs.setdefault("partitions", False)
- drives = {}
- lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
- for drive in lsblk['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part':
+def cleanup_bash_escapes(data :str) -> str:
+ return data.replace(r'\ ', ' ')
+
+def blkid(cmd :str) -> Dict[str, Any]:
+ if '-o' in cmd and '-o export' not in cmd:
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ elif '-o' not in cmd:
+ cmd += ' -o export'
+
+ try:
+ raw_data = SysCommand(cmd).decode()
+ except SysCallError as error:
+ log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
+ raise error
+
+ result = {}
+ # Process the raw result
+ devname = None
+ for line in raw_data.split('\r\n'):
+ if not len(line):
+ devname = None
+ continue
+
+ key, val = line.split('=', 1)
+ if key.lower() == 'devname':
+ devname = val
+ # Lowercase for backwards compatability with all_disks() previous use cases
+ result[devname] = {
+ "path": devname,
+ "PATH": devname
+ }
+ continue
+
+ result[devname][key] = cleanup_bash_escapes(val)
+
+ return result
+
+def get_loop_info(path :str) -> Dict[str, Any]:
+ for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == path:
continue
- drives[drive['path']] = BlockDevice(drive['path'], drive)
+ return {
+ path: {
+ **drive,
+ 'type' : 'loop',
+ 'TYPE' : 'loop',
+ 'DEVTYPE' : 'loop',
+ 'PATH' : drive['name'],
+ 'path' : drive['name']
+ }
+ }
+
+ return {}
+
+def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
+ result = {}
+ for device_path, device_information in information.items():
+ dev_name = pathlib.Path(device_information['PATH']).name
+ if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
+ with dmcrypt_name.open('r') as fh:
+ device_information['DMCRYPT_NAME'] = fh.read().strip()
+
+ result[device_path] = device_information
+
+ return result
+
+def uevent(data :str) -> Dict[str, Any]:
+ information = {}
+
+ for line in data.replace('\r\n', '\n').split('\n'):
+ if len((line := line.strip())):
+ key, val = line.split('=', 1)
+ information[key] = val
+
+ return information
+
+def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
+ device_information = {}
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ return {
+ f"/dev/{dev_name}" : {
+ **device_information,
+ 'path' : f'/dev/{dev_name}',
+ 'PATH' : f'/dev/{dev_name}',
+ 'PTTYPE' : None
+ }
+ }
+
+def all_disks() -> List[BlockDevice]:
+ log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
+ return all_blockdevices(partitions=False, mappers=False)
+
+def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]:
+ """
+ Returns BlockDevice() and Partition() objects for all available devices.
+ """
+ from .partition import Partition
- return drives
+ instances = {}
+ # Due to lsblk being highly unreliable for this use case,
+ # we'll iterate the /sys/class definitions and find the information
+ # from there.
+ for block_device in glob.glob("/sys/class/block/*"):
+ device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ try:
+ information = blkid(f'blkid -p -o export {device_path}')
+
+ # TODO: No idea why F841 is raised here:
+ except SysCallError as error: # noqa: F841
+ if error.exit_code in (512, 2):
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError("Could not get loop information", exit_code=1)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
+ else:
+ raise error
+
+ information = enrich_blockdevice_information(information)
+
+ for path, path_info in information.items():
+ if path_info.get('DMCRYPT_NAME'):
+ instances[path] = DMCryptDev(dev_path=path)
+ elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
+ if partitions:
+ instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path))))
+ elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
+ instances[path] = BlockDevice(path, path_info)
+ elif path_info.get('TYPE') == 'squashfs':
+ # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
+ continue
+ else:
+ log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
+
+ if mappers:
+ for block_device in glob.glob("/dev/mapper/*"):
+ if (pathobj := pathlib.Path(block_device)).is_symlink():
+ instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
+
+ return instances
+
+
+def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
+ partition_name = path.name
+ pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
+ return f"/dev/{pci_device.parent.name}"
def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_disks()
+ collection = all_blockdevices(partitions=False)
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
continue
@@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :
return collection[drive]
def split_bind_name(path :Union[pathlib.Path, str]) -> list:
+ # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
# we check for the bind notation. if exist we'll only use the "true" device path
if '[' in str(path) : # is a bind path (btrfs subvolume path)
device_path, bind_path = str(path).split('[')
@@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
+def find_mountpoint(device_path :str) -> Dict[str, Any]:
+ try:
+ for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
+ yield filesystem
+ except SysCallError:
+ return {}
+
def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
- device_path,bind_path = split_bind_name(path)
+ device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
- log(f"Getting mount information for device path {traversal}", level=logging.INFO)
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
break
- except SysCallError:
+
+ except SysCallError as error:
+ print('ERROR:', error)
pass
if not traverse:
break
if not output:
- raise DiskError(f"Could not get mount information for device path {path}")
+ raise DiskError(f"Could not get mount information for device path {device_path}")
output = json.loads(output)
+
# for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
# i.e. the subvolume filesystem we're searching for
if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
+
if 'filesystems' in output:
if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
+ raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
if return_real_path:
return output['filesystems'][0], traversal
@@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur
return {}
+def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
+ for info in data:
+ if info.get('target') not in filters:
+ filters[info.get('target')] = None
+
+ filters.update(get_all_targets(info.get('children', [])))
+
+ return filters
+
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError:
- return []
-
- mounts = []
+ return {}
if not output:
- return []
+ return {}
output = json.loads(output)
- for target in output.get('filesystems', []):
- # We need to create a BlockDevice() instead of 'None' here when creaiting Partition()
- # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None.
+ # print(output)
- # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device
- # Note: doing print(partition) here will break because the above mentioned issue.
- partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])
- partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ mounts = {}
+
+ block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
+
+ block_devices_mountpoints = {}
+ for blockdev in block_devices_available.values():
+ if not type(blockdev) in (Partition, MapperDev):
+ continue
- # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using
- # (lsblk pkname lists both the partition and blockdevice, BD being the last entry)
- result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1]
- block_device = BlockDevice(f"/dev/{result}")
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
- # Once we figured the block device out, we can properly create the partition object
- partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
- mounts.append(partition)
+ for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ if mountpoint in block_devices_mountpoints:
+ if mountpoint not in mounts:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
+ # If the already defined mountpoint is a DMCryptDev, and the newly found
+ # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
+ elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- for child in target.get('children', []):
- mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target']))
+ log(f"Available partitions: {mounts}", level=logging.DEBUG)
return mounts
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
new file mode 100644
index 00000000..91ec6d25
--- /dev/null
+++ b/archinstall/lib/disk/mapperdev.py
@@ -0,0 +1,83 @@
+import glob
+import pathlib
+import logging
+import json
+from dataclasses import dataclass
+from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING
+
+from ..exceptions import SysCallError
+from ..general import SysCommand
+from ..output import log
+
+if TYPE_CHECKING:
+ from .btrfs import BtrfsSubvolume
+
+@dataclass
+class MapperDev:
+ mappername :str
+
+ @property
+ def name(self):
+ return self.mappername
+
+ @property
+ def path(self):
+ return f"/dev/mapper/{self.mappername}"
+
+ @property
+ def partition(self):
+ from .helpers import uevent, get_parent_of_partition
+ from .partition import Partition
+ from .blockdevice import BlockDevice
+
+ for mapper in glob.glob('/dev/mapper/*'):
+ path_obj = pathlib.Path(mapper)
+ if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink():
+ dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve()
+
+ for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
+ partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
+
+ try:
+ uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
+ except SysCallError as error:
+ log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
+
+ information = uevent(uevent_data)
+ block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
+
+ return Partition(information['DEVNAME'], block_device)
+
+ raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device")
+
+ @property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow")
+ pass
+
+ return None
+
+ @property
+ def mount_information(self) -> List[Dict[str, Any]]:
+ from .helpers import find_mountpoint
+ return list(find_mountpoint(self.path))
+
+ @property
+ def filesystem(self) -> Optional[str]:
+ from .helpers import get_filesystem_type
+ return get_filesystem_type(self.path)
+
+ @property
+ def subvolumes(self) -> Iterator['BtrfsSubvolume']:
+ from .btrfs import get_subvolumes_from_findmnt
+
+ for mountpoint in self.mount_information:
+ for result in get_subvolumes_from_findmnt(mountpoint):
+ yield result \ No newline at end of file
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 7bfde64c..708edd29 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -5,15 +5,15 @@ import logging
import json
import os
import hashlib
-from typing import Optional, Dict, Any, List, Union
+from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
-from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
+from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
-
+from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume
class Partition:
def __init__(self,
@@ -29,9 +29,11 @@ class Partition:
part_id = os.path.basename(path)
self.block_device = block_device
+ if type(self.block_device) is str:
+ raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
+
self.path = path
self.part_id = part_id
- self.mountpoint = mountpoint
self.target_mountpoint = mountpoint
self.filesystem = filesystem
self._encrypted = None
@@ -42,20 +44,12 @@ class Partition:
self.mount(mountpoint)
try:
- mount_information = get_mount_info(self.path)
+ self.mount_information = list(find_mountpoint(self.path))
except DiskError:
- mount_information = {}
-
- if mount_information.get('target', None):
- if self.mountpoint != mount_information.get('target', None) and mountpoint:
- raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
-
- if target := mount_information.get('target', None):
- self.mountpoint = target
+ self.mount_information = [{}]
if not self.filesystem and autodetect_filesystem:
- if fstype := mount_information.get('fstype', get_filesystem_type(path)):
- self.filesystem = fstype
+ self.filesystem = get_filesystem_type(path)
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
@@ -98,6 +92,20 @@ class Partition:
}
@property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG)
+ pass
+
+ return None
+
+ @property
def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
@@ -135,14 +143,16 @@ class Partition:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
- if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}")).exit_code == 0:
- lsblk = json.loads(handle.decode('UTF-8'))
+ try:
+ lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
for device in lsblk['blockdevices']:
return convert_size_to_gb(device['size'])
- elif handle.exit_code == 8192:
- # Device is not a block device
- return None
+ except SysCallError as error:
+ if error.exit_code == 8192:
+ return None
+ else:
+ raise error
time.sleep(storage['DISK_TIMEOUTS'])
@@ -200,7 +210,14 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
self.partprobe()
- return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
+ try:
+ return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
+ except SysCallError as error:
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
+ return None
+
+ raise DiskError(f"Could not get PARTUUID of partition {self}: {error}")
@property
def encrypted(self) -> Union[bool, None]:
@@ -237,6 +254,12 @@ class Partition:
device_path, bind_name = split_bind_name(self.path)
return bind_name
+ @property
+ def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ for mountpoint in self.mount_information:
+ for result in get_subvolumes_from_findmnt(mountpoint):
+ yield result
+
def partprobe(self) -> bool:
if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0:
time.sleep(1)
@@ -411,7 +434,6 @@ class Partition:
except SysCallError as err:
raise err
- self.mountpoint = target
return True
return False
@@ -425,7 +447,6 @@ class Partition:
if 0 < worker.exit_code < 8000:
raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
- self.mountpoint = None
return True
def umount(self) -> bool: