index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | archinstall/lib/disk/blockdevice.py | 32 | ||||
-rw-r--r-- | archinstall/lib/disk/btrfs.py | 56 | ||||
-rw-r--r-- | archinstall/lib/disk/dmcryptdev.py | 48 | ||||
-rw-r--r-- | archinstall/lib/disk/helpers.py | 241 | ||||
-rw-r--r-- | archinstall/lib/disk/mapperdev.py | 83 | ||||
-rw-r--r-- | archinstall/lib/disk/partition.py | 67 | ||||
-rw-r--r-- | archinstall/lib/general.py | 2 | ||||
-rw-r--r-- | archinstall/lib/installer.py | 365 | ||||
-rw-r--r-- | archinstall/lib/output.py | 5 | ||||
-rw-r--r-- | archinstall/lib/user_interaction.py | 6 | ||||
-rw-r--r-- | docs/archinstall/general.rst | 2 | ||||
-rw-r--r-- | examples/minimal.py | 2 | ||||
-rw-r--r-- | profiles/52-54-00-12-34-56.py | 2 |
@@ -67,7 +67,7 @@ However, assuming you're building your own ISO and want to create an automated i import archinstall, getpass # Select a harddrive and a disk password -harddrive = archinstall.select_disk(archinstall.all_disks()) +harddrive = archinstall.select_disk(archinstall.all_blockdevices(partitions=False)) disk_password = getpass.getpass(prompt='Disk password (won\'t echo): ') # We disable safety precautions in the library that protects the partitions diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index fac258ef..ff741f18 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -16,10 +16,10 @@ from ..storage import storage class BlockDevice: def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: - from .helpers import all_disks + from .helpers import all_blockdevices # If we don't give any information, we need to auto-fill it. # Otherwise any subsequent usage will break. - info = all_disks()[path].info + info = all_blockdevices(partitions=False)[path].info self.path = path self.info = info @@ -78,16 +78,20 @@ class BlockDevice: If it's a loop-back-device it returns the back-file, For other types it return self.device """ - if self.info['type'] == 'loop': - for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: - if not drive['name'] == self.path: - continue - - return drive['back-file'] + if self.info.get('type') == 'loop': + return self.info['back-file'] else: return self.device @property + def mountpoint(self) -> None: + """ + A dummy function to enable transparent comparisons of mountpoints. + As blockdevices can't be mounted directly, this will always be None + """ + return None + + @property def device(self) -> str: """ Returns the device file of the BlockDevice. @@ -95,20 +99,20 @@ class BlockDevice: If it's a ATA-drive it returns the /dev/X device And if it's a crypto-device it returns the parent device """ - if "type" not in self.info: + if "DEVTYPE" not in self.info: raise DiskError(f'Could not locate backplane info for "{self.path}"') - if self.info['type'] in ['disk','loop']: + if self.info['DEVTYPE'] in ['disk','loop']: return self.path - elif self.info['type'][:4] == 'raid': + elif self.info['DEVTYPE'][:4] == 'raid': # This should catch /dev/md## raid devices return self.path - elif self.info['type'] == 'crypt': + elif self.info['DEVTYPE'] == 'crypt': if 'pkname' not in self.info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" else: - log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG) + log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG) # if not stat.S_ISBLK(os.stat(full_path).st_mode): # raise DiskError(f'Selected disk "{full_path}" is not a block device.') @@ -195,7 +199,7 @@ class BlockDevice: _, start, end, size, *_ = free_space.strip('\r\n;').split(':') yield (start, end, size) except SysCallError as error: - log(f"Could not get free space on {self.path}: {error}", level=logging.INFO) + log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) @property def largest_free_space(self) -> List[str]: diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py index ad8d0a52..f2da957f 100644 --- a/archinstall/lib/disk/btrfs.py +++ b/archinstall/lib/disk/btrfs.py @@ -2,7 +2,9 @@ from __future__ import annotations import pathlib import glob import logging -from typing import Union, Dict, TYPE_CHECKING +import re +from typing import Union, Dict, TYPE_CHECKING, Any, Iterator +from dataclasses import dataclass # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -11,7 +13,49 @@ from .helpers import get_mount_info from ..exceptions import DiskError from ..general import SysCommand from ..output import log - +from ..exceptions import SysCallError + +@dataclass +class BtrfsSubvolume: + target :str + source :str + fstype :str + name :str + options :str + root :bool = False + +def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]: + if '@' in struct['source']: + subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1] + struct['source'] = struct['source'].replace(f"[{subvolume}]", "") + yield BtrfsSubvolume( + target=struct['target'], + source=struct['source'], + fstype=struct['fstype'], + name=subvolume, + options=struct['options'], + root=index == 0 + ) + index += 1 + + for child in struct.get('children', []): + for item in get_subvolumes_from_findmnt(child, index=index): + yield item + index += 1 + +def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: + try: + output = SysCommand(f"btrfs subvol show {path}").decode() + except SysCallError as error: + print('Error:', error) + + result = {} + for line in output.replace('\r\n', '\n').split('\n'): + if ':' in line: + key, val = line.replace('\t', '').split(':', 1) + result[key.strip().lower().replace(' ', '_')] = val.strip() + + return result def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool: """ @@ -24,7 +68,7 @@ def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.P This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure. Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name] """ - log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING) + log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING) installation_mountpoint = installation.target if type(installation_mountpoint) == str: installation_mountpoint = pathlib.Path(installation_mountpoint) @@ -179,11 +223,7 @@ def manage_btrfs_subvolumes(installation :Installer, # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" - # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'. - # If i don't reset it, process will abort as "already mounted' . - # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice - fake_partition['device_instance'].mountpoint = None - # + # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, # as "normal" ones mountpoints.append(fake_partition) diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py new file mode 100644 index 00000000..63392ffb --- /dev/null +++ b/archinstall/lib/disk/dmcryptdev.py @@ -0,0 +1,48 @@ +import pathlib +import logging +import json +from dataclasses import dataclass +from typing import Optional +from ..exceptions import SysCallError +from ..general import SysCommand +from ..output import log +from .mapperdev import MapperDev + +@dataclass +class DMCryptDev: + dev_path :pathlib.Path + + @property + def name(self): + with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh: + return fh.read().strip() + + @property + def path(self): + return f"/dev/mapper/{self.dev_path}" + + @property + def blockdev(self): + pass + + @property + def MapperDev(self): + return MapperDev(mappername=self.name) + + @property + def mountpoint(self) -> Optional[str]: + try: + data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode()) + for filesystem in data['filesystems']: + return filesystem.get('target') + + except SysCallError as error: + # Not mounted anywhere most likely + log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow") + pass + + return None + + @property + def filesystem(self) -> Optional[str]: + return self.MapperDev.filesystem
\ No newline at end of file diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index b04e2740..afaf9e5e 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -5,12 +5,15 @@ import os import pathlib import re import time +import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: from .partition import Partition from .blockdevice import BlockDevice +from .dmcryptdev import DMCryptDev +from .mapperdev import MapperDev from ..exceptions import SysCallError, DiskError from ..general import SysCommand from ..output import log @@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]: return return True -# lsblk --json -l -n -o path -def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]: - kwargs.setdefault("partitions", False) - drives = {} - lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8')) - for drive in lsblk['blockdevices']: - if not kwargs['partitions'] and drive['type'] == 'part': +def cleanup_bash_escapes(data :str) -> str: + return data.replace(r'\ ', ' ') + +def blkid(cmd :str) -> Dict[str, Any]: + if '-o' in cmd and '-o export' not in cmd: + raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") + elif '-o' not in cmd: + cmd += ' -o export' + + try: + raw_data = SysCommand(cmd).decode() + except SysCallError as error: + log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG) + raise error + + result = {} + # Process the raw result + devname = None + for line in raw_data.split('\r\n'): + if not len(line): + devname = None + continue + + key, val = line.split('=', 1) + if key.lower() == 'devname': + devname = val + # Lowercase for backwards compatability with all_disks() previous use cases + result[devname] = { + "path": devname, + "PATH": devname + } + continue + + result[devname][key] = cleanup_bash_escapes(val) + + return result + +def get_loop_info(path :str) -> Dict[str, Any]: + for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']: + if not drive['name'] == path: continue - drives[drive['path']] = BlockDevice(drive['path'], drive) + return { + path: { + **drive, + 'type' : 'loop', + 'TYPE' : 'loop', + 'DEVTYPE' : 'loop', + 'PATH' : drive['name'], + 'path' : drive['name'] + } + } + + return {} + +def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]: + result = {} + for device_path, device_information in information.items(): + dev_name = pathlib.Path(device_information['PATH']).name + if not device_information.get('TYPE') or not device_information.get('DEVTYPE'): + with open(f"/sys/class/block/{dev_name}/uevent") as fh: + device_information.update(uevent(fh.read())) + + if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists(): + with dmcrypt_name.open('r') as fh: + device_information['DMCRYPT_NAME'] = fh.read().strip() + + result[device_path] = device_information + + return result + +def uevent(data :str) -> Dict[str, Any]: + information = {} + + for line in data.replace('\r\n', '\n').split('\n'): + if len((line := line.strip())): + key, val = line.split('=', 1) + information[key] = val + + return information + +def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]: + device_information = {} + with open(f"/sys/class/block/{dev_name}/uevent") as fh: + device_information.update(uevent(fh.read())) + + return { + f"/dev/{dev_name}" : { + **device_information, + 'path' : f'/dev/{dev_name}', + 'PATH' : f'/dev/{dev_name}', + 'PTTYPE' : None + } + } + +def all_disks() -> List[BlockDevice]: + log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow") + return all_blockdevices(partitions=False, mappers=False) + +def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]: + """ + Returns BlockDevice() and Partition() objects for all available devices. + """ + from .partition import Partition - return drives + instances = {} + # Due to lsblk being highly unreliable for this use case, + # we'll iterate the /sys/class definitions and find the information + # from there. + for block_device in glob.glob("/sys/class/block/*"): + device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" + try: + information = blkid(f'blkid -p -o export {device_path}') + + # TODO: No idea why F841 is raised here: + except SysCallError as error: # noqa: F841 + if error.exit_code in (512, 2): + # Assume that it's a loop device, and try to get info on it + try: + information = get_loop_info(device_path) + if not information: + raise SysCallError("Could not get loop information", exit_code=1) + + except SysCallError: + information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) + else: + raise error + + information = enrich_blockdevice_information(information) + + for path, path_info in information.items(): + if path_info.get('DMCRYPT_NAME'): + instances[path] = DMCryptDev(dev_path=path) + elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'): + if partitions: + instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path)))) + elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': + instances[path] = BlockDevice(path, path_info) + elif path_info.get('TYPE') == 'squashfs': + # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) + continue + else: + log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow") + + if mappers: + for block_device in glob.glob("/dev/mapper/*"): + if (pathobj := pathlib.Path(block_device)).is_symlink(): + instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name) + + return instances + + +def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path: + partition_name = path.name + pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve() + return f"/dev/{pci_device.parent.name}" def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]: - collection = all_disks() + collection = all_blockdevices(partitions=False) for drive in collection: if size and convert_to_gigabytes(collection[drive]['size']) != size: continue @@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy : return collection[drive] def split_bind_name(path :Union[pathlib.Path, str]) -> list: + # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow") # we check for the bind notation. if exist we'll only use the "true" device path if '[' in str(path) : # is a bind path (btrfs subvolume path) device_path, bind_path = str(path).split('[') @@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list: bind_path = None return device_path,bind_path +def find_mountpoint(device_path :str) -> Dict[str, Any]: + try: + for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']: + yield filesystem + except SysCallError: + return {} + def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]: - device_path,bind_path = split_bind_name(path) + device_path, bind_path = split_bind_name(path) output = {} for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))): try: - log(f"Getting mount information for device path {traversal}", level=logging.INFO) + log(f"Getting mount information for device path {traversal}", level=logging.DEBUG) if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')): break - except SysCallError: + + except SysCallError as error: + print('ERROR:', error) pass if not traverse: break if not output: - raise DiskError(f"Could not get mount information for device path {path}") + raise DiskError(f"Could not get mount information for device path {device_path}") output = json.loads(output) + # for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter # i.e. the subvolume filesystem we're searching for if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None: output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)] + if 'filesystems' in output: if len(output['filesystems']) > 1: - raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}") + raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}") if return_real_path: return output['filesystems'][0], traversal @@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur return {} +def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]: + for info in data: + if info.get('target') not in filters: + filters[info.get('target')] = None + + filters.update(get_all_targets(info.get('children', []))) + + return filters + def get_partitions_in_use(mountpoint :str) -> List[Partition]: from .partition import Partition try: output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8') except SysCallError: - return [] - - mounts = [] + return {} if not output: - return [] + return {} output = json.loads(output) - for target in output.get('filesystems', []): - # We need to create a BlockDevice() instead of 'None' here when creaiting Partition() - # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None. + # print(output) - # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device - # Note: doing print(partition) here will break because the above mentioned issue. - partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target']) - partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target']) + mounts = {} + + block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True) + + block_devices_mountpoints = {} + for blockdev in block_devices_available.values(): + if not type(blockdev) in (Partition, MapperDev): + continue - # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using - # (lsblk pkname lists both the partition and blockdevice, BD being the last entry) - result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1] - block_device = BlockDevice(f"/dev/{result}") + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev - # Once we figured the block device out, we can properly create the partition object - partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target']) + log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) - mounts.append(partition) + for mountpoint in list(get_all_targets(output['filesystems']).keys()): + if mountpoint in block_devices_mountpoints: + if mountpoint not in mounts: + mounts[mountpoint] = block_devices_mountpoints[mountpoint] + # If the already defined mountpoint is a DMCryptDev, and the newly found + # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition. + elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev: + mounts[mountpoint] = block_devices_mountpoints[mountpoint] - for child in target.get('children', []): - mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target'])) + log(f"Available partitions: {mounts}", level=logging.DEBUG) return mounts diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py new file mode 100644 index 00000000..91ec6d25 --- /dev/null +++ b/archinstall/lib/disk/mapperdev.py @@ -0,0 +1,83 @@ +import glob +import pathlib +import logging +import json +from dataclasses import dataclass +from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING + +from ..exceptions import SysCallError +from ..general import SysCommand +from ..output import log + +if TYPE_CHECKING: + from .btrfs import BtrfsSubvolume + +@dataclass +class MapperDev: + mappername :str + + @property + def name(self): + return self.mappername + + @property + def path(self): + return f"/dev/mapper/{self.mappername}" + + @property + def partition(self): + from .helpers import uevent, get_parent_of_partition + from .partition import Partition + from .blockdevice import BlockDevice + + for mapper in glob.glob('/dev/mapper/*'): + path_obj = pathlib.Path(mapper) + if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink(): + dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve() + + for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): + partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name + + try: + uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() + except SysCallError as error: + log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") + + information = uevent(uevent_data) + block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) + + return Partition(information['DEVNAME'], block_device) + + raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device") + + @property + def mountpoint(self) -> Optional[str]: + try: + data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) + for filesystem in data['filesystems']: + return filesystem.get('target') + + except SysCallError as error: + # Not mounted anywhere most likely + log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow") + pass + + return None + + @property + def mount_information(self) -> List[Dict[str, Any]]: + from .helpers import find_mountpoint + return list(find_mountpoint(self.path)) + + @property + def filesystem(self) -> Optional[str]: + from .helpers import get_filesystem_type + return get_filesystem_type(self.path) + + @property + def subvolumes(self) -> Iterator['BtrfsSubvolume']: + from .btrfs import get_subvolumes_from_findmnt + + for mountpoint in self.mount_information: + for result in get_subvolumes_from_findmnt(mountpoint): + yield result
\ No newline at end of file diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 7bfde64c..708edd29 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -5,15 +5,15 @@ import logging import json import os import hashlib -from typing import Optional, Dict, Any, List, Union +from typing import Optional, Dict, Any, List, Union, Iterator from .blockdevice import BlockDevice -from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name +from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log from ..general import SysCommand - +from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume class Partition: def __init__(self, @@ -29,9 +29,11 @@ class Partition: part_id = os.path.basename(path) self.block_device = block_device + if type(self.block_device) is str: + raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") + self.path = path self.part_id = part_id - self.mountpoint = mountpoint self.target_mountpoint = mountpoint self.filesystem = filesystem self._encrypted = None @@ -42,20 +44,12 @@ class Partition: self.mount(mountpoint) try: - mount_information = get_mount_info(self.path) + self.mount_information = list(find_mountpoint(self.path)) except DiskError: - mount_information = {} - - if mount_information.get('target', None): - if self.mountpoint != mount_information.get('target', None) and mountpoint: - raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}") - - if target := mount_information.get('target', None): - self.mountpoint = target + self.mount_information = [{}] if not self.filesystem and autodetect_filesystem: - if fstype := mount_information.get('fstype', get_filesystem_type(path)): - self.filesystem = fstype + self.filesystem = get_filesystem_type(path) if self.filesystem == 'crypto_LUKS': self.encrypted = True @@ -98,6 +92,20 @@ class Partition: } @property + def mountpoint(self) -> Optional[str]: + try: + data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) + for filesystem in data['filesystems']: + return filesystem.get('target') + + except SysCallError as error: + # Not mounted anywhere most likely + log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG) + pass + + return None + + @property def sector_size(self) -> Optional[int]: output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) @@ -135,14 +143,16 @@ class Partition: for i in range(storage['DISK_RETRY_ATTEMPTS']): self.partprobe() - if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}")).exit_code == 0: - lsblk = json.loads(handle.decode('UTF-8')) + try: + lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode()) for device in lsblk['blockdevices']: return convert_size_to_gb(device['size']) - elif handle.exit_code == 8192: - # Device is not a block device - return None + except SysCallError as error: + if error.exit_code == 8192: + return None + else: + raise error time.sleep(storage['DISK_TIMEOUTS']) @@ -200,7 +210,14 @@ class Partition: For instance when you want to get a __repr__ of the class. """ self.partprobe() - return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() + try: + return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() + except SysCallError as error: + if self.block_device.info.get('TYPE') == 'iso9660': + # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) + return None + + raise DiskError(f"Could not get PARTUUID of partition {self}: {error}") @property def encrypted(self) -> Union[bool, None]: @@ -237,6 +254,12 @@ class Partition: device_path, bind_name = split_bind_name(self.path) return bind_name + @property + def subvolumes(self) -> Iterator[BtrfsSubvolume]: + for mountpoint in self.mount_information: + for result in get_subvolumes_from_findmnt(mountpoint): + yield result + def partprobe(self) -> bool: if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0: time.sleep(1) @@ -411,7 +434,6 @@ class Partition: except SysCallError as err: raise err - self.mountpoint = target return True return False @@ -425,7 +447,6 @@ class Partition: if 0 < worker.exit_code < 8000: raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) - self.mountpoint = None return True def umount(self) -> bool: diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index ad7b8ad4..174acb8a 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -265,7 +265,7 @@ class SysCommandWorker: log(args[1], level=logging.DEBUG, fg='red') if self.exit_code != 0: - raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[:500]}", self.exit_code) + raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code) def is_alive(self) -> bool: self.poll() diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index daac340b..1ead46c7 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -12,7 +12,7 @@ from .disk import get_partitions_in_use, Partition from .general import SysCommand, generate_password from .hardware import has_uefi, is_vm, cpu_vendor from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout -from .disk.helpers import get_mount_info, split_bind_name +from .disk.helpers import get_mount_info from .mirrors import use_mirrors from .plugins import plugins from .storage import storage @@ -21,7 +21,7 @@ from .output import log from .profiles import Profile from .disk.btrfs import manage_btrfs_subvolumes from .disk.partition import get_mount_fs_type -from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError +from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError # Any package that the Installer() is responsible for (optional and the default ones) __packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"] @@ -165,7 +165,7 @@ class Installer: @property def partitions(self) -> List[Partition]: - return get_partitions_in_use(self.target) + return get_partitions_in_use(self.target).values() def sync_log_to_install_medium(self) -> bool: # Copy over the install log (if there is one) to the install medium if @@ -259,12 +259,15 @@ class Installer: for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']): mountpoint = partition['mountpoint'] log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO) + if partition.get('filesystem',{}).get('mount_options',[]): mount_options = ','.join(partition['filesystem']['mount_options']) partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options) else: partition['device_instance'].mount(f"{self.target}{mountpoint}") + time.sleep(1) + try: get_mount_info(f"{self.target}{mountpoint}", traverse=False) except DiskError: @@ -501,11 +504,17 @@ class Installer: return True def detect_encryption(self, partition :Partition) -> bool: - part = Partition(partition.parent, None, autodetect_filesystem=True) - if partition.encrypted: + from .disk.mapperdev import MapperDev + from .disk.dmcryptdev import DMCryptDev + from .disk.helpers import get_filesystem_type + + if type(partition) is MapperDev: + # Returns MapperDev.partition + return partition.partition + elif type(partition) is DMCryptDev: + return partition.MapperDev.partition + elif get_filesystem_type(partition.path) == 'crypto_LUKS': return partition - elif partition.parent not in partition.path and part.filesystem == 'crypto_LUKS': - return part return False @@ -627,6 +636,184 @@ class Installer: else: raise ValueError(f"Archinstall currently only supports setting up swap on zram") + def add_systemd_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: + self.pacstrap('efibootmgr') + + if not has_uefi(): + raise HardwareIncompatibilityError + # TODO: Ideally we would want to check if another config + # points towards the same disk and/or partition. + # And in which case we should do some clean up. + + # Install the boot loader + if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0: + # Fallback, try creating the boot loader without touching the EFI variables + SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') + + # Ensure that the /boot/loader directory exists before we try to create files in it + if not os.path.exists(f'{self.target}/boot/loader'): + os.makedirs(f'{self.target}/boot/loader') + + # Modify or create a loader.conf + if os.path.isfile(f'{self.target}/boot/loader/loader.conf'): + with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader: + loader_data = loader.read().split('\n') + else: + loader_data = [ + f"default {self.init_time}", + "timeout 5" + ] + + with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader: + for line in loader_data: + if line[:8] == 'default ': + loader.write(f'default {self.init_time}_{self.kernels[0]}\n') + elif line[:8] == '#timeout' and 'timeout 5' not in loader_data: + # We add in the default timeout to support dual-boot + loader.write(f"{line[1:]}\n") + else: + loader.write(f"{line}\n") + + # Ensure that the /boot/loader/entries directory exists before we try to create files in it + if not os.path.exists(f'{self.target}/boot/loader/entries'): + os.makedirs(f'{self.target}/boot/loader/entries') + + for kernel in self.kernels: + # Setup the loader entry + with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}.conf', 'w') as entry: + entry.write('# Created by: archinstall\n') + entry.write(f'# Created on: {self.init_time}\n') + entry.write(f'title Arch Linux ({kernel})\n') + entry.write(f"linux /vmlinuz-{kernel}\n") + if not is_vm(): + vendor = cpu_vendor() + if vendor == "AuthenticAMD": + entry.write("initrd /amd-ucode.img\n") + elif vendor == "GenuineIntel": + entry.write("initrd /intel-ucode.img\n") + else: + self.log("unknow cpu vendor, not adding ucode to systemd-boot config") + entry.write(f"initrd /initramfs-{kernel}.img\n") + # blkid doesn't trigger on loopback devices really well, + # so we'll use the old manual method until we get that sorted out. + root_fs_type = get_mount_fs_type(root_partition.filesystem) + + if root_fs_type is not None: + options_entry = f'rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n' + else: + options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n' + + for subvolume in root_partition.subvolumes: + if subvolume.root is True: + options_entry = f"rootflags=subvol={subvolume.name} " + options_entry + + # Zswap should be disabled when using zram. + # + # https://github.com/archlinux/archinstall/issues/881 + if self.zram_enabled: + options_entry = "zswap.enabled=0 " + options_entry + + if real_device := self.detect_encryption(root_partition): + # TODO: We need to detect if the encrypted device is a whole disk encryption, + # or simply a partition encryption. Right now we assume it's a partition (and we always have) + log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) + entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}') + else: + log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG) + entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}') + + self.helper_flags['bootloader'] = "systemd" + + return True + + def add_grub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: + self.pacstrap('grub') # no need? + + root_fs_type = get_mount_fs_type(root_partition.filesystem) + + if real_device := self.detect_encryption(root_partition): + root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip() + _file = "/etc/default/grub" + add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'" + enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'" + + log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO) + SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") + SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}") + else: + _file = "/etc/default/grub" + add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'" + SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") + + log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO) + if has_uefi(): + self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable') + except SysCallError as error: + raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}") + else: + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc --recheck {boot_partition.parent}') + except SysCallError as error: + raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}") + + try: + SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg') + except SysCallError as error: + raise DiskError(f"Could not configure GRUB: {error}") + + self.helper_flags['bootloader'] = "grub" + + return True + + def add_efistub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool: + self.pacstrap('efibootmgr') + + if not has_uefi(): + raise HardwareIncompatibilityError + # TODO: Ideally we would want to check if another config + # points towards the same disk and/or partition. + # And in which case we should do some clean up. + + root_fs_type = get_mount_fs_type(root_partition.filesystem) + + for kernel in self.kernels: + # Setup the firmware entry + + label = f'Arch Linux ({kernel})' + loader = f"/vmlinuz-{kernel}" + + kernel_parameters = [] + + if not is_vm(): + vendor = cpu_vendor() + if vendor == "AuthenticAMD": + kernel_parameters.append("initrd=\\amd-ucode.img") + elif vendor == "GenuineIntel": + kernel_parameters.append("initrd=\\intel-ucode.img") + else: + self.log("unknow cpu vendor, not adding ucode to firmware boot entry") + + kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img") + + # blkid doesn't trigger on loopback devices really well, + # so we'll use the old manual method until we get that sorted out. + if real_device := self.detect_encryption(root_partition): + # TODO: We need to detect if the encrypted device is a whole disk encryption, + # or simply a partition encryption. Right now we assume it's a partition (and we always have) + log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) + kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + else: + log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG) + kernel_parameters.append(f'root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') + + SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') + + self.helper_flags['bootloader'] = "efistub" + + return True + def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool: """ Adds a bootloader to the installation instance. @@ -648,177 +835,23 @@ class Installer: boot_partition = None root_partition = None - root_partition_fs = None for partition in self.partitions: - if partition.mountpoint == self.target + '/boot': + if partition.mountpoint == os.path.join(self.target, 'boot'): boot_partition = partition elif partition.mountpoint == self.target: root_partition = partition - root_partition_fs = partition.filesystem - root_fs_type = get_mount_fs_type(root_partition_fs) if boot_partition is None or root_partition is None: - raise ValueError(f"Could not detect root (/) or boot (/boot) in {self.target} based on: {self.partitions}") + raise ValueError(f"Could not detect root ({root_partition}) or boot ({boot_partition}) in {self.target} based on: {self.partitions}") self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO) if bootloader == 'systemd-bootctl': - self.pacstrap('efibootmgr') - - if not has_uefi(): - raise HardwareIncompatibilityError - # TODO: Ideally we would want to check if another config - # points towards the same disk and/or partition. - # And in which case we should do some clean up. - - # Install the boot loader - if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0: - # Fallback, try creating the boot loader without touching the EFI variables - SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install') - - # Ensure that the /boot/loader directory exists before we try to create files in it - if not os.path.exists(f'{self.target}/boot/loader'): - os.makedirs(f'{self.target}/boot/loader') - - # Modify or create a loader.conf - if os.path.isfile(f'{self.target}/boot/loader/loader.conf'): - with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader: - loader_data = loader.read().split('\n') - else: - loader_data = [ - f"default {self.init_time}", - "timeout 5" - ] - - with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader: - for line in loader_data: - if line[:8] == 'default ': - loader.write(f'default {self.init_time}_{self.kernels[0]}\n') - elif line[:8] == '#timeout' and 'timeout 5' not in loader_data: - # We add in the default timeout to support dual-boot - loader.write(f"{line[1:]}\n") - else: - loader.write(f"{line}\n") - - # Ensure that the /boot/loader/entries directory exists before we try to create files in it - if not os.path.exists(f'{self.target}/boot/loader/entries'): - os.makedirs(f'{self.target}/boot/loader/entries') - - for kernel in self.kernels: - # Setup the loader entry - with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}.conf', 'w') as entry: - entry.write('# Created by: archinstall\n') - entry.write(f'# Created on: {self.init_time}\n') - entry.write(f'title Arch Linux ({kernel})\n') - entry.write(f"linux /vmlinuz-{kernel}\n") - if not is_vm(): - vendor = cpu_vendor() - if vendor == "AuthenticAMD": - entry.write("initrd /amd-ucode.img\n") - elif vendor == "GenuineIntel": - entry.write("initrd /intel-ucode.img\n") - else: - self.log("unknow cpu vendor, not adding ucode to systemd-boot config") - entry.write(f"initrd /initramfs-{kernel}.img\n") - # blkid doesn't trigger on loopback devices really well, - # so we'll use the old manual method until we get that sorted out. - if root_fs_type is not None: - options_entry = f'rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n' - else: - options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n' - base_path,bind_path = split_bind_name(str(root_partition.path)) - if bind_path is not None: # and root_fs_type == 'btrfs': - options_entry = f"rootflags=subvol={bind_path} " + options_entry - - # Zswap should be disabled when using zram. - # - # https://github.com/archlinux/archinstall/issues/881 - if self.zram_enabled: - options_entry = "zswap.enabled=0 " + options_entry - - if real_device := self.detect_encryption(root_partition): - # TODO: We need to detect if the encrypted device is a whole disk encryption, - # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) - entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}') - else: - log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG) - entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}') - - self.helper_flags['bootloader'] = bootloader - + self.add_systemd_bootloader(boot_partition, root_partition) elif bootloader == "grub-install": - self.pacstrap('grub') # no need? - - if real_device := self.detect_encryption(root_partition): - root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip() - _file = "/etc/default/grub" - add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'" - enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'" - - log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO) - SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") - SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}") - else: - _file = "/etc/default/grub" - add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'" - SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}") - - log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO) - if has_uefi(): - self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead? - if not (handle := SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable')).exit_code == 0: - raise DiskError(f"Could not install GRUB to {self.target}/boot: {handle}") - else: - if not (handle := SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc --recheck {boot_partition.parent}')).exit_code == 0: - raise DiskError(f"Could not install GRUB to {boot_partition.path}: {handle}") - - if not (handle := SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg')).exit_code == 0: - raise DiskError(f"Could not configure GRUB: {handle}") - - self.helper_flags['bootloader'] = True + self.add_grub_bootloader(boot_partition, root_partition) elif bootloader == 'efistub': - self.pacstrap('efibootmgr') - - if not has_uefi(): - raise HardwareIncompatibilityError - # TODO: Ideally we would want to check if another config - # points towards the same disk and/or partition. - # And in which case we should do some clean up. - - for kernel in self.kernels: - # Setup the firmware entry - - label = f'Arch Linux ({kernel})' - loader = f"/vmlinuz-{kernel}" - - kernel_parameters = [] - - if not is_vm(): - vendor = cpu_vendor() - if vendor == "AuthenticAMD": - kernel_parameters.append("initrd=\\amd-ucode.img") - elif vendor == "GenuineIntel": - kernel_parameters.append("initrd=\\intel-ucode.img") - else: - self.log("unknow cpu vendor, not adding ucode to firmware boot entry") - - kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img") - - # blkid doesn't trigger on loopback devices really well, - # so we'll use the old manual method until we get that sorted out. - if real_device := self.detect_encryption(root_partition): - # TODO: We need to detect if the encrypted device is a whole disk encryption, - # or simply a partition encryption. Right now we assume it's a partition (and we always have) - log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') - else: - log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG) - kernel_parameters.append(f'root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}') - - SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose') - - self.helper_flags['bootloader'] = bootloader + self.add_efistub_bootloader(boot_partition, root_partition) else: raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}") diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index ffe00370..fa537dd4 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -105,5 +105,6 @@ def log(*args :str, **kwargs :Union[str, int, Dict[str, Union[str, int]]]) -> No # Finally, print the log unless we skipped it based on level. # We use sys.stdout.write()+flush() instead of print() to try and # fix issue #94 - sys.stdout.write(f"{string}\n") - sys.stdout.flush() + if kwargs.get('level', logging.INFO) != logging.DEBUG or storage['arguments'].get('verbose', False): + sys.stdout.write(f"{string}\n") + sys.stdout.flush() diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py index 7ed143f1..202b14a4 100644 --- a/archinstall/lib/user_interaction.py +++ b/archinstall/lib/user_interaction.py @@ -15,7 +15,7 @@ from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING if TYPE_CHECKING: from .disk.partition import Partition -from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_disks +from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_blockdevices from .exceptions import RequirementError, UserError, DiskError from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics from .locale_helpers import list_keyboard_languages, list_timezones, list_locales @@ -285,7 +285,7 @@ def ask_ntp() -> bool: def ask_hostname(): - hostname = input(_('Desired hostname for the installation: ').strip(' ')) + hostname = input(_('Desired hostname for the installation: ')).strip(' ') return hostname @@ -889,7 +889,7 @@ def select_harddrives() -> Optional[str]: :return: List of selected hard drives :rtype: list """ - hard_drives = all_disks().values() + hard_drives = all_blockdevices(partitions=False).values() options = {f'{option}': option for option in hard_drives} selected_harddrive = Menu( diff --git a/docs/archinstall/general.rst b/docs/archinstall/general.rst index 53847139..970d40f2 100644 --- a/docs/archinstall/general.rst +++ b/docs/archinstall/general.rst @@ -58,7 +58,7 @@ Disk related .. autofunction:: archinstall.device_state -.. autofunction:: archinstall.all_disks +.. autofunction:: archinstall.all_blockdevices Luks (Disk encryption) ====================== diff --git a/examples/minimal.py b/examples/minimal.py index 2aff1305..d80c4045 100644 --- a/examples/minimal.py +++ b/examples/minimal.py @@ -9,7 +9,7 @@ if archinstall.arguments.get('help', None): archinstall.log(" - Optional filesystem type via --filesystem=<fs type>") archinstall.log(" - Optional systemd network via --network") -archinstall.arguments['harddrive'] = archinstall.select_disk(archinstall.all_disks()) +archinstall.arguments['harddrive'] = archinstall.select_disk(archinstall.all_blockdevices()) def install_on(mountpoint): diff --git a/profiles/52-54-00-12-34-56.py b/profiles/52-54-00-12-34-56.py index 68e15720..0a1626d9 100644 --- a/profiles/52-54-00-12-34-56.py +++ b/profiles/52-54-00-12-34-56.py @@ -15,7 +15,7 @@ archinstall.sys_command(f'umount -R /mnt', suppress_errors=True) archinstall.sys_command(f'cryptsetup close /dev/mapper/luksloop', suppress_errors=True) # Select a harddrive and a disk password -harddrive = archinstall.all_disks()['/dev/sda'] +harddrive = archinstall.all_blockdevices()['/dev/sda'] disk_password = '1234' with archinstall.Filesystem(harddrive) as fs: |