Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib')
-rw-r--r--archinstall/lib/disk/blockdevice.py32
-rw-r--r--archinstall/lib/disk/btrfs.py56
-rw-r--r--archinstall/lib/disk/dmcryptdev.py48
-rw-r--r--archinstall/lib/disk/helpers.py241
-rw-r--r--archinstall/lib/disk/mapperdev.py83
-rw-r--r--archinstall/lib/disk/partition.py67
-rw-r--r--archinstall/lib/general.py2
-rw-r--r--archinstall/lib/installer.py365
-rw-r--r--archinstall/lib/output.py5
-rw-r--r--archinstall/lib/user_interaction.py6
10 files changed, 653 insertions, 252 deletions
diff --git a/archinstall/lib/disk/blockdevice.py b/archinstall/lib/disk/blockdevice.py
index fac258ef..ff741f18 100644
--- a/archinstall/lib/disk/blockdevice.py
+++ b/archinstall/lib/disk/blockdevice.py
@@ -16,10 +16,10 @@ from ..storage import storage
class BlockDevice:
def __init__(self, path :str, info :Optional[Dict[str, Any]] = None):
if not info:
- from .helpers import all_disks
+ from .helpers import all_blockdevices
# If we don't give any information, we need to auto-fill it.
# Otherwise any subsequent usage will break.
- info = all_disks()[path].info
+ info = all_blockdevices(partitions=False)[path].info
self.path = path
self.info = info
@@ -78,16 +78,20 @@ class BlockDevice:
If it's a loop-back-device it returns the back-file,
For other types it return self.device
"""
- if self.info['type'] == 'loop':
- for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
- if not drive['name'] == self.path:
- continue
-
- return drive['back-file']
+ if self.info.get('type') == 'loop':
+ return self.info['back-file']
else:
return self.device
@property
+ def mountpoint(self) -> None:
+ """
+ A dummy function to enable transparent comparisons of mountpoints.
+ As blockdevices can't be mounted directly, this will always be None
+ """
+ return None
+
+ @property
def device(self) -> str:
"""
Returns the device file of the BlockDevice.
@@ -95,20 +99,20 @@ class BlockDevice:
If it's a ATA-drive it returns the /dev/X device
And if it's a crypto-device it returns the parent device
"""
- if "type" not in self.info:
+ if "DEVTYPE" not in self.info:
raise DiskError(f'Could not locate backplane info for "{self.path}"')
- if self.info['type'] in ['disk','loop']:
+ if self.info['DEVTYPE'] in ['disk','loop']:
return self.path
- elif self.info['type'][:4] == 'raid':
+ elif self.info['DEVTYPE'][:4] == 'raid':
# This should catch /dev/md## raid devices
return self.path
- elif self.info['type'] == 'crypt':
+ elif self.info['DEVTYPE'] == 'crypt':
if 'pkname' not in self.info:
raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
return f"/dev/{self.info['pkname']}"
else:
- log(f"Unknown blockdevice type for {self.path}: {self.info['type']}", level=logging.DEBUG)
+ log(f"Unknown blockdevice type for {self.path}: {self.info['DEVTYPE']}", level=logging.DEBUG)
# if not stat.S_ISBLK(os.stat(full_path).st_mode):
# raise DiskError(f'Selected disk "{full_path}" is not a block device.')
@@ -195,7 +199,7 @@ class BlockDevice:
_, start, end, size, *_ = free_space.strip('\r\n;').split(':')
yield (start, end, size)
except SysCallError as error:
- log(f"Could not get free space on {self.path}: {error}", level=logging.INFO)
+ log(f"Could not get free space on {self.path}: {error}", level=logging.DEBUG)
@property
def largest_free_space(self) -> List[str]:
diff --git a/archinstall/lib/disk/btrfs.py b/archinstall/lib/disk/btrfs.py
index ad8d0a52..f2da957f 100644
--- a/archinstall/lib/disk/btrfs.py
+++ b/archinstall/lib/disk/btrfs.py
@@ -2,7 +2,9 @@ from __future__ import annotations
import pathlib
import glob
import logging
-from typing import Union, Dict, TYPE_CHECKING
+import re
+from typing import Union, Dict, TYPE_CHECKING, Any, Iterator
+from dataclasses import dataclass
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
@@ -11,7 +13,49 @@ from .helpers import get_mount_info
from ..exceptions import DiskError
from ..general import SysCommand
from ..output import log
-
+from ..exceptions import SysCallError
+
+@dataclass
+class BtrfsSubvolume:
+ target :str
+ source :str
+ fstype :str
+ name :str
+ options :str
+ root :bool = False
+
+def get_subvolumes_from_findmnt(struct :Dict[str, Any], index=0) -> Iterator[BtrfsSubvolume]:
+ if '@' in struct['source']:
+ subvolume = re.findall(r'\[.*?\]', struct['source'])[0][1:-1]
+ struct['source'] = struct['source'].replace(f"[{subvolume}]", "")
+ yield BtrfsSubvolume(
+ target=struct['target'],
+ source=struct['source'],
+ fstype=struct['fstype'],
+ name=subvolume,
+ options=struct['options'],
+ root=index == 0
+ )
+ index += 1
+
+ for child in struct.get('children', []):
+ for item in get_subvolumes_from_findmnt(child, index=index):
+ yield item
+ index += 1
+
+def get_subvolume_info(path :pathlib.Path) -> Dict[str, Any]:
+ try:
+ output = SysCommand(f"btrfs subvol show {path}").decode()
+ except SysCallError as error:
+ print('Error:', error)
+
+ result = {}
+ for line in output.replace('\r\n', '\n').split('\n'):
+ if ':' in line:
+ key, val = line.replace('\t', '').split(':', 1)
+ result[key.strip().lower().replace(' ', '_')] = val.strip()
+
+ return result
def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.Path, str], force=False) -> bool:
"""
@@ -24,7 +68,7 @@ def mount_subvolume(installation :Installer, subvolume_location :Union[pathlib.P
This function is DEPRECATED. you can get the same result creating a partition dict like any other partition, and using the standard mount procedure.
Only change partition['device_instance'].path with the apropriate bind name: real_partition_path[/subvolume_name]
"""
- log("function btrfs.mount_subvolume DEPRECATED. See code for alternatives",fg="yellow",level=logging.WARNING)
+ log("[Deprecated] function btrfs.mount_subvolume is deprecated. See code for alternatives",fg="yellow",level=logging.WARNING)
installation_mountpoint = installation.target
if type(installation_mountpoint) == str:
installation_mountpoint = pathlib.Path(installation_mountpoint)
@@ -179,11 +223,7 @@ def manage_btrfs_subvolumes(installation :Installer,
# As the rest will query there the path of the "partition" to be mounted, we feed it with the bind name needed to mount subvolumes
# As we made a deepcopy we have a fresh instance of this object we can manipulate problemless
fake_partition['device_instance'].path = f"{partition['device_instance'].path}[/{name}]"
- # we reset this attribute, which holds where the partition is actually mounted. Remember, the physical partition is mounted at this moment and therefore has the value '/'.
- # If i don't reset it, process will abort as "already mounted' .
- # TODO It works for this purpose, but the fact that this bevahiour can happed, should make think twice
- fake_partition['device_instance'].mountpoint = None
- #
+
# Well, now that this "fake partition" is ready, we add it to the list of the ones which are to be mounted,
# as "normal" ones
mountpoints.append(fake_partition)
diff --git a/archinstall/lib/disk/dmcryptdev.py b/archinstall/lib/disk/dmcryptdev.py
new file mode 100644
index 00000000..63392ffb
--- /dev/null
+++ b/archinstall/lib/disk/dmcryptdev.py
@@ -0,0 +1,48 @@
+import pathlib
+import logging
+import json
+from dataclasses import dataclass
+from typing import Optional
+from ..exceptions import SysCallError
+from ..general import SysCommand
+from ..output import log
+from .mapperdev import MapperDev
+
+@dataclass
+class DMCryptDev:
+ dev_path :pathlib.Path
+
+ @property
+ def name(self):
+ with open(f"/sys/devices/virtual/block/{pathlib.Path(self.path).name}/dm/name", "r") as fh:
+ return fh.read().strip()
+
+ @property
+ def path(self):
+ return f"/dev/mapper/{self.dev_path}"
+
+ @property
+ def blockdev(self):
+ pass
+
+ @property
+ def MapperDev(self):
+ return MapperDev(mappername=self.name)
+
+ @property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.dev_path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.dev_path}: {error}", level=logging.WARNING, fg="yellow")
+ pass
+
+ return None
+
+ @property
+ def filesystem(self) -> Optional[str]:
+ return self.MapperDev.filesystem \ No newline at end of file
diff --git a/archinstall/lib/disk/helpers.py b/archinstall/lib/disk/helpers.py
index b04e2740..afaf9e5e 100644
--- a/archinstall/lib/disk/helpers.py
+++ b/archinstall/lib/disk/helpers.py
@@ -5,12 +5,15 @@ import os
import pathlib
import re
import time
+import glob
from typing import Union, List, Iterator, Dict, Optional, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
if TYPE_CHECKING:
from .partition import Partition
from .blockdevice import BlockDevice
+from .dmcryptdev import DMCryptDev
+from .mapperdev import MapperDev
from ..exceptions import SysCallError, DiskError
from ..general import SysCommand
from ..output import log
@@ -103,23 +106,167 @@ def device_state(name :str, *args :str, **kwargs :str) -> Optional[bool]:
return
return True
-# lsblk --json -l -n -o path
-def all_disks(*args :str, **kwargs :str) -> List[BlockDevice]:
- kwargs.setdefault("partitions", False)
- drives = {}
- lsblk = json.loads(SysCommand('lsblk --json -l -n -o path,size,type,mountpoint,label,pkname,model').decode('UTF_8'))
- for drive in lsblk['blockdevices']:
- if not kwargs['partitions'] and drive['type'] == 'part':
+def cleanup_bash_escapes(data :str) -> str:
+ return data.replace(r'\ ', ' ')
+
+def blkid(cmd :str) -> Dict[str, Any]:
+ if '-o' in cmd and '-o export' not in cmd:
+ raise ValueError(f"blkid() requires '-o export' to be used and can therefor not continue reliably.")
+ elif '-o' not in cmd:
+ cmd += ' -o export'
+
+ try:
+ raw_data = SysCommand(cmd).decode()
+ except SysCallError as error:
+ log(f"Could not get block device information using blkid() using command {cmd}", level=logging.DEBUG)
+ raise error
+
+ result = {}
+ # Process the raw result
+ devname = None
+ for line in raw_data.split('\r\n'):
+ if not len(line):
+ devname = None
+ continue
+
+ key, val = line.split('=', 1)
+ if key.lower() == 'devname':
+ devname = val
+ # Lowercase for backwards compatability with all_disks() previous use cases
+ result[devname] = {
+ "path": devname,
+ "PATH": devname
+ }
+ continue
+
+ result[devname][key] = cleanup_bash_escapes(val)
+
+ return result
+
+def get_loop_info(path :str) -> Dict[str, Any]:
+ for drive in json.loads(SysCommand(['losetup', '--json']).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == path:
continue
- drives[drive['path']] = BlockDevice(drive['path'], drive)
+ return {
+ path: {
+ **drive,
+ 'type' : 'loop',
+ 'TYPE' : 'loop',
+ 'DEVTYPE' : 'loop',
+ 'PATH' : drive['name'],
+ 'path' : drive['name']
+ }
+ }
+
+ return {}
+
+def enrich_blockdevice_information(information :Dict[str, Any]) -> Dict[str, Any]:
+ result = {}
+ for device_path, device_information in information.items():
+ dev_name = pathlib.Path(device_information['PATH']).name
+ if not device_information.get('TYPE') or not device_information.get('DEVTYPE'):
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ if (dmcrypt_name := pathlib.Path(f"/sys/class/block/{dev_name}/dm/name")).exists():
+ with dmcrypt_name.open('r') as fh:
+ device_information['DMCRYPT_NAME'] = fh.read().strip()
+
+ result[device_path] = device_information
+
+ return result
+
+def uevent(data :str) -> Dict[str, Any]:
+ information = {}
+
+ for line in data.replace('\r\n', '\n').split('\n'):
+ if len((line := line.strip())):
+ key, val = line.split('=', 1)
+ information[key] = val
+
+ return information
+
+def get_blockdevice_uevent(dev_name :str) -> Dict[str, Any]:
+ device_information = {}
+ with open(f"/sys/class/block/{dev_name}/uevent") as fh:
+ device_information.update(uevent(fh.read()))
+
+ return {
+ f"/dev/{dev_name}" : {
+ **device_information,
+ 'path' : f'/dev/{dev_name}',
+ 'PATH' : f'/dev/{dev_name}',
+ 'PTTYPE' : None
+ }
+ }
+
+def all_disks() -> List[BlockDevice]:
+ log(f"[Deprecated] archinstall.all_disks() is deprecated. Use archinstall.all_blockdevices() with the appropriate filters instead.", level=logging.WARNING, fg="yellow")
+ return all_blockdevices(partitions=False, mappers=False)
+
+def all_blockdevices(mappers=False, partitions=False, error=False) -> List[BlockDevice, Partition]:
+ """
+ Returns BlockDevice() and Partition() objects for all available devices.
+ """
+ from .partition import Partition
- return drives
+ instances = {}
+ # Due to lsblk being highly unreliable for this use case,
+ # we'll iterate the /sys/class definitions and find the information
+ # from there.
+ for block_device in glob.glob("/sys/class/block/*"):
+ device_path = f"/dev/{pathlib.Path(block_device).readlink().name}"
+ try:
+ information = blkid(f'blkid -p -o export {device_path}')
+
+ # TODO: No idea why F841 is raised here:
+ except SysCallError as error: # noqa: F841
+ if error.exit_code in (512, 2):
+ # Assume that it's a loop device, and try to get info on it
+ try:
+ information = get_loop_info(device_path)
+ if not information:
+ raise SysCallError("Could not get loop information", exit_code=1)
+
+ except SysCallError:
+ information = get_blockdevice_uevent(pathlib.Path(block_device).readlink().name)
+ else:
+ raise error
+
+ information = enrich_blockdevice_information(information)
+
+ for path, path_info in information.items():
+ if path_info.get('DMCRYPT_NAME'):
+ instances[path] = DMCryptDev(dev_path=path)
+ elif path_info.get('PARTUUID') or path_info.get('PART_ENTRY_NUMBER'):
+ if partitions:
+ instances[path] = Partition(path, BlockDevice(get_parent_of_partition(pathlib.Path(path))))
+ elif path_info.get('PTTYPE', False) is not False or path_info.get('TYPE') == 'loop':
+ instances[path] = BlockDevice(path, path_info)
+ elif path_info.get('TYPE') == 'squashfs':
+ # We can ignore squashfs devices (usually /dev/loop0 on Arch ISO)
+ continue
+ else:
+ log(f"Unknown device found by all_blockdevices(), ignoring: {information}", level=logging.WARNING, fg="yellow")
+
+ if mappers:
+ for block_device in glob.glob("/dev/mapper/*"):
+ if (pathobj := pathlib.Path(block_device)).is_symlink():
+ instances[f"/dev/mapper/{pathobj.name}"] = MapperDev(mappername=pathobj.name)
+
+ return instances
+
+
+def get_parent_of_partition(path :pathlib.Path) -> pathlib.Path:
+ partition_name = path.name
+ pci_device = (pathlib.Path("/sys/class/block") / partition_name).resolve()
+ return f"/dev/{pci_device.parent.name}"
def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :bool = False) -> Optional[BlockDevice]:
- collection = all_disks()
+ collection = all_blockdevices(partitions=False)
for drive in collection:
if size and convert_to_gigabytes(collection[drive]['size']) != size:
continue
@@ -129,6 +276,7 @@ def harddrive(size :Optional[float] = None, model :Optional[str] = None, fuzzy :
return collection[drive]
def split_bind_name(path :Union[pathlib.Path, str]) -> list:
+ # log(f"[Deprecated] Partition().subvolumes now contain the split bind name via it's subvolume.name instead.", level=logging.WARNING, fg="yellow")
# we check for the bind notation. if exist we'll only use the "true" device path
if '[' in str(path) : # is a bind path (btrfs subvolume path)
device_path, bind_path = str(path).split('[')
@@ -138,32 +286,43 @@ def split_bind_name(path :Union[pathlib.Path, str]) -> list:
bind_path = None
return device_path,bind_path
+def find_mountpoint(device_path :str) -> Dict[str, Any]:
+ try:
+ for filesystem in json.loads(SysCommand(f'/usr/bin/findmnt -R --json {device_path}').decode())['filesystems']:
+ yield filesystem
+ except SysCallError:
+ return {}
+
def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, return_real_path :bool = False) -> Dict[str, Any]:
- device_path,bind_path = split_bind_name(path)
+ device_path, bind_path = split_bind_name(path)
output = {}
for traversal in list(map(str, [str(device_path)] + list(pathlib.Path(str(device_path)).parents))):
try:
- log(f"Getting mount information for device path {traversal}", level=logging.INFO)
+ log(f"Getting mount information for device path {traversal}", level=logging.DEBUG)
if (output := SysCommand(f'/usr/bin/findmnt --json {traversal}').decode('UTF-8')):
break
- except SysCallError:
+
+ except SysCallError as error:
+ print('ERROR:', error)
pass
if not traverse:
break
if not output:
- raise DiskError(f"Could not get mount information for device path {path}")
+ raise DiskError(f"Could not get mount information for device path {device_path}")
output = json.loads(output)
+
# for btrfs partitions we redice the filesystem list to the one with the source equals to the parameter
# i.e. the subvolume filesystem we're searching for
if 'filesystems' in output and len(output['filesystems']) > 1 and bind_path is not None:
output['filesystems'] = [entry for entry in output['filesystems'] if entry['source'] == str(path)]
+
if 'filesystems' in output:
if len(output['filesystems']) > 1:
- raise DiskError(f"Path '{path}' contains multiple mountpoints: {output['filesystems']}")
+ raise DiskError(f"Path '{device_path}' contains multiple mountpoints: {output['filesystems']}")
if return_real_path:
return output['filesystems'][0], traversal
@@ -176,41 +335,53 @@ def get_mount_info(path :Union[pathlib.Path, str], traverse :bool = False, retur
return {}
+def get_all_targets(data :Dict[str, Any], filters :Dict[str, None] = {}) -> Dict[str, None]:
+ for info in data:
+ if info.get('target') not in filters:
+ filters[info.get('target')] = None
+
+ filters.update(get_all_targets(info.get('children', [])))
+
+ return filters
+
def get_partitions_in_use(mountpoint :str) -> List[Partition]:
from .partition import Partition
try:
output = SysCommand(f"/usr/bin/findmnt --json -R {mountpoint}").decode('UTF-8')
except SysCallError:
- return []
-
- mounts = []
+ return {}
if not output:
- return []
+ return {}
output = json.loads(output)
- for target in output.get('filesystems', []):
- # We need to create a BlockDevice() instead of 'None' here when creaiting Partition()
- # Otherwise subsequent calls to .size etc will fail due to BlockDevice being None.
+ # print(output)
- # So first, we create the partition without a BlockDevice and carefully only use it to get .real_device
- # Note: doing print(partition) here will break because the above mentioned issue.
- partition = Partition(target['source'], None, filesystem=target.get('fstype', None), mountpoint=target['target'])
- partition = Partition(target['source'], partition.real_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ mounts = {}
+
+ block_devices_available = all_blockdevices(mappers=True, partitions=True, error=True)
+
+ block_devices_mountpoints = {}
+ for blockdev in block_devices_available.values():
+ if not type(blockdev) in (Partition, MapperDev):
+ continue
- # Once we have the real device (for instance /dev/nvme0n1p5) we can find the parent block device using
- # (lsblk pkname lists both the partition and blockdevice, BD being the last entry)
- result = SysCommand(f'lsblk -no pkname {partition.real_device}').decode().rstrip('\r\n').split('\r\n')[-1]
- block_device = BlockDevice(f"/dev/{result}")
+ for blockdev_mountpoint in blockdev.mount_information:
+ block_devices_mountpoints[blockdev_mountpoint['target']] = blockdev
- # Once we figured the block device out, we can properly create the partition object
- partition = Partition(target['source'], block_device, filesystem=target.get('fstype', None), mountpoint=target['target'])
+ log(f'Filtering available mounts {block_devices_mountpoints} to those under {mountpoint}', level=logging.DEBUG)
- mounts.append(partition)
+ for mountpoint in list(get_all_targets(output['filesystems']).keys()):
+ if mountpoint in block_devices_mountpoints:
+ if mountpoint not in mounts:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
+ # If the already defined mountpoint is a DMCryptDev, and the newly found
+ # mountpoint is a MapperDev, it has precedence and replaces the old mountpoint definition.
+ elif type(mounts[mountpoint]) == DMCryptDev and type(block_devices_mountpoints[mountpoint]) == MapperDev:
+ mounts[mountpoint] = block_devices_mountpoints[mountpoint]
- for child in target.get('children', []):
- mounts.append(Partition(child['source'], block_device, filesystem=child.get('fstype', None), mountpoint=child['target']))
+ log(f"Available partitions: {mounts}", level=logging.DEBUG)
return mounts
diff --git a/archinstall/lib/disk/mapperdev.py b/archinstall/lib/disk/mapperdev.py
new file mode 100644
index 00000000..91ec6d25
--- /dev/null
+++ b/archinstall/lib/disk/mapperdev.py
@@ -0,0 +1,83 @@
+import glob
+import pathlib
+import logging
+import json
+from dataclasses import dataclass
+from typing import Optional, List, Dict, Any, Iterator, TYPE_CHECKING
+
+from ..exceptions import SysCallError
+from ..general import SysCommand
+from ..output import log
+
+if TYPE_CHECKING:
+ from .btrfs import BtrfsSubvolume
+
+@dataclass
+class MapperDev:
+ mappername :str
+
+ @property
+ def name(self):
+ return self.mappername
+
+ @property
+ def path(self):
+ return f"/dev/mapper/{self.mappername}"
+
+ @property
+ def partition(self):
+ from .helpers import uevent, get_parent_of_partition
+ from .partition import Partition
+ from .blockdevice import BlockDevice
+
+ for mapper in glob.glob('/dev/mapper/*'):
+ path_obj = pathlib.Path(mapper)
+ if path_obj.name == self.mappername and pathlib.Path(mapper).is_symlink():
+ dm_device = (pathlib.Path("/dev/mapper/") / path_obj.readlink()).resolve()
+
+ for slave in glob.glob(f"/sys/class/block/{dm_device.name}/slaves/*"):
+ partition_belonging_to_dmcrypt_device = pathlib.Path(slave).name
+
+ try:
+ uevent_data = SysCommand(f"blkid -o export /dev/{partition_belonging_to_dmcrypt_device}").decode()
+ except SysCallError as error:
+ log(f"Could not get information on device /dev/{partition_belonging_to_dmcrypt_device}: {error}", level=logging.ERROR, fg="red")
+
+ information = uevent(uevent_data)
+ block_device = BlockDevice(get_parent_of_partition('/dev/' / pathlib.Path(information['DEVNAME'])))
+
+ return Partition(information['DEVNAME'], block_device)
+
+ raise ValueError(f"Could not convert {self.mappername} to a real dm-crypt device")
+
+ @property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.WARNING, fg="yellow")
+ pass
+
+ return None
+
+ @property
+ def mount_information(self) -> List[Dict[str, Any]]:
+ from .helpers import find_mountpoint
+ return list(find_mountpoint(self.path))
+
+ @property
+ def filesystem(self) -> Optional[str]:
+ from .helpers import get_filesystem_type
+ return get_filesystem_type(self.path)
+
+ @property
+ def subvolumes(self) -> Iterator['BtrfsSubvolume']:
+ from .btrfs import get_subvolumes_from_findmnt
+
+ for mountpoint in self.mount_information:
+ for result in get_subvolumes_from_findmnt(mountpoint):
+ yield result \ No newline at end of file
diff --git a/archinstall/lib/disk/partition.py b/archinstall/lib/disk/partition.py
index 7bfde64c..708edd29 100644
--- a/archinstall/lib/disk/partition.py
+++ b/archinstall/lib/disk/partition.py
@@ -5,15 +5,15 @@ import logging
import json
import os
import hashlib
-from typing import Optional, Dict, Any, List, Union
+from typing import Optional, Dict, Any, List, Union, Iterator
from .blockdevice import BlockDevice
-from .helpers import get_mount_info, get_filesystem_type, convert_size_to_gb, split_bind_name
+from .helpers import find_mountpoint, get_filesystem_type, convert_size_to_gb, split_bind_name
from ..storage import storage
from ..exceptions import DiskError, SysCallError, UnknownFilesystemFormat
from ..output import log
from ..general import SysCommand
-
+from .btrfs import get_subvolumes_from_findmnt, BtrfsSubvolume
class Partition:
def __init__(self,
@@ -29,9 +29,11 @@ class Partition:
part_id = os.path.basename(path)
self.block_device = block_device
+ if type(self.block_device) is str:
+ raise ValueError(f"Partition()'s 'block_device' parameter has to be a archinstall.BlockDevice() instance!")
+
self.path = path
self.part_id = part_id
- self.mountpoint = mountpoint
self.target_mountpoint = mountpoint
self.filesystem = filesystem
self._encrypted = None
@@ -42,20 +44,12 @@ class Partition:
self.mount(mountpoint)
try:
- mount_information = get_mount_info(self.path)
+ self.mount_information = list(find_mountpoint(self.path))
except DiskError:
- mount_information = {}
-
- if mount_information.get('target', None):
- if self.mountpoint != mount_information.get('target', None) and mountpoint:
- raise DiskError(f"{self} was given a mountpoint but the actual mountpoint differs: {mount_information.get('target', None)}")
-
- if target := mount_information.get('target', None):
- self.mountpoint = target
+ self.mount_information = [{}]
if not self.filesystem and autodetect_filesystem:
- if fstype := mount_information.get('fstype', get_filesystem_type(path)):
- self.filesystem = fstype
+ self.filesystem = get_filesystem_type(path)
if self.filesystem == 'crypto_LUKS':
self.encrypted = True
@@ -98,6 +92,20 @@ class Partition:
}
@property
+ def mountpoint(self) -> Optional[str]:
+ try:
+ data = json.loads(SysCommand(f"findmnt --json -R {self.path}").decode())
+ for filesystem in data['filesystems']:
+ return filesystem.get('target')
+
+ except SysCallError as error:
+ # Not mounted anywhere most likely
+ log(f"Could not locate mount information for {self.path}: {error}", level=logging.DEBUG)
+ pass
+
+ return None
+
+ @property
def sector_size(self) -> Optional[int]:
output = json.loads(SysCommand(f"lsblk --json -o+LOG-SEC {self.device_path}").decode('UTF-8'))
@@ -135,14 +143,16 @@ class Partition:
for i in range(storage['DISK_RETRY_ATTEMPTS']):
self.partprobe()
- if (handle := SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}")).exit_code == 0:
- lsblk = json.loads(handle.decode('UTF-8'))
+ try:
+ lsblk = json.loads(SysCommand(f"lsblk --json -b -o+SIZE {self.device_path}").decode())
for device in lsblk['blockdevices']:
return convert_size_to_gb(device['size'])
- elif handle.exit_code == 8192:
- # Device is not a block device
- return None
+ except SysCallError as error:
+ if error.exit_code == 8192:
+ return None
+ else:
+ raise error
time.sleep(storage['DISK_TIMEOUTS'])
@@ -200,7 +210,14 @@ class Partition:
For instance when you want to get a __repr__ of the class.
"""
self.partprobe()
- return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
+ try:
+ return SysCommand(f'blkid -s PARTUUID -o value {self.device_path}').decode('UTF-8').strip()
+ except SysCallError as error:
+ if self.block_device.info.get('TYPE') == 'iso9660':
+ # Parent device is a Optical Disk (.iso dd'ed onto a device for instance)
+ return None
+
+ raise DiskError(f"Could not get PARTUUID of partition {self}: {error}")
@property
def encrypted(self) -> Union[bool, None]:
@@ -237,6 +254,12 @@ class Partition:
device_path, bind_name = split_bind_name(self.path)
return bind_name
+ @property
+ def subvolumes(self) -> Iterator[BtrfsSubvolume]:
+ for mountpoint in self.mount_information:
+ for result in get_subvolumes_from_findmnt(mountpoint):
+ yield result
+
def partprobe(self) -> bool:
if self.block_device and SysCommand(f'partprobe {self.block_device.device}').exit_code == 0:
time.sleep(1)
@@ -411,7 +434,6 @@ class Partition:
except SysCallError as err:
raise err
- self.mountpoint = target
return True
return False
@@ -425,7 +447,6 @@ class Partition:
if 0 < worker.exit_code < 8000:
raise SysCallError(f"Could not unmount {self.path} properly: {worker}", exit_code=worker.exit_code)
- self.mountpoint = None
return True
def umount(self) -> bool:
diff --git a/archinstall/lib/general.py b/archinstall/lib/general.py
index ad7b8ad4..174acb8a 100644
--- a/archinstall/lib/general.py
+++ b/archinstall/lib/general.py
@@ -265,7 +265,7 @@ class SysCommandWorker:
log(args[1], level=logging.DEBUG, fg='red')
if self.exit_code != 0:
- raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[:500]}", self.exit_code)
+ raise SysCallError(f"{self.cmd} exited with abnormal exit code [{self.exit_code}]: {self._trace_log[-500:]}", self.exit_code)
def is_alive(self) -> bool:
self.poll()
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index daac340b..1ead46c7 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -12,7 +12,7 @@ from .disk import get_partitions_in_use, Partition
from .general import SysCommand, generate_password
from .hardware import has_uefi, is_vm, cpu_vendor
from .locale_helpers import verify_keyboard_layout, verify_x11_keyboard_layout
-from .disk.helpers import get_mount_info, split_bind_name
+from .disk.helpers import get_mount_info
from .mirrors import use_mirrors
from .plugins import plugins
from .storage import storage
@@ -21,7 +21,7 @@ from .output import log
from .profiles import Profile
from .disk.btrfs import manage_btrfs_subvolumes
from .disk.partition import get_mount_fs_type
-from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError
+from .exceptions import DiskError, ServiceException, RequirementError, HardwareIncompatibilityError, SysCallError
# Any package that the Installer() is responsible for (optional and the default ones)
__packages__ = ["base", "base-devel", "linux-firmware", "linux", "linux-lts", "linux-zen", "linux-hardened"]
@@ -165,7 +165,7 @@ class Installer:
@property
def partitions(self) -> List[Partition]:
- return get_partitions_in_use(self.target)
+ return get_partitions_in_use(self.target).values()
def sync_log_to_install_medium(self) -> bool:
# Copy over the install log (if there is one) to the install medium if
@@ -259,12 +259,15 @@ class Installer:
for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']):
mountpoint = partition['mountpoint']
log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
+
if partition.get('filesystem',{}).get('mount_options',[]):
mount_options = ','.join(partition['filesystem']['mount_options'])
partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options)
else:
partition['device_instance'].mount(f"{self.target}{mountpoint}")
+
time.sleep(1)
+
try:
get_mount_info(f"{self.target}{mountpoint}", traverse=False)
except DiskError:
@@ -501,11 +504,17 @@ class Installer:
return True
def detect_encryption(self, partition :Partition) -> bool:
- part = Partition(partition.parent, None, autodetect_filesystem=True)
- if partition.encrypted:
+ from .disk.mapperdev import MapperDev
+ from .disk.dmcryptdev import DMCryptDev
+ from .disk.helpers import get_filesystem_type
+
+ if type(partition) is MapperDev:
+ # Returns MapperDev.partition
+ return partition.partition
+ elif type(partition) is DMCryptDev:
+ return partition.MapperDev.partition
+ elif get_filesystem_type(partition.path) == 'crypto_LUKS':
return partition
- elif partition.parent not in partition.path and part.filesystem == 'crypto_LUKS':
- return part
return False
@@ -627,6 +636,184 @@ class Installer:
else:
raise ValueError(f"Archinstall currently only supports setting up swap on zram")
+ def add_systemd_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
+ self.pacstrap('efibootmgr')
+
+ if not has_uefi():
+ raise HardwareIncompatibilityError
+ # TODO: Ideally we would want to check if another config
+ # points towards the same disk and/or partition.
+ # And in which case we should do some clean up.
+
+ # Install the boot loader
+ if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0:
+ # Fallback, try creating the boot loader without touching the EFI variables
+ SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
+
+ # Ensure that the /boot/loader directory exists before we try to create files in it
+ if not os.path.exists(f'{self.target}/boot/loader'):
+ os.makedirs(f'{self.target}/boot/loader')
+
+ # Modify or create a loader.conf
+ if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
+ with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader:
+ loader_data = loader.read().split('\n')
+ else:
+ loader_data = [
+ f"default {self.init_time}",
+ "timeout 5"
+ ]
+
+ with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
+ for line in loader_data:
+ if line[:8] == 'default ':
+ loader.write(f'default {self.init_time}_{self.kernels[0]}\n')
+ elif line[:8] == '#timeout' and 'timeout 5' not in loader_data:
+ # We add in the default timeout to support dual-boot
+ loader.write(f"{line[1:]}\n")
+ else:
+ loader.write(f"{line}\n")
+
+ # Ensure that the /boot/loader/entries directory exists before we try to create files in it
+ if not os.path.exists(f'{self.target}/boot/loader/entries'):
+ os.makedirs(f'{self.target}/boot/loader/entries')
+
+ for kernel in self.kernels:
+ # Setup the loader entry
+ with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}.conf', 'w') as entry:
+ entry.write('# Created by: archinstall\n')
+ entry.write(f'# Created on: {self.init_time}\n')
+ entry.write(f'title Arch Linux ({kernel})\n')
+ entry.write(f"linux /vmlinuz-{kernel}\n")
+ if not is_vm():
+ vendor = cpu_vendor()
+ if vendor == "AuthenticAMD":
+ entry.write("initrd /amd-ucode.img\n")
+ elif vendor == "GenuineIntel":
+ entry.write("initrd /intel-ucode.img\n")
+ else:
+ self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
+ entry.write(f"initrd /initramfs-{kernel}.img\n")
+ # blkid doesn't trigger on loopback devices really well,
+ # so we'll use the old manual method until we get that sorted out.
+ root_fs_type = get_mount_fs_type(root_partition.filesystem)
+
+ if root_fs_type is not None:
+ options_entry = f'rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
+ else:
+ options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n'
+
+ for subvolume in root_partition.subvolumes:
+ if subvolume.root is True:
+ options_entry = f"rootflags=subvol={subvolume.name} " + options_entry
+
+ # Zswap should be disabled when using zram.
+ #
+ # https://github.com/archlinux/archinstall/issues/881
+ if self.zram_enabled:
+ options_entry = "zswap.enabled=0 " + options_entry
+
+ if real_device := self.detect_encryption(root_partition):
+ # TODO: We need to detect if the encrypted device is a whole disk encryption,
+ # or simply a partition encryption. Right now we assume it's a partition (and we always have)
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
+ entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}')
+ else:
+ log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
+ entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}')
+
+ self.helper_flags['bootloader'] = "systemd"
+
+ return True
+
+ def add_grub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
+ self.pacstrap('grub') # no need?
+
+ root_fs_type = get_mount_fs_type(root_partition.filesystem)
+
+ if real_device := self.detect_encryption(root_partition):
+ root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip()
+ _file = "/etc/default/grub"
+ add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'"
+ enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'"
+
+ log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO)
+ SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
+ SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}")
+ else:
+ _file = "/etc/default/grub"
+ add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'"
+ SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
+
+ log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO)
+ if has_uefi():
+ self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable')
+ except SysCallError as error:
+ raise DiskError(f"Could not install GRUB to {self.target}/boot: {error}")
+ else:
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc --recheck {boot_partition.parent}')
+ except SysCallError as error:
+ raise DiskError(f"Could not install GRUB to {boot_partition.path}: {error}")
+
+ try:
+ SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg')
+ except SysCallError as error:
+ raise DiskError(f"Could not configure GRUB: {error}")
+
+ self.helper_flags['bootloader'] = "grub"
+
+ return True
+
+ def add_efistub_bootloader(self, boot_partition :Partition, root_partition :Partition) -> bool:
+ self.pacstrap('efibootmgr')
+
+ if not has_uefi():
+ raise HardwareIncompatibilityError
+ # TODO: Ideally we would want to check if another config
+ # points towards the same disk and/or partition.
+ # And in which case we should do some clean up.
+
+ root_fs_type = get_mount_fs_type(root_partition.filesystem)
+
+ for kernel in self.kernels:
+ # Setup the firmware entry
+
+ label = f'Arch Linux ({kernel})'
+ loader = f"/vmlinuz-{kernel}"
+
+ kernel_parameters = []
+
+ if not is_vm():
+ vendor = cpu_vendor()
+ if vendor == "AuthenticAMD":
+ kernel_parameters.append("initrd=\\amd-ucode.img")
+ elif vendor == "GenuineIntel":
+ kernel_parameters.append("initrd=\\intel-ucode.img")
+ else:
+ self.log("unknow cpu vendor, not adding ucode to firmware boot entry")
+
+ kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img")
+
+ # blkid doesn't trigger on loopback devices really well,
+ # so we'll use the old manual method until we get that sorted out.
+ if real_device := self.detect_encryption(root_partition):
+ # TODO: We need to detect if the encrypted device is a whole disk encryption,
+ # or simply a partition encryption. Right now we assume it's a partition (and we always have)
+ log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
+ kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+ else:
+ log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
+ kernel_parameters.append(f'root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
+
+ SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
+
+ self.helper_flags['bootloader'] = "efistub"
+
+ return True
+
def add_bootloader(self, bootloader :str = 'systemd-bootctl') -> bool:
"""
Adds a bootloader to the installation instance.
@@ -648,177 +835,23 @@ class Installer:
boot_partition = None
root_partition = None
- root_partition_fs = None
for partition in self.partitions:
- if partition.mountpoint == self.target + '/boot':
+ if partition.mountpoint == os.path.join(self.target, 'boot'):
boot_partition = partition
elif partition.mountpoint == self.target:
root_partition = partition
- root_partition_fs = partition.filesystem
- root_fs_type = get_mount_fs_type(root_partition_fs)
if boot_partition is None or root_partition is None:
- raise ValueError(f"Could not detect root (/) or boot (/boot) in {self.target} based on: {self.partitions}")
+ raise ValueError(f"Could not detect root ({root_partition}) or boot ({boot_partition}) in {self.target} based on: {self.partitions}")
self.log(f'Adding bootloader {bootloader} to {boot_partition if boot_partition else root_partition}', level=logging.INFO)
if bootloader == 'systemd-bootctl':
- self.pacstrap('efibootmgr')
-
- if not has_uefi():
- raise HardwareIncompatibilityError
- # TODO: Ideally we would want to check if another config
- # points towards the same disk and/or partition.
- # And in which case we should do some clean up.
-
- # Install the boot loader
- if SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --path=/boot install').exit_code != 0:
- # Fallback, try creating the boot loader without touching the EFI variables
- SysCommand(f'/usr/bin/arch-chroot {self.target} bootctl --no-variables --path=/boot install')
-
- # Ensure that the /boot/loader directory exists before we try to create files in it
- if not os.path.exists(f'{self.target}/boot/loader'):
- os.makedirs(f'{self.target}/boot/loader')
-
- # Modify or create a loader.conf
- if os.path.isfile(f'{self.target}/boot/loader/loader.conf'):
- with open(f'{self.target}/boot/loader/loader.conf', 'r') as loader:
- loader_data = loader.read().split('\n')
- else:
- loader_data = [
- f"default {self.init_time}",
- "timeout 5"
- ]
-
- with open(f'{self.target}/boot/loader/loader.conf', 'w') as loader:
- for line in loader_data:
- if line[:8] == 'default ':
- loader.write(f'default {self.init_time}_{self.kernels[0]}\n')
- elif line[:8] == '#timeout' and 'timeout 5' not in loader_data:
- # We add in the default timeout to support dual-boot
- loader.write(f"{line[1:]}\n")
- else:
- loader.write(f"{line}\n")
-
- # Ensure that the /boot/loader/entries directory exists before we try to create files in it
- if not os.path.exists(f'{self.target}/boot/loader/entries'):
- os.makedirs(f'{self.target}/boot/loader/entries')
-
- for kernel in self.kernels:
- # Setup the loader entry
- with open(f'{self.target}/boot/loader/entries/{self.init_time}_{kernel}.conf', 'w') as entry:
- entry.write('# Created by: archinstall\n')
- entry.write(f'# Created on: {self.init_time}\n')
- entry.write(f'title Arch Linux ({kernel})\n')
- entry.write(f"linux /vmlinuz-{kernel}\n")
- if not is_vm():
- vendor = cpu_vendor()
- if vendor == "AuthenticAMD":
- entry.write("initrd /amd-ucode.img\n")
- elif vendor == "GenuineIntel":
- entry.write("initrd /intel-ucode.img\n")
- else:
- self.log("unknow cpu vendor, not adding ucode to systemd-boot config")
- entry.write(f"initrd /initramfs-{kernel}.img\n")
- # blkid doesn't trigger on loopback devices really well,
- # so we'll use the old manual method until we get that sorted out.
- if root_fs_type is not None:
- options_entry = f'rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}\n'
- else:
- options_entry = f'rw intel_pstate=no_hwp {" ".join(self.KERNEL_PARAMS)}\n'
- base_path,bind_path = split_bind_name(str(root_partition.path))
- if bind_path is not None: # and root_fs_type == 'btrfs':
- options_entry = f"rootflags=subvol={bind_path} " + options_entry
-
- # Zswap should be disabled when using zram.
- #
- # https://github.com/archlinux/archinstall/issues/881
- if self.zram_enabled:
- options_entry = "zswap.enabled=0 " + options_entry
-
- if real_device := self.detect_encryption(root_partition):
- # TODO: We need to detect if the encrypted device is a whole disk encryption,
- # or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
- entry.write(f'options cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev {options_entry}')
- else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
- entry.write(f'options root=PARTUUID={root_partition.uuid} {options_entry}')
-
- self.helper_flags['bootloader'] = bootloader
-
+ self.add_systemd_bootloader(boot_partition, root_partition)
elif bootloader == "grub-install":
- self.pacstrap('grub') # no need?
-
- if real_device := self.detect_encryption(root_partition):
- root_uuid = SysCommand(f"blkid -s UUID -o value {real_device.path}").decode().rstrip()
- _file = "/etc/default/grub"
- add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"cryptdevice=UUID={root_uuid}:cryptlvm rootfstype={root_fs_type}\"/'"
- enable_CRYPTODISK = "sed -i 's/#GRUB_ENABLE_CRYPTODISK=y/GRUB_ENABLE_CRYPTODISK=y/'"
-
- log(f"Using UUID {root_uuid} of {real_device} as encrypted root identifier.", level=logging.INFO)
- SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
- SysCommand(f"/usr/bin/arch-chroot {self.target} {enable_CRYPTODISK} {_file}")
- else:
- _file = "/etc/default/grub"
- add_to_CMDLINE_LINUX = f"sed -i 's/GRUB_CMDLINE_LINUX=\"\"/GRUB_CMDLINE_LINUX=\"rootfstype={root_fs_type}\"/'"
- SysCommand(f"/usr/bin/arch-chroot {self.target} {add_to_CMDLINE_LINUX} {_file}")
-
- log(f"GRUB uses {boot_partition.path} as the boot partition.", level=logging.INFO)
- if has_uefi():
- self.pacstrap('efibootmgr') # TODO: Do we need? Yes, but remove from minimal_installation() instead?
- if not (handle := SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=x86_64-efi --efi-directory=/boot --bootloader-id=GRUB --removable')).exit_code == 0:
- raise DiskError(f"Could not install GRUB to {self.target}/boot: {handle}")
- else:
- if not (handle := SysCommand(f'/usr/bin/arch-chroot {self.target} grub-install --target=i386-pc --recheck {boot_partition.parent}')).exit_code == 0:
- raise DiskError(f"Could not install GRUB to {boot_partition.path}: {handle}")
-
- if not (handle := SysCommand(f'/usr/bin/arch-chroot {self.target} grub-mkconfig -o /boot/grub/grub.cfg')).exit_code == 0:
- raise DiskError(f"Could not configure GRUB: {handle}")
-
- self.helper_flags['bootloader'] = True
+ self.add_grub_bootloader(boot_partition, root_partition)
elif bootloader == 'efistub':
- self.pacstrap('efibootmgr')
-
- if not has_uefi():
- raise HardwareIncompatibilityError
- # TODO: Ideally we would want to check if another config
- # points towards the same disk and/or partition.
- # And in which case we should do some clean up.
-
- for kernel in self.kernels:
- # Setup the firmware entry
-
- label = f'Arch Linux ({kernel})'
- loader = f"/vmlinuz-{kernel}"
-
- kernel_parameters = []
-
- if not is_vm():
- vendor = cpu_vendor()
- if vendor == "AuthenticAMD":
- kernel_parameters.append("initrd=\\amd-ucode.img")
- elif vendor == "GenuineIntel":
- kernel_parameters.append("initrd=\\intel-ucode.img")
- else:
- self.log("unknow cpu vendor, not adding ucode to firmware boot entry")
-
- kernel_parameters.append(f"initrd=\\initramfs-{kernel}.img")
-
- # blkid doesn't trigger on loopback devices really well,
- # so we'll use the old manual method until we get that sorted out.
- if real_device := self.detect_encryption(root_partition):
- # TODO: We need to detect if the encrypted device is a whole disk encryption,
- # or simply a partition encryption. Right now we assume it's a partition (and we always have)
- log(f"Identifying root partition by PART-UUID on {real_device}: '{real_device.uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'cryptdevice=PARTUUID={real_device.uuid}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
- else:
- log(f"Identifying root partition by PART-UUID on {root_partition}, looking for '{root_partition.uuid}'.", level=logging.DEBUG)
- kernel_parameters.append(f'root=PARTUUID={root_partition.uuid} rw intel_pstate=no_hwp rootfstype={root_fs_type} {" ".join(self.KERNEL_PARAMS)}')
-
- SysCommand(f'efibootmgr --disk {boot_partition.path[:-1]} --part {boot_partition.path[-1]} --create --label "{label}" --loader {loader} --unicode \'{" ".join(kernel_parameters)}\' --verbose')
-
- self.helper_flags['bootloader'] = bootloader
+ self.add_efistub_bootloader(boot_partition, root_partition)
else:
raise RequirementError(f"Unknown (or not yet implemented) bootloader requested: {bootloader}")
diff --git a/archinstall/lib/output.py b/archinstall/lib/output.py
index ffe00370..fa537dd4 100644
--- a/archinstall/lib/output.py
+++ b/archinstall/lib/output.py
@@ -105,5 +105,6 @@ def log(*args :str, **kwargs :Union[str, int, Dict[str, Union[str, int]]]) -> No
# Finally, print the log unless we skipped it based on level.
# We use sys.stdout.write()+flush() instead of print() to try and
# fix issue #94
- sys.stdout.write(f"{string}\n")
- sys.stdout.flush()
+ if kwargs.get('level', logging.INFO) != logging.DEBUG or storage['arguments'].get('verbose', False):
+ sys.stdout.write(f"{string}\n")
+ sys.stdout.flush()
diff --git a/archinstall/lib/user_interaction.py b/archinstall/lib/user_interaction.py
index 7ed143f1..202b14a4 100644
--- a/archinstall/lib/user_interaction.py
+++ b/archinstall/lib/user_interaction.py
@@ -15,7 +15,7 @@ from typing import List, Any, Optional, Dict, Union, TYPE_CHECKING
if TYPE_CHECKING:
from .disk.partition import Partition
-from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_disks
+from .disk import BlockDevice, suggest_single_disk_layout, suggest_multi_disk_layout, valid_parted_position, all_blockdevices
from .exceptions import RequirementError, UserError, DiskError
from .hardware import AVAILABLE_GFX_DRIVERS, has_uefi, has_amd_graphics, has_intel_graphics, has_nvidia_graphics
from .locale_helpers import list_keyboard_languages, list_timezones, list_locales
@@ -285,7 +285,7 @@ def ask_ntp() -> bool:
def ask_hostname():
- hostname = input(_('Desired hostname for the installation: ').strip(' '))
+ hostname = input(_('Desired hostname for the installation: ')).strip(' ')
return hostname
@@ -889,7 +889,7 @@ def select_harddrives() -> Optional[str]:
:return: List of selected hard drives
:rtype: list
"""
- hard_drives = all_disks().values()
+ hard_drives = all_blockdevices(partitions=False).values()
options = {f'{option}': option for option in hard_drives}
selected_harddrive = Menu(