Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/mirrors.py
blob: 62a0b081d022b9cc0c7b366b975762edaecafc9a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import pathlib
import urllib.error
import urllib.request
from typing import Union, Iterable, Dict, Any, List
from dataclasses import dataclass

from .general import SysCommand
from .output import info, warn
from .exceptions import SysCallError
from .storage import storage


@dataclass
class CustomMirror:
	url: str
	signcheck: str
	signoptions: str
	name: str


def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes:
	"""
	This function can sort /etc/pacman.d/mirrorlist according to the
	mirror's URL prefix. By default places HTTPS before HTTP but it also
	preserves the country/rank-order.

	This assumes /etc/pacman.d/mirrorlist looks like the following:

	## Comment
	Server = url

	or

	## Comment
	#Server = url

	But the Comments need to start with double-hashmarks to be distringuished
	from server url definitions (commented or uncommented).
	"""
	comments_and_whitespaces = b""
	sort_order += ['Unknown']
	categories: Dict[str, List] = {key: [] for key in sort_order}

	for line in raw_data.split(b"\n"):
		if line[0:2] in (b'##', b''):
			comments_and_whitespaces += line + b'\n'
		elif line[:6].lower() == b'server' or line[:7].lower() == b'#server':
			opening, url = line.split(b'=', 1)
			opening, url = opening.strip(), url.strip()
			if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories:
				categories[category].append(comments_and_whitespaces)
				categories[category].append(opening + b' = ' + url + b'\n')
			else:
				categories["Unknown"].append(comments_and_whitespaces)
				categories["Unknown"].append(opening + b' = ' + url + b'\n')

			comments_and_whitespaces = b""

	new_raw_data = b''
	for category in sort_order + ["Unknown"]:
		for line in categories[category]:
			new_raw_data += line

	return new_raw_data


def filter_mirrors_by_region(regions :str,
	destination :str = '/etc/pacman.d/mirrorlist',
	sort_order :List[str] = ["https", "http"],
	*args :str,
	**kwargs :str
) -> Union[bool, bytes]:
	"""
	This function will change the active mirrors on the live medium by
	filtering which regions are active based on `regions`.

	:param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States.
	:type regions: str
	"""
	region_list = [f'country={region}' for region in regions.split(',')]
	response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'}))
	new_list = response.read().replace(b"#Server", b"Server")

	if sort_order:
		new_list = sort_mirrorlist(new_list, sort_order=sort_order)

	if destination:
		with open(destination, "wb") as mirrorlist:
			mirrorlist.write(new_list)

		return True
	else:
		return new_list.decode('UTF-8')


def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool:
	"""
	This will append custom mirror definitions in pacman.conf

	:param mirrors: A list of custom mirrors
	:type mirrors: List[CustomMirror]
	"""
	with open('/etc/pacman.conf', 'a') as pacman:
		for mirror in mirrors:
			pacman.write(f"[{mirror.name}]\n")
			pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n")
			pacman.write(f"Server = {mirror.url}\n")

	return True


def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
	"""
	This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
	It will not flush any other mirrors, just insert new ones.

	:param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}`
	:type mirrors: dict
	"""
	original_mirrorlist = ''
	with open('/etc/pacman.d/mirrorlist', 'r') as original:
		original_mirrorlist = original.read()

	with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist:
		for mirror, country in mirrors.items():
			new_mirrorlist.write(f'## {country}\n')
			new_mirrorlist.write(f'Server = {mirror}\n')
		new_mirrorlist.write('\n')
		new_mirrorlist.write(original_mirrorlist)

	return True


def use_mirrors(
	regions: Dict[str, Iterable[str]],
	destination: str = '/etc/pacman.d/mirrorlist'
):
	info(f'A new package mirror-list has been created: {destination}')
	with open(destination, 'w') as mirrorlist:
		for region, mirrors in regions.items():
			for mirror in mirrors:
				mirrorlist.write(f'## {region}\n')
				mirrorlist.write(f'Server = {mirror}\n')


def re_rank_mirrors(
	top: int = 10,
	src: str = '/etc/pacman.d/mirrorlist',
	dst: str = '/etc/pacman.d/mirrorlist',
) -> bool:
	try:
		cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}")
	except SysCallError:
		return False
	with open(dst, 'w') as f:
		f.write(str(cmd))
	return True


def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
	regions: Dict[str, Dict[str, Any]] = {}

	if storage['arguments']['offline']:
		with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh:
			mirrorlist = fh.read()
	else:
		url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"

		try:
			response = urllib.request.urlopen(url)
		except urllib.error.URLError as err:
			warn(f'Could not fetch an active mirror-list: {err}')
			return regions

		mirrorlist = response.read()

	if sort_order:
		mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order)

	region = 'Unknown region'
	for line in mirrorlist.split(b'\n'):
		if len(line.strip()) == 0:
			continue

		clean_line = line.decode('UTF-8').strip('\n').strip('\r')

		if clean_line[:3] == '## ':
			region = clean_line[3:]
		elif clean_line[:10] == '#Server = ':
			regions.setdefault(region, {})

			url = clean_line.lstrip('#Server = ')
			regions[region][url] = True
		elif clean_line.startswith('Server = '):
			regions.setdefault(region, {})

			url = clean_line.lstrip('Server = ')
			regions[region][url] = True

	return regions