Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/installer.py
diff options
context:
space:
mode:
Diffstat (limited to 'archinstall/lib/installer.py')
-rw-r--r--archinstall/lib/installer.py148
1 files changed, 85 insertions, 63 deletions
diff --git a/archinstall/lib/installer.py b/archinstall/lib/installer.py
index 4f46e458..b1570e16 100644
--- a/archinstall/lib/installer.py
+++ b/archinstall/lib/installer.py
@@ -180,80 +180,101 @@ class Installer:
return True
- def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
- from .luks import luks2
-
- mountpoints = {}
- for blockdevice in layouts:
- for partition in layouts[blockdevice]['partitions']:
- if (subvolumes := partition.get('btrfs', {}).get('subvolumes', {})):
- if partition.get('encrypted',False):
- if partition.get('mountpoint',None):
- ppath = partition['mountpoint']
- else:
- ppath = partition['device_instance'].path
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(ppath).name}loop"
- # Immediately unlock the encrypted device to format the inner volume
- with luks2(partition['device_instance'], loopdev, partition['!password'], auto_unmount=False) as unlocked_device:
- unlocked_device.mount(f"{self.target}/")
- try:
- manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes,unlocked_device)
- except Exception as e:
- # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
- unlocked_device.unmount()
- raise e
- unlocked_device.unmount()
- # TODO generate key
- else:
- self.mount(partition['device_instance'],"/")
- try:
- manage_btrfs_subvolumes(self,partition,mountpoints,subvolumes)
- except Exception as e:
- # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
- partition['device_instance'].unmount()
- raise e
- partition['device_instance'].unmount()
- else:
- mountpoints[partition['mountpoint']] = partition
- for mountpoint in sorted([mnt_dest for mnt_dest in mountpoints.keys() if mnt_dest is not None]):
- partition = mountpoints[mountpoint]
- if partition.get('encrypted', False) and not partition.get('subvolume',None):
- loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
- if not (password := partition.get('!password', None)):
- raise RequirementError(f"Missing mountpoint {mountpoint} encryption password in layout: {partition}")
-
- with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device:
- if partition.get('generate-encryption-key-file'):
- if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
- cryptkey_dir.mkdir(parents=True)
-
- # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
- # if we name the device to "xyzloop".
- encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key"
- with open(f"{self.target}{encryption_key_path}", "w") as keyfile:
- keyfile.write(generate_password(length=512))
+ def _create_keyfile(self,luks_handle , partition :dict, password :str):
+ """ roiutine to create keyfiles, so it can be moved elsewere
+ """
+ if partition.get('generate-encryption-key-file'):
+ if not (cryptkey_dir := pathlib.Path(f"{self.target}/etc/cryptsetup-keys.d")).exists():
+ cryptkey_dir.mkdir(parents=True)
+ # Once we store the key as ../xyzloop.key systemd-cryptsetup can automatically load this key
+ # if we name the device to "xyzloop".
+ if partition.get('mountpoint',None):
+ encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['mountpoint']).name}loop.key"
+ else:
+ encryption_key_path = f"/etc/cryptsetup-keys.d/{pathlib.Path(partition['device_instance'].path).name}.key"
+ with open(f"{self.target}{encryption_key_path}", "w") as keyfile:
+ keyfile.write(generate_password(length=512))
- os.chmod(f"{self.target}{encryption_key_path}", 0o400)
+ os.chmod(f"{self.target}{encryption_key_path}", 0o400)
- luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password)
- luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"])
+ luks_handle.add_key(pathlib.Path(f"{self.target}{encryption_key_path}"), password=password)
+ luks_handle.crypttab(self, encryption_key_path, options=["luks", "key-slot=1"])
- log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {unlocked_device}", level=logging.INFO)
- unlocked_device.mount(f"{self.target}{mountpoint}")
+ def _has_root(self, partition :dict) -> bool:
+ """
+ Determine if an encrypted partition contains root in it
+ """
+ if partition.get("mountpoint") is None:
+ if (sub_list := partition.get("btrfs",{}).get('subvolumes',{})):
+ for mountpoint in [sub_list[subvolume] if isinstance(sub_list[subvolume],str) else sub_list[subvolume].get("mountpoint") for subvolume in sub_list]:
+ if mountpoint == '/':
+ return True
+ return False
+ else:
+ return False
+ elif partition.get("mountpoint") == '/':
+ return True
+ else:
+ return False
+ def mount_ordered_layout(self, layouts: Dict[str, Any]) -> None:
+ from .luks import luks2
+ # set the partitions as a list not part of a tree (which we don't need anymore (i think)
+ list_part = []
+ list_luks_handles = []
+ for blockdevice in layouts:
+ list_part.extend(layouts[blockdevice]['partitions'])
+
+ # we manage the encrypted partititons
+ for partition in [entry for entry in list_part if entry.get('encrypted',False)]:
+ # open the luks device and all associate stuff
+ if not (password := partition.get('!password', None)):
+ raise RequirementError(f"Missing partition {partition['device_instance'].path} encryption password in layout: {partition}")
+ # i change a bit the naming conventions for the loop device
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
else:
- log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
- if partition.get('options',[]):
- mount_options = ','.join(partition['options'])
- partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options)
- else:
- partition['device_instance'].mount(f"{self.target}{mountpoint}")
+ loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"
+ # note that we DON'T auto_unmount (i.e. close the encrypted device so it can be used
+ with (luks_handle := luks2(partition['device_instance'], loopdev, password, auto_unmount=False)) as unlocked_device:
+ if partition.get('generate-encryption-key-file',False) and not self._has_root(partition):
+ list_luks_handles.append([luks_handle,partition,password])
+ # this way all the requesrs will be to the dm_crypt device and not to the physical partition
+ partition['device_instance'] = unlocked_device
+
+ # we manage the btrfs partitions
+ for partition in [entry for entry in list_part if entry.get('btrfs', {}).get('subvolumes', {})]:
+ self.mount(partition['device_instance'],"/")
+ try:
+ new_mountpoints = manage_btrfs_subvolumes(self,partition)
+ except Exception as e:
+ # every exception unmounts the physical volume. Otherwise we let the system in an unstable state
+ partition['device_instance'].unmount()
+ raise e
+ partition['device_instance'].unmount()
+ if new_mountpoints:
+ list_part.extend(new_mountpoints)
+
+ # we mount. We need to sort by mountpoint to get a good working order
+ for partition in sorted([entry for entry in list_part if entry.get('mountpoint',False)],key=lambda part: part['mountpoint']):
+ mountpoint = partition['mountpoint']
+ log(f"Mounting {mountpoint} to {self.target}{mountpoint} using {partition['device_instance']}", level=logging.INFO)
+ if partition.get('filesystem',{}).get('mount_options',[]):
+ mount_options = ','.join(partition['filesystem']['mount_options'])
+ partition['device_instance'].mount(f"{self.target}{mountpoint}",options=mount_options)
+ else:
+ partition['device_instance'].mount(f"{self.target}{mountpoint}")
time.sleep(1)
try:
get_mount_info(f"{self.target}{mountpoint}", traverse=False)
except DiskError:
raise DiskError(f"Target {self.target}{mountpoint} never got mounted properly (unable to get mount information using findmnt).")
+ # once everything is mounted, we generate the key files in the correct place
+ for handle in list_luks_handles:
+ ppath = handle[1]['device_instance'].path
+ log(f"creating key-file for {ppath}",level=logging.INFO)
+ self._create_keyfile(handle[0],handle[1],handle[2])
+
def mount(self, partition :Partition, mountpoint :str, create_mountpoint :bool = True) -> None:
if create_mountpoint and not os.path.isdir(f'{self.target}{mountpoint}'):
os.makedirs(f'{self.target}{mountpoint}')
@@ -692,6 +713,7 @@ class Installer:
base_path,bind_path = split_bind_name(str(root_partition.path))
if bind_path is not None: # and root_fs_type == 'btrfs':
options_entry = f"rootflags=subvol={bind_path} " + options_entry
+
if real_device := self.detect_encryption(root_partition):
# TODO: We need to detect if the encrypted device is a whole disk encryption,
# or simply a partition encryption. Right now we assume it's a partition (and we always have)