Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/filesystem.py
blob: bdfa502a9beee146e4e5ada562e9245a26b38b4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
from __future__ import annotations
import time
import logging
import json
import pathlib
from typing import Optional, Dict, Any, TYPE_CHECKING
# https://stackoverflow.com/a/39757388/929999
from ..models.disk_encryption import DiskEncryption

if TYPE_CHECKING:
	from .blockdevice import BlockDevice
	_: Any

from .partition import Partition
from .validators import valid_fs_type
from ..exceptions import DiskError, SysCallError
from ..general import SysCommand
from ..output import log
from ..storage import storage

GPT = 0b00000001
MBR = 0b00000010

# A sane default is 5MiB, that allows for plenty of buffer for GRUB on MBR
# but also 4MiB for memory cards for instance. And another 1MiB to avoid issues.
# (we've been pestered by disk issues since the start, so please let this be here for a few versions)
DEFAULT_PARTITION_START = '5MiB'

class Filesystem:
	# TODO:
	#   When instance of a HDD is selected, check all usages and gracefully unmount them
	#   as well as close any crypto handles.
	def __init__(self, blockdevice :BlockDevice, mode :int):
		self.blockdevice = blockdevice
		self.mode = mode

	def __enter__(self, *args :str, **kwargs :str) -> 'Filesystem':
		return self

	def __repr__(self) -> str:
		return f"Filesystem(blockdevice={self.blockdevice}, mode={self.mode})"

	def __exit__(self, *args :str, **kwargs :str) -> bool:
		# TODO: https://stackoverflow.com/questions/28157929/how-to-safely-handle-an-exception-inside-a-context-manager
		if len(args) >= 2 and args[1]:
			raise args[1]

		SysCommand('sync')
		return True

	def partuuid_to_index(self, uuid :str) -> Optional[int]:
		for i in range(storage['DISK_RETRY_ATTEMPTS']):
			self.partprobe()
			time.sleep(max(0.1, storage['DISK_TIMEOUTS'] * i))

			# We'll use unreliable lbslk to grab children under the /dev/<device>
			output = json.loads(SysCommand(f"lsblk --json {self.blockdevice.device}").decode('UTF-8'))

			for device in output['blockdevices']:
				for index, partition in enumerate(device.get('children', [])):
					# But we'll use blkid to reliably grab the PARTUUID for that child device (partition)
					partition_uuid = SysCommand(f"blkid -s PARTUUID -o value /dev/{partition.get('name')}").decode().strip()
					if partition_uuid.lower() == uuid.lower():
						return index

		raise DiskError(f"Failed to convert PARTUUID {uuid} to a partition index number on blockdevice {self.blockdevice.device}")

	def load_layout(self, layout :Dict[str, Any]) -> None:
		from ..luks import luks2
		from .btrfs import BTRFSPartition

		# If the layout tells us to wipe the drive, we do so
		if layout.get('wipe', False):
			if self.mode == GPT:
				if not self.parted_mklabel(self.blockdevice.device, "gpt"):
					raise KeyError(f"Could not create a GPT label on {self}")
			elif self.mode == MBR:
				if not self.parted_mklabel(self.blockdevice.device, "msdos"):
					raise KeyError(f"Could not create a MS-DOS label on {self}")

			self.blockdevice.flush_cache()
			time.sleep(3)

		prev_partition = None
		# We then iterate the partitions in order
		for partition in layout.get('partitions', []):
			# We don't want to re-add an existing partition (those containing a UUID already)
			if partition.get('wipe', False) and not partition.get('PARTUUID', None):
				start = partition.get('start') or (
					prev_partition and f'{prev_partition["device_instance"].end_sectors}s' or DEFAULT_PARTITION_START)
				partition['device_instance'] = self.add_partition(partition.get('type', 'primary'),
																	start=start,
																	end=partition.get('size', '100%'),
																	partition_format=partition.get('filesystem', {}).get('format', 'btrfs'),
																	skip_mklabel=layout.get('wipe', False) is not False)

			elif (partition_uuid := partition.get('PARTUUID')):
				# We try to deal with both UUID and PARTUUID of a partition when it's being re-used.
				# We should re-name or separate this logi based on partition.get('PARTUUID') and partition.get('UUID')
				# but for now, lets just attempt to deal with both.
				try:
					partition['device_instance'] = self.blockdevice.get_partition(uuid=partition_uuid)
				except DiskError:
					partition['device_instance'] = self.blockdevice.get_partition(partuuid=partition_uuid)

				log(_("Re-using partition instance: {}").format(partition['device_instance']), level=logging.DEBUG, fg="gray")
			else:
				log(f"{self}.load_layout() doesn't know how to work without 'wipe' being set or UUID ({partition.get('PARTUUID')}) was given and found.", fg="yellow", level=logging.WARNING)
				continue

			if partition.get('filesystem', {}).get('format', False):
				# needed for backward compatibility with the introduction of the new "format_options"
				format_options = partition.get('options',[]) + partition.get('filesystem',{}).get('format_options',[])
				disk_encryption: DiskEncryption = storage['arguments'].get('disk_encryption')

				if partition in disk_encryption.partitions:
					if not partition['device_instance']:
						raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")

					if partition.get('mountpoint',None):
						loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['mountpoint']).name}loop"
					else:
						loopdev = f"{storage.get('ENC_IDENTIFIER', 'ai')}{pathlib.Path(partition['device_instance'].path).name}"

					partition['device_instance'].encrypt(password=disk_encryption.encryption_password)
					# Immediately unlock the encrypted device to format the inner volume
					with luks2(partition['device_instance'], loopdev, disk_encryption.encryption_password, auto_unmount=True) as unlocked_device:
						if not partition.get('wipe'):
							if storage['arguments'] == 'silent':
								raise ValueError(f"Missing fs-type to format on newly created encrypted partition {partition['device_instance']}")
							else:
								if not partition.get('filesystem'):
									partition['filesystem'] = {}

								if not partition['filesystem'].get('format', False):
									while True:
										partition['filesystem']['format'] = input(f"Enter a valid fs-type for newly encrypted partition {partition['filesystem']['format']}: ").strip()
										if not partition['filesystem']['format'] or valid_fs_type(partition['filesystem']['format']) is False:
											log(_("You need to enter a valid fs-type in order to continue. See `man parted` for valid fs-type's."))
											continue
										break

						unlocked_device.format(partition['filesystem']['format'], options=format_options)

				elif partition.get('wipe', False):
					if not partition['device_instance']:
						raise DiskError(f"Internal error caused us to loose the partition. Please report this issue upstream!")

					partition['device_instance'].format(partition['filesystem']['format'], options=format_options)

					if partition['filesystem']['format'] == 'btrfs':
						# We upgrade the device instance to a BTRFSPartition if we format it as such.
						# This is so that we can gain access to more features than otherwise available in Partition()
						partition['device_instance'] = BTRFSPartition(
							partition['device_instance'].path,
							block_device=partition['device_instance'].block_device,
							encrypted=False,
							filesystem='btrfs',
							autodetect_filesystem=False
						)

			if partition.get('boot', False):
				log(f"Marking partition {partition['device_instance']} as bootable.")
				self.set(self.partuuid_to_index(partition['device_instance'].part_uuid), 'boot on')

			prev_partition = partition

	def find_partition(self, mountpoint :str) -> Partition:
		for partition in self.blockdevice:
			if partition.target_mountpoint == mountpoint or partition.mountpoint == mountpoint:
				return partition

	def partprobe(self) -> bool:
		try:
			SysCommand(f'partprobe {self.blockdevice.device}')
		except SysCallError as error:
			log(f"Could not execute partprobe: {error!r}", level=logging.ERROR, fg="red")
			raise DiskError(f"Could not run partprobe on {self.blockdevice.device}: {error!r}")

		return True

	def raw_parted(self, string: str) -> SysCommand:
		try:
			cmd_handle = SysCommand(f'/usr/bin/parted -s {string}')
			time.sleep(0.5)
			return cmd_handle
		except SysCallError as error:
			log(f"Parted ended with a bad exit code: {error.exit_code} ({error})", level=logging.ERROR, fg="red")
			return error

	def parted(self, string: str) -> bool:
		"""
		Performs a parted execution of the given string

		:param string: A raw string passed to /usr/bin/parted -s <string>
		:type string: str
		"""
		if (parted_handle := self.raw_parted(string)).exit_code == 0:
			return self.partprobe()
		else:
			raise DiskError(f"Parted failed to add a partition: {parted_handle}")

	def use_entire_disk(self, root_filesystem_type :str = 'ext4') -> Partition:
		# TODO: Implement this with declarative profiles instead.
		raise ValueError("Installation().use_entire_disk() has to be re-worked.")

	def add_partition(
		self,
		partition_type :str,
		start :str,
		end :str,
		partition_format :Optional[str] = None,
		skip_mklabel :bool = False
	) -> Partition:
		log(f'Adding partition to {self.blockdevice}, {start}->{end}', level=logging.INFO)

		if len(self.blockdevice.partitions) == 0 and skip_mklabel is False:
			# If it's a completely empty drive, and we're about to add partitions to it
			# we need to make sure there's a filesystem label.
			if self.mode == GPT:
				if not self.parted_mklabel(self.blockdevice.device, "gpt"):
					raise KeyError(f"Could not create a GPT label on {self}")
			elif self.mode == MBR:
				if not self.parted_mklabel(self.blockdevice.device, "msdos"):
					raise KeyError(f"Could not create a MS-DOS label on {self}")

			self.blockdevice.flush_cache()

		previous_partuuids = []
		for partition in self.blockdevice.partitions.values():
			try:
				previous_partuuids.append(partition.part_uuid)
			except DiskError:
				pass

		# TODO this check should probably run in the setup process rather than during the installation
		if self.mode == MBR:
			if len(self.blockdevice.partitions) > 3:
				DiskError("Too many partitions on disk, MBR disks can only have 3 primary partitions")

		if partition_format:
			parted_string = f'{self.blockdevice.device} mkpart {partition_type} {partition_format} {start} {end}'
		else:
			parted_string = f'{self.blockdevice.device} mkpart {partition_type} {start} {end}'

		log(f"Adding partition using the following parted command: {parted_string}", level=logging.DEBUG)

		if self.parted(parted_string):
			for count in range(storage.get('DISK_RETRY_ATTEMPTS', 3)):
				self.blockdevice.flush_cache()

				new_partition_uuids = [partition.part_uuid for partition in self.blockdevice.partitions.values()]
				new_partuuid_set = (set(previous_partuuids) ^ set(new_partition_uuids))

				if len(new_partuuid_set) and (new_partuuid := new_partuuid_set.pop()):
					try:
						return self.blockdevice.get_partition(partuuid=new_partuuid)
					except Exception as err:
						log(f'Blockdevice: {self.blockdevice}', level=logging.ERROR, fg="red")
						log(f'Partitions: {self.blockdevice.partitions}', level=logging.ERROR, fg="red")
						log(f'Partition set: {new_partuuid_set}', level=logging.ERROR, fg="red")
						log(f'New PARTUUID: {[new_partuuid]}', level=logging.ERROR, fg="red")
						log(f'get_partition(): {self.blockdevice.get_partition}', level=logging.ERROR, fg="red")
						raise err
				else:
					log(f"Could not get UUID for partition. Waiting {storage.get('DISK_TIMEOUTS', 1) * count}s before retrying.",level=logging.DEBUG)
					self.partprobe()
					time.sleep(max(0.1, storage.get('DISK_TIMEOUTS', 1)))
		else:
			print("Parted did not return True during partition creation")

		total_partitions = set([partition.part_uuid for partition in self.blockdevice.partitions.values()])
		total_partitions.update(previous_partuuids)

		# TODO: This should never be able to happen
		log(f"Could not find the new PARTUUID after adding the partition.", level=logging.ERROR, fg="red")
		log(f"Previous partitions: {previous_partuuids}", level=logging.ERROR, fg="red")
		log(f"New partitions: {total_partitions}", level=logging.ERROR, fg="red")

		raise DiskError(f"Could not add partition using: {parted_string}")

	def set_name(self, partition: int, name: str) -> bool:
		return self.parted(f'{self.blockdevice.device} name {partition + 1} "{name}"') == 0

	def set(self, partition: int, string: str) -> bool:
		log(f"Setting {string} on (parted) partition index {partition+1}", level=logging.INFO)
		return self.parted(f'{self.blockdevice.device} set {partition + 1} {string}') == 0

	def parted_mklabel(self, device: str, disk_label: str) -> bool:
		log(f"Creating a new partition label on {device}", level=logging.INFO, fg="yellow")
		# Try to unmount devices before attempting to run mklabel
		try:
			SysCommand(f'bash -c "umount {device}?"')
		except:
			pass

		self.partprobe()
		worked = self.raw_parted(f'{device} mklabel {disk_label}').exit_code == 0
		self.partprobe()

		return worked