Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/luks.py
blob: d62c2d4b4d051de767d194967e1a2f7768c2e1c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import os
from .exceptions import *
from .general import *
from .disk import Partition
from .output import log, LOG_LEVELS
from .storage import storage

class luks2():
	def __init__(self, partition, mountpoint, password, *args, **kwargs):
		self.password = password
		self.partition = partition
		self.mountpoint = mountpoint
		self.args = args
		self.kwargs = kwargs
		self.filesystem = 'crypto_LUKS'

	def __enter__(self):
		key_file = self.encrypt(self.partition, self.password, *self.args, **self.kwargs)
		return self.unlock(self.partition, self.mountpoint, key_file)

	def __exit__(self, *args, **kwargs):
		# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
		if len(args) >= 2 and args[1]:
			raise args[1]
		return True

	def encrypt(self, partition, password, key_size=512, hash_type='sha512', iter_time=10000, key_file=None):
		# TODO: We should be able to integrate this into the main log some how.
		#       Perhaps post-mortem?
		log(f'Encrypting {partition} (This might take a while)', level=LOG_LEVELS.Info)

		if not key_file:
			key_file = f"/tmp/{os.path.basename(self.partition.path)}.disk_pw"  # TODO: Make disk-pw-file randomly unique?
		if type(password) != bytes: password = bytes(password, 'UTF-8')

		with open(key_file, 'wb') as fh:
			fh.write(password)

		o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash {hash_type} --key-size {key_size} --iter-time {iter_time} --key-file {os.path.abspath(key_file)} --use-urandom luksFormat {partition.path}'))
		if b'Command successful.' not in o:
			raise DiskError(f'Could not encrypt volume "{partition.path}": {o}')
	
		return key_file

	def unlock(self, partition, mountpoint, key_file):
		"""
		Mounts a lukts2 compatible partition to a certain mountpoint.
		Keyfile must be specified as there's no way to interact with the pw-prompt atm.

		:param mountpoint: The name without absolute path, for instance "luksdev" will point to /dev/mapper/luksdev
		:type mountpoint: str
		"""
		if '/' in mountpoint:
			os.path.basename(mountpoint)  # TODO: Raise exception instead?
		sys_command(f'/usr/bin/cryptsetup open {partition.path} {mountpoint} --key-file {os.path.abspath(key_file)} --type luks2')
		if os.path.islink(f'/dev/mapper/{mountpoint}'):
			return Partition(f'/dev/mapper/{mountpoint}', encrypted=True)

	def close(self, mountpoint):
		sys_command(f'cryptsetup close /dev/mapper/{mountpoint}')
		return os.path.islink(f'/dev/mapper/{mountpoint}') is False

	def format(self, path):
		if (handle := sys_command(f"/usr/bin/cryptsetup -q -v luksErase {path}")).exit_code != 0:
			raise DiskError(f'Could not format {path} with {self.filesystem} because: {b"".join(handle)}')