index : archinstall32 | |
Archlinux32 installer | gitolite user |
summaryrefslogtreecommitdiff |
diff --git a/archinstall/lib/configuration.py b/archinstall/lib/configuration.py index 510f7103..2a43174d 100644 --- a/archinstall/lib/configuration.py +++ b/archinstall/lib/configuration.py @@ -1,4 +1,6 @@ +import os import json +import stat import logging import pathlib from typing import Optional, Dict @@ -106,23 +108,33 @@ class ConfigurationOutput: def save_user_config(self, dest_path :pathlib.Path = None): if self._is_valid_path(dest_path): - with open(dest_path / self._user_config_file, 'w') as config_file: + target = dest_path / self._user_config_file + + with open(target, 'w') as config_file: config_file.write(self.user_config_to_json()) + os.chmod(str(dest_path / self._user_config_file), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) + def save_user_creds(self, dest_path :pathlib.Path = None): if self._is_valid_path(dest_path): if user_creds := self.user_credentials_to_json(): target = dest_path / self._user_creds_file + with open(target, 'w') as config_file: config_file.write(user_creds) + os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) + def save_disk_layout(self, dest_path :pathlib.Path = None): if self._is_valid_path(dest_path): if disk_layout := self.disk_layout_to_json(): target = dest_path / self._disk_layout_file + with target.open('w') as config_file: config_file.write(disk_layout) + os.chmod(str(target), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) + def save(self, dest_path :pathlib.Path = None): if not dest_path: dest_path = self._default_save_path diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py index c7b69205..736bacbc 100644 --- a/archinstall/lib/disk/blockdevice.py +++ b/archinstall/lib/disk/blockdevice.py @@ -1,13 +1,11 @@ from __future__ import annotations -import os import json import logging import time -from functools import cached_property -from typing import Optional, Dict, Any, Iterator, Tuple, List, TYPE_CHECKING -# https://stackoverflow.com/a/39757388/929999 -if TYPE_CHECKING: - from .partition import Partition + +from collections import OrderedDict +from dataclasses import dataclass +from typing import Optional, Dict, Any, Iterator, List, TYPE_CHECKING from ..exceptions import DiskError, SysCallError from ..output import log @@ -15,18 +13,44 @@ from ..general import SysCommand from ..storage import storage +if TYPE_CHECKING: + from .partition import Partition + _: Any + + +@dataclass +class BlockSizeInfo: + start: str + end: str + size: str + + +@dataclass +class BlockInfo: + pttype: str + ptuuid: str + size: int + tran: Optional[str] + rota: bool + free_space: Optional[List[BlockSizeInfo]] + + class BlockDevice: def __init__(self, path :str, info :Optional[Dict[str, Any]] = None): if not info: from .helpers import all_blockdevices # If we don't give any information, we need to auto-fill it. # Otherwise any subsequent usage will break. - info = all_blockdevices(partitions=False)[path].info + self.info = all_blockdevices(partitions=False)[path].info + else: + self.info = info - self.path = path - self.info = info + self._path = path self.keep_partitions = True - self.part_cache = {} + self._block_info = self._fetch_information() + self._partitions: Dict[str, 'Partition'] = {} + + self._load_partitions() # TODO: Currently disk encryption is a BIT misleading. # It's actually partition-encryption, but for future-proofing this @@ -35,70 +59,113 @@ class BlockDevice: def __repr__(self, *args :str, **kwargs :str) -> str: return self._str_repr - @cached_property + @property + def path(self) -> str: + return self._path + + @property def _str_repr(self) -> str: - return f"BlockDevice({self.device_or_backfile}, size={self._safe_size}GB, free_space={self._safe_free_space}, bus_type={self.bus_type})" - - @cached_property - def display_info(self) -> str: - columns = { - str(_('Device')): self.device_or_backfile, - str(_('Size')): f'{self._safe_size}GB', - str(_('Free space')): f'{self._safe_free_space}', + return f"BlockDevice({self._device_or_backfile}, size={self.size}GB, free_space={self._safe_free_space()}, bus_type={self.bus_type})" + + def as_json(self) -> Dict[str, Any]: + return { + str(_('Device')): self._device_or_backfile, + str(_('Size')): f'{self.size}GB', + str(_('Free space')): f'{self._safe_free_space()}', str(_('Bus-type')): f'{self.bus_type}' } - padding = max([len(k) for k in columns.keys()]) - - pretty = '' - for k, v in columns.items(): - k = k.ljust(padding, ' ') - pretty += f'{k} = {v}\n' - - return pretty.rstrip() - - def __iter__(self) -> Iterator[Partition]: + def __iter__(self) -> Iterator['Partition']: for partition in self.partitions: yield self.partitions[partition] def __getitem__(self, key :str, *args :str, **kwargs :str) -> Any: if hasattr(self, key): return getattr(self, key) - elif key not in self.info: - raise KeyError(f'{self} does not contain information: "{key}"') - return self.info[key] - def __len__(self) -> int: - return len(self.partitions) + if self.info and key in self.info: + return self.info[key] + + raise KeyError(f'{self.info} does not contain information: "{key}"') def __lt__(self, left_comparitor :'BlockDevice') -> bool: - return self.path < left_comparitor.path + return self._path < left_comparitor.path def json(self) -> str: """ json() has precedence over __dump__, so this is a way to give less/partial information for user readability. """ - return self.path + return self._path def __dump__(self) -> Dict[str, Dict[str, Any]]: return { - self.path : { - 'partuuid' : self.uuid, - 'wipe' : self.info.get('wipe', None), - 'partitions' : [part.__dump__() for part in self.partitions.values()] + self._path: { + 'partuuid': self.uuid, + 'wipe': self.info.get('wipe', None), + 'partitions': [part.__dump__() for part in self.partitions.values()] } } - @property - def partition_type(self) -> str: - output = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.path}").decode('UTF-8')) + def _call_lsblk(self, path: str) -> Dict[str, Any]: + output = SysCommand(f'lsblk --json -b -o+SIZE,PTTYPE,ROTA,TRAN,PTUUID {self._path}').decode('UTF-8') + if output: + lsblk_info = json.loads(output) + return lsblk_info + + raise DiskError(f'Failed to read disk "{self.path}" with lsblk') + + def _load_partitions(self): + from .partition import Partition + + self._partitions.clear() + + lsblk_info = self._call_lsblk(self._path) + device = lsblk_info['blockdevices'][0] + self._partitions.clear() + + if children := device.get('children', None): + root = f'/dev/{device["name"]}' + for child in children: + part_id = child['name'].removeprefix(device['name']) + self._partitions[part_id] = Partition(root + part_id, block_device=self, part_id=part_id) + + def _get_free_space(self) -> Optional[List[BlockSizeInfo]]: + # NOTE: parted -s will default to `cancel` on prompt, skipping any partition + # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, + # so the free will ignore the ESP partition and just give the "free" space. + # Doesn't harm us, but worth noting in case something weird happens. + try: + output = SysCommand(f"parted -s --machine {self._path} print free").decode('utf-8') + if output: + free_lines = [line for line in output.split('\n') if 'free' in line] + sizes = [] + for free_space in free_lines: + _, start, end, size, *_ = free_space.strip('\r\n;').split(':') + sizes.append(BlockSizeInfo(start, end, size)) + + return sizes + except SysCallError as error: + log(f"Could not get free space on {self._path}: {error}", level=logging.DEBUG) + + return None + + def _fetch_information(self) -> BlockInfo: + lsblk_info = self._call_lsblk(self._path) + device = lsblk_info['blockdevices'][0] + free_space = self._get_free_space() - for device in output['blockdevices']: - return device['pttype'] + return BlockInfo( + pttype=device['pttype'], + ptuuid=device['ptuuid'], + size=device['size'], + tran=device['tran'], + rota=device['rota'], + free_space=free_space + ) - @cached_property - def device_or_backfile(self) -> str: + @property + def _device_or_backfile(self) -> Optional[str]: """ Returns the actual device-endpoint of the BlockDevice. If it's a loop-back-device it returns the back-file, @@ -118,7 +185,7 @@ class BlockDevice: return None @property - def device(self) -> str: + def device(self) -> Optional[str]: """ Returns the device file of the BlockDevice. If it's a loop-back-device it returns the /dev/X device, @@ -126,168 +193,82 @@ class BlockDevice: And if it's a crypto-device it returns the parent device """ if "DEVTYPE" not in self.info: - raise DiskError(f'Could not locate backplane info for "{self.path}"') + raise DiskError(f'Could not locate backplane info for "{self._path}"') if self.info['DEVTYPE'] in ['disk','loop']: - return self.path + return self._path elif self.info['DEVTYPE'][:4] == 'raid': # This should catch /dev/md## raid devices - return self.path + return self._path elif self.info['DEVTYPE'] == 'crypt': if 'pkname' not in self.info: - raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.') + raise DiskError(f'A crypt device ({self._path}) without a parent kernel device name.') return f"/dev/{self.info['pkname']}" else: - log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - - # if not stat.S_ISBLK(os.stat(full_path).st_mode): - # raise DiskError(f'Selected disk "{full_path}" is not a block device.') - - @property - def partitions(self) -> Dict[str, Partition]: - from .filesystem import Partition - - self.partprobe() - result = SysCommand(['/usr/bin/lsblk', '-J', self.path]) - - if b'not a block device' in result: - raise DiskError(f'Can not read partitions off something that isn\'t a block device: {self.path}') + log(f"Unknown blockdevice type for {self._path}: {self.info['DEVTYPE']}", level=logging.DEBUG) - if not result[:1] == b'{': - raise DiskError('Error getting JSON output from:', f'/usr/bin/lsblk -J {self.path}') - - r = json.loads(result.decode('UTF-8')) - if len(r['blockdevices']) and 'children' in r['blockdevices'][0]: - root_path = f"/dev/{r['blockdevices'][0]['name']}" - for part in r['blockdevices'][0]['children']: - part_id = part['name'][len(os.path.basename(self.path)):] - if part_id not in self.part_cache: - # TODO: Force over-write even if in cache? - if part_id not in self.part_cache or self.part_cache[part_id].size != part['size']: - self.part_cache[part_id] = Partition(root_path + part_id, block_device=self, part_id=part_id) - - return {k: self.part_cache[k] for k in sorted(self.part_cache)} + return None @property - def partition(self) -> Partition: - all_partitions = self.partitions - return [all_partitions[k] for k in all_partitions] + def partition_type(self) -> str: + return self._block_info.pttype @property - def partition_table_type(self) -> int: - # TODO: Don't hardcode :) - # Remove if we don't use this function anywhere - from .filesystem import GPT - return GPT - - @cached_property def uuid(self) -> str: - log('BlockDevice().uuid is untested!', level=logging.WARNING, fg='yellow') - """ - Returns the disk UUID as returned by lsblk. - This is more reliable than relying on /dev/disk/by-partuuid as - it doesn't seam to be able to detect md raid partitions. - """ - return SysCommand(f'blkid -s PTUUID -o value {self.path}').decode('UTF-8') - - @cached_property - def _safe_size(self) -> float: - from .helpers import convert_size_to_gb - - try: - output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) - except SysCallError: - return -1.0 - - for device in output['blockdevices']: - return convert_size_to_gb(device['size']) + return self._block_info.ptuuid - @cached_property + @property def size(self) -> float: from .helpers import convert_size_to_gb + return convert_size_to_gb(self._block_info.size) - output = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.path}").decode('UTF-8')) - - for device in output['blockdevices']: - return convert_size_to_gb(device['size']) - - @cached_property - def bus_type(self) -> str: - output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) - - for device in output['blockdevices']: - return device['tran'] + @property + def bus_type(self) -> Optional[str]: + return self._block_info.tran - @cached_property + @property def spinning(self) -> bool: - output = json.loads(SysCommand(f"lsblk --json -o+ROTA,TRAN {self.path}").decode('UTF-8')) + return self._block_info.rota - for device in output['blockdevices']: - return device['rota'] is True + @property + def partitions(self) -> Dict[str, 'Partition']: + return OrderedDict(sorted(self._partitions.items())) - @cached_property - def _safe_free_space(self) -> Tuple[str, ...]: - try: - return '+'.join(part[2] for part in self.free_space) - except SysCallError: - return '?' + @property + def partition(self) -> List['Partition']: + return list(self.partitions.values()) - @cached_property - def free_space(self) -> Tuple[str, ...]: - # NOTE: parted -s will default to `cancel` on prompt, skipping any partition - # that is "outside" the disk. in /dev/sr0 this is usually the case with Archiso, - # so the free will ignore the ESP partition and just give the "free" space. - # Doesn't harm us, but worth noting in case something weird happens. - try: - for line in SysCommand(f"parted -s --machine {self.path} print free"): - if 'free' in (free_space := line.decode('UTF-8')): - _, start, end, size, *_ = free_space.strip('\r\n;').split(':') - yield (start, end, size) - except SysCallError as error: - log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG) - - @cached_property - def largest_free_space(self) -> List[str]: - info = [] - for space_info in self.free_space: - if not info: - info = space_info - else: - # [-1] = size - if space_info[-1] > info[-1]: - info = space_info - return info - - @cached_property + @property def first_free_sector(self) -> str: - if info := self.largest_free_space: - start = info[0] + if block_size := self._largest_free_space(): + return block_size.start else: - start = '512MB' - return start + return '512MB' - @cached_property + @property def first_end_sector(self) -> str: - if info := self.largest_free_space: - end = info[1] + if block_size := self._largest_free_space(): + return block_size.end else: - end = f"{self.size}GB" - return end - - def partprobe(self) -> bool: - return SysCommand(['partprobe', self.path]).exit_code == 0 + return f"{self.size}GB" + + def _safe_free_space(self) -> str: + if self._block_info.free_space: + sizes = [free_space.size for free_space in self._block_info.free_space] + return '+'.join(sizes) + return '?' + + def _largest_free_space(self) -> Optional[BlockSizeInfo]: + if self._block_info.free_space: + sorted_sizes = sorted(self._block_info.free_space, key=lambda x: x.size, reverse=True) + return sorted_sizes[0] + return None - def has_partitions(self) -> int: - return len(self.partitions) - - def has_mount_point(self, mountpoint :str) -> bool: - for partition in self.partitions: - if self.partitions[partition].mountpoint == mountpoint: - return True - return False + def _partprobe(self) -> bool: + return SysCommand(['partprobe', self._path]).exit_code == 0 def flush_cache(self) -> None: - self.part_cache = {} + self._load_partitions() def get_partition(self, uuid :Optional[str] = None, partuuid :Optional[str] = None) -> Partition: if not uuid and not partuuid: @@ -296,9 +277,9 @@ class BlockDevice: for count in range(storage.get('DISK_RETRY_ATTEMPTS', 5)): for partition_index, partition in self.partitions.items(): try: - if uuid and partition.uuid.lower() == uuid.lower(): + if uuid and partition.uuid and partition.uuid.lower() == uuid.lower(): return partition - elif partuuid and partition.part_uuid.lower() == partuuid.lower(): + elif partuuid and partition.part_uuid and partition.part_uuid.lower() == partuuid.lower(): return partition except DiskError as error: # Most likely a blockdevice that doesn't support or use UUID's @@ -307,9 +288,10 @@ class BlockDevice: pass log(f"uuid {uuid} or {partuuid} not found. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s for next attempt",level=logging.DEBUG) + self.flush_cache() time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) - + log(f"Could not find {uuid}/{partuuid} in disk after 5 retries", level=logging.INFO) - log(f"Cache: {self.part_cache}") + log(f"Cache: {self._partitions}") log(f"Partitions: {self.partitions.items()}") raise DiskError(f"Partition {uuid}/{partuuid} was never found on {self} despite several attempts.") diff --git a/archinstall/lib/disk/btrfs/__init__.py b/archinstall/lib/disk/btrfs/__init__.py index 84b9c0f6..a26e0160 100644 --- a/archinstall/lib/disk/btrfs/__init__.py +++ b/archinstall/lib/disk/btrfs/__init__.py @@ -2,8 +2,7 @@ from __future__ import annotations import pathlib import glob import logging -import re -from typing import Union, Dict, TYPE_CHECKING, Any, Iterator +from typing import Union, Dict, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 if TYPE_CHECKING: @@ -15,30 +14,15 @@ from .btrfs_helpers import ( setup_subvolumes as setup_subvolumes, mount_subvolume as mount_subvolume ) -from .btrfssubvolume import BtrfsSubvolume as BtrfsSubvolume +from .btrfssubvolumeinfo import BtrfsSubvolumeInfo as BtrfsSubvolume from .btrfspartition import BTRFSPartition as BTRFSPartition -from ..helpers import get_mount_info from ...exceptions import DiskError, Deprecated from ...general import SysCommand from ...output import log -from ...exceptions import SysCallError -def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]: - try: - output = SysCommand(f"btrfs subvol show {path}").decode() - except SysCallError as error: - print('Error:', error) - result = {} - for line in output.replace('\r\n', '\n').split('\n'): - if ':' in line: - key, val = line.replace('\t', '').split(':', 1) - result[key.strip().lower().replace(' ', '_')] = val.strip() - - return result - -def create_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: +def create_subvolume(installation: Installer, subvolume_location :Union[pathlib.Path, str]) -> bool: """ This function uses btrfs to create a subvolume. @@ -70,113 +54,3 @@ def create_subvolume(installation :Installer, subvolume_location :Union[pathlib. log(f"Creating a subvolume on {target}", level=logging.INFO) if (cmd := SysCommand(f"btrfs subvolume create {target}")).exit_code != 0: raise DiskError(f"Could not create a subvolume at {target}: {cmd}") - -def _has_option(option :str,options :list) -> bool: - """ auxiliary routine to check if an option is present in a list. - we check if the string appears in one of the options, 'cause it can appear in severl forms (option, option=val,...) - """ - if not options: - return False - - for item in options: - if option in item: - return True - - return False - -def manage_btrfs_subvolumes(installation :Installer, - partition :Dict[str, str],) -> list: - - raise Deprecated("Use setup_subvolumes() instead.") - - from copy import deepcopy - """ we do the magic with subvolumes in a centralized place - parameters: - * the installation object - * the partition dictionary entry which represents the physical partition - returns - * mountpoinst, the list which contains all the "new" partititon to be mounted - - We expect the partition has been mounted as / , and it to be unmounted after the processing - Then we create all the subvolumes inside btrfs as demand - We clone then, both the partition dictionary and the object inside it and adapt it to the subvolume needs - Then we return a list of "new" partitions to be processed as "normal" partitions - # TODO For encrypted devices we need some special processing prior to it - """ - # We process each of the pairs <subvolume name: mount point | None | mount info dict> - # th mount info dict has an entry for the path of the mountpoint (named 'mountpoint') and 'options' which is a list - # of mount options (or similar used by brtfs) - mountpoints = [] - subvolumes = partition['btrfs']['subvolumes'] - for name, right_hand in subvolumes.items(): - try: - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load - every subvolume is created from the top of the hierarchy- and simplifies its further use - if name.startswith('/'): - name = name[1:] - # renormalize the right hand. - location = None - subvol_options = [] - # no contents, so it is not to be mounted - if not right_hand: - location = None - # just a string. per backward compatibility the mount point - elif isinstance(right_hand,str): - location = right_hand - # a dict. two elements 'mountpoint' (obvious) and and a mount options list ¿? - elif isinstance(right_hand,dict): - location = right_hand.get('mountpoint',None) - subvol_options = right_hand.get('options',[]) - # we create the subvolume - create_subvolume(installation,name) - # Make the nodatacow processing now - # It will be the main cause of creation of subvolumes which are not to be mounted - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in subvol_options: - if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del subvol_options[subvol_options.index('nodatacow')] - # Make the compress processing now - # it is not an options which can be established by subvolume (but for whole file systems), and can be - # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - # in this way only zstd compression is activaded - # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in subvol_options: - if not _has_option('compress',partition.get('filesystem',{}).get('mount_options',[])): - if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: - raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so compress doesn't propagate to the mount options - del subvol_options[subvol_options.index('compress')] - # END compress processing. - # we do not mount if THE basic partition will be mounted or if we exclude explicitly this subvolume - if not partition['mountpoint'] and location is not None: - # we begin to create a fake partition entry. First we copy the original -the one that corresponds to - # the primary partition. We make a deepcopy to avoid altering the original content in any case - fake_partition = deepcopy(partition) - # we start to modify entries in the "fake partition" to match the needs of the subvolumes - # to avoid any chance of entering in a loop (not expected) we delete the list of subvolumes in the copy - del fake_partition['btrfs'] - fake_partition['encrypted'] = False - fake_partition['generate-encryption-key-file'] = False - # Mount destination. As of now the right hand part - fake_partition['mountpoint'] = location - # we load the name in an attribute called subvolume, but i think it is not needed anymore, 'cause the mount logic uses a different path. - fake_partition['subvolume'] = name - # here we add the special mount options for the subvolume, if any. - # if the original partition['options'] is not a list might give trouble - if fake_partition.get('filesystem',{}).get('mount_options',[]): - fake_partition['filesystem']['mount_options'].extend(subvol_options) - else: - fake_partition['filesystem']['mount_options'] = subvol_options - # Here comes the most exotic part. The dictionary attribute 'device_instance' contains an instance of Partition. This instance will be queried along the mount process at the installer. - # As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes - # As we made a deepcopy we have a fresh instance of this object we can manipulate problemless - fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]" - - # Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted, - # as "normal" ones - mountpoints.append(fake_partition) - except Exception as e: - raise e - return mountpoints diff --git a/archinstall/lib/disk/btrfs/btrfs_helpers.py b/archinstall/lib/disk/btrfs/btrfs_helpers.py index d577d82b..f6d2734a 100644 --- a/archinstall/lib/disk/btrfs/btrfs_helpers.py +++ b/archinstall/lib/disk/btrfs/btrfs_helpers.py @@ -1,72 +1,73 @@ -import pathlib import logging -from typing import Optional +import re +from pathlib import Path +from typing import Optional, Dict, Any, TYPE_CHECKING +from ...models.subvolume import Subvolume from ...exceptions import SysCallError, DiskError from ...general import SysCommand from ...output import log +from ...plugins import plugins from ..helpers import get_mount_info -from .btrfssubvolume import BtrfsSubvolume +from .btrfssubvolumeinfo import BtrfsSubvolumeInfo +if TYPE_CHECKING: + from .btrfspartition import BTRFSPartition + from ...installer import Installer -def mount_subvolume(installation, device, name, subvolume_information): - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = name.lstrip('/') - # renormalize the right hand. - mountpoint = subvolume_information.get('mountpoint', None) - if not mountpoint: - return None +class fstab_btrfs_compression_plugin(): + def __init__(self, partition_dict): + self.partition_dict = partition_dict + + def on_genfstab(self, installation): + with open(f"{installation.target}/etc/fstab", 'r') as fh: + fstab = fh.read() + + # Replace the {installation}/etc/fstab with entries + # using the compress=zstd where the mountpoint has compression set. + with open(f"{installation.target}/etc/fstab", 'w') as fh: + for line in fstab.split('\n'): + # So first we grab the mount options by using subvol=.*? as a locator. + # And we also grab the mountpoint for the entry, for instance /var/log + if (subvoldef := re.findall(',.*?subvol=.*?[\t ]', line)) and (mountpoint := re.findall('[\t ]/.*?[\t ]', line)): + for subvolume in self.partition_dict.get('btrfs', {}).get('subvolumes', []): + # We then locate the correct subvolume and check if it's compressed + if subvolume.compress and subvolume.mountpoint == mountpoint[0].strip(): + # We then sneak in the compress=zstd option if it doesn't already exist: + # We skip entries where compression is already defined + if ',compress=zstd,' not in line: + line = line.replace(subvoldef[0], f",compress=zstd{subvoldef[0]}") + break + + fh.write(f"{line}\n") - if type(mountpoint) == str: - mountpoint = pathlib.Path(mountpoint) + return True - installation_target = installation.target - if type(installation_target) == str: - installation_target = pathlib.Path(installation_target) + +def mount_subvolume(installation: 'Installer', device: 'BTRFSPartition', subvolume: Subvolume): + # we normalize the subvolume name (getting rid of slash at the start if exists. + # In our implementation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = subvolume.name.lstrip('/') + mountpoint = Path(subvolume.mountpoint) + installation_target = Path(installation.target) mountpoint = installation_target / mountpoint.relative_to(mountpoint.anchor) mountpoint.mkdir(parents=True, exist_ok=True) - - mount_options = subvolume_information.get('options', []) - if not any('subvol=' in x for x in mount_options): - mount_options += [f'subvol={name}'] + mount_options = subvolume.options + [f'subvol={name}'] log(f"Mounting subvolume {name} on {device} to {mountpoint}", level=logging.INFO, fg="gray") SysCommand(f"mount {device.path} {mountpoint} -o {','.join(mount_options)}") -def setup_subvolumes(installation, partition_dict): - """ - Taken from: ..user_guides.py - - partition['btrfs'] = { - "subvolumes" : { - "@": "/", - "@home": "/home", - "@log": "/var/log", - "@pkg": "/var/cache/pacman/pkg", - "@.snapshots": "/.snapshots" - } - } - """ +def setup_subvolumes(installation: 'Installer', partition_dict: Dict[str, Any]): log(f"Setting up subvolumes: {partition_dict['btrfs']['subvolumes']}", level=logging.INFO, fg="gray") - for name, right_hand in partition_dict['btrfs']['subvolumes'].items(): - # we normalize the subvolume name (getting rid of slash at the start if exists. In our implemenation has no semantic load. - # Every subvolume is created from the top of the hierarchy- and simplifies its further use - name = name.lstrip('/') - # renormalize the right hand. - # mountpoint = None - subvol_options = [] - - match right_hand: - # case str(): # backwards-compatability - # mountpoint = right_hand - case dict(): - # mountpoint = right_hand.get('mountpoint', None) - subvol_options = right_hand.get('options', []) + for subvolume in partition_dict['btrfs']['subvolumes']: + # we normalize the subvolume name (getting rid of slash at the start if exists. In our implementation has no semantic load. + # Every subvolume is created from the top of the hierarchy- and simplifies its further use + name = subvolume.name.lstrip('/') # We create the subvolume using the BTRFSPartition instance. # That way we ensure not only easy access, but also accurate mount locations etc. @@ -76,27 +77,28 @@ def setup_subvolumes(installation, partition_dict): # It will be the main cause of creation of subvolumes which are not to be mounted # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new - if 'nodatacow' in subvol_options: + if subvolume.nodatacow: if (cmd := SysCommand(f"chattr +C {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set nodatacow attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so nodatacow doesn't propagate to the mount options - del subvol_options[subvol_options.index('nodatacow')] + # Make the compress processing now # it is not an options which can be established by subvolume (but for whole file systems), and can be # set up via a simple attribute change in a directory (if empty). And here the directories are brand new # in this way only zstd compression is activaded # TODO WARNING it is not clear if it should be a standard feature, so it might need to be deactivated - if 'compress' in subvol_options: + if subvolume.compress: if not any(['compress' in filesystem_option for filesystem_option in partition_dict.get('filesystem', {}).get('mount_options', [])]): if (cmd := SysCommand(f"chattr +c {installation.target}/{name}")).exit_code != 0: raise DiskError(f"Could not set compress attribute at {installation.target}/{name}: {cmd}") - # entry is deleted so compress doesn't propagate to the mount options - del subvol_options[subvol_options.index('compress')] -def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: + if 'fstab_btrfs_compression_plugin' not in plugins: + plugins['fstab_btrfs_compression_plugin'] = fstab_btrfs_compression_plugin(partition_dict) + + +def subvolume_info_from_path(path: Path) -> Optional[BtrfsSubvolumeInfo]: try: - subvolume_name = None + subvolume_name = '' result = {} for index, line in enumerate(SysCommand(f"btrfs subvolume show {path}")): if index == 0: @@ -110,14 +112,14 @@ def subvolume_info_from_path(path :pathlib.Path) -> Optional[BtrfsSubvolume]: # allows for hooking in a pre-processor to do this we have to do it here: result[key.lower().replace(' ', '_').replace('(s)', 's')] = value.strip() - return BtrfsSubvolume(**{'full_path' : path, 'name' : subvolume_name, **result}) - + return BtrfsSubvolumeInfo(**{'full_path' : path, 'name' : subvolume_name, **result}) # type: ignore except SysCallError as error: log(f"Could not retrieve subvolume information from {path}: {error}", level=logging.WARNING, fg="orange") return None -def find_parent_subvolume(path :pathlib.Path, filters=[]): + +def find_parent_subvolume(path: Path, filters=[]) -> Optional[BtrfsSubvolumeInfo]: # A root path cannot have a parent if str(path) == '/': return None @@ -127,6 +129,8 @@ def find_parent_subvolume(path :pathlib.Path, filters=[]): if found_mount['target'] == '/': return None - return find_parent_subvolume(path.parent, traverse=True, filters=[*filters, found_mount['target']]) + return find_parent_subvolume(path.parent, filters=[*filters, found_mount['target']]) - return subvolume
\ No newline at end of file + return subvolume + + return None diff --git a/archinstall/lib/disk/btrfs/btrfspartition.py b/archinstall/lib/disk/btrfs/btrfspartition.py index 5020133d..d04c9b98 100644 --- a/archinstall/lib/disk/btrfs/btrfspartition.py +++ b/archinstall/lib/disk/btrfs/btrfspartition.py @@ -15,24 +15,13 @@ from .btrfs_helpers import ( if TYPE_CHECKING: from ...installer import Installer - from .btrfssubvolume import BtrfsSubvolume + from .btrfssubvolumeinfo import BtrfsSubvolumeInfo + class BTRFSPartition(Partition): def __init__(self, *args, **kwargs): Partition.__init__(self, *args, **kwargs) - def __repr__(self, *args :str, **kwargs :str) -> str: - mount_repr = '' - if self.mountpoint: - mount_repr = f", mounted={self.mountpoint}" - elif self.target_mountpoint: - mount_repr = f", rel_mountpoint={self.target_mountpoint}" - - if self._encrypted: - return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' - else: - return f'BTRFSPartition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' - @property def subvolumes(self): for filesystem in findmnt(pathlib.Path(self.path), recurse=True).get('filesystems', []): @@ -40,17 +29,17 @@ class BTRFSPartition(Partition): yield subvolume_info_from_path(filesystem['target']) def iterate_children(struct): - for child in struct.get('children', []): + for c in struct.get('children', []): if '[' in child.get('source', ''): - yield subvolume_info_from_path(child['target']) + yield subvolume_info_from_path(c['target']) - for sub_child in iterate_children(child): + for sub_child in iterate_children(c): yield sub_child for child in iterate_children(filesystem): yield child - def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolume': + def create_subvolume(self, subvolume :pathlib.Path, installation :Optional['Installer'] = None) -> 'BtrfsSubvolumeInfo': """ Subvolumes have to be created within a mountpoint. This means we need to get the current installation target. @@ -62,13 +51,13 @@ class BTRFSPartition(Partition): if not installation: installation = storage.get('installation_session') - # Determain if the path given, is an absolute path or a releative path. + # Determain if the path given, is an absolute path or a relative path. # We do this by checking if the path contains a known mountpoint. if str(subvolume)[0] == '/': if filesystems := findmnt(subvolume, traverse=True).get('filesystems'): if (target := filesystems[0].get('target')) and target != '/' and str(subvolume).startswith(target): # Path starts with a known mountpoint which isn't / - # Which means it's an absolut path to a mounted location. + # Which means it's an absolute path to a mounted location. pass else: # Since it's not an absolute position with a known start. @@ -108,9 +97,13 @@ class BTRFSPartition(Partition): if glob.glob(str(subvolume / '*')): raise DiskError(f"Cannot create subvolume at {subvolume} because it contains data (non-empty folder target is not supported by BTRFS)") - elif subvolinfo := subvolume_info_from_path(subvolume): - raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") + # Ideally we would like to check if the destination is already a subvolume. + # But then we would need the mount-point at this stage as well. + # So we'll comment out this check: + # elif subvolinfo := subvolume_info_from_path(subvolume): + # raise DiskError(f"Destination {subvolume} is already a subvolume: {subvolinfo}") + # And deal with it here: SysCommand(f"btrfs subvolume create {subvolume}") - return subvolume_info_from_path(subvolume)
\ No newline at end of file + return subvolume_info_from_path(subvolume) diff --git a/archinstall/lib/disk/btrfs/btrfssubvolume.py b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py index a96e2a94..5f5bdea6 100644 --- a/archinstall/lib/disk/btrfs/btrfssubvolume.py +++ b/archinstall/lib/disk/btrfs/btrfssubvolumeinfo.py @@ -16,8 +16,9 @@ from ...general import SysCommand from ...output import log from ...storage import storage + @dataclass -class BtrfsSubvolume: +class BtrfsSubvolumeInfo: full_path :pathlib.Path name :str uuid :str @@ -68,9 +69,9 @@ class BtrfsSubvolume: from .btrfs_helpers import subvolume_info_from_path # TODO: Make this function traverse storage['MOUNT_POINT'] and find the first - # occurance of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. + # occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume. # It would also be nice if it could use findmnt(self.full_path) and traverse backwards - # finding the last occurance of a subvolume which 'self' belongs to. + # finding the last occurrence of a subvolume which 'self' belongs to. if volume := subvolume_info_from_path(storage['MOUNT_POINT']): return self.full_path == volume.full_path @@ -188,4 +189,4 @@ class BtrfsSubvolume: def unmount(self, recurse :bool = True): SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}") - log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")
\ No newline at end of file + log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray") diff --git a/archinstall/lib/disk/filesystem.py b/archinstall/lib/disk/filesystem.py index f94b4b47..5d5952a0 100644 --- a/archinstall/lib/disk/filesystem.py +++ b/archinstall/lib/disk/filesystem.py @@ -74,7 +74,7 @@ class Filesystem: raise KeyError(f"Could not create a GPT label on {self}") elif self.mode == MBR: if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MSDOS label on {self}") + raise KeyError(f"Could not create a MS-DOS label on {self}") self.blockdevice.flush_cache() time.sleep(3) @@ -100,7 +100,7 @@ class Filesystem: partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid) except DiskError: partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid) - + log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray") else: log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING) @@ -210,7 +210,14 @@ class Filesystem: # TODO: Implement this with declarative profiles instead. raise ValueError("Installation().use_entire_disk() has to be re-worked.") - def add_partition(self, partition_type :str, start :str, end :str, partition_format :Optional[str] = None, skip_mklabel :bool = False) -> Partition: + def add_partition( + self, + partition_type :str, + start :str, + end :str, + partition_format :Optional[str] = None, + skip_mklabel :bool = False + ) -> Partition: log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO) if len(self.blockdevice.partitions) == 0 and skip_mklabel is False: @@ -221,7 +228,7 @@ class Filesystem: raise KeyError(f"Could not create a GPT label on {self}") elif self.mode == MBR: if not self.parted_mklabel(self.blockdevice.device, "msdos"): - raise KeyError(f"Could not create a MSDOS label on {self}") + raise KeyError(f"Could not create a MS-DOS label on {self}") self.blockdevice.flush_cache() @@ -232,6 +239,7 @@ class Filesystem: except DiskError: pass + # TODO this check should probably run in the setup process rather than during the installation if self.mode == MBR: if len(self.blockdevice.partitions) > 3: DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions") @@ -245,15 +253,9 @@ class Filesystem: if self.parted(parted_string): for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)): - self.partprobe() - - new_partition_uuids = [] - for partition in self.blockdevice.partitions.values(): - try: - new_partition_uuids.append(partition.part_uuid) - except DiskError: - pass + self.blockdevice.flush_cache() + new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()] new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids)) if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()): @@ -263,17 +265,23 @@ class Filesystem: log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red") log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red") log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red") - log(f'New UUID: {[new_partuuid]}', level=logging.ERROR, fg="red") + log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red") log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red") raise err else: log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG) - time.sleep(storage.get('DISK_TIMEOUTS', 1) * count) + self.partprobe() + time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) + else: + print("Parted did not return True during partition creation") + + total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()]) + total_partitions.update(previous_partuuids) # TODO: This should never be able to happen log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red") log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red") - log(f"New partitions: {(previous_partuuids ^ {partition.part_uuid for partition in self.blockdevice.partitions.values()})}", level=logging.ERROR, fg="red") + log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red") raise DiskError(f"Could not add partition using: {parted_string}") def set_name(self, partition: int, name: str) -> bool: diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py index 99856aad..f19125f4 100644 --- a/archinstall/lib/disk/helpers.py +++ b/archinstall/lib/disk/helpers.py @@ -8,6 +8,8 @@ import time import glob from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .partition import Partition @@ -112,7 +114,7 @@ def cleanup_bash_escapes(data :str) -> str: def blkid(cmd :str) -> Dict[str, Any]: if '-o' in cmd and '-o export' not in cmd: - raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.") + raise ValueError(f"blkid() requires '-o export' to be used and can therefore not continue reliably.") elif '-o' not in cmd: cmd += ' -o export' @@ -133,7 +135,7 @@ def blkid(cmd :str) -> Dict[str, Any]: key, val = line.split('=', 1) if key.lower() == 'devname': devname = val - # Lowercase for backwards compatability with all_disks() previous use cases + # Lowercase for backwards compatibility with all_disks() previous use cases result[devname] = { "path": devname, "PATH": devname @@ -218,7 +220,12 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, # we'll iterate the /sys/class definitions and find the information # from there. for block_device in glob.glob("/sys/class/block/*"): - device_path = f"/dev/{pathlib.Path(block_device).readlink().name}" + device_path = pathlib.Path(f"/dev/{pathlib.Path(block_device).readlink().name}") + + if device_path.exists() is False: + log(f"Unknown device found by '/sys/class/block/*', ignoring: {device_path}", level=logging.WARNING, fg="yellow") + continue + try: information = blkid(f'blkid -p -o export {device_path}') except SysCallError as ex: @@ -227,12 +234,17 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, try: information = get_loop_info(device_path) if not information: + print("Exit code for blkid -p -o export was:", ex.exit_code) raise SysCallError("Could not get loop information", exit_code=1) except SysCallError: + print("Not a loop device, trying uevent rules.") information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name) else: + # We could not reliably get any information, perhaps the disk is clean of information? + print("Raising ex because:", ex.exit_code) raise ex + # return instances information = enrich_blockdevice_information(information) @@ -244,7 +256,7 @@ def all_blockdevices(mappers=False, partitions=False, error=False) -> Dict[str, instances[path] = Partition(path, block_device=BlockDevice(get_parent_of_partition(pathlib.Path(path)))) elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop': instances[path] = BlockDevice(path, path_info) - elif path_info.get('TYPE') == 'squashfs': + elif path_info.get('TYPE') in ('squashfs', 'erofs'): # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO) continue else: @@ -368,7 +380,7 @@ def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict return filters -def get_partitions_in_use(mountpoint :str) -> List[Partition]: +def get_partitions_in_use(mountpoint :str) -> Dict[str, Any]: from .partition import Partition try: @@ -391,12 +403,20 @@ def get_partitions_in_use(mountpoint :str) -> List[Partition]: if not type(blockdev) in (Partition, MapperDev): continue - for blockdev_mountpoint in blockdev.mount_information: - block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev + if isinstance(blockdev, Partition): + for blockdev_mountpoint in blockdev.mountpoints: + block_devices_mountpoints[blockdev_mountpoint] = blockdev + else: + for blockdev_mountpoint in blockdev.mount_information: + block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG) for mountpoint in list(get_all_targets(output['filesystems']).keys()): + # Since all_blockdevices() returns PosixPath objects, we need to convert + # findmnt paths to pathlib.Path() first: + mountpoint = pathlib.Path(mountpoint) + if mountpoint in block_devices_mountpoints: if mountpoint not in mounts: mounts[mountpoint] = block_devices_mountpoints[mountpoint] @@ -433,9 +453,10 @@ def disk_layouts() -> Optional[Dict[str, Any]]: def encrypted_partitions(blockdevices :Dict[str, Any]) -> bool: - for partition in blockdevices.values(): - if partition.get('encrypted', False): - yield partition + for blockdevice in blockdevices.values(): + for partition in blockdevice.get('partitions', []): + if partition.get('encrypted', False): + yield partition def find_partition_by_mountpoint(block_devices :List[BlockDevice], relative_mountpoint :str) -> Partition: for device in block_devices: @@ -468,13 +489,14 @@ def convert_device_to_uuid(path :str) -> str: raise DiskError(f"Could not retrieve the UUID of {path} within a timely manner.") + def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, strict: bool = True) -> bool: """ Determine if a certain partition is mounted (or has a mountpoint) as specific target (path) Coded for clarity rather than performance Input parms: :parm partition the partition we check - :type Either a Partition object or a dict with the contents of a partition definiton in the disk_layouts schema + :type Either a Partition object or a dict with the contents of a partition definition in the disk_layouts schema :parm target (a string representing a mount path we want to check for. :type str @@ -484,10 +506,12 @@ def has_mountpoint(partition: Union[dict,Partition,MapperDev], target: str, stri """ # we create the mountpoint list if isinstance(partition,dict): - subvols = partition.get('btrfs',{}).get('subvolumes',{}) - mountpoints = [partition.get('mountpoint'),] + [subvols[subvol] if isinstance(subvols[subvol],str) or not subvols[subvol] else subvols[subvol].get('mountpoint') for subvol in subvols] + subvolumes: List[Subvolume] = partition.get('btrfs',{}).get('subvolumes', []) + mountpoints = [partition.get('mountpoint')] + mountpoints += [volume.mountpoint for volume in subvolumes] else: mountpoints = [partition.mountpoint,] + [subvol.target for subvol in partition.subvolumes] + # we check if strict or target == '/': if target in mountpoints: diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py index 913dbc13..71ef2a79 100644 --- a/archinstall/lib/disk/mapperdev.py +++ b/archinstall/lib/disk/mapperdev.py @@ -10,7 +10,7 @@ from ..general import SysCommand from ..output import log if TYPE_CHECKING: - from .btrfs import BtrfsSubvolume + from .btrfs import BtrfsSubvolumeInfo @dataclass class MapperDev: @@ -37,12 +37,12 @@ class MapperDev: for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"): partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name - + try: uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode() except SysCallError as error: log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red") - + information = uevent(uevent_data) block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME']))) @@ -65,9 +65,13 @@ class MapperDev: return None @property + def mountpoints(self) -> List[Dict[str, Any]]: + return [obj['target'] for obj in self.mount_information] + + @property def mount_information(self) -> List[Dict[str, Any]]: from .helpers import find_mountpoint - return list(find_mountpoint(self.path)) + return [{**obj, 'target' : pathlib.Path(obj.get('target', '/dev/null'))} for obj in find_mountpoint(self.path)] @property def filesystem(self) -> Optional[str]: @@ -75,10 +79,10 @@ class MapperDev: return get_filesystem_type(self.path) @property - def subvolumes(self) -> Iterator['BtrfsSubvolume']: + def subvolumes(self) -> Iterator['BtrfsSubvolumeInfo']: from .btrfs import subvolume_info_from_path for mountpoint in self.mount_information: if target := mountpoint.get('target'): if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume
\ No newline at end of file + yield subvolume diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py index 73c88597..56a7d436 100644 --- a/archinstall/lib/disk/partition.py +++ b/archinstall/lib/disk/partition.py @@ -1,60 +1,83 @@ import glob -import pathlib import time import logging import json import os import hashlib +import typing +from dataclasses import dataclass +from pathlib import Path from typing import Optional, Dict, Any, List, Union, Iterator from .blockdevice import BlockDevice -from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name +from .helpers import get_filesystem_type, convert_size_to_gb, split_bind_name from ..storage import storage from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat from ..output import log from ..general import SysCommand from .btrfs.btrfs_helpers import subvolume_info_from_path -from .btrfs.btrfssubvolume import BtrfsSubvolume +from .btrfs.btrfssubvolumeinfo import BtrfsSubvolumeInfo + + +@dataclass +class PartitionInfo: + pttype: str + partuuid: str + uuid: str + start: Optional[int] + end: Optional[int] + bootable: bool + size: float + sector_size: int + filesystem_type: str + mountpoints: List[Path] + + def get_first_mountpoint(self) -> Optional[Path]: + if len(self.mountpoints) > 0: + return self.mountpoints[0] + return None + class Partition: - def __init__(self, + def __init__( + self, path: str, block_device: BlockDevice, part_id :Optional[str] = None, filesystem :Optional[str] = None, mountpoint :Optional[str] = None, encrypted :bool = False, - autodetect_filesystem :bool = True): - + autodetect_filesystem :bool = True, + ): if not part_id: part_id = os.path.basename(path) - self.block_device = block_device - if type(self.block_device) is str: + if type(block_device) is str: raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!") - self.path = path - self.part_id = part_id - self.target_mountpoint = mountpoint - self.filesystem = filesystem + self.block_device = block_device + self._path = path + self._part_id = part_id + self._target_mountpoint = mountpoint self._encrypted = None - self.encrypted = encrypted - self.allow_formatting = False + self._encrypted = encrypted + self._wipe = False + self._type = 'primary' if mountpoint: self.mount(mountpoint) - try: - self.mount_information = list(find_mountpoint(self.path)) - except DiskError: - self.mount_information = [{}] + self._partition_info = self._fetch_information() - if not self.filesystem and autodetect_filesystem: - self.filesystem = get_filesystem_type(path) + if not autodetect_filesystem and filesystem: + self._partition_info.filesystem_type = filesystem - if self.filesystem == 'crypto_LUKS': - self.encrypted = True + if self._partition_info.filesystem_type == 'crypto_LUKS': + self._encrypted = True + # I hate doint this but I'm currently unsure where this + # is acutally used to be able to fix the typing issues properly + @typing.no_type_check def __lt__(self, left_comparitor :BlockDevice) -> bool: if type(left_comparitor) == Partition: left_comparitor = left_comparitor.path @@ -62,147 +85,175 @@ class Partition: left_comparitor = str(left_comparitor) # The goal is to check if /dev/nvme0n1p1 comes before /dev/nvme0n1p5 - return self.path < left_comparitor + return self._path < left_comparitor def __repr__(self, *args :str, **kwargs :str) -> str: mount_repr = '' - if self.mountpoint: - mount_repr = f", mounted={self.mountpoint}" - elif self.target_mountpoint: - mount_repr = f", rel_mountpoint={self.target_mountpoint}" + if mountpoint := self._partition_info.get_first_mountpoint(): + mount_repr = f", mounted={mountpoint}" + elif self._target_mountpoint: + mount_repr = f", rel_mountpoint={self._target_mountpoint}" + + classname = self.__class__.__name__ if self._encrypted: - return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, parent={self.real_device}, fs={self.filesystem}{mount_repr})' + return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, parent={self.real_device}, fs={self._partition_info.filesystem_type}{mount_repr})' else: - return f'Partition(path={self.path}, size={self.size}, PARTUUID={self._safe_uuid}, fs={self.filesystem}{mount_repr})' + return f'{classname}(path={self._path}, size={self.size}, PARTUUID={self.part_uuid}, fs={self._partition_info.filesystem_type}{mount_repr})' + + def as_json(self) -> Dict[str, Any]: + """ + this is used for the table representation of the partition (see FormattedOutput) + """ + partition_info = { + 'type': self._type, + 'PARTUUID': self.part_uuid, + 'wipe': self._wipe, + 'boot': self.boot, + 'ESP': self.boot, + 'mountpoint': self._target_mountpoint, + 'encrypted': self._encrypted, + 'start': self.start, + 'size': self.end, + 'filesystem': self._partition_info.filesystem_type + } + + return partition_info def __dump__(self) -> Dict[str, Any]: + # TODO remove this in favour of as_json return { - 'type': 'primary', - 'PARTUUID': self._safe_uuid, - 'wipe': self.allow_formatting, + 'type': self._type, + 'PARTUUID': self.part_uuid, + 'wipe': self._wipe, 'boot': self.boot, 'ESP': self.boot, - 'mountpoint': self.target_mountpoint, + 'mountpoint': self._target_mountpoint, 'encrypted': self._encrypted, 'start': self.start, 'size': self.end, 'filesystem': { - 'format': get_filesystem_type(self.path) + 'format': self._partition_info.filesystem_type } } - @property - def mountpoint(self) -> Optional[str]: - try: - data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode()) - for filesystem in data['filesystems']: - return pathlib.Path(filesystem.get('target')) + def _call_lsblk(self) -> Dict[str, Any]: + self.partprobe() + # This sleep might be overkill, but lsblk is known to + # work against a chaotic cache that can change during call + # causing no information to be returned (blkid is better) + # time.sleep(1) + # TODO: Maybe incorporate a re-try system here based on time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1))) + + try: + output = SysCommand(f"lsblk --json -b -o+LOG-SEC,SIZE,PTTYPE,PARTUUID,UUID,FSTYPE {self.device_path}").decode('UTF-8') except SysCallError as error: - # Not mounted anywhere most likely - log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG, fg="grey") - pass + # It appears as if lsblk can return exit codes like 8192 to indicate something. + # But it does return output so we'll try to catch it. + output = error.worker.decode('UTF-8') - return None + if output: + lsblk_info = json.loads(output) + return lsblk_info - @property - def sector_size(self) -> Optional[int]: - output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8')) + raise DiskError(f'Failed to read disk "{self.device_path}" with lsblk') - for device in output['blockdevices']: - return device.get('log-sec', None) + def _call_sfdisk(self) -> Dict[str, Any]: + output = SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8') - @property - def start(self) -> Optional[str]: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + if output: + sfdisk_info = json.loads(output) + partitions = sfdisk_info.get('partitiontable', {}).get('partitions', []) + node = list(filter(lambda x: x['node'] == self._path, partitions)) - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['start'] # * self.sector_size + if len(node) > 0: + return node[0] - @property - def end(self) -> Optional[str]: - # TODO: actually this is size in sectors unit - # TODO: Verify that the logic holds up, that 'size' is the size without 'start' added to it. - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + return {} + + raise DiskError(f'Failed to read disk "{self.block_device.path}" with sfdisk') + + def _fetch_information(self) -> PartitionInfo: + lsblk_info = self._call_lsblk() + sfdisk_info = self._call_sfdisk() + + if not (device := lsblk_info.get('blockdevices', [None])[0]): + raise DiskError(f'Failed to retrieve information for "{self.device_path}" with lsblk') - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['size'] # * self.sector_size + mountpoints = [Path(mountpoint) for mountpoint in device['mountpoints'] if mountpoint] + bootable = sfdisk_info.get('bootable', False) or sfdisk_info.get('type', '') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' + + return PartitionInfo( + pttype=device['pttype'], + partuuid=device['partuuid'], + uuid=device['uuid'], + sector_size=device['log-sec'], + size=convert_size_to_gb(device['size']), + start=sfdisk_info.get('start', None), + end=sfdisk_info.get('size', None), + bootable=bootable, + filesystem_type=device['fstype'], + mountpoints=mountpoints + ) @property - def end_sectors(self) -> Optional[str]: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) + def target_mountpoint(self) -> Optional[str]: + return self._target_mountpoint - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - return partition['start'] + partition['size'] + @property + def path(self) -> str: + return self._path @property - def size(self) -> Optional[float]: - for i in range(storage['DISK_RETRY_ATTEMPTS']): - self.partprobe() - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) + def filesystem(self) -> str: + return self._partition_info.filesystem_type - try: - lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode()) + @property + def mountpoint(self) -> Optional[Path]: + if len(self.mountpoints) > 0: + return self.mountpoints[0] + return None - for device in lsblk['blockdevices']: - return convert_size_to_gb(device['size']) - except SysCallError as error: - if error.exit_code == 8192: - return None - else: - raise error + @property + def mountpoints(self) -> List[Path]: + return self._partition_info.mountpoints @property - def boot(self) -> bool: - output = json.loads(SysCommand(f"sfdisk --json {self.block_device.path}").decode('UTF-8')) - - # Get the bootable flag from the sfdisk output: - # { - # "partitiontable": { - # "device":"/dev/loop0", - # "partitions": [ - # {"node":"/dev/loop0p1", "start":2048, "size":10483712, "type":"83", "bootable":true} - # ] - # } - # } - - for partition in output.get('partitiontable', {}).get('partitions', []): - if partition['node'] == self.path: - # first condition is for MBR disks, second for GPT disks - return partition.get('bootable', False) or partition.get('type','') == 'C12A7328-F81F-11D2-BA4B-00A0C93EC93B' + def sector_size(self) -> int: + return self._partition_info.sector_size - return False + @property + def start(self) -> Optional[int]: + return self._partition_info.start @property - def partition_type(self) -> Optional[str]: - lsblk = json.loads(SysCommand(f"lsblk --json -o+PTTYPE {self.device_path}").decode('UTF-8')) + def end(self) -> Optional[int]: + return self._partition_info.end - for device in lsblk['blockdevices']: - return device['pttype'] + @property + def end_sectors(self) -> Optional[int]: + start = self._partition_info.start + end = self._partition_info.end + if start and end: + return start + end + return None @property - def part_uuid(self) -> Optional[str]: - """ - Returns the PARTUUID as returned by lsblk. - This is more reliable than relying on /dev/disk/by-partuuid as - it doesn't seam to be able to detect md raid partitions. - For bind mounts all the subvolumes share the same uuid - """ - for i in range(storage['DISK_RETRY_ATTEMPTS']): - if not self.partprobe(): - raise DiskError(f"Could not perform partprobe on {self.device_path}") - - time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i)) + def size(self) -> Optional[float]: + return self._partition_info.size - partuuid = self._safe_part_uuid - if partuuid: - return partuuid + @property + def boot(self) -> bool: + return self._partition_info.bootable - raise DiskError(f"Could not get PARTUUID for {self.path} using 'blkid -s PARTUUID -o value {self.path}'") + @property + def partition_type(self) -> Optional[str]: + return self._partition_info.pttype + + @property + def part_uuid(self) -> str: + return self._partition_info.partuuid @property def uuid(self) -> Optional[str]: @@ -232,7 +283,7 @@ class Partition: For instance when you want to get a __repr__ of the class. """ if not self.partprobe(): - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': return None log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) @@ -240,7 +291,7 @@ class Partition: try: return SysCommand(f'blkid -s UUID -o value {self.device_path}').decode('UTF-8').strip() except SysCallError as error: - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) return None @@ -254,7 +305,7 @@ class Partition: For instance when you want to get a __repr__ of the class. """ if not self.partprobe(): - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': return None log(f"Could not reliably refresh PARTUUID of partition {self.device_path} due to partprobe error.", level=logging.DEBUG) @@ -262,37 +313,39 @@ class Partition: try: return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip() except SysCallError as error: - if self.block_device.info.get('TYPE') == 'iso9660': + if self.block_device.partition_type == 'iso9660': # Parent device is a Optical Disk (.iso dd'ed onto a device for instance) return None log(f"Could not get PARTUUID of partition using 'blkid -s PARTUUID -o value {self.device_path}': {error}") + return self._partition_info.uuid + @property def encrypted(self) -> Union[bool, None]: return self._encrypted - @encrypted.setter - def encrypted(self, value: bool) -> None: - self._encrypted = value - @property def parent(self) -> str: return self.real_device @property def real_device(self) -> str: - for blockdevice in json.loads(SysCommand('lsblk -J').decode('UTF-8'))['blockdevices']: - if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): - return f"/dev/{parent}" - # raise DiskError(f'Could not find appropriate parent for encrypted partition {self}') - return self.path + output = SysCommand('lsblk -J').decode('UTF-8') + + if output: + for blockdevice in json.loads(output)['blockdevices']: + if parent := self.find_parent_of(blockdevice, os.path.basename(self.device_path)): + return f"/dev/{parent}" + return self._path + + raise DiskError('Unable to get disk information for command "lsblk -J"') @property def device_path(self) -> str: - """ for bind mounts returns the phisical path of the partition + """ for bind mounts returns the physical path of the partition """ - device_path, bind_name = split_bind_name(self.path) + device_path, bind_name = split_bind_name(self._path) return device_path @property @@ -300,38 +353,40 @@ class Partition: """ for bind mounts returns the bind name (subvolume path). Returns none if this property does not exist """ - device_path, bind_name = split_bind_name(self.path) + device_path, bind_name = split_bind_name(self._path) return bind_name @property - def subvolumes(self) -> Iterator[BtrfsSubvolume]: + def subvolumes(self) -> Iterator[BtrfsSubvolumeInfo]: from .helpers import findmnt - + def iterate_children_recursively(information): for child in information.get('children', []): if target := child.get('target'): - if subvolume := subvolume_info_from_path(pathlib.Path(target)): - yield subvolume + if child.get('fstype') == 'btrfs': + if subvolume := subvolume_info_from_path(Path(target)): + yield subvolume if child.get('children'): for subchild in iterate_children_recursively(child): yield subchild - for mountpoint in self.mount_information: - if result := findmnt(pathlib.Path(mountpoint['target'])): - for filesystem in result.get('filesystems', []): - if subvolume := subvolume_info_from_path(pathlib.Path(mountpoint['target'])): - yield subvolume + if self._partition_info.filesystem_type == 'btrfs': + for mountpoint in self._partition_info.mountpoints: + if result := findmnt(mountpoint): + for filesystem in result.get('filesystems', []): + if subvolume := subvolume_info_from_path(mountpoint): + yield subvolume - for child in iterate_children_recursively(filesystem): - yield child + for child in iterate_children_recursively(filesystem): + yield child def partprobe(self) -> bool: try: if self.block_device: return 0 == SysCommand(f'partprobe {self.block_device.device}').exit_code except SysCallError as error: - log(f"Unreliable results might be given for {self.path} due to partprobe error: {error}", level=logging.DEBUG) + log(f"Unreliable results might be given for {self._path} due to partprobe error: {error}", level=logging.DEBUG) return False @@ -343,19 +398,20 @@ class Partition: with luks2(self, storage.get('ENC_IDENTIFIER', 'ai') + 'loop', password, auto_unmount=True) as unlocked_device: return unlocked_device.filesystem except SysCallError: - return None + pass + return None def has_content(self) -> bool: - fs_type = get_filesystem_type(self.path) + fs_type = self._partition_info.filesystem_type if not fs_type or "swap" in fs_type: return False temporary_mountpoint = '/tmp/' + hashlib.md5(bytes(f"{time.time()}", 'UTF-8') + os.urandom(12)).hexdigest() - temporary_path = pathlib.Path(temporary_mountpoint) + temporary_path = Path(temporary_mountpoint) temporary_path.mkdir(parents=True, exist_ok=True) - if (handle := SysCommand(f'/usr/bin/mount {self.path} {temporary_mountpoint}')).exit_code != 0: - raise DiskError(f'Could not mount and check for content on {self.path} because: {b"".join(handle)}') + if (handle := SysCommand(f'/usr/bin/mount {self._path} {temporary_mountpoint}')).exit_code != 0: + raise DiskError(f'Could not mount and check for content on {self._path} because: {handle}') files = len(glob.glob(f"{temporary_mountpoint}/*")) iterations = 0 @@ -366,14 +422,14 @@ class Partition: return True if files > 0 else False - def encrypt(self, *args :str, **kwargs :str) -> str: + def encrypt(self, password: Optional[str] = None) -> str: """ A wrapper function for luks2() instances and the .encrypt() method of that instance. """ from ..luks import luks2 handle = luks2(self, None, None) - return handle.encrypt(self, *args, **kwargs) + return handle.encrypt(self, password=password) def format(self, filesystem :Optional[str] = None, path :Optional[str] = None, log_formatting :bool = True, options :List[str] = [], retry :bool = True) -> bool: """ @@ -381,17 +437,17 @@ class Partition: the formatting functionality and in essence the support for the given filesystem. """ if filesystem is None: - filesystem = self.filesystem + filesystem = self._partition_info.filesystem_type if path is None: - path = self.path + path = self._path # This converts from fat32 -> vfat to unify filesystem names filesystem = get_mount_fs_type(filesystem) # To avoid "unable to open /dev/x: No such file or directory" start_wait = time.time() - while pathlib.Path(path).exists() is False and time.time() - start_wait < 10: + while Path(path).exists() is False and time.time() - start_wait < 10: time.sleep(0.025) if log_formatting: @@ -401,57 +457,57 @@ class Partition: if filesystem == 'btrfs': options = ['-f'] + options - if 'UUID:' not in (mkfs := SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8')): + mkfs = SysCommand(f"/usr/bin/mkfs.btrfs {' '.join(options)} {path}").decode('UTF-8') + if mkfs and 'UUID:' not in mkfs: raise DiskError(f'Could not format {path} with {filesystem} because: {mkfs}') - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'vfat': options = ['-F32'] + options - + log(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}") if (handle := SysCommand(f"/usr/bin/mkfs.vfat {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ext4': options = ['-F'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ext4 {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ext2': options = ['-F'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ext2 {' '.join(options)} {path}")).exit_code != 0: - raise DiskError(f'Could not format {path} with {filesystem} because: {b"".join(handle)}') - self.filesystem = 'ext2' - + raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") + self._partition_info.filesystem_type = 'ext2' elif filesystem == 'xfs': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.xfs {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'f2fs': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.f2fs {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'ntfs3': options = ['-f'] + options if (handle := SysCommand(f"/usr/bin/mkfs.ntfs -Q {' '.join(options)} {path}")).exit_code != 0: raise DiskError(f"Could not format {path} with {filesystem} because: {handle.decode('UTF-8')}") - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem elif filesystem == 'crypto_LUKS': # from ..luks import luks2 # encrypted_partition = luks2(self, None, None) # encrypted_partition.format(path) - self.filesystem = filesystem + self._partition_info.filesystem_type = filesystem else: raise UnknownFilesystemFormat(f"Fileformat '{filesystem}' is not yet implemented.") @@ -460,13 +516,13 @@ class Partition: if retry is True: log(f"Retrying in {storage.get('DISK_TIMEOUTS', 1)} seconds.", level=logging.WARNING, fg="orange") time.sleep(storage.get('DISK_TIMEOUTS', 1)) - + return self.format(filesystem, path, log_formatting, options, retry=False) if get_filesystem_type(path) == 'crypto_LUKS' or get_filesystem_type(self.real_device) == 'crypto_LUKS': - self.encrypted = True + self._encrypted = True else: - self.encrypted = False + self._encrypted = False return True @@ -478,18 +534,18 @@ class Partition: if parent := self.find_parent_of(child, name, parent=data['name']): return parent + return None + def mount(self, target :str, fs :Optional[str] = None, options :str = '') -> bool: - if not self.mountpoint: + if not self._partition_info.get_first_mountpoint(): log(f'Mounting {self} to {target}', level=logging.INFO) if not fs: - if not self.filesystem: - raise DiskError(f'Need to format (or define) the filesystem on {self} before mounting.') - fs = self.filesystem + fs = self._partition_info.filesystem_type fs_type = get_mount_fs_type(fs) - pathlib.Path(target).mkdir(parents=True, exist_ok=True) + Path(target).mkdir(parents=True, exist_ok=True) if self.bind_name: device_path = self.device_path @@ -499,7 +555,7 @@ class Partition: else: options = f"subvol={self.bind_name}" else: - device_path = self.path + device_path = self._path try: if options: mnt_handle = SysCommand(f"/usr/bin/mount -t {fs_type} -o {options} {device_path} {target}") @@ -508,7 +564,7 @@ class Partition: # TODO: Should be redundant to check for exit_code if mnt_handle.exit_code != 0: - raise DiskError(f"Could not mount {self.path} to {target} using options {options}") + raise DiskError(f"Could not mount {self._path} to {target} using options {options}") except SysCallError as err: raise err @@ -517,19 +573,17 @@ class Partition: return False def unmount(self) -> bool: - worker = SysCommand(f"/usr/bin/umount {self.path}") + worker = SysCommand(f"/usr/bin/umount {self._path}") + exit_code = worker.exit_code # Without to much research, it seams that low error codes are errors. # And above 8k is indicators such as "/dev/x not mounted.". # So anything in between 0 and 8k are errors (?). - if 0 < worker.exit_code < 8000: - raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code) + if exit_code and 0 < exit_code < 8000: + raise SysCallError(f"Could not unmount {self._path} properly: {worker}", exit_code=exit_code) return True - def umount(self) -> bool: - return self.unmount() - def filesystem_supported(self) -> bool: """ The support for a filesystem (this partition) is tested by calling @@ -538,7 +592,7 @@ class Partition: 2. UnknownFilesystemFormat that indicates that we don't support the given filesystem type """ try: - self.format(self.filesystem, '/dev/null', log_formatting=False, allow_formatting=True) + self.format(self._partition_info.filesystem_type, '/dev/null', log_formatting=False) except (SysCallError, DiskError): pass # We supported it, but /dev/null is not formattable as expected so the mkfs call exited with an error code except UnknownFilesystemFormat as err: diff --git a/archinstall/lib/disk/user_guides.py b/archinstall/lib/disk/user_guides.py index 5fa6bfdc..5809c073 100644 --- a/archinstall/lib/disk/user_guides.py +++ b/archinstall/lib/disk/user_guides.py @@ -3,6 +3,8 @@ import logging from typing import Optional, Dict, Any, List, TYPE_CHECKING # https://stackoverflow.com/a/39757388/929999 +from ..models.subvolume import Subvolume + if TYPE_CHECKING: from .blockdevice import BlockDevice _: Any @@ -107,17 +109,14 @@ def suggest_single_disk_layout(block_device :BlockDevice, # https://unix.stackexchange.com/questions/246976/btrfs-subvolume-uuid-clash # https://github.com/classy-giraffe/easy-arch/blob/main/easy-arch.sh layout[block_device.path]['partitions'][1]['btrfs'] = { - "subvolumes" : { - "@":"/", - "@home": "/home", - "@log": "/var/log", - "@pkg": "/var/cache/pacman/pkg", - "@.snapshots": "/.snapshots" - } + 'subvolumes': [ + Subvolume('@', '/'), + Subvolume('@home', '/home'), + Subvolume('@log', '/var/log'), + Subvolume('@pkg', '/var/cache/pacman/pkg'), + Subvolume('@.snapshots', '/.snapshots') + ] } - # else: - # pass # ... implement a guided setup - elif using_home_partition: # If we don't want to use subvolumes, # But we want to be able to re-use data between re-installs.. diff --git a/archinstall/lib/disk/validators.py b/archinstall/lib/disk/validators.py index fd1b7f33..54808886 100644 --- a/archinstall/lib/disk/validators.py +++ b/archinstall/lib/disk/validators.py @@ -7,13 +7,11 @@ def valid_parted_position(pos :str) -> bool: if pos.isdigit(): return True - if pos[-1] == '%' and pos[:-1].isdigit(): + if pos.lower().endswith('b') and pos[:-1].isdigit(): return True - if pos[-3:].lower() in ['mib', 'kib', 'b', 'tib'] and pos[:-3].replace(".", "", 1).isdigit(): - return True - - if pos[-2:].lower() in ['kb', 'mb', 'gb', 'tb'] and pos[:-2].replace(".", "", 1).isdigit(): + if any(pos.lower().endswith(size) and pos[:-len(size)].replace(".", "", 1).isdigit() + for size in ['%', 'kb', 'mb', 'gb', 'tb', 'kib', 'mib', 'gib', 'tib']): return True return False diff --git a/archinstall/lib/exceptions.py b/archinstall/lib/exceptions.py index a16faa3f..a66e4e04 100644 --- a/archinstall/lib/exceptions.py +++ b/archinstall/lib/exceptions.py @@ -1,4 +1,7 @@ -from typing import Optional +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .general import SysCommandWorker class RequirementError(BaseException): pass @@ -17,10 +20,11 @@ class ProfileError(BaseException): class SysCallError(BaseException): - def __init__(self, message :str, exit_code :Optional[int] = None) -> None: + def __init__(self, message :str, exit_code :Optional[int] = None, worker :Optional['SysCommandWorker'] = None) -> None: super(SysCallError, self).__init__(message) self.message = message self.exit_code = exit_code + self.worker = worker class PermissionError(BaseException): diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py index b99e4a45..d76b7036 100644 --- a/archinstall/lib/general.py +++ b/archinstall/lib/general.py @@ -6,12 +6,14 @@ import os import secrets import shlex import subprocess +import stat import string import sys import time import re import urllib.parse import urllib.request +import urllib.error import pathlib from datetime import datetime, date from typing import Callable, Optional, Dict, Any, List, Union, Iterator, TYPE_CHECKING @@ -37,7 +39,7 @@ else: def unregister(self, fileno :int, *args :List[Any], **kwargs :Dict[str, Any]) -> None: try: - del(self.monitoring[fileno]) + del(self.monitoring[fileno]) # noqa: E275 except: pass @@ -207,7 +209,7 @@ class SysCommandWorker: self.cmd = cmd self.callbacks = callbacks self.peak_output = peak_output - # define the standard locale for command outputs. For now the C ascii one. Can be overriden + # define the standard locale for command outputs. For now the C ascii one. Can be overridden self.environment_vars = {**storage.get('CMD_LOCALE',{}),**environment_vars} self.logfile = logfile self.working_directory = working_directory @@ -270,7 +272,7 @@ class SysCommandWorker: log(args[1], level=logging.DEBUG, fg='red') if self.exit_code != 0: - raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code) + raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code, worker=self) def is_alive(self) -> bool: self.poll() @@ -312,9 +314,18 @@ class SysCommandWorker: except UnicodeDecodeError: return False - with open(f"{storage['LOG_PATH']}/cmd_output.txt", "a") as peak_output_log: + peak_logfile = pathlib.Path(f"{storage['LOG_PATH']}/cmd_output.txt") + + change_perm = False + if peak_logfile.exists() is False: + change_perm = True + + with peak_logfile.open("a") as peak_output_log: peak_output_log.write(output) + if change_perm: + os.chmod(str(peak_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) + sys.stdout.write(str(output)) sys.stdout.flush() @@ -354,16 +365,24 @@ class SysCommandWorker: # Note: If for any reason, we get a Python exception between here # and until os.close(), the traceback will get locked inside # stdout of the child_fd object. `os.read(self.child_fd, 8192)` is the - # only way to get the traceback without loosing it. + # only way to get the traceback without losing it. self.pid, self.child_fd = pty.fork() # https://stackoverflow.com/questions/4022600/python-pty-fork-how-does-it-work if not self.pid: + history_logfile = pathlib.Path(f"{storage['LOG_PATH']}/cmd_history.txt") try: + change_perm = False + if history_logfile.exists() is False: + change_perm = True + try: - with open(f"{storage['LOG_PATH']}/cmd_history.txt", "a") as cmd_log: + with history_logfile.open("a") as cmd_log: cmd_log.write(f"{self.cmd}\n") + + if change_perm: + os.chmod(str(history_logfile), stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP) except PermissionError: pass @@ -443,7 +462,7 @@ class SysCommand: def __repr__(self, *args :List[Any], **kwargs :Dict[str, Any]) -> str: if self.session: - return self.session._trace_log.decode('UTF-8') + return self.session._trace_log.decode('UTF-8', errors='backslashreplace') return '' def __json__(self) -> Dict[str, Union[str, bool, List[str], Dict[str, Any], Optional[bool], Optional[Dict[str, Any]]]]: @@ -547,9 +566,13 @@ def json_stream_to_structure(configuration_identifier : str, stream :str, target parsed_url = urllib.parse.urlparse(stream) - if parsed_url.scheme: # The stream is in fact a URL that should be grabed - with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: - target.update(json.loads(response.read())) + if parsed_url.scheme: # The stream is in fact a URL that should be grabbed + try: + with urllib.request.urlopen(urllib.request.Request(stream, headers={'User-Agent': 'ArchInstall'})) as response: + target.update(json.loads(response.read())) + except urllib.error.HTTPError as error: + log(f"Could not load {configuration_identifier} via {parsed_url} due to: {error}", level=logging.ERROR, fg="red") + return False else: if pathlib.Path(stream).exists(): try: diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py index b2cd6306..1270959e 100644 --- a/archinstall/lib/installer.py +++ b/archinstall/lib/installer.py @@ -24,6 +24,7 @@ from .disk.partition import get_mount_fs_type from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError from .hsm import fido2_enroll from .models.users import User +from .models.subvolume import Subvolume if TYPE_CHECKING: _: Any @@ -158,8 +159,6 @@ class Installer: print(_(" Please submit this issue (and file) to https://github.com/archlinux/archinstall/issues")) raise args[1] - self.genfstab() - if not (missing_steps := self.post_install_check()): self.log('Installation completed without any errors. You may now reboot.', fg='green', level=logging.INFO) self.sync_log_to_install_medium() @@ -195,7 +194,7 @@ class Installer: return True def _create_keyfile(self,luks_handle , partition :dict, password :str): - """ roiutine to create keyfiles, so it can be moved elsewere + """ roiutine to create keyfiles, so it can be moved elsewhere """ if partition.get('generate-encryption-key-file'): if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists(): @@ -220,7 +219,7 @@ class Installer: """ if partition.get("mountpoint") is None: if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})): - for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list if sub_list[subvolume]]: + for mountpoint in [sub_list[subvolume].get("mountpoint") if isinstance(subvolume, dict) else subvolume.mountpoint for subvolume in sub_list]: if mountpoint == '/': return True return False @@ -247,16 +246,17 @@ class Installer: # we manage the encrypted partititons for partition in [entry for entry in list_part if entry.get('encrypted', False)]: # open the luks device and all associate stuff - if not (password := partition.get('!password', None)): - raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}") - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop" - else: - loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" + if not (password := partition.get('!password', None)) and storage['arguments'].get('!encryption-password'): + password = storage['arguments'].get('!encryption-password') + elif not password: + raise RequirementError(f"Missing partition encryption password in layout: {partition}") + + loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}" # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device: - if partition.get('generate-encryption-key-file',False) and not self._has_root(partition): - list_luks_handles.append([luks_handle,partition,password]) + if partition.get('generate-encryption-key-file', False) and not self._has_root(partition): + list_luks_handles.append([luks_handle, partition, password]) # this way all the requesrs will be to the dm_crypt device and not to the physical partition partition['device_instance'] = unlocked_device @@ -265,47 +265,25 @@ class Installer: hsm_device_path = storage['arguments']['HSM'] fido2_enroll(hsm_device_path, partition['device_instance'], password) - # we manage the btrfs partitions - if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]): - for partition in btrfs_subvolumes: - if mount_options := ','.join(partition.get('filesystem',{}).get('mount_options',[])): - self.mount(partition['device_instance'], "/", options=mount_options) - else: - self.mount(partition['device_instance'], "/") - - setup_subvolumes( - installation=self, - partition_dict=partition - ) + btrfs_subvolumes = [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', [])] - partition['device_instance'].unmount() + for partition in btrfs_subvolumes: + device_instance = partition['device_instance'] + mount_options = partition.get('filesystem', {}).get('mount_options', []) + self.mount(device_instance, "/", options=','.join(mount_options)) + setup_subvolumes(installation=self, partition_dict=partition) + device_instance.unmount() # We then handle any special cases, such as btrfs - if any(btrfs_subvolumes := [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]): - for partition_information in btrfs_subvolumes: - for name, mountpoint in sorted(partition_information['btrfs']['subvolumes'].items(), key=lambda item: item[1]): - btrfs_subvolume_information = {} - - match mountpoint: - case str(): # backwards-compatability - btrfs_subvolume_information['mountpoint'] = mountpoint - btrfs_subvolume_information['options'] = [] - case dict(): - btrfs_subvolume_information['mountpoint'] = mountpoint.get('mountpoint', None) - btrfs_subvolume_information['options'] = mountpoint.get('options', []) - case _: - continue - - if mountpoint_parsed := btrfs_subvolume_information.get('mountpoint'): - # We cache the mount call for later - mount_queue[mountpoint_parsed] = lambda device=partition_information['device_instance'], \ - name=name, \ - subvolume_information=btrfs_subvolume_information: mount_subvolume( - installation=self, - device=device, - name=name, - subvolume_information=subvolume_information - ) + for partition in btrfs_subvolumes: + subvolumes: List[Subvolume] = partition['btrfs']['subvolumes'] + for subvolume in sorted(subvolumes, key=lambda item: item.mountpoint): + # We cache the mount call for later + mount_queue[subvolume.mountpoint] = lambda sub_vol=subvolume, device=partition['device_instance']: mount_subvolume( + installation=self, + device=device, + subvolume=sub_vol + ) # We mount ordinary partitions, and we sort them by the mountpoint for partition in sorted([entry for entry in list_part if entry.get('mountpoint', False)], key=lambda part: part['mountpoint']): @@ -348,7 +326,7 @@ class Installer: def enable_multilib_repository(self): # Set up a regular expression pattern of a commented line containing 'multilib' within [] - pattern = re.compile("^#\\[.*multilib.*\\]$") + pattern = re.compile(r"^#\s*\[multilib\]$") # This is used to track if the previous line is a match, so we end up uncommenting the line after the block. matched = False @@ -413,7 +391,7 @@ class Installer: try: run_pacman('-Syy', default_cmd='/usr/bin/pacman') except SysCallError as error: - self.log(f'Could not sync a new package databse: {error}', level=logging.ERROR, fg="red") + self.log(f'Could not sync a new package database: {error}', level=logging.ERROR, fg="red") if storage['arguments'].get('silent', False) is False: if input('Would you like to re-try this download? (Y/n): ').lower().strip() in ('', 'y'): @@ -454,7 +432,8 @@ class Installer: for plugin in plugins.values(): if hasattr(plugin, 'on_genfstab'): - plugin.on_genfstab(self) + if plugin.on_genfstab(self) is True: + break return True @@ -466,10 +445,27 @@ class Installer: if not len(locale): return True + modifier = '' + + # This is a temporary patch to fix #1200 + if '.' in locale: + locale, potential_encoding = locale.split('.', 1) + + # Override encoding if encoding is set to the default parameter + # and the "found" encoding differs. + if encoding == 'UTF-8' and encoding != potential_encoding: + encoding = potential_encoding + + # Make sure we extract the modifier, that way we can put it in if needed. + if '@' in locale: + locale, modifier = locale.split('@', 1) + modifier = f"@{modifier}" + # - End patch + with open(f'{self.target}/etc/locale.gen', 'a') as fh: - fh.write(f'{locale}.{encoding} {encoding}\n') + fh.write(f'{locale}.{encoding}{modifier} {encoding}\n') with open(f'{self.target}/etc/locale.conf', 'w') as fh: - fh.write(f'LANG={locale}.{encoding}\n') + fh.write(f'LANG={locale}.{encoding}{modifier}\n') return True if SysCommand(f'/usr/bin/arch-chroot {self.target} locale-gen').exit_code == 0 else False @@ -654,7 +650,7 @@ class Installer: mkinit.write(f"BINARIES=({' '.join(self.BINARIES)})\n") mkinit.write(f"FILES=({' '.join(self.FILES)})\n") - if not storage['arguments']['HSM']: + if not storage['arguments'].get('HSM'): # For now, if we don't use HSM we revert to the old # way of setting up encryption hooks for mkinitcpio. # This is purely for stability reasons, we're going away from this. @@ -696,7 +692,7 @@ class Installer: self.HOOKS.remove('fsck') if self.detect_encryption(partition): - if storage['arguments']['HSM']: + if storage['arguments'].get('HSM'): # Required bby mkinitcpio to add support for fido2-device options self.pacstrap('libfido2') @@ -728,7 +724,7 @@ class Installer: self.log("The multilib flag is set. This system will be installed with the multilib repository enabled.") self.enable_multilib_repository() else: - self.log("The testing flag is not set. This system will be installed without testing repositories enabled.") + self.log("The multilib flag is not set. This system will be installed without multilib repositories enabled.") if testing: self.log("The testing flag is set. This system will be installed with testing repositories enabled.") @@ -760,7 +756,7 @@ class Installer: # TODO: Use python functions for this SysCommand(f'/usr/bin/arch-chroot {self.target} chmod 700 /root') - if storage['arguments']['HSM']: + if storage['arguments'].get('HSM'): # TODO: # A bit of a hack, but we need to get vconsole.conf in there # before running `mkinitcpio` because it expects it in HSM mode. @@ -872,7 +868,7 @@ class Installer: options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n' for subvolume in root_partition.subvolumes: - if subvolume.root is True: + if subvolume.root is True and subvolume.name != '<FS_TREE>': options_entry = f"rootflags=subvol={subvolume.name} " + options_entry # Zswap should be disabled when using zram. @@ -888,7 +884,7 @@ class Installer: kernel_options = f"options" - if storage['arguments']['HSM']: + if storage['arguments'].get('HSM'): # Note: lsblk UUID must be used, not PARTUUID for sd-encrypt to work kernel_options += f" rd.luks.name={real_device.uuid}=luksdev" # Note: tpm2-device and fido2-device don't play along very well: @@ -1022,10 +1018,9 @@ class Installer: boot_partition = None root_partition = None for partition in self.partitions: - print(partition, [partition.mountpoint], [self.target]) - if partition.mountpoint == self.target / 'boot': + if self.target / 'boot' in partition.mountpoints: boot_partition = partition - elif partition.mountpoint == self.target: + elif self.target in partition.mountpoints: root_partition = partition if boot_partition is None or root_partition is None: @@ -1154,7 +1149,8 @@ class Installer: return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c \"chsh -s {shell} {user}\"").exit_code == 0 def chown(self, owner :str, path :str, options :List[str] = []) -> bool: - return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {path}").exit_code == 0 + cleaned_path = path.replace('\'', '\\\'') + return SysCommand(f"/usr/bin/arch-chroot {self.target} sh -c 'chown {' '.join(options)} {owner} {cleaned_path}'").exit_code == 0 def create_file(self, filename :str, owner :Optional[str] = None) -> InstallationFile: return InstallationFile(self, filename, owner) diff --git a/archinstall/lib/locale_helpers.py b/archinstall/lib/locale_helpers.py index b48c3bc4..5580fa91 100644 --- a/archinstall/lib/locale_helpers.py +++ b/archinstall/lib/locale_helpers.py @@ -20,7 +20,7 @@ def list_locales() -> List[str]: entries.reverse() for entry in entries: - text = entry[1:].strip() + text = entry.replace('#', '').strip() if text == '': break locales.append(text) diff --git a/archinstall/lib/luks.py b/archinstall/lib/luks.py index ac480b11..7e4534d8 100644 --- a/archinstall/lib/luks.py +++ b/archinstall/lib/luks.py @@ -22,9 +22,9 @@ from .disk.btrfs import BTRFSPartition class luks2: def __init__(self, - partition :Partition, - mountpoint :str, - password :str, + partition: Partition, + mountpoint: Optional[str], + password: Optional[str], key_file :Optional[str] = None, auto_unmount :bool = False, *args :str, diff --git a/archinstall/lib/menu/global_menu.py b/archinstall/lib/menu/global_menu.py index 5cb27cab..d1bec189 100644 --- a/archinstall/lib/menu/global_menu.py +++ b/archinstall/lib/menu/global_menu.py @@ -3,38 +3,37 @@ from __future__ import annotations from typing import Any, List, Optional, Union, Dict, TYPE_CHECKING import archinstall - -from ..menu import Menu -from ..menu.selection_menu import Selector, GeneralMenu +from ..disk import encrypted_partitions from ..general import SysCommand, secret from ..hardware import has_uefi +from ..menu import Menu +from ..menu.selection_menu import Selector, GeneralMenu from ..models import NetworkConfiguration -from ..storage import storage +from ..models.users import User +from ..output import FormattedOutput from ..profiles import is_desktop_profile, Profile -from ..disk import encrypted_partitions - -from ..user_interaction import get_password, ask_for_a_timezone, save_config -from ..user_interaction import ask_ntp -from ..user_interaction import ask_for_swap +from ..storage import storage +from ..user_interaction import add_number_of_parrallel_downloads +from ..user_interaction import ask_additional_packages_to_install +from ..user_interaction import ask_for_additional_users +from ..user_interaction import ask_for_audio_selection from ..user_interaction import ask_for_bootloader +from ..user_interaction import ask_for_swap from ..user_interaction import ask_hostname -from ..user_interaction import ask_for_audio_selection -from ..user_interaction import ask_additional_packages_to_install +from ..user_interaction import ask_ntp from ..user_interaction import ask_to_configure_network -from ..user_interaction import ask_for_additional_users -from ..user_interaction import select_language -from ..user_interaction import select_mirror_regions -from ..user_interaction import select_locale_lang -from ..user_interaction import select_locale_enc +from ..user_interaction import get_password, ask_for_a_timezone, save_config +from ..user_interaction import select_additional_repositories from ..user_interaction import select_disk_layout -from ..user_interaction import select_kernel from ..user_interaction import select_encrypted_partitions from ..user_interaction import select_harddrives +from ..user_interaction import select_kernel +from ..user_interaction import select_language +from ..user_interaction import select_locale_enc +from ..user_interaction import select_locale_lang +from ..user_interaction import select_mirror_regions from ..user_interaction import select_profile -from ..user_interaction import select_additional_repositories -from ..models.users import User from ..user_interaction.partitioning_conf import current_partition_layout -from ..output import FormattedOutput if TYPE_CHECKING: _: Any @@ -42,6 +41,7 @@ if TYPE_CHECKING: class GlobalMenu(GeneralMenu): def __init__(self,data_store): + self._disk_check = True super().__init__(data_store=data_store, auto_cursor=True, preview_size=0.3) def _setup_selection_menu_options(self): @@ -50,7 +50,8 @@ class GlobalMenu(GeneralMenu): Selector( _('Archinstall language'), lambda x: self._select_archinstall_language(x), - default='English') + display_func=lambda x: x.display_name, + default=self.translation_handler.get_language_by_abbr('en')) self._menu_options['keyboard-layout'] = \ Selector( _('Keyboard layout'), @@ -143,6 +144,15 @@ class GlobalMenu(GeneralMenu): display_func=lambda x: x if x else 'None', default=None ) + + self._menu_options['parallel downloads'] = \ + Selector( + _('Parallel Downloads'), + add_number_of_parrallel_downloads, + display_func=lambda x: x if x else '0', + default=0 + ) + self._menu_options['kernels'] = \ Selector( _('Kernels'), @@ -163,7 +173,8 @@ class GlobalMenu(GeneralMenu): Selector( _('Network configuration'), ask_to_configure_network, - display_func=lambda x: self._prev_network_configuration(x), + display_func=lambda x: self._display_network_conf(x), + preview_func=self._prev_network_config, default={}) self._menu_options['timezone'] = \ Selector( @@ -204,14 +215,21 @@ class GlobalMenu(GeneralMenu): # Then we need to identify which partitions to encrypt. This will default to / (root). if len(list(encrypted_partitions(storage['arguments'].get('disk_layouts', [])))) == 0: for blockdevice in storage['arguments']['disk_layouts']: - for partition_index in select_encrypted_partitions( - title="Select which partitions to encrypt:", - partitions=storage['arguments']['disk_layouts'][blockdevice]['partitions'] - ): - - partition = storage['arguments']['disk_layouts'][blockdevice]['partitions'][partition_index] - partition['encrypted'] = True - partition['!password'] = storage['arguments']['!encryption-password'] + if storage['arguments']['disk_layouts'][blockdevice].get('partitions'): + for partition_index in select_encrypted_partitions( + title=_('Select which partitions to encrypt:'), + partitions=storage['arguments']['disk_layouts'][blockdevice]['partitions'], + filter_=(lambda p: p['mountpoint'] != '/boot') + ): + + partition = storage['arguments']['disk_layouts'][blockdevice]['partitions'][partition_index] + partition['encrypted'] = True + partition['!password'] = storage['arguments']['!encryption-password'] + + # We make sure generate-encryption-key-file is set on additional partitions + # other than the root partition. Otherwise they won't unlock properly #1279 + if partition['mountpoint'] != '/': + partition['generate-encryption-key-file'] = True def _install_text(self): missing = len(self._missing_configs()) @@ -219,21 +237,28 @@ class GlobalMenu(GeneralMenu): return _('Install ({} config(s) missing)').format(missing) return _('Install') - def _prev_network_configuration(self, cur_value: Union[NetworkConfiguration, List[NetworkConfiguration]]) -> str: + def _display_network_conf(self, cur_value: Union[NetworkConfiguration, List[NetworkConfiguration]]) -> str: if not cur_value: return _('Not configured, unavailable unless setup manually') else: if isinstance(cur_value, list): - ifaces = [x.iface for x in cur_value] - return f'Configured ifaces: {ifaces}' + return str(_('Configured {} interfaces')).format(len(cur_value)) else: return str(cur_value) + def _prev_network_config(self) -> Optional[str]: + selector = self._menu_options['nic'] + if selector.has_selection(): + ifaces = selector.current_selection + if isinstance(ifaces, list): + return FormattedOutput.as_table(ifaces) + return None + def _prev_harddrives(self) -> Optional[str]: selector = self._menu_options['harddrives'] if selector.has_selection(): drives = selector.current_selection - return '\n\n'.join([d.display_info for d in drives]) + return FormattedOutput.as_table(drives) return None def _prev_disk_layouts(self) -> Optional[str]: @@ -288,11 +313,12 @@ class GlobalMenu(GeneralMenu): missing += ['Hostname'] if not check('!root-password') and not has_superuser(): missing += [str(_('Either root-password or at least 1 user with sudo privileges must be specified'))] - if not check('harddrives'): - missing += ['Hard drives'] - if check('harddrives'): - if not self._menu_options['harddrives'].is_empty() and not check('disk_layouts'): - missing += ['Disk layout'] + if self._disk_check: + if not check('harddrives'): + missing += [str(_('Drive(s)'))] + if check('harddrives'): + if not self._menu_options['harddrives'].is_empty() and not check('disk_layouts'): + missing += [str(_('Disk layout'))] return missing @@ -318,22 +344,26 @@ class GlobalMenu(GeneralMenu): def _select_harddrives(self, old_harddrives : list) -> List: harddrives = select_harddrives(old_harddrives) - if len(harddrives) == 0: - prompt = _( - "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n" - "WARNING: Archinstall won't check the suitability of this setup\n" - "Do you wish to continue?" - ).format(storage['MOUNT_POINT']) + if harddrives is not None: + if len(harddrives) == 0: + prompt = _( + "You decided to skip harddrive selection\nand will use whatever drive-setup is mounted at {} (experimental)\n" + "WARNING: Archinstall won't check the suitability of this setup\n" + "Do you wish to continue?" + ).format(storage['MOUNT_POINT']) - choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run() + choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes(), skip=False).run() - if choice.value == Menu.no(): - return self._select_harddrives(old_harddrives) + if choice.value == Menu.no(): + self._disk_check = True + return self._select_harddrives(old_harddrives) + else: + self._disk_check = False - # in case the harddrives got changed we have to reset the disk layout as well - if old_harddrives != harddrives: - self._menu_options['disk_layouts'].set_current_selection(None) - storage['arguments']['disk_layouts'] = {} + # in case the harddrives got changed we have to reset the disk layout as well + if old_harddrives != harddrives: + self._menu_options['disk_layouts'].set_current_selection(None) + storage['arguments']['disk_layouts'] = {} return harddrives diff --git a/archinstall/lib/menu/list_manager.py b/archinstall/lib/menu/list_manager.py index cb567093..ae3a6eb5 100644 --- a/archinstall/lib/menu/list_manager.py +++ b/archinstall/lib/menu/list_manager.py @@ -1,94 +1,7 @@ -#!/usr/bin/python -""" -# Purpose -ListManager is a widget based on `menu` which allows the handling of repetitive operations in a list. -Imagine you have a list and want to add/copy/edit/delete their elements. With this widget you will be shown the list -``` -Vamos alla - -Use ESC to skip - - -> uno : 1 -dos : 2 -tres : 3 -cuatro : 4 -==> -Confirm and exit -Cancel -(Press "/" to search) -``` -Once you select one of the elements of the list, you will be promted with the action to be done to the selected element -``` - -uno : 1 -dos : 2 -> tres : 3 -cuatro : 4 -==> -Confirm and exit -Cancel -(Press "/" to search) - -Select an action for < {'tres': 3} > - - -Add -Copy -Edit -Delete -> Cancel -``` -You execute the action for this element (which might or not involve user interaction) and return to the list main page -till you call one of the options `confirm and exit` which returns the modified list or `cancel` which returns the original list unchanged. -If the list is empty one action can be defined as default (usually Add). We call it **null_action** -YOu can also define a **default_action** which will appear below the separator, not tied to any element of the list. Barring explicit definition, default_action will be the null_action -``` -==> -Add -Confirm and exit -Cancel -(Press "/" to search) -``` -The default implementation can handle simple lists and a key:value dictionary. The default actions are the shown above. -A sample of basic usage is included at the end of the source. - -More sophisticaded uses can be achieved by -* changing the action list and the null_action during intialization -``` - opciones = ListManager('Vamos alla',opciones,[str(_('Add')),str(_('Delete'))],_('Add')).run() -``` -* And using following methods to overwrite/define user actions and other details: -* * `reformat`. To change the appearance of the list elements -* * `action_list`. To modify the content of the action list once an element is defined. F.i. to avoid Delete to appear for certain elements, or to add/modify action based in the value of the element. -* * `exec_action` which contains the actual code to be executed when an action is selected - -The contents in the base class of this methods serve for a very basic usage, and are to be taken as samples. Thus the best use of this class would be to subclass in your code - -``` - class ObjectList(archinstall.ListManager): - def __init__(prompt,list): - self.ObjectAction = [... list of actions ...] - self.ObjectNullAction = one ObjectAction - super().__init__(prompt,list,ObjectActions,ObjectNullAction) - def reformat(self): - ... beautfy the output of the list - def action_list(self): - ... if you need some changes to the action list based on self.target - def exec_action(self): - if self.action == self.ObjectAction[0]: - performFirstAction(self.target, ...) - - ... - resultList = ObjectList(prompt,originallist).run() -``` - -""" import copy from os import system -from typing import Union, Any, TYPE_CHECKING, Dict, Optional +from typing import Any, TYPE_CHECKING, Dict, Optional, Tuple, List -from .text_input import TextInput from .menu import Menu if TYPE_CHECKING: @@ -98,199 +11,132 @@ if TYPE_CHECKING: class ListManager: def __init__( self, - prompt :str, - base_list :Union[list,dict] , - base_actions :list = None, - null_action :str = None, - default_action :Union[str,list] = None, - header :Union[str,list] = None): + prompt: str, + entries: List[Any], + base_actions: List[str], + sub_menu_actions: List[str] + ): """ - param :prompt Text which will appear at the header + :param prompt: Text which will appear at the header type param: string | DeferredTranslation - param :base:_list list/dict of option to be shown / mainpulated - type param: list | dict - - param base_actions an alternate list of actions to the items of the object + :param entries: list/dict of option to be shown / manipulated type param: list - param: null_action action which will be taken (if any) when base_list is empty - type param: string - - param: default_action action which will be presented at the bottom of the list. Shouldn't need a target. If not present, null_action is set there. - Both Null and Default actions can be defined outside the base_actions list, as long as they are launched in exec_action - type param: string or list + :param base_actions: list of actions that is displayed in the main list manager, + usually global actions such as 'Add...' + type param: list - param: header one or more header lines for the list - type param: string or list + :param sub_menu_actions: list of actions available for a chosen entry + type param: list """ + self._original_data = copy.deepcopy(entries) + self._data = copy.deepcopy(entries) explainer = str(_('\n Choose an object from the list, and select one of the available actions for it to execute')) self._prompt = prompt + explainer if prompt else explainer - self._null_action = str(null_action) if null_action else None + self._separator = '' + self._confirm_action = str(_('Confirm and exit')) + self._cancel_action = str(_('Cancel')) - if not default_action: - self._default_action = [self._null_action] - elif isinstance(default_action,(list,tuple)): - self._default_action = default_action - else: - self._default_action = [str(default_action),] + self._terminate_actions = [self._confirm_action, self._cancel_action] + self._base_actions = base_actions + self._sub_menu_actions = sub_menu_actions - self.header = header if header else None - self.cancel_action = str(_('Cancel')) - self.confirm_action = str(_('Confirm and exit')) - self.separator = '' - self.bottom_list = [self.confirm_action,self.cancel_action] - self.bottom_item = [self.cancel_action] - self.base_actions = base_actions if base_actions else [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))] - self._original_data = copy.deepcopy(base_list) - self._data = copy.deepcopy(base_list) # as refs, changes are immediate - # default values for the null case - self.target: Optional[Any] = None - self.action = self._null_action + self._last_choice = None - if len(self._data) == 0 and self._null_action: - self._data = self.exec_action(self._data) + @property + def last_choice(self): + return self._last_choice def run(self): while True: # this will return a dictionary with the key as the menu entry to be displayed # and the value is the original value from the self._data container data_formatted = self.reformat(self._data) - options = list(data_formatted.keys()) - options.append(self.separator) - - if self._default_action: - options += self._default_action - - options += self.bottom_list + options, header = self._prepare_selection(data_formatted) system('clear') - target = Menu( + choice = Menu( self._prompt, options, sort=False, clear_screen=False, clear_menu_on_exit=False, - header=self.header, + header=header, skip_empty_entries=True, - skip=False + skip=False, + show_search_hint=False ).run() - if not target.value or target.value in self.bottom_list: - self.action = target + if choice.value in self._base_actions: + self._data = self.handle_action(choice.value, None, self._data) + elif choice.value in self._terminate_actions: break + else: # an entry of the existing selection was choosen + selected_entry = data_formatted[choice.value] + self._run_actions_on_entry(selected_entry) - if target.value and target.value in self._default_action: - self.action = target.value - self.target = None - self._data = self.exec_action(self._data) - continue - - if isinstance(self._data,dict): - data_key = data_formatted[target.value] - key = self._data[data_key] - self.target = {data_key: key} - elif isinstance(self._data, list): - self.target = [d for d in self._data if d == data_formatted[target.value]][0] - else: - self.target = self._data[data_formatted[target.value]] - - # Possible enhacement. If run_actions returns false a message line indicating the failure - self.run_actions(target.value) - - if target.value == self.cancel_action: # TODO dubious + self._last_choice = choice + if choice.value == self._cancel_action: return self._original_data # return the original list else: return self._data - def run_actions(self,prompt_data=None): - options = self.action_list() + self.bottom_item - prompt = _("Select an action for < {} >").format(prompt_data if prompt_data else self.target) + def _prepare_selection(self, data_formatted: Dict[str, Any]) -> Tuple[List[str], str]: + # header rows are mapped to None so make sure + # to exclude those from the selectable data + options: List[str] = [key for key, val in data_formatted.items() if val is not None] + header = '' + + if len(options) > 0: + table_header = [key for key, val in data_formatted.items() if val is None] + header = '\n'.join(table_header) + + if len(options) > 0: + options.append(self._separator) + + options += self._base_actions + options += self._terminate_actions + + return options, header + + def _run_actions_on_entry(self, entry: Any): + options = self.filter_options(entry,self._sub_menu_actions) + [self._cancel_action] + display_value = self.selected_action_display(entry) + + prompt = _("Select an action for '{}'").format(display_value) + choice = Menu( prompt, options, sort=False, clear_screen=False, clear_menu_on_exit=False, - preset_values=self.bottom_item, show_search_hint=False ).run() - self.action = choice.value + if choice.value and choice.value != self._cancel_action: + self._data = self.handle_action(choice.value, entry, self._data) - if self.action and self.action != self.cancel_action: - self._data = self.exec_action(self._data) + def selected_action_display(self, selection: Any) -> str: + # this will return the value to be displayed in the + # "Select an action for '{}'" string + raise NotImplementedError('Please implement me in the child class') - """ - The following methods are expected to be overwritten by the user if the needs of the list are beyond the simple case - """ + def reformat(self, data: List[Any]) -> Dict[str, Any]: + # this should return a dictionary of display string to actual data entry + # mapping; if the value for a given display string is None it will be used + # in the header value (useful when displaying tables) + raise NotImplementedError('Please implement me in the child class') - def reformat(self, data: Any) -> Dict[str, Any]: - """ - method to get the data in a format suitable to be shown - It is executed once for run loop and processes the whole self._data structure - """ - if isinstance(data,dict): - return {f'{k}: {v}': k for k, v in data.items()} - else: - return {str(k): k for k in data} - - def action_list(self): - """ - can define alternate action list or customize the list for each item. - Executed after any item is selected, contained in self.target - """ - return self.base_actions - - def exec_action(self, data: Any): - """ - what's executed one an item (self.target) and one action (self.action) is selected. - Should be overwritten by the user - The result is expected to update self._data in this routine, else it is ignored - The basic code is useful for simple lists and dictionaries (key:value pairs, both strings) - """ - # TODO guarantee unicity - if isinstance(self._data,list): - if self.action == str(_('Add')): - self.target = TextInput(_('Add: '),None).run() - self._data.append(self.target) - if self.action == str(_('Copy')): - while True: - target = TextInput(_('Copy to: '),self.target).run() - if target != self.target: - self._data.append(self.target) - break - elif self.action == str(_('Edit')): - tgt = self.target - idx = self._data.index(self.target) - result = TextInput(_('Edit: '),tgt).run() - self._data[idx] = result - elif self.action == str(_('Delete')): - del self._data[self._data.index(self.target)] - elif isinstance(self._data,dict): - # allows overwrites - if self.target: - origkey,origval = list(self.target.items())[0] - else: - origkey = None - origval = None - if self.action == str(_('Add')): - key = TextInput(_('Key: '),None).run() - value = TextInput(_('Value: '),None).run() - self._data[key] = value - if self.action == str(_('Copy')): - while True: - key = TextInput(_('Copy to new key:'),origkey).run() - if key != origkey: - self._data[key] = origval - break - elif self.action == str(_('Edit')): - value = TextInput(_('Edit {}: ').format(origkey), origval).run() - self._data[origkey] = value - elif self.action == str(_('Delete')): - del self._data[origkey] + def handle_action(self, action: Any, entry: Optional[Any], data: List[Any]) -> List[Any]: + # this function is called when a base action or + # a specific action for an entry is triggered + raise NotImplementedError('Please implement me in the child class') - return self._data + def filter_options(self, selection :Any, options :List[str]) -> List[str]: + # filter which actions to show for an specific selection + return options diff --git a/archinstall/lib/menu/menu.py b/archinstall/lib/menu/menu.py index c34814eb..112bc0ae 100644 --- a/archinstall/lib/menu/menu.py +++ b/archinstall/lib/menu/menu.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from enum import Enum, auto -from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional +from os import system +from typing import Dict, List, Union, Any, TYPE_CHECKING, Optional, Callable from archinstall.lib.menu.simple_menu import TerminalMenu @@ -51,13 +52,17 @@ class Menu(TerminalMenu): sort :bool = True, preset_values :Union[str, List[str]] = None, cursor_index : Optional[int] = None, - preview_command=None, - preview_size=0.75, - preview_title='Info', + preview_command: Optional[Callable] = None, + preview_size: float = 0.75, + preview_title: str = 'Info', header :Union[List[str],str] = None, - explode_on_interrupt :bool = False, - explode_warning :str = '', - **kwargs + raise_error_on_interrupt :bool = False, + raise_error_warning_msg :str = '', + clear_screen: bool = True, + show_search_hint: bool = True, + cycle_cursor: bool = True, + clear_menu_on_exit: bool = True, + skip_empty_entries: bool = False ): """ Creates a new menu @@ -99,10 +104,10 @@ class Menu(TerminalMenu): param header: one or more header lines for the menu type param: string or list - param explode_on_interrupt: This will explicitly handle a ctrl+c instead and return that specific state + param raise_error_on_interrupt: This will explicitly handle a ctrl+c instead and return that specific state type param: bool - param explode_warning: If explode_on_interrupt is True and this is non-empty, there will be a warning with a user confirmation displayed + param raise_error_warning_msg: If raise_error_on_interrupt is True and this is non-empty, there will be a warning with a user confirmation displayed type param: str :param kwargs : any SimpleTerminal parameter @@ -115,7 +120,7 @@ class Menu(TerminalMenu): # We check that the options are iterable. If not we abort. Else we copy them to lists # it options is a dictionary we use the values as entries of the list # if options is a string object, each character becomes an entry - # if options is a list, we implictily build a copy to mantain immutability + # if options is a list, we implictily build a copy to maintain immutability if not isinstance(p_options,Iterable): log(f"Objects of type {type(p_options)} is not iterable, and are not supported at Menu",fg="red") log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>",level=logging.WARNING) @@ -145,27 +150,30 @@ class Menu(TerminalMenu): self._skip = skip self._default_option = default_option self._multi = multi - self._explode_on_interrupt = explode_on_interrupt - self._explode_warning = explode_warning + self._raise_error_on_interrupt = raise_error_on_interrupt + self._raise_error_warning_msg = raise_error_warning_msg + self._preview_command = preview_command menu_title = f'\n{title}\n\n' if header: if not isinstance(header,(list,tuple)): header = [header] - header = '\n'.join(header) - menu_title += f'\n{header}\n' + menu_title += '\n'.join(header) action_info = '' if skip: - action_info += str(_("Use ESC to skip")) + action_info += str(_('ESC to skip')) - if self._explode_on_interrupt: - if len(action_info) > 0: - action_info += '\n' - action_info += str(_('Use CTRL+C to reset current selection\n\n')) + if self._raise_error_on_interrupt: + action_info += ', ' if len(action_info) > 0 else '' + action_info += str(_('CTRL+C to reset')) - menu_title += action_info + if multi: + action_info += ', ' if len(action_info) > 0 else '' + action_info += str(_('TAB to select')) + + menu_title += action_info + '\n' if default_option: # if a default value was specified we move that one @@ -178,10 +186,6 @@ class Menu(TerminalMenu): cursor = "> " main_menu_cursor_style = ("fg_cyan", "bold") main_menu_style = ("bg_blue", "fg_gray") - # defaults that can be changed up the stack - kwargs['clear_screen'] = kwargs.get('clear_screen',True) - kwargs['show_search_hint'] = kwargs.get('show_search_hint',True) - kwargs['cycle_cursor'] = kwargs.get('cycle_cursor',True) super().__init__( menu_entries=self._menu_options, @@ -195,12 +199,16 @@ class Menu(TerminalMenu): # show_search_hint=True, preselected_entries=self.preset_values, cursor_index=self.cursor_index, - preview_command=preview_command, + preview_command=lambda x: self._preview_wrapper(preview_command, x), preview_size=preview_size, preview_title=preview_title, - explode_on_interrupt=self._explode_on_interrupt, + raise_error_on_interrupt=self._raise_error_on_interrupt, multi_select_select_on_accept=False, - **kwargs, + clear_screen=clear_screen, + show_search_hint=show_search_hint, + cycle_cursor=cycle_cursor, + clear_menu_on_exit=clear_menu_on_exit, + skip_empty_entries=skip_empty_entries ) def _show(self) -> MenuSelection: @@ -228,16 +236,24 @@ class Menu(TerminalMenu): else: return MenuSelection(type_=MenuSelectionType.Esc) + def _preview_wrapper(self, preview_command: Optional[Callable], current_selection: str) -> Optional[str]: + if preview_command: + if self._default_option is not None and f'{self._default_option} {self._default_str}' == current_selection: + current_selection = self._default_option + return preview_command(current_selection) + return None + def run(self) -> MenuSelection: ret = self._show() if ret.type_ == MenuSelectionType.Ctrl_c: - if self._explode_on_interrupt and len(self._explode_warning) > 0: - response = Menu(self._explode_warning, Menu.yes_no(), skip=False).run() + if self._raise_error_on_interrupt and len(self._raise_error_warning_msg) > 0: + response = Menu(self._raise_error_warning_msg, Menu.yes_no(), skip=False).run() if response.value == Menu.no(): return self.run() if ret.type_ is not MenuSelectionType.Selection and not self._skip: + system('clear') return self.run() return ret diff --git a/archinstall/lib/menu/selection_menu.py b/archinstall/lib/menu/selection_menu.py index 57e290f1..8a08812c 100644 --- a/archinstall/lib/menu/selection_menu.py +++ b/archinstall/lib/menu/selection_menu.py @@ -8,22 +8,15 @@ from typing import Callable, Any, List, Iterator, Tuple, Optional, Dict, TYPE_CH from .menu import Menu, MenuSelectionType from ..locale_helpers import set_keyboard_language from ..output import log -from ..translation import Translation +from ..translationhandler import TranslationHandler, Language from ..hsm.fido import get_fido2_devices +from ..user_interaction.general_conf import select_archinstall_language + if TYPE_CHECKING: _: Any -def select_archinstall_language(preset_value: str) -> Optional[Any]: - """ - copied from user_interaction/general_conf.py as a temporary measure - """ - languages = Translation.get_available_lang() - language = Menu(_('Archinstall language'), languages, preset_values=preset_value).run() - return language.value - - class Selector: def __init__( self, @@ -190,13 +183,18 @@ class GeneralMenu: """ self._enabled_order :List[str] = [] - self._translation = Translation.load_nationalization() + self._translation_handler = TranslationHandler() self.is_context_mgr = False self._data_store = data_store if data_store is not None else {} self.auto_cursor = auto_cursor self._menu_options: Dict[str, Selector] = {} self._setup_selection_menu_options() self.preview_size = preview_size + self._last_choice = None + + @property + def last_choice(self): + return self._last_choice def __enter__(self, *args :Any, **kwargs :Any) -> GeneralMenu: self.is_context_mgr = True @@ -217,9 +215,13 @@ class GeneralMenu: self.exit_callback() + @property + def translation_handler(self) -> TranslationHandler: + return self._translation_handler + def _setup_selection_menu_options(self): """ Define the menu options. - Menu options can be defined here in a subclass or done per progam calling self.set_option() + Menu options can be defined here in a subclass or done per program calling self.set_option() """ return @@ -227,7 +229,7 @@ class GeneralMenu: """ will be called before each action in the menu """ return - def post_callback(self, selector_name :str, value :Any): + def post_callback(self, selection_name: str = None, value: Any = None): """ will be called after each action in the menu """ return True @@ -327,12 +329,16 @@ class GeneralMenu: break cursor_pos += 1 - value = value.strip() + value = value.strip() - # if this calls returns false, we exit the menu - # we allow for an callback for special processing on realeasing control - if not self._process_selection(value): - break + # if this calls returns false, we exit the menu + # we allow for an callback for special processing on realeasing control + if not self._process_selection(value): + break + + # we get the last action key + actions = {str(v.description):k for k,v in self._menu_options.items()} + self._last_choice = actions[selection.value.strip()] if not self.is_context_mgr: self.__exit__() @@ -347,7 +353,7 @@ class GeneralMenu: return self.exec_option(config_name, selector) def exec_option(self, config_name :str, p_selector :Selector = None) -> bool: - """ processes the exection of a given menu entry + """ processes the execution of a given menu entry - pre process callback - selection function - post process callback @@ -461,13 +467,10 @@ class GeneralMenu: mandatory_waiting += 1 return mandatory_fields, mandatory_waiting - def _select_archinstall_language(self, preset_value: str) -> str: - language = select_archinstall_language(preset_value) - if language is not None: - self._translation.activate(language) - return language - - return preset_value + def _select_archinstall_language(self, preset_value: Language) -> Language: + language = select_archinstall_language(self.translation_handler.translated_languages, preset_value) + self._translation_handler.activate(language) + return language def _select_hsm(self, preset :Optional[pathlib.Path] = None) -> Optional[pathlib.Path]: title = _('Select which partitions to mark for formatting:') diff --git a/archinstall/lib/menu/simple_menu.py b/archinstall/lib/menu/simple_menu.py index 947259eb..1980e2ce 100644 --- a/archinstall/lib/menu/simple_menu.py +++ b/archinstall/lib/menu/simple_menu.py @@ -65,7 +65,7 @@ __author__ = "Ingo Meyer" __email__ = "i.meyer@fz-juelich.de" __copyright__ = "Copyright © 2021 Forschungszentrum Jülich GmbH. All rights reserved." __license__ = "MIT" -__version_info__ = (1, 4, 1) +__version_info__ = (1, 5, 0) __version__ = ".".join(map(str, __version_info__)) @@ -86,6 +86,7 @@ DEFAULT_MULTI_SELECT_SELECT_ON_ACCEPT = True DEFAULT_PREVIEW_BORDER = True DEFAULT_PREVIEW_SIZE = 0.25 DEFAULT_PREVIEW_TITLE = "preview" +DEFAULT_QUIT_KEYS = ("escape", "q") DEFAULT_SEARCH_CASE_SENSITIVE = False DEFAULT_SEARCH_HIGHLIGHT_STYLE = ("fg_black", "bg_yellow", "bold") DEFAULT_SEARCH_KEY = "/" @@ -581,6 +582,8 @@ class TerminalMenu: preview_command: Optional[Union[str, Callable[[str], str]]] = None, preview_size: float = DEFAULT_PREVIEW_SIZE, preview_title: str = DEFAULT_PREVIEW_TITLE, + quit_keys: Iterable[str] = DEFAULT_QUIT_KEYS, + raise_error_on_interrupt: bool = False, search_case_sensitive: bool = DEFAULT_SEARCH_CASE_SENSITIVE, search_highlight_style: Optional[Iterable[str]] = DEFAULT_SEARCH_HIGHLIGHT_STYLE, search_key: Optional[str] = DEFAULT_SEARCH_KEY, @@ -596,8 +599,7 @@ class TerminalMenu: status_bar: Optional[Union[str, Iterable[str], Callable[[str], str]]] = None, status_bar_below_preview: bool = DEFAULT_STATUS_BAR_BELOW_PREVIEW, status_bar_style: Optional[Iterable[str]] = DEFAULT_STATUS_BAR_STYLE, - title: Optional[Union[str, Iterable[str]]] = None, - explode_on_interrupt: bool = False + title: Optional[Union[str, Iterable[str]]] = None ): def extract_shortcuts_menu_entries_and_preview_arguments( entries: Iterable[str], @@ -716,10 +718,11 @@ class TerminalMenu: self._preview_command = preview_command self._preview_size = preview_size self._preview_title = preview_title + self._quit_keys = tuple(quit_keys) + self._raise_error_on_interrupt = raise_error_on_interrupt self._search_case_sensitive = search_case_sensitive self._search_highlight_style = tuple(search_highlight_style) if search_highlight_style is not None else () self._search_key = search_key - self._explode_on_interrupt = explode_on_interrupt self._shortcut_brackets_highlight_style = ( tuple(shortcut_brackets_highlight_style) if shortcut_brackets_highlight_style is not None else () ) @@ -787,6 +790,7 @@ class TerminalMenu: # backspace can be queried from the terminal database but is unreliable, query the terminal directly instead self._init_backspace_control_character() self._add_missing_control_characters_for_keys(self._accept_keys) + self._add_missing_control_characters_for_keys(self._quit_keys) self._init_terminal_codes() @staticmethod @@ -1477,7 +1481,7 @@ class TerminalMenu: "menu_down": set(("down", "ctrl-j", "j")), "accept": set(self._accept_keys), "multi_select": set(self._multi_select_keys), - "quit": set(("escape", "q")), + "quit": set(self._quit_keys), "search_start": set((self._search_key,)), "backspace": set(("backspace",)), } # type: Dict[str, Set[Optional[str]]] @@ -1541,7 +1545,7 @@ class TerminalMenu: # `search_start` key self._search.search_text += next_key except KeyboardInterrupt as e: - if self._explode_on_interrupt: + if self._raise_error_on_interrupt: raise e menu_was_interrupted = True finally: @@ -1846,12 +1850,6 @@ def get_argumentparser() -> argparse.ArgumentParser: ) parser.add_argument("-t", "--title", action="store", dest="title", help="menu title") parser.add_argument( - "--explode-on-interrupt", - action="store_true", - dest="explode_on_interrupt", - help="Instead of quitting the menu, this will raise the KeyboardInterrupt Exception", - ) - parser.add_argument( "-V", "--version", action="store_true", dest="print_version", help="print the version number and exit" ) parser.add_argument("entries", action="store", nargs="*", help="the menu entries to show") @@ -1981,7 +1979,6 @@ def main() -> None: status_bar_below_preview=args.status_bar_below_preview, status_bar_style=args.status_bar_style, title=args.title, - explode_on_interrupt=args.explode_on_interrupt, ) except (InvalidParameterCombinationError, InvalidStyleError, UnknownMenuEntryError) as e: print(str(e), file=sys.stderr) diff --git a/archinstall/lib/mirrors.py b/archinstall/lib/mirrors.py index e4917f5e..f78a8b18 100644 --- a/archinstall/lib/mirrors.py +++ b/archinstall/lib/mirrors.py @@ -1,10 +1,12 @@ import logging +import pathlib import urllib.error import urllib.request from typing import Union, Mapping, Iterable, Dict, Any, List from .general import SysCommand from .output import log +from .storage import storage def sort_mirrorlist(raw_data :bytes, sort_order=["https", "http"]) -> bytes: """ @@ -144,16 +146,22 @@ def re_rank_mirrors( def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: - url = "https://archlinux32.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" regions = {} - try: - response = urllib.request.urlopen(url) - except urllib.error.URLError as err: - log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange") - return regions + if storage['arguments']['offline']: + with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh: + mirrorlist = fh.read() + else: + url = "https://archlinux32.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on" + + try: + response = urllib.request.urlopen(url) + except urllib.error.URLError as err: + log(f'Could not fetch an active mirror-list: {err}', level=logging.WARNING, fg="orange") + return regions + + mirrorlist = response.read() - mirrorlist = response.read() if sort_order: mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order) @@ -170,5 +178,10 @@ def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]: url = line.lstrip('#Server = ') regions[region][url] = True + elif line.startswith('Server = '): + regions.setdefault(region, {}) + + url = line.lstrip('Server = ') + regions[region][url] = True return regions diff --git a/archinstall/lib/models/network_configuration.py b/archinstall/lib/models/network_configuration.py index 4f135da5..e026e97b 100644 --- a/archinstall/lib/models/network_configuration.py +++ b/archinstall/lib/models/network_configuration.py @@ -39,8 +39,22 @@ class NetworkConfiguration: else: return 'Unknown type' - # for json serialization when calling json.dumps(...) on this class - def json(self): + def as_json(self) -> Dict: + exclude_fields = ['type'] + data = {} + for k, v in self.__dict__.items(): + if k not in exclude_fields: + if isinstance(v, list) and len(v) == 0: + v = '' + elif v is None: + v = '' + + data[k] = v + + return data + + def json(self) -> Dict: + # for json serialization when calling json.dumps(...) on this class return self.__dict__ def is_iso(self) -> bool: @@ -111,19 +125,10 @@ class NetworkConfigurationHandler: else: # not recognized return None - def _parse_manual_config(self, config: Dict[str, Any]) -> Union[None, List[NetworkConfiguration]]: - manual_configs: List = config.get('config', []) - - if not manual_configs: - return None - - if not isinstance(manual_configs, list): - log(_('Manual configuration setting must be a list')) - exit(1) - + def _parse_manual_config(self, configs: List[Dict[str, Any]]) -> Optional[List[NetworkConfiguration]]: configurations = [] - for manual_config in manual_configs: + for manual_config in configs: iface = manual_config.get('iface', None) if iface is None: @@ -135,7 +140,7 @@ class NetworkConfigurationHandler: NetworkConfiguration(NicType.MANUAL, iface=iface) ) else: - ip = config.get('ip', '') + ip = manual_config.get('ip', '') if not ip: log(_('Manual nic configuration with no auto DHCP requires an IP address'), fg='red') exit(1) @@ -145,32 +150,34 @@ class NetworkConfigurationHandler: NicType.MANUAL, iface=iface, ip=ip, - gateway=config.get('gateway', ''), - dns=config.get('dns', []), + gateway=manual_config.get('gateway', ''), + dns=manual_config.get('dns', []), dhcp=False ) ) return configurations - def parse_arguments(self, config: Any): - nic_type = config.get('type', None) - - if not nic_type: - # old style definitions - network_config = self._backwards_compability_config(config) - if network_config: - return network_config - return None - + def _parse_nic_type(self, nic_type: str) -> NicType: try: - type_ = NicType(nic_type) + return NicType(nic_type) except ValueError: options = [e.value for e in NicType] log(_('Unknown nic type: {}. Possible values are {}').format(nic_type, options), fg='red') exit(1) - if type_ != NicType.MANUAL: - self._configuration = NetworkConfiguration(type_) - else: # manual configuration settings + def parse_arguments(self, config: Any): + if isinstance(config, list): # new data format self._configuration = self._parse_manual_config(config) + elif nic_type := config.get('type', None): # new data format + type_ = self._parse_nic_type(nic_type) + + if type_ != NicType.MANUAL: + self._configuration = NetworkConfiguration(type_) + else: # manual configuration settings + self._configuration = self._parse_manual_config([config]) + else: # old style definitions + network_config = self._backwards_compability_config(config) + if network_config: + return network_config + return None diff --git a/archinstall/lib/models/password_strength.py b/archinstall/lib/models/password_strength.py new file mode 100644 index 00000000..61986bf0 --- /dev/null +++ b/archinstall/lib/models/password_strength.py @@ -0,0 +1,85 @@ +from enum import Enum + + +class PasswordStrength(Enum): + VERY_WEAK = 'very weak' + WEAK = 'weak' + MODERATE = 'moderate' + STRONG = 'strong' + + @property + def value(self): + match self: + case PasswordStrength.VERY_WEAK: return str(_('very weak')) + case PasswordStrength.WEAK: return str(_('weak')) + case PasswordStrength.MODERATE: return str(_('moderate')) + case PasswordStrength.STRONG: return str(_('strong')) + + def color(self): + match self: + case PasswordStrength.VERY_WEAK: return 'red' + case PasswordStrength.WEAK: return 'red' + case PasswordStrength.MODERATE: return 'yellow' + case PasswordStrength.STRONG: return 'green' + + @classmethod + def strength(cls, password: str) -> 'PasswordStrength': + digit = any(character.isdigit() for character in password) + upper = any(character.isupper() for character in password) + lower = any(character.islower() for character in password) + symbol = any(not character.isalnum() for character in password) + return cls._check_password_strength(digit, upper, lower, symbol, len(password)) + + @classmethod + def _check_password_strength( + cls, + digit: bool, + upper: bool, + lower: bool, + symbol: bool, + length: int + ) -> 'PasswordStrength': + # suggested evaluation + # https://github.com/archlinux/archinstall/issues/1304#issuecomment-1146768163 + if digit and upper and lower and symbol: + match length: + case num if 13 <= num: + return PasswordStrength.STRONG + case num if 11 <= num <= 12: + return PasswordStrength.MODERATE + case num if 7 <= num <= 10: + return PasswordStrength.WEAK + case num if num <= 6: + return PasswordStrength.VERY_WEAK + elif digit and upper and lower: + match length: + case num if 14 <= num: + return PasswordStrength.STRONG + case num if 11 <= num <= 13: + return PasswordStrength.MODERATE + case num if 7 <= num <= 10: + return PasswordStrength.WEAK + case num if num <= 6: + return PasswordStrength.VERY_WEAK + elif upper and lower: + match length: + case num if 15 <= num: + return PasswordStrength.STRONG + case num if 12 <= num <= 14: + return PasswordStrength.MODERATE + case num if 7 <= num <= 11: + return PasswordStrength.WEAK + case num if num <= 6: + return PasswordStrength.VERY_WEAK + elif lower or upper: + match length: + case num if 18 <= num: + return PasswordStrength.STRONG + case num if 14 <= num <= 17: + return PasswordStrength.MODERATE + case num if 9 <= num <= 13: + return PasswordStrength.WEAK + case num if num <= 8: + return PasswordStrength.VERY_WEAK + + return PasswordStrength.VERY_WEAK diff --git a/archinstall/lib/models/subvolume.py b/archinstall/lib/models/subvolume.py new file mode 100644 index 00000000..34a09227 --- /dev/null +++ b/archinstall/lib/models/subvolume.py @@ -0,0 +1,68 @@ +from dataclasses import dataclass +from typing import List, Any, Dict + + +@dataclass +class Subvolume: + name: str + mountpoint: str + compress: bool = False + nodatacow: bool = False + + def display(self) -> str: + options_str = ','.join(self.options) + return f'{_("Subvolume")}: {self.name:15} {_("Mountpoint")}: {self.mountpoint:20} {_("Options")}: {options_str}' + + @property + def options(self) -> List[str]: + options = [ + 'compress' if self.compress else '', + 'nodatacow' if self.nodatacow else '' + ] + return [o for o in options if len(o)] + + def json(self) -> Dict[str, Any]: + return { + 'name': self.name, + 'mountpoint': self.mountpoint, + 'compress': self.compress, + 'nodatacow': self.nodatacow + } + + @classmethod + def _parse(cls, config_subvolumes: List[Dict[str, Any]]) -> List['Subvolume']: + subvolumes = [] + for entry in config_subvolumes: + if not entry.get('name', None) or not entry.get('mountpoint', None): + continue + + subvolumes.append( + Subvolume( + entry['name'], + entry['mountpoint'], + entry.get('compress', False), + entry.get('nodatacow', False) + ) + ) + + return subvolumes + + @classmethod + def _parse_backwards_compatible(cls, config_subvolumes) -> List['Subvolume']: + subvolumes = [] + for name, mountpoint in config_subvolumes.items(): + if not name or not mountpoint: + continue + + subvolumes.append(Subvolume(name, mountpoint)) + + return subvolumes + + @classmethod + def parse_arguments(cls, config_subvolumes: Any) -> List['Subvolume']: + if isinstance(config_subvolumes, list): + return cls._parse(config_subvolumes) + elif isinstance(config_subvolumes, dict): + return cls._parse_backwards_compatible(config_subvolumes) + + raise ValueError('Unknown disk layout btrfs subvolume format') diff --git a/archinstall/lib/models/users.py b/archinstall/lib/models/users.py index 6052b73a..a8feb9ef 100644 --- a/archinstall/lib/models/users.py +++ b/archinstall/lib/models/users.py @@ -1,6 +1,8 @@ from dataclasses import dataclass from typing import Dict, List, Union, Any, TYPE_CHECKING +from .password_strength import PasswordStrength + if TYPE_CHECKING: _: Any @@ -25,8 +27,11 @@ class User: } def display(self) -> str: - password = '*' * len(self.password) - return f'{_("Username")}: {self.username:16} {_("Password")}: {password:16} sudo: {str(self.sudo)}' + password = '*' * (len(self.password) if self.password else 0) + if password: + strength = PasswordStrength.strength(self.password) + password += f' ({strength.value})' + return f'{_("Username")}: {self.username:16} {_("Password")}: {password:20} sudo: {str(self.sudo)}' @classmethod def _parse(cls, config_users: List[Dict[str, Any]]) -> List['User']: @@ -64,13 +69,13 @@ class User: ) -> List['User']: users = [] - # backwards compability + # backwards compatibility if isinstance(config_users, dict): users += cls._parse_backwards_compatible(config_users, False) else: users += cls._parse(config_users) - # backwards compability + # backwards compatibility if isinstance(config_superusers, dict): users += cls._parse_backwards_compatible(config_superusers, True) diff --git a/archinstall/lib/networking.py b/archinstall/lib/networking.py index 3e883a7b..96e8f3a1 100644 --- a/archinstall/lib/networking.py +++ b/archinstall/lib/networking.py @@ -4,7 +4,7 @@ import socket import struct from typing import Union, Dict, Any, List -from .exceptions import HardwareIncompatibilityError +from .exceptions import HardwareIncompatibilityError, SysCallError from .general import SysCommand from .output import log from .pacman import run_pacman @@ -33,14 +33,17 @@ def list_interfaces(skip_loopback :bool = True) -> Dict[str, str]: def check_mirror_reachable() -> bool: log("Testing connectivity to the Arch Linux mirrors ...", level=logging.INFO) - if run_pacman("-Sy").exit_code == 0: - return True - - elif os.geteuid() != 0: - log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") + try: + if run_pacman("-Sy").exit_code == 0: + return True + elif os.geteuid() != 0: + log("check_mirror_reachable() uses 'pacman -Sy' which requires root.", level=logging.ERROR, fg="red") + except SysCallError as err: + log(err, level=logging.DEBUG) return False + def update_keyring() -> bool: log("Updating archlinux-keyring ...", level=logging.INFO) if run_pacman("-Sy --noconfirm archlinux-keyring").exit_code == 0: diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py index 29b73bc4..709a7382 100644 --- a/archinstall/lib/output.py +++ b/archinstall/lib/output.py @@ -2,43 +2,83 @@ import logging import os import sys from pathlib import Path -from typing import Dict, Union, List, Any +from typing import Dict, Union, List, Any, Callable from .storage import storage +from dataclasses import asdict, is_dataclass class FormattedOutput: @classmethod - def values(cls, o: Any) -> Dict[str, Any]: - if hasattr(o, 'json'): + def values(cls, o: Any, class_formatter: str = None, filter_list: List[str] = None) -> Dict[str, Any]: + """ the original values returned a dataclass as dict thru the call to some specific methods + this version allows thru the parameter class_formatter to call a dynamicly selected formatting method. + Can transmit a filter list to the class_formatter, + """ + if class_formatter: + # if invoked per reference it has to be a standard function or a classmethod. + # A method of an instance does not make sense + if callable(class_formatter): + return class_formatter(o, filter_list) + # if is invoked by name we restrict it to a method of the class. No need to mess more + elif hasattr(o, class_formatter) and callable(getattr(o, class_formatter)): + func = getattr(o, class_formatter) + return func(filter_list) + # kept as to make it backward compatible + elif hasattr(o, 'as_json'): + return o.as_json() + elif hasattr(o, 'json'): return o.json() + elif is_dataclass(o): + return asdict(o) else: return o.__dict__ @classmethod - def as_table(cls, obj: List[Any]) -> str: + def as_table(cls, obj: List[Any], class_formatter: Union[str, Callable] = None, filter_list: List[str] = None) -> str: + """ variant of as_table (subtly different code) which has two additional parameters + filter which is a list of fields which will be shon + class_formatter a special method to format the outgoing data + + A general comment, the format selected for the output (a string where every data record is separated by newline) + is for compatibility with a print statement + As_table_filter can be a drop in replacement for as_table + """ + raw_data = [cls.values(o, class_formatter, filter_list) for o in obj] + # determine the maximum column size column_width: Dict[str, int] = {} - for o in obj: - for k, v in cls.values(o).items(): - column_width.setdefault(k, 0) - column_width[k] = max([column_width[k], len(str(v)), len(k)]) - + for o in raw_data: + for k, v in o.items(): + if not filter_list or k in filter_list: + column_width.setdefault(k, 0) + column_width[k] = max([column_width[k], len(str(v)), len(k)]) + + if not filter_list: + filter_list = (column_width.keys()) + # create the header lines output = '' - for key, width in column_width.items(): + key_list = [] + for key in filter_list: + width = column_width[key] key = key.replace('!', '') - output += key.ljust(width) + ' | ' - - output = output[:-3] + '\n' + key_list.append(key.ljust(width)) + output += ' | '.join(key_list) + '\n' output += '-' * len(output) + '\n' - for o in obj: - for k, v in cls.values(o).items(): - if '!' in k: - v = '*' * len(str(v)) - output += str(v).ljust(column_width[k]) + ' | ' - output = output[:-3] - output += '\n' + # create the data lines + for record in raw_data: + obj_data = [] + for key in filter_list: + width = column_width.get(key, len(key)) + value = record.get(key, '') + if '!' in key: + value = '*' * width + if isinstance(value,(int, float)) or (isinstance(value, str) and value.isnumeric()): + obj_data.append(str(value).rjust(width)) + else: + obj_data.append(str(value).ljust(width)) + output += ' | '.join(obj_data) + '\n' return output diff --git a/archinstall/lib/plugins.py b/archinstall/lib/plugins.py index 99e3811c..0ff63610 100644 --- a/archinstall/lib/plugins.py +++ b/archinstall/lib/plugins.py @@ -60,7 +60,7 @@ def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType: log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR) try: - del(sys.modules[namespace]) + del(sys.modules[namespace]) # noqa: E275 except: pass @@ -73,6 +73,7 @@ def find_nth(haystack :List[str], needle :str, n :int) -> int: def load_plugin(path :str) -> ModuleType: parsed_url = urllib.parse.urlparse(path) + log(f"Loading plugin {parsed_url}.", fg="gray", level=logging.INFO) # The Profile was not a direct match on a remote URL if not parsed_url.scheme: @@ -96,6 +97,7 @@ def load_plugin(path :str) -> ModuleType: if hasattr(sys.modules[namespace], 'Plugin'): try: plugins[namespace] = sys.modules[namespace].Plugin() + log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO) except Exception as err: log(err, level=logging.ERROR) log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR) diff --git a/archinstall/lib/storage.py b/archinstall/lib/storage.py index dd7ddc88..8c358161 100644 --- a/archinstall/lib/storage.py +++ b/archinstall/lib/storage.py @@ -17,13 +17,13 @@ storage: Dict[str, Any] = { # os.path.abspath(f'{os.path.dirname(__file__)}/../examples') ], 'UPSTREAM_URL': 'https://raw.githubusercontent.com/archlinux/archinstall/master/profiles', - 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabing. + 'PROFILE_DB': None, # Used in cases when listing profiles is desired, not mandatory for direct profile grabbing. 'LOG_PATH': '/var/log/archinstall', 'LOG_FILE': 'install.log', 'MOUNT_POINT': '/mnt/archinstall', 'ENC_IDENTIFIER': 'ainst', 'DISK_TIMEOUTS' : 1, # seconds 'DISK_RETRY_ATTEMPTS' : 5, # RETRY_ATTEMPTS * DISK_TIMEOUTS is used in disk operations - 'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overriden with set_cmd_locale() + 'CMD_LOCALE':{'LC_ALL':'C'}, # default locale for execution commands. Can be overridden with set_cmd_locale() 'CMD_LOCALE_DEFAULT':{'LC_ALL':'C'}, # should be the same as the former. Not be used except in reset_cmd_locale() } diff --git a/archinstall/lib/systemd.py b/archinstall/lib/systemd.py index 3d2f0385..f459f94b 100644 --- a/archinstall/lib/systemd.py +++ b/archinstall/lib/systemd.py @@ -88,7 +88,7 @@ class Boot: if len(args) >= 2 and args[1]: log(args[1], level=logging.ERROR, fg='red') - log(f"The error above occured in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red") + log(f"The error above occurred in a temporary boot-up of the installation {self.instance}", level=logging.ERROR, fg="red") shutdown = None shutdown_exit_code = -1 diff --git a/archinstall/lib/translation.py b/archinstall/lib/translation.py deleted file mode 100644 index 1a0e94e4..00000000 --- a/archinstall/lib/translation.py +++ /dev/null @@ -1,112 +0,0 @@ -from __future__ import annotations - -import json -import os -import gettext - -from pathlib import Path -from typing import List, Dict, Any, TYPE_CHECKING, Tuple -from .exceptions import TranslationError - -if TYPE_CHECKING: - _: Any - - -class LanguageDefinitions: - def __init__(self): - self._mappings = self._get_language_mappings() - - def _get_language_mappings(self) -> List[Dict[str, str]]: - locales_dir = Translation.get_locales_dir() - languages = Path.joinpath(locales_dir, 'languages.json') - - with open(languages, 'r') as fp: - return json.load(fp) - - def get_language(self, abbr: str) -> str: - for entry in self._mappings: - if entry['abbr'] == abbr: - return entry['lang'] - - raise ValueError(f'No language with abbrevation "{abbr}" found') - - -class DeferredTranslation: - def __init__(self, message: str): - self.message = message - - def __len__(self) -> int: - return len(self.message) - - def __str__(self) -> str: - translate = _ - if translate is DeferredTranslation: - return self.message - return translate(self.message) - - def __lt__(self, other) -> bool: - return self.message < other - - def __gt__(self, other) -> bool: - return self.message > other - - def __add__(self, other) -> DeferredTranslation: - if isinstance(other, str): - other = DeferredTranslation(other) - - concat = self.message + other.message - return DeferredTranslation(concat) - - def format(self, *args) -> str: - return self.message.format(*args) - - @classmethod - def install(cls): - import builtins - builtins._ = cls - - -class Translation: - def __init__(self, locales_dir): - self._languages = {} - - for names in self._get_translation_lang(): - try: - self._languages[names[0]] = gettext.translation('base', localedir=locales_dir, languages=names) - except FileNotFoundError as error: - raise TranslationError(f"Could not locate language file for '{names}': {error}") - - def activate(self, name): - if language := self._languages.get(name, None): - language.install() - else: - raise ValueError(f'Language not supported: {name}') - - @classmethod - def load_nationalization(cls) -> Translation: - locales_dir = cls.get_locales_dir() - return Translation(locales_dir) - - @classmethod - def get_locales_dir(cls) -> Path: - cur_path = Path(__file__).parent.parent - locales_dir = Path.joinpath(cur_path, 'locales') - return locales_dir - - @classmethod - def _defined_languages(cls) -> List[str]: - locales_dir = cls.get_locales_dir() - filenames = os.listdir(locales_dir) - return list(filter(lambda x: len(x) == 2, filenames)) - - @classmethod - def _get_translation_lang(cls) -> List[Tuple[str, str]]: - def_languages = cls._defined_languages() - languages = LanguageDefinitions() - return [(languages.get_language(lang), lang) for lang in def_languages] - - @classmethod - def get_available_lang(cls) -> List[str]: - def_languages = cls._defined_languages() - languages = LanguageDefinitions() - return [languages.get_language(lang) for lang in def_languages] diff --git a/archinstall/lib/translationhandler.py b/archinstall/lib/translationhandler.py new file mode 100644 index 00000000..ef33b8ec --- /dev/null +++ b/archinstall/lib/translationhandler.py @@ -0,0 +1,230 @@ +from __future__ import annotations + +import json +import logging +import os +import gettext +from dataclasses import dataclass + +from pathlib import Path +from typing import List, Dict, Any, TYPE_CHECKING, Optional +from .exceptions import TranslationError + +if TYPE_CHECKING: + _: Any + + +@dataclass +class Language: + abbr: str + name_en: str + translation: gettext.NullTranslations + translation_percent: int + translated_lang: Optional[str] + external_dep: Optional[str] + + @property + def display_name(self) -> str: + if not self.external_dep and self.translated_lang: + name = self.translated_lang + else: + name = self.name_en + + return f'{name} ({self.translation_percent}%)' + + def is_match(self, lang_or_translated_lang: str) -> bool: + if self.name_en == lang_or_translated_lang: + return True + elif self.translated_lang == lang_or_translated_lang: + return True + return False + + def json(self) -> str: + return self.name_en + + +class TranslationHandler: + def __init__(self): + self._base_pot = 'base.pot' + self._languages = 'languages.json' + + # check if a custom font was provided, otherwise we'll + # use one that can display latin, greek, cyrillic characters + if self.is_custom_font_enabled(): + self._set_font(self.custom_font_path().name) + else: + self._set_font('LatGrkCyr-8x16') + + self._total_messages = self._get_total_active_messages() + self._translated_languages = self._get_translations() + + @classmethod + def custom_font_path(cls) -> Path: + return Path('/usr/share/kbd/consolefonts/archinstall_font.psfu.gz') + + @classmethod + def is_custom_font_enabled(cls) -> bool: + return cls.custom_font_path().exists() + + @property + def translated_languages(self) -> List[Language]: + return self._translated_languages + + def _get_translations(self) -> List[Language]: + """ + Load all translated languages and return a list of such + """ + mappings = self._load_language_mappings() + defined_languages = self._provided_translations() + + languages = [] + + for short_form in defined_languages: + mapping_entry: Dict[str, Any] = next(filter(lambda x: x['abbr'] == short_form, mappings)) + abbr = mapping_entry['abbr'] + lang = mapping_entry['lang'] + translated_lang = mapping_entry.get('translated_lang', None) + external_dep = mapping_entry.get('external_dep', False) + + try: + # get a translation for a specific language + translation = gettext.translation('base', localedir=self._get_locales_dir(), languages=(abbr, lang)) + + # calculate the percentage of total translated text to total number of messages + if abbr == 'en': + percent = 100 + else: + num_translations = self._get_catalog_size(translation) + percent = int((num_translations / self._total_messages) * 100) + # prevent cases where the .pot file is out of date and the percentage is above 100 + percent = min(100, percent) + + language = Language(abbr, lang, translation, percent, translated_lang, external_dep) + languages.append(language) + except FileNotFoundError as error: + raise TranslationError(f"Could not locate language file for '{lang}': {error}") + + return languages + + def _set_font(self, font: str): + """ + Set the provided font as the new terminal font + """ + from archinstall import SysCommand, log + try: + log(f'Setting font: {font}', level=logging.DEBUG) + SysCommand(f'setfont {font}') + except Exception: + log(f'Unable to set font {font}', level=logging.ERROR) + + def _load_language_mappings(self) -> List[Dict[str, Any]]: + """ + Load the mapping table of all known languages + """ + locales_dir = self._get_locales_dir() + languages = Path.joinpath(locales_dir, self._languages) + + with open(languages, 'r') as fp: + return json.load(fp) + + def _get_catalog_size(self, translation: gettext.NullTranslations) -> int: + """ + Get the number of translated messages for a translations + """ + # this is a very naughty way of retrieving the data but + # there's no alternative method exposed unfortunately + catalog = translation._catalog # type: ignore + messages = {k: v for k, v in catalog.items() if k and v} + return len(messages) + + def _get_total_active_messages(self) -> int: + """ + Get total messages that could be translated + """ + locales = self._get_locales_dir() + with open(f'{locales}/{self._base_pot}', 'r') as fp: + lines = fp.readlines() + msgid_lines = [line for line in lines if 'msgid' in line] + + return len(msgid_lines) - 1 # don't count the first line which contains the metadata + + def get_language_by_name(self, name: str) -> Language: + """ + Get a language object by it's name, e.g. English + """ + try: + return next(filter(lambda x: x.name_en == name, self._translated_languages)) + except Exception: + raise ValueError(f'No language with name found: {name}') + + def get_language_by_abbr(self, abbr: str) -> Language: + """ + Get a language object by its abbrevation, e.g. en + """ + try: + return next(filter(lambda x: x.abbr == abbr, self._translated_languages)) + except Exception: + raise ValueError(f'No language with abbreviation "{abbr}" found') + + def activate(self, language: Language): + """ + Set the provided language as the current translation + """ + language.translation.install() + + def _get_locales_dir(self) -> Path: + """ + Get the locales directory path + """ + cur_path = Path(__file__).parent.parent + locales_dir = Path.joinpath(cur_path, 'locales') + return locales_dir + + def _provided_translations(self) -> List[str]: + """ + Get a list of all known languages + """ + locales_dir = self._get_locales_dir() + filenames = os.listdir(locales_dir) + + translation_files = [] + for filename in filenames: + if len(filename) == 2 or filename == 'pt_BR': + translation_files.append(filename) + + return translation_files + + +class DeferredTranslation: + def __init__(self, message: str): + self.message = message + + def __len__(self) -> int: + return len(self.message) + + def __str__(self) -> str: + translate = _ + if translate is DeferredTranslation: + return self.message + return translate(self.message) + + def __lt__(self, other) -> bool: + return self.message < other + + def __gt__(self, other) -> bool: + return self.message > other + + def __add__(self, other) -> DeferredTranslation: + if isinstance(other, str): + other = DeferredTranslation(other) + + concat = self.message + other.message + return DeferredTranslation(concat) + + def format(self, *args) -> str: + return self.message.format(*args) + + @classmethod + def install(cls): + import builtins + builtins._ = cls diff --git a/archinstall/lib/user_interaction/__init__.py b/archinstall/lib/user_interaction/__init__.py index 8aba4b4d..a1ca2652 100644 --- a/archinstall/lib/user_interaction/__init__.py +++ b/archinstall/lib/user_interaction/__init__.py @@ -7,6 +7,6 @@ from .network_conf import ask_to_configure_network from .partitioning_conf import select_partition, select_encrypted_partitions from .general_conf import (ask_ntp, ask_for_a_timezone, ask_for_audio_selection, select_language, select_mirror_regions, select_profile, select_archinstall_language, ask_additional_packages_to_install, - select_additional_repositories, ask_hostname) + select_additional_repositories, ask_hostname, add_number_of_parrallel_downloads) from .disk_conf import ask_for_main_filesystem_format, select_individual_blockdevice_usage, select_disk_layout, select_disk from .utils import get_password, do_countdown diff --git a/archinstall/lib/user_interaction/backwards_compatible_conf.py b/archinstall/lib/user_interaction/backwards_compatible_conf.py index d91690eb..296572d2 100644 --- a/archinstall/lib/user_interaction/backwards_compatible_conf.py +++ b/archinstall/lib/user_interaction/backwards_compatible_conf.py @@ -40,7 +40,7 @@ def generic_select( # We check that the options are iterable. If not we abort. Else we copy them to lists # it options is a dictionary we use the values as entries of the list # if options is a string object, each character becomes an entry - # if options is a list, we implictily build a copy to mantain immutability + # if options is a list, we implictily build a copy to maintain immutability if not isinstance(p_options, Iterable): log(f"Objects of type {type(p_options)} is not iterable, and are not supported at generic_select", fg="red") log(f"invalid parameter at Menu() call was at <{sys._getframe(1).f_code.co_name}>", level=logging.WARNING) diff --git a/archinstall/lib/user_interaction/disk_conf.py b/archinstall/lib/user_interaction/disk_conf.py index 371d052f..b5ed6967 100644 --- a/archinstall/lib/user_interaction/disk_conf.py +++ b/archinstall/lib/user_interaction/disk_conf.py @@ -45,8 +45,8 @@ def select_disk_layout(preset: Optional[Dict[str, Any]], block_devices: list, ad choice = Menu( _('Select what you wish to do with the selected block devices'), modes, - explode_on_interrupt=True, - explode_warning=warning + raise_error_on_interrupt=True, + raise_error_warning_msg=warning ).run() match choice.type_: diff --git a/archinstall/lib/user_interaction/general_conf.py b/archinstall/lib/user_interaction/general_conf.py index d4dc60db..6365014d 100644 --- a/archinstall/lib/user_interaction/general_conf.py +++ b/archinstall/lib/user_interaction/general_conf.py @@ -1,10 +1,9 @@ from __future__ import annotations import logging +import pathlib from typing import List, Any, Optional, Dict, TYPE_CHECKING -import archinstall - from ..menu.menu import MenuSelectionType from ..menu.text_input import TextInput @@ -14,9 +13,11 @@ from ..output import log from ..profiles import Profile, list_profiles from ..mirrors import list_mirrors -from ..translation import Translation +from ..translationhandler import Language, TranslationHandler from ..packages.packages import validate_package_list +from ..storage import storage + if TYPE_CHECKING: _: Any @@ -109,7 +110,7 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: list(mirrors.keys()), preset_values=preselected, multi=True, - explode_on_interrupt=True + raise_error_on_interrupt=True ).run() match selected_mirror.type_: @@ -118,10 +119,40 @@ def select_mirror_regions(preset_values: Dict[str, Any] = {}) -> Dict[str, Any]: case _: return {selected: mirrors[selected] for selected in selected_mirror.value} -def select_archinstall_language(default='English'): - languages = Translation.get_available_lang() - language = Menu(_('Archinstall language'), languages, default_option=default).run() - return language +def select_archinstall_language(languages: List[Language], preset_value: Language) -> Language: + # these are the displayed language names which can either be + # the english name of a language or, if present, the + # name of the language in its own language + options = {lang.display_name: lang for lang in languages} + + def dependency_preview(current_selection: str) -> Optional[str]: + current_lang = options[current_selection] + + if current_lang.external_dep and not TranslationHandler.is_custom_font_enabled(): + font_file = TranslationHandler.custom_font_path() + text = str(_('To be able to use this translation, please install a font manually that supports the language.')) + '\n' + text += str(_('The font should be stored as {}')).format(font_file) + return text + return None + + choice = Menu( + _('Archinstall language'), + list(options.keys()), + default_option=preset_value.display_name, + preview_command=lambda x: dependency_preview(x), + preview_size=0.5 + ).run() + + match choice.type_: + case MenuSelectionType.Esc: + return preset_value + case MenuSelectionType.Selection: + language: Language = options[choice.value] + # we have to make sure that the proper AUR dependency is + # present to be able to use this language + if not language.external_dep or TranslationHandler.is_custom_font_enabled(): + return language + return select_archinstall_language(languages, preset_value) def select_profile(preset) -> Optional[Profile]: @@ -147,19 +178,19 @@ def select_profile(preset) -> Optional[Profile]: selection = Menu( title=title, p_options=list(options.keys()), - explode_on_interrupt=True, - explode_warning=warning + raise_error_on_interrupt=True, + raise_error_warning_msg=warning ).run() match selection.type_: case MenuSelectionType.Selection: return options[selection.value] if selection.value is not None else None case MenuSelectionType.Ctrl_c: - archinstall.storage['profile_minimal'] = False - archinstall.storage['_selected_servers'] = [] - archinstall.storage['_desktop_profile'] = None - archinstall.arguments['desktop-environment'] = None - archinstall.arguments['gfx_driver_packages'] = None + storage['profile_minimal'] = False + storage['_selected_servers'] = [] + storage['_desktop_profile'] = None + storage['arguments']['desktop-environment'] = None + storage['arguments']['gfx_driver_packages'] = None return None case MenuSelectionType.Esc: return None @@ -172,27 +203,61 @@ def ask_additional_packages_to_install(pre_set_packages: List[str] = []) -> List def read_packages(already_defined: list = []) -> list: display = ' '.join(already_defined) - input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run() - return input_packages.split(' ') if input_packages else [] + input_packages = TextInput(_('Write additional packages to install (space separated, leave blank to skip): '), display).run().strip() + return input_packages.split() if input_packages else [] pre_set_packages = pre_set_packages if pre_set_packages else [] packages = read_packages(pre_set_packages) - while True: - if len(packages): - # Verify packages that were given - print(_("Verifying that additional packages exist (this might take a few seconds)")) - valid, invalid = validate_package_list(packages) + if not storage['arguments']['offline'] and not storage['arguments']['no_pkg_lookups']: + while True: + if len(packages): + # Verify packages that were given + print(_("Verifying that additional packages exist (this might take a few seconds)")) + valid, invalid = validate_package_list(packages) - if invalid: - log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red') - packages = read_packages(valid) - continue - break + if invalid: + log(f"Some packages could not be found in the repository: {invalid}", level=logging.WARNING, fg='red') + packages = read_packages(valid) + continue + break return packages +def add_number_of_parrallel_downloads(input_number :Optional[int] = None) -> Optional[int]: + max_downloads = 5 + print(_(f"This option enables the number of parallel downloads that can occur during installation")) + print(_(f"Enter the number of parallel downloads to be enabled.\n (Enter a value between 1 to {max_downloads})\nNote:")) + print(_(f" - Maximum value : {max_downloads} ( Allows {max_downloads} parallel downloads, allows {max_downloads+1} downloads at a time )")) + print(_(f" - Minimum value : 1 ( Allows 1 parallel download, allows 2 downloads at a time )")) + print(_(f" - Disable/Default : 0 ( Disables parallel downloading, allows only 1 download at a time )")) + + while True: + try: + input_number = int(TextInput(_("[Default value: 0] > ")).run().strip() or 0) + if input_number <= 0: + input_number = 0 + elif input_number > max_downloads: + input_number = max_downloads + break + except: + print(_(f"Invalid input! Try again with a valid input [1 to {max_downloads}, or 0 to disable]")) + + pacman_conf_path = pathlib.Path("/etc/pacman.conf") + with pacman_conf_path.open() as f: + pacman_conf = f.read().split("\n") + + with pacman_conf_path.open("w") as fwrite: + for line in pacman_conf: + if "ParallelDownloads" in line: + fwrite.write(f"ParallelDownloads = {input_number+1}\n") if not input_number == 0 else fwrite.write("#ParallelDownloads = 0\n") + else: + fwrite.write(f"{line}\n") + + return input_number + + def select_additional_repositories(preset: List[str]) -> List[str]: """ Allows the user to select additional repositories (multilib, and testing) if desired. @@ -209,7 +274,7 @@ def select_additional_repositories(preset: List[str]) -> List[str]: sort=False, multi=True, preset_values=preset, - explode_on_interrupt=True + raise_error_on_interrupt=True ).run() match choice.type_: diff --git a/archinstall/lib/user_interaction/manage_users_conf.py b/archinstall/lib/user_interaction/manage_users_conf.py index 567a2964..84ce3556 100644 --- a/archinstall/lib/user_interaction/manage_users_conf.py +++ b/archinstall/lib/user_interaction/manage_users_conf.py @@ -7,6 +7,7 @@ from .utils import get_password from ..menu import Menu from ..menu.list_manager import ListManager from ..models.users import User +from ..output import FormattedOutput if TYPE_CHECKING: _: Any @@ -18,56 +19,51 @@ class UserList(ListManager): """ def __init__(self, prompt: str, lusers: List[User]): - """ - param: prompt - type: str - param: lusers dict with the users already defined for the system - type: Dict - param: sudo. boolean to determine if we handle superusers or users. If None handles both types - """ self._actions = [ str(_('Add a user')), str(_('Change password')), str(_('Promote/Demote user')), str(_('Delete User')) ] - super().__init__(prompt, lusers, self._actions, self._actions[0]) + super().__init__(prompt, lusers, [self._actions[0]], self._actions[1:]) def reformat(self, data: List[User]) -> Dict[str, User]: - return {e.display(): e for e in data} + table = FormattedOutput.as_table(data) + rows = table.split('\n') - def action_list(self): - active_user = self.target if self.target else None + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data = {f' {rows[0]}': None, f' {rows[1]}': None} - if active_user is None: - return [self._actions[0]] - else: - return self._actions[1:] + for row, user in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = user - def exec_action(self, data: List[User]) -> List[User]: - if self.target: - active_user = self.target - else: - active_user = None + return display_data - if self.action == self._actions[0]: # add + def selected_action_display(self, user: User) -> str: + return user.username + + def handle_action(self, action: str, entry: Optional[User], data: List[User]) -> List[User]: + if action == self._actions[0]: # add new_user = self._add_user() if new_user is not None: # in case a user with the same username as an existing user # was created we'll replace the existing one data = [d for d in data if d.username != new_user.username] data += [new_user] - elif self.action == self._actions[1]: # change password - prompt = str(_('Password for user "{}": ').format(active_user.username)) + elif action == self._actions[1]: # change password + prompt = str(_('Password for user "{}": ').format(entry.username)) new_password = get_password(prompt=prompt) if new_password: - user = next(filter(lambda x: x == active_user, data), 1) + user = next(filter(lambda x: x == entry, data)) user.password = new_password - elif self.action == self._actions[2]: # promote/demote - user = next(filter(lambda x: x == active_user, data), 1) + elif action == self._actions[2]: # promote/demote + user = next(filter(lambda x: x == entry, data)) user.sudo = False if user.sudo else True - elif self.action == self._actions[3]: # delete - data = [d for d in data if d != active_user] + elif action == self._actions[3]: # delete + data = [d for d in data if d != entry] return data @@ -77,8 +73,7 @@ class UserList(ListManager): return False def _add_user(self) -> Optional[User]: - print(_('\nDefine a new user\n')) - prompt = str(_('Enter username (leave blank to skip): ')) + prompt = '\n\n' + str(_('Enter username (leave blank to skip): ')) while True: username = input(prompt).strip(' ') @@ -94,7 +89,9 @@ class UserList(ListManager): choice = Menu( str(_('Should "{}" be a superuser (sudo)?')).format(username), Menu.yes_no(), skip=False, - default_option=Menu.no() + default_option=Menu.no(), + clear_screen=False, + show_search_hint=False ).run() sudo = True if choice.value == Menu.yes() else False @@ -102,6 +99,5 @@ class UserList(ListManager): def ask_for_additional_users(prompt: str = '', defined_users: List[User] = []) -> List[User]: - prompt = prompt if prompt else _('Enter username (leave blank to skip): ') users = UserList(prompt, defined_users).run() return users diff --git a/archinstall/lib/user_interaction/network_conf.py b/archinstall/lib/user_interaction/network_conf.py index 5154d8b1..557e8ed8 100644 --- a/archinstall/lib/user_interaction/network_conf.py +++ b/archinstall/lib/user_interaction/network_conf.py @@ -2,7 +2,7 @@ from __future__ import annotations import ipaddress import logging -from typing import Any, Optional, TYPE_CHECKING, List, Union +from typing import Any, Optional, TYPE_CHECKING, List, Union, Dict from ..menu.menu import MenuSelectionType from ..menu.text_input import TextInput @@ -10,7 +10,7 @@ from ..models.network_configuration import NetworkConfiguration, NicType from ..networking import list_interfaces from ..menu import Menu -from ..output import log +from ..output import log, FormattedOutput from ..menu.list_manager import ListManager if TYPE_CHECKING: @@ -19,55 +19,55 @@ if TYPE_CHECKING: class ManualNetworkConfig(ListManager): """ - subclass of ListManager for the managing of network configuration accounts + subclass of ListManager for the managing of network configurations """ - def __init__(self, prompt: str, ifaces: Union[None, NetworkConfiguration, List[NetworkConfiguration]]): - """ - param: prompt - type: str - param: ifaces already defined previously - type: Dict - """ + def __init__(self, prompt: str, ifaces: List[NetworkConfiguration]): + self._actions = [ + str(_('Add interface')), + str(_('Edit interface')), + str(_('Delete interface')) + ] - if ifaces is not None and isinstance(ifaces, list): - display_values = {iface.iface: iface for iface in ifaces} - else: - display_values = {} + super().__init__(prompt, ifaces, [self._actions[0]], self._actions[1:]) + + def reformat(self, data: List[NetworkConfiguration]) -> Dict[str, Optional[NetworkConfiguration]]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') - self._action_add = str(_('Add interface')) - self._action_edit = str(_('Edit interface')) - self._action_delete = str(_('Delete interface')) + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[NetworkConfiguration]] = {f' {rows[0]}': None, f' {rows[1]}': None} - self._iface_actions = [self._action_edit, self._action_delete] + for row, iface in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = iface - super().__init__(prompt, display_values, self._iface_actions, self._action_add) + return display_data - def run_manual(self) -> List[NetworkConfiguration]: - ifaces = super().run() - if ifaces is not None: - return list(ifaces.values()) - return [] + def selected_action_display(self, iface: NetworkConfiguration) -> str: + return iface.iface if iface.iface else '' - def exec_action(self, data: Any): - if self.action == self._action_add: - iface_name = self._select_iface(data.keys()) + def handle_action(self, action: str, entry: Optional[NetworkConfiguration], data: List[NetworkConfiguration]): + if action == self._actions[0]: # add + iface_name = self._select_iface(data) if iface_name: iface = NetworkConfiguration(NicType.MANUAL, iface=iface_name) - data[iface_name] = self._edit_iface(iface) - elif self.target: - iface_name = list(self.target.keys())[0] - iface = data[iface_name] - - if self.action == self._action_edit: - data[iface_name] = self._edit_iface(iface) - elif self.action == self._action_delete: - del data[iface_name] + iface = self._edit_iface(iface) + data += [iface] + elif entry: + if action == self._actions[1]: # edit interface + data = [d for d in data if d.iface != entry.iface] + data.append(self._edit_iface(entry)) + elif action == self._actions[2]: # delete + data = [d for d in data if d != entry] return data - def _select_iface(self, existing_ifaces: List[str]) -> Optional[Any]: + def _select_iface(self, data: List[NetworkConfiguration]) -> Optional[Any]: all_ifaces = list_interfaces().values() + existing_ifaces = [d.iface for d in data] available = set(all_ifaces) - set(existing_ifaces) choice = Menu(str(_('Select interface to add')), list(available), skip=True).run() @@ -76,7 +76,7 @@ class ManualNetworkConfig(ListManager): return choice.value - def _edit_iface(self, edit_iface :NetworkConfiguration): + def _edit_iface(self, edit_iface: NetworkConfiguration): iface_name = edit_iface.iface modes = ['DHCP (auto detect)', 'IP (static)'] default_mode = 'DHCP (auto detect)' @@ -99,11 +99,13 @@ class ManualNetworkConfig(ListManager): gateway = None while 1: - gateway_input = TextInput(_('Enter your gateway (router) IP address or leave blank for none: '), - edit_iface.gateway).run().strip() + gateway = TextInput( + _('Enter your gateway (router) IP address or leave blank for none: '), + edit_iface.gateway + ).run().strip() try: - if len(gateway_input) > 0: - ipaddress.ip_address(gateway_input) + if len(gateway) > 0: + ipaddress.ip_address(gateway) break except ValueError: log("You need to enter a valid gateway (router) IP address.", level=logging.WARNING, fg='red') @@ -124,7 +126,9 @@ class ManualNetworkConfig(ListManager): return NetworkConfiguration(NicType.MANUAL, iface=iface_name) -def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[NetworkConfiguration]]) -> Optional[Union[List[NetworkConfiguration], NetworkConfiguration]]: +def ask_to_configure_network( + preset: Union[NetworkConfiguration, List[NetworkConfiguration]] +) -> Optional[NetworkConfiguration | List[NetworkConfiguration]]: """ Configure the network on the newly installed system """ @@ -150,8 +154,8 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw list(network_options.values()), cursor_index=cursor_idx, sort=False, - explode_on_interrupt=True, - explode_warning=warning + raise_error_on_interrupt=True, + raise_error_warning_msg=warning ).run() match choice.type_: @@ -165,7 +169,7 @@ def ask_to_configure_network(preset: Union[None, NetworkConfiguration, List[Netw elif choice.value == network_options['network_manager']: return NetworkConfiguration(NicType.NM) elif choice.value == network_options['manual']: - manual = ManualNetworkConfig('Configure interfaces', preset) - return manual.run_manual() + preset_ifaces = preset if isinstance(preset, list) else [] + return ManualNetworkConfig('Configure interfaces', preset_ifaces).run() return preset diff --git a/archinstall/lib/user_interaction/partitioning_conf.py b/archinstall/lib/user_interaction/partitioning_conf.py index bfff5705..f2e6b881 100644 --- a/archinstall/lib/user_interaction/partitioning_conf.py +++ b/archinstall/lib/user_interaction/partitioning_conf.py @@ -5,7 +5,7 @@ from typing import List, Any, Dict, Union, TYPE_CHECKING, Callable, Optional from ..menu import Menu from ..menu.menu import MenuSelectionType -from ..output import log +from ..output import log, FormattedOutput from ..disk.validators import fs_types @@ -28,16 +28,31 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool = pad_right = spaces - pad_left return f'{pad_right * " "}{name}{pad_left * " "}|' + def flatten_data(data: Dict[str, Any]) -> Dict[str, Any]: + flattened = {} + for k, v in data.items(): + if k == 'filesystem': + flat = flatten_data(v) + flattened.update(flat) + elif k == 'btrfs': + # we're going to create a separate table for the btrfs subvolumes + pass + else: + flattened[k] = v + return flattened + + display_data: List[Dict[str, Any]] = [flatten_data(entry) for entry in partitions] + column_names = {} # this will add an initial index to the table for each partition if with_idx: - column_names['index'] = max([len(str(len(partitions))), len('index')]) + column_names['index'] = max([len(str(len(display_data))), len('index')]) # determine all attribute names and the max length - # of the value among all partitions to know the width + # of the value among all display_data to know the width # of the table cells - for p in partitions: + for p in display_data: for attribute, value in p.items(): if attribute in column_names.keys(): column_names[attribute] = max([column_names[attribute], len(str(value)), len(attribute)]) @@ -50,7 +65,7 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool = current_layout = f'{current_layout[:-1]}\n{"-" * len(current_layout)}\n' - for idx, p in enumerate(partitions): + for idx, p in enumerate(display_data): row = '' for name, max_len in column_names.items(): if name == 'index': @@ -62,6 +77,13 @@ def current_partition_layout(partitions: List[Dict[str, Any]], with_idx: bool = current_layout += f'{row[:-1]}\n' + # we'll create a separate table for the btrfs subvolumes + btrfs_subvolumes = [partition['btrfs']['subvolumes'] for partition in partitions if partition.get('btrfs', None)] + if len(btrfs_subvolumes) > 0: + for subvolumes in btrfs_subvolumes: + output = FormattedOutput.as_table(subvolumes) + current_layout += f'\n{output}' + if with_title: title = str(_('Current partition layout')) return f'\n\n{title}:\n\n{current_layout}' @@ -118,23 +140,10 @@ def get_default_partition_layout( return suggest_multi_disk_layout(block_devices, advanced_options=advanced_options) -def select_individual_blockdevice_usage(block_devices: list) -> Dict[str, Any]: - result = {} - - for device in block_devices: - layout = manage_new_and_existing_partitions(device) - result[device.path] = layout - - return result - - def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, Any]: # noqa: max-complexity: 50 block_device_struct = {"partitions": [partition.__dump__() for partition in block_device.partitions.values()]} original_layout = copy.deepcopy(block_device_struct) - # Test code: [part.__dump__() for part in block_device.partitions.values()] - # TODO: Squeeze in BTRFS subvolumes here - new_partition = str(_('Create a new partition')) suggest_partition_layout = str(_('Suggest partition layout')) delete_partition = str(_('Delete a partition')) @@ -187,6 +196,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, return original_layout elif task == save_and_exit: break + if task == new_partition: from ..disk import valid_parted_position @@ -200,8 +210,9 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, if fs_choice.type_ == MenuSelectionType.Esc: continue - prompt = _('Enter the start sector (percentage or block number, default: {}): ').format( - block_device.first_free_sector) + prompt = str(_('Enter the start sector (percentage or block number, default: {}): ')).format( + block_device.first_free_sector + ) start = input(prompt).strip() if not start.strip(): @@ -210,8 +221,9 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, else: end_suggested = '100%' - prompt = _('Enter the end sector of the partition (percentage or block number, ex: {}): ').format( - end_suggested) + prompt = str(_('Enter the end sector of the partition (percentage or block number, ex: {}): ')).format( + end_suggested + ) end = input(prompt).strip() if not end.strip(): @@ -224,7 +236,7 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, continue block_device_struct["partitions"].append({ - "type": "primary", # Strictly only allowed under MSDOS, but GPT accepts it so it's "safe" to inject + "type": "primary", # Strictly only allowed under MS-DOS, but GPT accepts it so it's "safe" to inject "start": start, "size": end, "mountpoint": None, @@ -351,18 +363,16 @@ def manage_new_and_existing_partitions(block_device: 'BlockDevice') -> Dict[str, if partition is not None: if not block_device_struct["partitions"][partition].get('btrfs', {}): block_device_struct["partitions"][partition]['btrfs'] = {} - if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', {}): - block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = {} + if not block_device_struct["partitions"][partition]['btrfs'].get('subvolumes', []): + block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = [] prev = block_device_struct["partitions"][partition]['btrfs']['subvolumes'] - result = SubvolumeList(_("Manage btrfs subvolumes for current partition"),prev).run() - if result: - block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result - else: - del block_device_struct["partitions"][partition]['btrfs'] + result = SubvolumeList(_("Manage btrfs subvolumes for current partition"), prev).run() + block_device_struct["partitions"][partition]['btrfs']['subvolumes'] = result return block_device_struct + def select_encrypted_partitions( title :str, partitions :List[Partition], @@ -374,11 +384,9 @@ def select_encrypted_partitions( if len(partition_indexes) == 0: return None - title = _('Select which partitions to mark for formatting:') - # show current partition layout: if len(partitions): - title += current_partition_layout(partitions) + '\n' + title += current_partition_layout(partitions, with_idx=True) + '\n' choice = Menu(title, partition_indexes, multi=multiple).run() diff --git a/archinstall/lib/user_interaction/subvolume_config.py b/archinstall/lib/user_interaction/subvolume_config.py index af783639..94150dee 100644 --- a/archinstall/lib/user_interaction/subvolume_config.py +++ b/archinstall/lib/user_interaction/subvolume_config.py @@ -1,146 +1,98 @@ -from typing import Dict, List +from typing import Dict, List, Optional, Any, TYPE_CHECKING from ..menu.list_manager import ListManager from ..menu.menu import MenuSelectionType -from ..menu.selection_menu import Selector, GeneralMenu from ..menu.text_input import TextInput from ..menu import Menu +from ..models.subvolume import Subvolume +from ... import FormattedOutput + +if TYPE_CHECKING: + _: Any -""" -UI classes -""" class SubvolumeList(ListManager): - def __init__(self,prompt,list): - self.ObjectNullAction = None # str(_('Add')) - self.ObjectDefaultAction = str(_('Add')) - super().__init__(prompt,list,None,self.ObjectNullAction,self.ObjectDefaultAction) - - def reformat(self, data: Dict) -> Dict: - def presentation(key :str, value :Dict): - text = _(" Subvolume :{:16}").format(key) - if isinstance(value,str): - text += _(" mounted at {:16}").format(value) - else: - if value.get('mountpoint'): - text += _(" mounted at {:16}").format(value['mountpoint']) - else: - text += (' ' * 28) - - if value.get('options',[]): - text += _(" with option {}").format(', '.join(value['options'])) - return text - - formatted = {presentation(k, v): k for k, v in data.items()} - return {k: v for k, v in sorted(formatted.items(), key=lambda e: e[0])} - - def action_list(self): - return super().action_list() - - def exec_action(self, data: Dict): - if self.target: - origkey, origval = list(self.target.items())[0] - else: - origkey = None - - if self.action == str(_('Delete')): - del data[origkey] - else: - if self.action == str(_('Add')): - self.target = {} - print(_('\n Fill the desired values for a new subvolume \n')) - with SubvolumeMenu(self.target,self.action) as add_menu: - for elem in ['name','mountpoint','options']: - add_menu.exec_option(elem) - else: - SubvolumeMenu(self.target,self.action).run() - - data.update(self.target) + def __init__(self, prompt: str, subvolumes: List[Subvolume]): + self._actions = [ + str(_('Add subvolume')), + str(_('Edit subvolume')), + str(_('Delete subvolume')) + ] + super().__init__(prompt, subvolumes, [self._actions[0]], self._actions[1:]) - return data + def reformat(self, data: List[Subvolume]) -> Dict[str, Optional[Subvolume]]: + table = FormattedOutput.as_table(data) + rows = table.split('\n') + + # these are the header rows of the table and do not map to any User obviously + # we're adding 2 spaces as prefix because the menu selector '> ' will be put before + # the selectable rows so the header has to be aligned + display_data: Dict[str, Optional[Subvolume]] = {f' {rows[0]}': None, f' {rows[1]}': None} + + for row, subvol in zip(rows[2:], data): + row = row.replace('|', '\\|') + display_data[row] = subvol + + return display_data + def selected_action_display(self, subvolume: Subvolume) -> str: + return subvolume.name + + def _prompt_options(self, editing: Optional[Subvolume] = None) -> List[str]: + preset_options = [] + if editing: + preset_options = editing.options -class SubvolumeMenu(GeneralMenu): - def __init__(self,parameters,action=None): - self.data = parameters - self.action = action - self.ds = {} - self.ds['name'] = None - self.ds['mountpoint'] = None - self.ds['options'] = None - if self.data: - origkey,origval = list(self.data.items())[0] - self.ds['name'] = origkey - if isinstance(origval,str): - self.ds['mountpoint'] = origval - else: - self.ds['mountpoint'] = self.data[origkey].get('mountpoint') - self.ds['options'] = self.data[origkey].get('options') - - super().__init__(data_store=self.ds) - - def _setup_selection_menu_options(self): - # [str(_('Add')),str(_('Copy')),str(_('Edit')),str(_('Delete'))] - self._menu_options['name'] = Selector(str(_('Subvolume name ')), - self._select_subvolume_name if not self.action or self.action in (str(_('Add')),str(_('Copy'))) else None, - mandatory=True, - enabled=True) - self._menu_options['mountpoint'] = Selector(str(_('Subvolume mountpoint')), - self._select_subvolume_mount_point if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None, - enabled=True) - self._menu_options['options'] = Selector(str(_('Subvolume options')), - self._select_subvolume_options if not self.action or self.action in (str(_('Add')),str(_('Edit'))) else None, - enabled=True) - self._menu_options['save'] = Selector(str(_('Save')), - exec_func=lambda n,v:True, - enabled=True) - self._menu_options['cancel'] = Selector(str(_('Cancel')), - # func = lambda pre:True, - exec_func=lambda n,v:self.fast_exit(n), - enabled=True) - self.cancel_action = 'cancel' - self.save_action = 'save' - self.bottom_list = [self.save_action,self.cancel_action] - - def fast_exit(self,accion): - if self.option(accion).get_selection(): - for item in self.list_options(): - if self.option(item).is_mandatory(): - self.option(item).set_mandatory(False) - return True - - def exit_callback(self): - # we exit without moving data - if self.option(self.cancel_action).get_selection(): - return - if not self.ds['name']: - return - else: - key = self.ds['name'] - value = {} - if self.ds['mountpoint']: - value['mountpoint'] = self.ds['mountpoint'] - if self.ds['options']: - value['options'] = self.ds['options'] - self.data.update({key : value}) - - def _select_subvolume_name(self,value): - return TextInput(str(_("Subvolume name :")),value).run() - - def _select_subvolume_mount_point(self,value): - return TextInput(str(_("Select a mount point :")),value).run() - - def _select_subvolume_options(self,value) -> List[str]: - # def __init__(self, title, p_options, skip=True, multi=False, default_option=None, sort=True): choice = Menu( str(_("Select the desired subvolume options ")), ['nodatacow','compress'], skip=True, - preset_values=value, + preset_values=preset_options, multi=True ).run() if choice.type_ == MenuSelectionType.Selection: - return choice.value + return choice.value # type: ignore return [] + + def _add_subvolume(self, editing: Optional[Subvolume] = None) -> Optional[Subvolume]: + name = TextInput(f'\n\n{_("Subvolume name")}: ', editing.name if editing else '').run() + + if not name: + return None + + mountpoint = TextInput(f'\n{_("Subvolume mountpoint")}: ', editing.mountpoint if editing else '').run() + + if not mountpoint: + return None + + options = self._prompt_options(editing) + + subvolume = Subvolume(name, mountpoint) + subvolume.compress = 'compress' in options + subvolume.nodatacow = 'nodatacow' in options + + return subvolume + + def handle_action(self, action: str, entry: Optional[Subvolume], data: List[Subvolume]) -> List[Subvolume]: + if action == self._actions[0]: # add + new_subvolume = self._add_subvolume() + + if new_subvolume is not None: + # in case a user with the same username as an existing user + # was created we'll replace the existing one + data = [d for d in data if d.name != new_subvolume.name] + data += [new_subvolume] + elif entry is not None: + if action == self._actions[1]: # edit subvolume + new_subvolume = self._add_subvolume(entry) + + if new_subvolume is not None: + # we'll remove the original subvolume and add the modified version + data = [d for d in data if d.name != entry.name and d.name != new_subvolume.name] + data += [new_subvolume] + elif action == self._actions[2]: # delete + data = [d for d in data if d != entry] + + return data diff --git a/archinstall/lib/user_interaction/system_conf.py b/archinstall/lib/user_interaction/system_conf.py index 78daa6a5..94bbac30 100644 --- a/archinstall/lib/user_interaction/system_conf.py +++ b/archinstall/lib/user_interaction/system_conf.py @@ -32,8 +32,8 @@ def select_kernel(preset: List[str] = None) -> List[str]: sort=True, multi=True, preset_values=preset, - explode_on_interrupt=True, - explode_warning=warning + raise_error_on_interrupt=True, + raise_error_warning_msg=warning ).run() match choice.type_: @@ -67,8 +67,8 @@ def select_harddrives(preset: List[str] = []) -> List[str]: list(options.keys()), preset_values=list(preset_disks.keys()), multi=True, - explode_on_interrupt=True, - explode_warning=warning + raise_error_on_interrupt=True, + raise_error_warning_msg=warning ).run() match selected_harddrive.type_: diff --git a/archinstall/lib/user_interaction/utils.py b/archinstall/lib/user_interaction/utils.py index fa079bc2..7ee6fc07 100644 --- a/archinstall/lib/user_interaction/utils.py +++ b/archinstall/lib/user_interaction/utils.py @@ -7,6 +7,7 @@ import time from typing import Any, Optional, TYPE_CHECKING from ..menu import Menu +from ..models.password_strength import PasswordStrength from ..output import log if TYPE_CHECKING: @@ -16,42 +17,23 @@ if TYPE_CHECKING: SIG_TRIGGER = None -def check_password_strong(passwd: str) -> bool: - symbol_count = 0 - if any(character.isdigit() for character in passwd): - symbol_count += 10 - if any(character.isupper() for character in passwd): - symbol_count += 26 - if any(character.islower() for character in passwd): - symbol_count += 26 - if any(not character.isalnum() for character in passwd): - symbol_count += 40 - - if symbol_count**len(passwd) < 10e20: - prompt = str(_("The password you are using seems to be weak, are you sure you want to use it?")) - choice = Menu(prompt, Menu.yes_no(), default_option=Menu.yes()).run() - return choice.value == Menu.yes() - - return True - - def get_password(prompt: str = '') -> Optional[str]: if not prompt: prompt = _("Enter a password: ") - while passwd := getpass.getpass(prompt): - if len(passwd.strip()) <= 0: + while password := getpass.getpass(prompt): + if len(password.strip()) <= 0: break - if not check_password_strong(passwd): - continue + strength = PasswordStrength.strength(password) + log(f'Password strength: {strength.value}', fg=strength.color()) passwd_verification = getpass.getpass(prompt=_('And one more time for verification: ')) - if passwd != passwd_verification: + if password != passwd_verification: log(' * Passwords did not match * ', fg='red') continue - return passwd + return password return None |