Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/disk/btrfs/btrfssubvolume.py
blob: bc7db612365f03e2af9457bfb5e9bbd6c1760cac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import pathlib
import datetime
import logging
import string
import random
import shutil
from dataclasses import dataclass
from typing import Optional, List# , TYPE_CHECKING
from functools import cached_property

# if TYPE_CHECKING:
# 	from ..blockdevice import BlockDevice

from ...exceptions import DiskError
from ...general import SysCommand
from ...output import log
from ...storage import storage

@dataclass
class BtrfsSubvolume:
	full_path :pathlib.Path
	name :str
	uuid :str
	parent_uuid :str
	creation_time :datetime.datetime
	subvolume_id :int
	generation :int
	gen_at_creation :int
	parent_id :int
	top_level_id :int
	send_transid :int
	send_time :datetime.datetime
	receive_transid :int
	received_uuid :Optional[str] = None
	flags :Optional[str] = None
	receive_time :Optional[datetime.datetime] = None
	snapshots :Optional[List] = None

	def __post_init__(self):
		self.full_path = pathlib.Path(self.full_path)

		# Convert "-" entries to `None`
		if self.parent_uuid == "-":
			self.parent_uuid = None
		if self.received_uuid == "-":
			self.received_uuid = None
		if self.flags == "-":
			self.flags = None
		if self.receive_time == "-":
			self.receive_time = None
		if self.snapshots == "":
			self.snapshots = []

		# Convert timestamps into datetime workable objects (and preserve timezone by using ISO formats)
		self.creation_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.creation_time))
		self.send_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.send_time))
		if self.receive_time:
			self.receive_time = datetime.datetime.fromisoformat(self.convert_to_ISO_format(self.receive_time))

	@property
	def parent_subvolume(self):
		from .btrfs_helpers import find_parent_subvolume

		return find_parent_subvolume(self.full_path)

	@property
	def root(self) -> bool:
		from .btrfs_helpers import subvolume_info_from_path

		# TODO: Make this function traverse storage['MOUNT_POINT'] and find the first
		# occurrence of a mountpoint that is a btrfs volume instead of lazy assume / is a subvolume.
		# It would also be nice if it could use findmnt(self.full_path) and traverse backwards
		# finding the last occurrence of a subvolume which 'self' belongs to.
		if volume := subvolume_info_from_path(storage['MOUNT_POINT']):
			return self.full_path == volume.full_path

		return False

	@cached_property
	def partition(self):
		from ..helpers import findmnt, get_parent_of_partition, all_blockdevices
		from ..partition import Partition
		from ..blockdevice import BlockDevice
		from ..mapperdev import MapperDev
		from .btrfspartition import BTRFSPartition
		from .btrfs_helpers import subvolume_info_from_path

		try:
			# If the subvolume is mounted, it's pretty trivial to lookup the partition (parent) device.
			if filesystem := findmnt(self.full_path).get('filesystems', []):
				if source := filesystem[0].get('source', None):
					# Strip away subvolume definitions from findmnt
					if '[' in source:
						source = source[:source.find('[')]

					if filesystem[0].get('fstype', '') == 'btrfs':
						return BTRFSPartition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
					elif filesystem[0].get('source', '').startswith('/dev/mapper'):
						return MapperDev(source)
					else:
						return Partition(source, BlockDevice(get_parent_of_partition(pathlib.Path(source))))
		except DiskError:
			# Subvolume has never been mounted, we have no reliable way of finding where it is.
			# But we have the UUID of the partition, and can begin looking for it by mounting
			# all blockdevices that we can reliably support.. This is taxing tho and won't cover all devices.

			log(f"Looking up {self}, this might take time.", fg="orange", level=logging.WARNING)
			for blockdevice, instance in all_blockdevices(mappers=True, partitions=True, error=True).items():
				if type(instance) in (Partition, MapperDev):
					we_mounted_it = False
					detection_mountpoint = instance.mountpoint
					if not detection_mountpoint:
						if type(instance) == Partition and instance.encrypted:
							# TODO: Perhaps support unlocking encrypted volumes?
							# This will cause a lot of potential user interactions tho.
							log(f"Ignoring {blockdevice} because it's encrypted.", fg="gray", level=logging.DEBUG)
							continue

						detection_mountpoint = pathlib.Path(f"/tmp/{''.join([random.choice(string.ascii_letters) for x in range(20)])}")
						detection_mountpoint.mkdir(parents=True, exist_ok=True)

						instance.mount(str(detection_mountpoint))
						we_mounted_it = True

					if (filesystem := findmnt(detection_mountpoint)) and (filesystem := filesystem.get('filesystems', [])):
						if subvolume := subvolume_info_from_path(filesystem[0]['target']):
							if subvolume.uuid == self.uuid:
								# The top level subvolume matched of ourselves,
								# which means the instance we're iterating has the subvol we're looking for.
								log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
								return instance

						def iterate_children(struct):
							for child in struct.get('children', []):
								if '[' in child.get('source', ''):
									yield subvolume_info_from_path(child['target'])

								for sub_child in iterate_children(child):
									yield sub_child

						for child in iterate_children(filesystem[0]):
							if child.uuid == self.uuid:
								# We found a child within the instance that has the subvol we're looking for.
								log(f"Found the subvolume on device {instance}", level=logging.DEBUG, fg="gray")
								return instance

					if we_mounted_it:
						instance.unmount()
						shutil.rmtree(detection_mountpoint)

	@cached_property
	def mount_options(self) -> Optional[List[str]]:
		from ..helpers import findmnt

		if filesystem := findmnt(self.full_path).get('filesystems', []):
			return filesystem[0].get('options').split(',')

	def convert_to_ISO_format(self, time_string):
		time_string_almost_done = time_string.replace(' ', 'T', 1).replace(' ', '')
		iso_string = f"{time_string_almost_done[:-2]}:{time_string_almost_done[-2:]}"
		return iso_string

	def mount(self, mountpoint :pathlib.Path, options=None, include_previously_known_options=True):
		from ..helpers import findmnt

		try:
			if mnt_info := findmnt(pathlib.Path(mountpoint), traverse=False):
				log(f"Unmounting {mountpoint} as it was already mounted using {mnt_info}")
				SysCommand(f"umount {mountpoint}")
		except DiskError:
			# No previously mounted device at the mountpoint
			pass

		if not options:
			options = []

		try:
			if include_previously_known_options and (cached_options := self.mount_options):
				options += cached_options
		except DiskError:
			pass

		if not any('subvol=' in x for x in options):
			options += f'subvol={self.name}'

		SysCommand(f"mount {self.partition.path} {mountpoint} -o {','.join(options)}")
		log(f"{self} has successfully been mounted to {mountpoint}", level=logging.INFO, fg="gray")

	def unmount(self, recurse :bool = True):
		SysCommand(f"umount {'-R' if recurse else ''} {self.full_path}")
		log(f"Successfully unmounted {self}", level=logging.INFO, fg="gray")