1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
|
import pathlib
import urllib.error
import urllib.request
from typing import Union, Iterable, Dict, Any, List
from dataclasses import dataclass
from .general import SysCommand
from .output import info, warn
from .exceptions import SysCallError
from .storage import storage
@dataclass
class CustomMirror:
url: str
signcheck: str
signoptions: str
name: str
def sort_mirrorlist(raw_data :bytes, sort_order: List[str] = ['https', 'http']) -> bytes:
"""
This function can sort /etc/pacman.d/mirrorlist according to the
mirror's URL prefix. By default places HTTPS before HTTP but it also
preserves the country/rank-order.
This assumes /etc/pacman.d/mirrorlist looks like the following:
## Comment
Server = url
or
## Comment
#Server = url
But the Comments need to start with double-hashmarks to be distringuished
from server url definitions (commented or uncommented).
"""
comments_and_whitespaces = b""
sort_order += ['Unknown']
categories: Dict[str, List] = {key: [] for key in sort_order}
for line in raw_data.split(b"\n"):
if line[0:2] in (b'##', b''):
comments_and_whitespaces += line + b'\n'
elif line[:6].lower() == b'server' or line[:7].lower() == b'#server':
opening, url = line.split(b'=', 1)
opening, url = opening.strip(), url.strip()
if (category := url.split(b'://',1)[0].decode('UTF-8')) in categories:
categories[category].append(comments_and_whitespaces)
categories[category].append(opening + b' = ' + url + b'\n')
else:
categories["Unknown"].append(comments_and_whitespaces)
categories["Unknown"].append(opening + b' = ' + url + b'\n')
comments_and_whitespaces = b""
new_raw_data = b''
for category in sort_order + ["Unknown"]:
for line in categories[category]:
new_raw_data += line
return new_raw_data
def filter_mirrors_by_region(regions :str,
destination :str = '/etc/pacman.d/mirrorlist',
sort_order :List[str] = ["https", "http"],
*args :str,
**kwargs :str
) -> Union[bool, bytes]:
"""
This function will change the active mirrors on the live medium by
filtering which regions are active based on `regions`.
:param regions: A series of country codes separated by `,`. For instance `SE,US` for sweden and United States.
:type regions: str
"""
region_list = [f'country={region}' for region in regions.split(',')]
response = urllib.request.urlopen(urllib.request.Request(f"https://archlinux.org/mirrorlist/?{'&'.join(region_list)}&protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on'", headers={'User-Agent': 'ArchInstall'}))
new_list = response.read().replace(b"#Server", b"Server")
if sort_order:
new_list = sort_mirrorlist(new_list, sort_order=sort_order)
if destination:
with open(destination, "wb") as mirrorlist:
mirrorlist.write(new_list)
return True
else:
return new_list.decode('UTF-8')
def add_custom_mirrors(mirrors: List[CustomMirror]) -> bool:
"""
This will append custom mirror definitions in pacman.conf
:param mirrors: A list of custom mirrors
:type mirrors: List[CustomMirror]
"""
with open('/etc/pacman.conf', 'a') as pacman:
for mirror in mirrors:
pacman.write(f"[{mirror.name}]\n")
pacman.write(f"SigLevel = {mirror.signcheck} {mirror.signoptions}\n")
pacman.write(f"Server = {mirror.url}\n")
return True
def insert_mirrors(mirrors :Dict[str, Any], *args :str, **kwargs :str) -> bool:
"""
This function will insert a given mirror-list at the top of `/etc/pacman.d/mirrorlist`.
It will not flush any other mirrors, just insert new ones.
:param mirrors: A dictionary of `{'url' : 'country', 'url2' : 'country'}`
:type mirrors: dict
"""
original_mirrorlist = ''
with open('/etc/pacman.d/mirrorlist', 'r') as original:
original_mirrorlist = original.read()
with open('/etc/pacman.d/mirrorlist', 'w') as new_mirrorlist:
for mirror, country in mirrors.items():
new_mirrorlist.write(f'## {country}\n')
new_mirrorlist.write(f'Server = {mirror}\n')
new_mirrorlist.write('\n')
new_mirrorlist.write(original_mirrorlist)
return True
def use_mirrors(
regions: Dict[str, Iterable[str]],
destination: str = '/etc/pacman.d/mirrorlist'
):
info(f'A new package mirror-list has been created: {destination}')
with open(destination, 'w') as mirrorlist:
for region, mirrors in regions.items():
for mirror in mirrors:
mirrorlist.write(f'## {region}\n')
mirrorlist.write(f'Server = {mirror}\n')
def re_rank_mirrors(
top: int = 10,
src: str = '/etc/pacman.d/mirrorlist',
dst: str = '/etc/pacman.d/mirrorlist',
) -> bool:
try:
cmd = SysCommand(f"/usr/bin/rankmirrors -n {top} {src}")
except SysCallError:
return False
with open(dst, 'w') as f:
f.write(str(cmd))
return True
def list_mirrors(sort_order :List[str] = ["https", "http"]) -> Dict[str, Any]:
regions: Dict[str, Dict[str, Any]] = {}
if storage['arguments']['offline']:
with pathlib.Path('/etc/pacman.d/mirrorlist').open('rb') as fh:
mirrorlist = fh.read()
else:
url = "https://archlinux.org/mirrorlist/?protocol=https&protocol=http&ip_version=4&ip_version=6&use_mirror_status=on"
try:
response = urllib.request.urlopen(url)
except urllib.error.URLError as err:
warn(f'Could not fetch an active mirror-list: {err}')
return regions
mirrorlist = response.read()
if sort_order:
mirrorlist = sort_mirrorlist(mirrorlist, sort_order=sort_order)
region = 'Unknown region'
for line in mirrorlist.split(b'\n'):
if len(line.strip()) == 0:
continue
clean_line = line.decode('UTF-8').strip('\n').strip('\r')
if clean_line[:3] == '## ':
region = clean_line[3:]
elif clean_line[:10] == '#Server = ':
regions.setdefault(region, {})
url = clean_line.lstrip('#Server = ')
regions[region][url] = True
elif clean_line.startswith('Server = '):
regions.setdefault(region, {})
url = clean_line.lstrip('Server = ')
regions[region][url] = True
return regions
|