Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Hvornum <anton.feeds+github@gmail.com>2020-06-29 08:55:25 +0000
committerAnton Hvornum <anton.feeds+github@gmail.com>2020-06-29 08:55:25 +0000
commit5ae18b80fd118cc07108694feb81e5e02a9b18f3 (patch)
treeae19f5234e465492044052a8ef4bf6029e42c1d3
parent89ecdee763677fd455fc9ae722632928047a791d (diff)
Starting to rework the entire codebase to be context friendly. Annotations is next after one successful install.
-rw-r--r--.gitignore2
-rw-r--r--archinstall.py1510
-rw-r--r--exceptions.py4
-rw-r--r--helpers/__init__.py0
-rw-r--r--helpers/disk.py109
-rw-r--r--helpers/general.py206
-rw-r--r--helpers/user_interaction.py17
-rw-r--r--installer.py22
8 files changed, 376 insertions, 1494 deletions
diff --git a/.gitignore b/.gitignore
index 45939095..38e33532 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
**/**__pycache__
SAFETY_LOCK
+**/**old.*
+**/**.img
diff --git a/archinstall.py b/archinstall.py
index 086585fb..c8df36c2 100644
--- a/archinstall.py
+++ b/archinstall.py
@@ -1,1499 +1,21 @@
-#!/usr/bin/python3
-import traceback
-import os, re, struct, sys, json, pty, shlex
-import urllib.request, urllib.parse, ssl, signal
-import time
-from glob import glob
-from select import epoll, EPOLLIN, EPOLLHUP
-from socket import socket, inet_ntoa, AF_INET, AF_INET6, AF_PACKET
-from collections import OrderedDict as oDict
-from subprocess import Popen, STDOUT, PIPE, check_output
-from random import choice
-from string import ascii_uppercase, ascii_lowercase, digits
-from hashlib import sha512
-from threading import Thread, enumerate as tenum
+import os, stat
-if os.path.isfile('./SAFETY_LOCK'):
- SAFETY_LOCK = True
-else:
- SAFETY_LOCK = False
+from exceptions import *
+from helpers.disk import *
+from helpers.general import *
+from helpers.user_interaction import *
-profiles_path = 'https://raw.githubusercontent.com/Torxed/archinstall/master/deployments'
-rootdir_pattern = re.compile('^.*?/devices')
-harddrives = oDict()
-commandlog = []
-worker_history = oDict()
-instructions = oDict()
-args = {}
+class HardDrive():
+ def __init__(self, full_path:str, *args, **kwargs):
+ if not stat.S_ISBLK(os.stat(full_path).st_mode):
+ raise DiskError(f'Selected disk "{full_path}" is not a block device.')
-create_log = True
+class installer():
+ def __init__(self, partition, *, profile=None, hostname='ArchInstalled'):
+ self.profile = profile
+ self.hostname = hostname
-try:
- if 'log' in __builtins__.__dict__:
- create_log = False
-except:
- if 'log' in __builtins__:
- create_log = False
+ self.partition = partition
-if create_log:
- import logging
- from systemd.journal import JournalHandler
-
- # Custom adapter to pre-pend the 'origin' key.
- # TODO: Should probably use filters: https://docs.python.org/3/howto/logging-cookbook.html#using-filters-to-impart-contextual-information
- class CustomAdapter(logging.LoggerAdapter):
- def process(self, msg, kwargs):
- return '[{}] {}'.format(self.extra['origin'], msg), kwargs
-
- logger = logging.getLogger() # __name__
- journald_handler = JournalHandler()
- journald_handler.setFormatter(logging.Formatter('[{levelname}] {message}', style='{'))
- logger.addHandler(journald_handler)
- logger.setLevel(logging.DEBUG)
-
- class LOG_LEVELS:
- CRITICAL = 1
- ERROR = 2
- WARNING = 3
- INFO = 4
- DEBUG = 5
-
- LOG_LEVEL = 4
- def log(*msg, origin='UNKNOWN', level=5, **kwargs):
- if level <= LOG_LEVEL:
- msg = [item.decode('UTF-8', errors='backslashreplace') if type(item) == bytes else item for item in msg]
- msg = [str(item) if type(item) != str else item for item in msg]
- log_adapter = CustomAdapter(logger, {'origin': origin})
- if level <= 1:
- log_adapter.critical(' '.join(msg))
- elif level <= 2:
- log_adapter.error(' '.join(msg))
- elif level <= 3:
- log_adapter.warning(' '.join(msg))
- elif level <= 4:
- log_adapter.info(' '.join(msg))
- else:
- log_adapter.debug(' '.join(msg))
-
-## == Profiles Path can be set via --profiles-path=/path
-## This just sets the default path if the parameter is omitted.
-try:
- import psutil
-except:
- ## Time to monkey patch in all the stats and psutil fuctions if it isn't installed.
-
- class mem():
- def __init__(self, free, percent=-1):
- self.free = free
- self.percent = percent
-
- class disk():
- def __init__(self, size, free, percent):
- self.total = size
- self.used = 0
- self.free = free
- self.percent = percent
-
- class iostat():
- def __init__(self, interface, bytes_sent=0, bytes_recv=0):
- self.interface = interface
- self.bytes_recv = int(bytes_recv)
- self.bytes_sent = int(bytes_sent)
- def __repr__(self, *positionals, **kwargs):
- return f'iostat@{self.interface}[bytes_sent: {self.bytes_sent}, bytes_recv: {self.bytes_recv}]'
-
- class psutil():
- def cpu_percent(interval=0):
- ## This just counts the ammount of time the CPU has spent. Find a better way!
- with cmd("grep 'cpu ' /proc/stat | awk '{usage=($2+$4)*100/($2+$4+$5)} END {print usage}'") as output:
- for line in output:
- return float(line.strip().decode('UTF-8'))
-
- def virtual_memory():
- with cmd("grep 'MemFree: ' /proc/meminfo | awk '{free=($2)} END {print free}'") as output:
- for line in output:
- return mem(float(line.strip().decode('UTF-8')))
-
- def disk_usage(partition):
- disk_stats = os.statvfs(partition)
- free_size = disk_stats.f_bfree * disk_stats.f_bsize
- disk_size = disk_stats.f_blocks * disk_stats.f_bsize
- percent = (100/disk_size)*free_size
- return disk(disk_size, free_size, percent)
-
- def net_if_addrs():
- interfaces = {}
- for root, folders, files in os.walk('/sys/class/net/'):
- for name in folders:
- interfaces[name] = {}
- return interfaces
-
- def net_io_counters(pernic=True):
- data = {}
- for interface in psutil.net_if_addrs().keys():
- with cmd("grep '{interface}:' /proc/net/dev | awk '{{recv=$2}}{{send=$10}} END {{print send,recv}}'".format(interface=interface)) as output:
- for line in output:
- data[interface] = iostat(interface, *line.strip().decode('UTF-8').split(' ',1))
- return data
-
-
-## FIXME: dependency checks (fdisk, lsblk etc)
-def sig_handler(signal, frame):
- print('\nAborting further installation steps!')
- print(' Here\'s a summary of the commandline:')
- print(f' {sys.argv}')
-
- exit(0)
-signal.signal(signal.SIGINT, sig_handler)
-
-def gen_uid(entropy_length=256):
- return sha512(os.urandom(entropy_length)).hexdigest()
-
-def get_default_gateway_linux(*positionals, **kwargs):
- """Read the default gateway directly from /proc."""
- with open("/proc/net/route") as fh:
- for line in fh:
- fields = line.strip().split()
- if fields[1] != '00000000' or not int(fields[3], 16) & 2:
- continue
-
- return inet_ntoa(struct.pack("<L", int(fields[2], 16)))
-
-def get_local_MACs():
- macs = {}
- for nic, opts in psutil.net_if_addrs().items():
- for addr in opts:
- #if addr.family in (AF_INET, AF_INET6) and addr.address:
- if addr.family == AF_PACKET: # MAC
- macs[addr.address] = nic
- return macs
-
-def gen_yubikey_password():
- return None #TODO: Implement
-
-def pid_exists(pid):
- """Check whether pid exists in the current process table."""
- if pid < 0:
- return False
- try:
- os.kill(pid, 0)
- except (OSError, e):
- return e.errno == errno.EPERMRM
- else:
- return True
-
-def simple_command(cmd, opts=None, *positionals, **kwargs):
- if not opts: opts = {}
- if 'debug' in opts:
- print('[!] {}'.format(cmd))
- handle = Popen(cmd, shell='True', stdout=PIPE, stderr=STDOUT, stdin=PIPE)
- output = b''
- while handle.poll() is None:
- data = handle.stdout.read()
- if len(data):
- if 'debug' in opts:
- print(data.decode('UTF-8'), end='')
- # print(data.decode('UTF-8'), end='')
- output += data
- data = handle.stdout.read()
- if 'debug' in opts:
- print(data.decode('UTF-8'), end='')
- output += data
- handle.stdin.close()
- handle.stdout.close()
- return output
-
-class sys_command():#Thread):
- def __init__(self, cmd, callback=None, start_callback=None, *positionals, **kwargs):
- if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid()
- if not 'emulate' in kwargs: kwargs['emulate'] = SAFETY_LOCK
- #Thread.__init__(self)
- if kwargs['emulate']:
- print(f"Starting command '{cmd}' in emulation mode.")
- self.cmd = shlex.split(cmd)
- self.args = args
- self.kwargs = kwargs
- if not 'worker' in self.kwargs: self.kwargs['worker'] = None
- self.callback = callback
- self.pid = None
- self.exit_code = None
- self.started = time.time()
- self.ended = None
- self.worker_id = kwargs['worker_id']
- self.trace_log = b''
- self.status = 'starting'
-
- user_catalogue = os.path.expanduser('~')
- self.cwd = f"{user_catalogue}/archinstall/cache/workers/{kwargs['worker_id']}/"
- self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir'
-
- if not self.cmd[0][0] == '/':
- log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
- o = check_output(['/usr/bin/which', self.cmd[0]])
- log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
- self.cmd[0] = o.decode('UTF-8').strip()
-
- if not os.path.isdir(self.exec_dir):
- os.makedirs(self.exec_dir)
-
- if self.kwargs['emulate']:
- commandlog.append(cmd + ' # (emulated)')
- elif 'hide_from_log' in self.kwargs and self.kwargs['hide_from_log']:
- pass
- else:
- commandlog.append(cmd)
- if start_callback: start_callback(self, *positionals, **kwargs)
- #self.start()
- self.run()
-
- def __iter__(self, *positionals, **kwargs):
- for line in self.trace_log.split(b'\n'):
- yield line
-
- def __repr__(self, *positionals, **kwargs):
- return f"{self.cmd, self.trace_log}"
-
- def decode(self, fmt='UTF-8'):
- return self.trace_log.decode(fmt)
-
- def dump(self):
- return {
- 'status' : self.status,
- 'worker_id' : self.worker_id,
- 'worker_result' : self.trace_log.decode('UTF-8'),
- 'started' : self.started,
- 'ended' : self.ended,
- 'started_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
- 'ended_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
- 'exit_code' : self.exit_code
- }
-
- def run(self):
- #main = None
- #for t in tenum():
- # if t.name == 'MainThread':
- # main = t
- # break
-
- #if not main:
- # print('Main thread not existing')
- # return
-
- self.status = 'running'
- old_dir = os.getcwd()
- os.chdir(self.exec_dir)
- self.pid, child_fd = pty.fork()
- if not self.pid: # Child process
- # Replace child process with our main process
- if not self.kwargs['emulate']:
- try:
- os.execv(self.cmd[0], self.cmd)
- except FileNotFoundError:
- self.status = 'done'
- log(f"{self.cmd[0]} does not exist.", origin='spawn', level=2)
- self.exit_code = 1
- return False
-
- os.chdir(old_dir)
-
- poller = epoll()
- poller.register(child_fd, EPOLLIN | EPOLLHUP)
-
- if 'events' in self.kwargs and 'debug' in self.kwargs:
- print(f'[D] Using triggers for command: {self.cmd}')
- print(json.dumps(self.kwargs['events']))
-
- alive = True
- last_trigger_pos = 0
- while alive and not self.kwargs['emulate']:
- for fileno, event in poller.poll(0.1):
- try:
- output = os.read(child_fd, 8192).strip()
- self.trace_log += output
- except OSError:
- alive = False
- break
-
- if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
- print(self.cmd[0], 'gave:', output.decode('UTF-8'))
- log(self.cmd[0],'gave:', output.decode('UTF-8'), origin='spawn', level=4)
-
- if 'on_output' in self.kwargs:
- self.kwargs['on_output'](self.kwargs['worker'], output)
-
- lower = output.lower()
- broke = False
- if 'events' in self.kwargs:
- for trigger in list(self.kwargs['events']):
- if type(trigger) != bytes:
- original = trigger
- trigger = bytes(original, 'UTF-8')
- self.kwargs['events'][trigger] = self.kwargs['events'][original]
- del(self.kwargs['events'][original])
- if type(self.kwargs['events'][trigger]) != bytes:
- self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
-
- if trigger.lower() in self.trace_log[last_trigger_pos:].lower():
- trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
-
- if 'debug' in self.kwargs and self.kwargs['debug']:
- print(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}")
- log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", origin='spawn', level=5)
-
- last_trigger_pos = trigger_pos
- os.write(child_fd, self.kwargs['events'][trigger])
- del(self.kwargs['events'][trigger])
- broke = True
- break
-
- if broke:
- continue
-
- ## Adding a exit trigger:
- if len(self.kwargs['events']) == 0:
- if 'debug' in self.kwargs and self.kwargs['debug']:
- log(f"Waiting for last command {self.cmd[0]} to finish.", origin='spawn', level=4)
-
- if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
- if 'debug' in self.kwargs and self.kwargs['debug']:
- log(f"{self.cmd[0]} has finished.", origin='spawn', level=4)
- alive = False
- break
-
- self.status = 'done'
-
- if 'debug' in self.kwargs and self.kwargs['debug']:
- log(f"{self.cmd[0]} waiting for exit code.", origin='spawn', level=5)
-
- if not self.kwargs['emulate']:
- try:
- self.exit_code = os.waitpid(self.pid, 0)[1]
- except ChildProcessError:
- try:
- self.exit_code = os.waitpid(child_fd, 0)[1]
- except ChildProcessError:
- self.exit_code = 1
- else:
- self.exit_code = 0
-
- if 'ignore_errors' in self.kwargs:
- self.exit_code = 0
-
- if self.exit_code != 0:
- log(f"{self.cmd} did not exit gracefully, exit code {self.exit_code}.", origin='spawn', level=3)
- log(self.trace_log.decode('UTF-8'), origin='spawn', level=3)
- else:
- log(f"{self.cmd[0]} exit nicely.", origin='spawn', level=5)
-
- self.ended = time.time()
- with open(f'{self.cwd}/trace.log', 'wb') as fh:
- fh.write(self.trace_log)
-
- worker_history[self.worker_id] = self.dump()
-
- if 'dependency' in self.kwargs:
- pass # TODO: Not yet supported (steal it from archinstall_gui)
- """
- dependency = self.kwargs['dependency']
- if type(dependency) == str:
- # Dependency is a progress-string. Wait for it to show up.
- while main and main.isAlive() and dependency not in progress or progress[dependency] is None:
- time.sleep(0.25)
- dependency = progress[dependency]
-
- if type(dependency) == str:
- log(f"{self.func} waited for progress {dependency} which never showed up. Aborting.", level=2, origin='worker', function='run')
- self.ended = time.time()
- self.status = 'aborted'
- return None
-
- while main and main.isAlive() and dependency.ended is None:
- time.sleep(0.25)
-
- print(' *** Dependency released for:', self.func)
-
- if dependency.data is None or not main or not main.isAlive():
- log('Dependency:', dependency.func, 'did not exit clearly. There for,', self.func, 'can not execute.', level=2, origin='worker', function='run')
- self.ended = time.time()
- self.status = 'aborted'
- return None
- """
-
- if self.callback:
- pass # TODO: Not yet supported (steal it from archinstall_gui)
-
- #self.callback(self, *self.args, **self.kwargs)
-
-def get_drive_from_uuid(drive):
- if len(harddrives) <= 0: raise ValueError("No hard drives to iterate in order to find: {}".format(uuid))
-
- for drive in harddrives:
- #for partition in psutil.disk_partitions('/dev/{}'.format(name)):
- # pass #blkid -s PARTUUID -o value /dev/sda2
- o = simple_command(f'blkid -s PTUUID -o value /dev/{drive}')
- if len(o) and o == uuid:
- return drive
-
- return None
-
-def get_drive_from_part_uuid(partuuid, *positionals, **kwargs):
- if len(harddrives) <= 0: raise ValueError("No hard drives to iterate in order to find: {}".format(uuid))
-
- for drive in harddrives:
- for partition in get_partitions(f'/dev/{drive}', *positionals, **kwargs):
- o = simple_command(f'blkid -s PARTUUID -o value /dev/{drive}{partition}')
- if len(o) and o == partuuid:
- return drive
-
- return None
-
-def set_password(user, password, *positionals, **kwargs):
- if not SAFETY_LOCK:
- o = simple_command("/usr/bin/arch-chroot /mnt sh -c \"echo 'root:{pin}' | chpasswd\"".format(**args, pin=args['password']))
- return True
-
-def update_git(branch='master'):
- default_gw = get_default_gateway_linux()
- if(default_gw):
- print('[N] Checking for updates...')
- ## Not the most elegant way to make sure git conflicts doesn't occur (yea fml)
- if os.path.isfile('/root/archinstall/archinstall.py'):
- os.remove('/root/archinstall/archinstall.py')
- if os.path.isfile('/root/archinstall/README.md'):
- os.remove('/root/archinstall/README.md')
-
- output = simple_command('(cd /root/archinstall; git reset --hard origin/$(git branch | grep "*" | cut -d\' \' -f 2); git pull)')
-
- if b'error:' in output:
- print('[N] Could not update git source for some reason.')
- return
-
- # b'From github.com:Torxed/archinstall\n 339d687..80b97f3 master -> origin/master\nUpdating 339d687..80b97f3\nFast-forward\n README.md | 2 +-\n 1 file changed, 1 insertion(+), 1 deletion(-)\n'
- if output != b'Already up to date' or branch != 'master':
- #tmp = re.findall(b'[0-9]+ file changed', output)
- #print(tmp)
- #if len(tmp):
- # num_changes = int(tmp[0].split(b' ',1)[0])
- # if(num_changes):
-
- if branch != 'master':
- on_branch = simple_command('(cd /root/archinstall; git branch | grep "*" | cut -d\' \' -f 2)').decode('UTF-8').strip()
- if on_branch.lower() != branch.lower():
- print(f'[N] Changing branch from {on_branch} to {branch}')
- output = simple_command(f'(cd /root/archinstall; git checkout {branch}; git pull)')
- print('[N] Rebooting the new branch')
- if not 'rebooted' in args:
- os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted','--rerun'])
- else:
- os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rerun',])
-
- if not 'rebooted' in args:
- ## Reboot the script (in same context)
- print('[N] Rebooting the script')
- os.execv('/usr/bin/python3', ['archinstall.py'] + sys.argv + ['--rebooted',])
- extit(1)
-
-def device_state(name, *positionals, **kwargs):
- # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
- if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
- with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
- if f.read(1) == '1':
- return
-
- path = rootdir_pattern.sub('', os.readlink('/sys/block/{}'.format(name)))
- hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire")
- for bus in hotplug_buses:
- if os.path.exists('/sys/bus/{}'.format(bus)):
- for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)):
- device_link = rootdir_pattern.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus)))
- if re.search(device_link, path):
- return
- return True
-
-def get_partitions(dev, *positionals, **kwargs):
- drive_name = os.path.basename(dev)
- parts = oDict()
- #o = b''.join(sys_command('/usr/bin/lsblk -o name -J -b {dev}'.format(dev=dev)))
- o = b''.join(sys_command(f'/usr/bin/lsblk -J {dev}', hide_from_log=True))
- if b'not a block device' in o:
- ## TODO: Replace o = sys_command() with code, o = sys_command()
- ## and make sys_command() return the exit-code, way safer than checking output strings :P
- return {}
-
- if not o[:1] == b'{':
- print('[E] Error in getting blk devices:', o)
- exit(1)
-
- r = json.loads(o.decode('UTF-8'))
- if len(r['blockdevices']) and 'children' in r['blockdevices'][0]:
- for part in r['blockdevices'][0]['children']:
- #size = os.statvfs(dev + part['name'][len(drive_name):])
- parts[part['name'][len(drive_name):]] = {
- #'size' : size.f_frsize * size.f_bavail,
- #'blocksize' : size.f_frsize * size.f_blocks
- 'size' : part['size']
- }
-
- return parts
-
-def get_disk_model(drive):
- with open(f'/sys/block/{os.path.basename(drive)}/device/model', 'rb') as fh:
- return fh.read().decode('UTF-8').strip()
-
-def get_disk_size(drive):
- dev_short_name = os.path.basename(drive)
- with open(f'/sys/block/{dev_short_name}/device/block/{dev_short_name}/size', 'rb') as fh:
- return ''.join(human_readable_size(fh.read().decode('UTF-8').strip()))
-
-def disk_info(drive, *positionals, **kwargs):
- lkwargs = {**kwargs}
- lkwargs['emulate'] = False # This is a emulate-safe function. Does not alter filesystem.
-
- info = json.loads(b''.join(sys_command(f'lsblk -J -o "NAME,SIZE,FSTYPE,LABEL" {drive}', *positionals, **lkwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices'][0]
- fileformats = []
- labels = []
- if 'children' in info: ## Might not be partitioned yet
- for child in info['children']:
- if child['fstype'] != None:
- fileformats.append(child['fstype'])
- if child['label'] != None:
- labels.append(child['label'])
- else:
- fileformats = ['*Empty Drive*']
- labels = ['(no partitions)']
- info['fileformats'] = fileformats
- info['labels'] = labels
- info['model'] = get_disk_model(drive)
-
- return info
-
-def cleanup_args(*positionals, **kwargs):
- for key in args:
- if args[key] == '<STDIN>':
- if not args['unattended']:
- if 'input_redirect' in kwargs:
- args[key] = kwargs['input_redirect'](key)
- else:
- args[key] = input(f'Enter a value for {key}: ')
- else:
- args[key] = random_string(32)
- elif args[key] == '<RND_STR>': args[key] = random_string(32)
- elif args[key] == '<YUBIKEY>':
- args[key] = gen_yubikey_password()
- if not args[key]:
- print('[E] Failed to setup a yubikey password, is it plugged in?')
- exit(1)
-
-def merge_in_includes(instructions, *positionals, **kwargs):
- if 'args' in instructions:
- ## == Recursively fetch instructions if "include" is found under {args: ...}
- while 'include' in instructions['args']:
- includes = instructions['args']['include']
- print('[!] Importing net-deploy target: {}'.format(includes))
- del(instructions['args']['include'])
- if type(includes) in (dict, list):
- for include in includes:
- instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True)
- else:
- instructions = merge_dicts(instructions, get_instructions(includes), *positionals, **kwargs, before=True)
-
- ## Update arguments if we found any
- for key, val in instructions['args'].items():
- args[key] = val
-
- if 'args' in instructions:
- ## TODO: Reuseable code, there's to many get_instructions, merge_dictgs and args updating going on.
- ## Update arguments if we found any
- for key, val in instructions['args'].items():
- args[key] = val
- if 'user_args' in kwargs:
- for key, val in kwargs['user_args'].items():
- args[key] = val
-
- return instructions
-
-
-def update_drive_list(*positionals, **kwargs):
- # https://github.com/karelzak/util-linux/blob/f920f73d83f8fd52e7a14ec0385f61fab448b491/disk-utils/fdisk-list.c#L52
- for path in glob('/sys/block/*/device'):
- name = re.sub('.*/(.*?)/device', '\g<1>', path)
- if device_state(name, *positionals, **kwargs):
- harddrives[f'/dev/{name}'] = disk_info(f'/dev/{name}', *positionals, **kwargs)
-
-def human_readable_size(bits, sizes=[{8 : 'b'}, {1024 : 'kb'}, {1024 : 'mb'}, {1024 : 'gb'}, {1024 : 'tb'}, {1024 : 'zb?'}]):
- # Not needed if using lsblk.
- end_human = None
- for pair in sizes:
- size, human = list(pair.items())[0]
-
- if bits / size > 1:
- bits = bits/size
- end_human = human
- else:
- break
- return bits, end_human
-
-def human_disk_info(drive):
- return {
- 'size' : harddrives[drive]['size'],
- 'fileformat' : harddrives[drive]['fileformats'],
- 'labels' : harddrives[drive]['labels']
- }
-
-def close_disks():
- o = simple_command('/usr/bin/umount -R /mnt/boot')
- o = simple_command('/usr/bin/umount -R /mnt')
- o = simple_command('/usr/bin/cryptsetup close /dev/mapper/luksdev')
-
-def format_disk(drive='drive', start='start', end='size', emulate=False, *positionals, **kwargs):
- drive = args[drive]
- start = args[start]
- end = args[end]
- if not drive:
- raise ValueError('Need to supply a drive path, for instance: /dev/sdx')
-
- if not SAFETY_LOCK:
- # dd if=/dev/random of=args['drive'] bs=4096 status=progress
- # https://github.com/dcantrell/pyparted would be nice, but isn't officially in the repo's #SadPanda
- #if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- # return None
- if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- return None
- if sys_command(f'/usr/bin/parted -s {drive} mkpart primary FAT32 1MiB {start}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- return None
- if sys_command(f'/usr/bin/parted -s {drive} name 1 "EFI"', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- return None
- if sys_command(f'/usr/bin/parted -s {drive} set 1 esp on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- return None
- if sys_command(f'/usr/bin/parted -s {drive} set 1 boot on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- return None
- if sys_command(f'/usr/bin/parted -s {drive} mkpart primary {start} {end}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
- return None
-
- # TODO: grab partitions after each parted/partition step instead of guessing which partiton is which later on.
- # Create one, grab partitions - dub that to "boot" or something. do the next partition, grab that and dub it "system".. or something..
- # This "assumption" has bit me in the ass so many times now I've stoped counting.. Jerker is right.. Don't do it like this :P
-
- return True
-
-def multisplit(s, splitters):
- s = [s,]
- for key in splitters:
- ns = []
- for obj in s:
- x = obj.split(key)
- for index, part in enumerate(x):
- if len(part):
- ns.append(part)
- if index < len(x)-1:
- ns.append(key)
- s = ns
- return s
-
-def grab_url_data(path):
- safe_path = path[:path.find(':')+1]+''.join([item if item in ('/', '?', '=', '&') else urllib.parse.quote(item) for item in multisplit(path[path.find(':')+1:], ('/', '?', '=', '&'))])
- ssl_context = ssl.create_default_context()
- ssl_context.check_hostname = False
- ssl_context.verify_mode=ssl.CERT_NONE
- response = urllib.request.urlopen(safe_path, context=ssl_context)
- return response.read()
-
-def get_application_instructions(target):
- instructions = {}
- try:
- instructions = grab_url_data('{}/applications/{}.json'.format(args['profiles-path'], target)).decode('UTF-8')
- print('[N] Found application instructions for: {}'.format(target))
- except urllib.error.HTTPError:
- print('[N] Could not find remote instructions. yrying local instructions under ./deployments/applications')
- local_path = './deployments/applications' if os.path.isfile('./archinstall.py') else './archinstall/deployments/applications' # Dangerous assumption
- if os.path.isfile(f'{local_path}/{target}.json'):
- with open(f'{local_path}/{target}.json', 'r') as fh:
- instructions = fh.read()
-
- print('[N] Found local application instructions for: {}'.format(target))
- else:
- print('[N] No instructions found for: {}'.format(target))
- return instructions
-
- try:
- instructions = json.loads(instructions, object_pairs_hook=oDict)
- except:
- print('[E] JSON syntax error in {}'.format('{}/applications/{}.json'.format(args['profiles-path'], target)))
- traceback.print_exc()
- exit(1)
-
- return instructions
-
-def get_local_instructions(target):
- instructions = oDict()
- local_path = './deployments' if os.path.isfile('./archinstall.py') else './archinstall/deployments' # Dangerous assumption
- if os.path.isfile(f'{local_path}/{target}.json'):
- with open(f'{local_path}/{target}.json', 'r') as fh:
- instructions = fh.read()
-
- print('[N] Found local instructions called: {}'.format(target))
- else:
- print('[N] No instructions found called: {}'.format(target))
- return instructions
-
-def get_instructions(target, *positionals, **kwargs):
- if not 'profiles-path' in kwargs: kwargs['profiles-path'] = args['profiles-path']
- instructions = oDict()
- if target[0-len('.json'):] == '.json': target = target[:0-len('.json')]
- log(f'Fetching instructions for {target}', level=4, origin='get_instructions')
- if get_default_gateway_linux():
- try:
- instructions = grab_url_data(f"{kwargs['profiles-path']}/{target}.json").decode('UTF-8')
- log(f'Found net-deploy instructions for {target}', level=4, origin='get_instructions')
- print('[N] Found net-deploy instructions called: {}'.format(target))
- except urllib.error.HTTPError:
- print('[N] Could not find remote instructions. Trying local instructions under ./deployments')
- log(f'Could not find remote instructions. Trying local instructions under ./deployments', level=4, origin='get_instructions')
- instructions = get_local_instructions(target, *positionals)
- else:
- instructions = get_local_instructions(target, *positionals)
-
- if type(instructions) not in (dict, oDict,):
- try:
- instructions = json.loads(instructions, object_pairs_hook=oDict)
- except:
- log(f'JSON syntax error in: {target}', level=4, origin='get_instructions')
- print('[E] JSON syntax error in {}'.format('{}/{}.json'.format(kwargs['profiles-path'], target)))
- traceback.print_exc()
- exit(1)
-
- log(f'Final instructions are: {instructions}', level=4, origin='get_instructions')
- return instructions
-
-def merge_dicts(d1, d2, before=True, overwrite=False):
- """ Merges d2 into d1 """
- if before:
- d1, d2 = d2.copy(), d1.copy()
- overwrite = True
-
- for key, val in d2.items():
- if key in d1:
- if type(d1[key]) in [dict, oDict] and type(d2[key]) in [dict, oDict]:
- d1[key] = merge_dicts(d1[key] if not before else d2[key], d2[key] if not before else d1[key], before=before, overwrite=overwrite)
- elif overwrite:
- d1[key] = val
- else:
- d1[key] = val
-
- return d1
-
-def random_string(l):
- return ''.join(choice(ascii_uppercase + ascii_lowercase + digits) for i in range(l))
-
-def phone_home(url):
- payload = json.dumps({"hostname": args['hostname'],
- "done" : time.time(),
- "profile": args['profile'],
- "drive": args['drive'],
- "base_status": base_return_code}).encode('utf8')
- request = urllib.request.Request(url, data=payload, headers={'content-type': 'application/json'})
- response = urllib.request.urlopen(request)
-
-def get_external_ip(*positionals, **kwargs):
- result = urllib.request.urlopen("https://hvornum.se/ip/?f=json").read().decode('UTF-8')
- return json.loads(result)['ip']
-
-def guess_country(ip, *positionals, **kwargs):
- # python-pygeoip
- # geoip-database
- result = None
- GEOIP_DB = '/usr/share/GeoIP/GeoIP.dat'
- if os.path.isfile(GEOIP_DB):
- try:
- import pygeoip
- except:
- ## TODO: Do a best-effort-guess based off the hostname given off the IP instead, if GoeIP doesn't exist.
- return result
-
- gi = pygeoip.GeoIP(GEOIP_DB)
- result = gi.country_code_by_addr(ip)
- else:
- log(f'Missing GeoIP database: {GEOIP_DB}', origin='guess_country', level=LOG_LEVELS.ERROR)
- return result
-
-def setup_args_defaults(args, *positionals, **kwargs):
- if not 'size' in args: args['size'] = '100%'
- if not 'mirrors' in args: args['mirrors'] = True
- if not 'start' in args: args['start'] = '513MiB'
- if not 'pwfile' in args: args['pwfile'] = '/tmp/diskpw'
- if not 'hostname' in args: args['hostname'] = 'Archinstall'
- if not 'packages' in args: args['packages'] = '' # extra packages other than default
- if not 'post' in args: args['post'] = 'reboot'
- if not 'password' in args: args['password'] = '0000' # Default disk passord, can be <STDIN> or a fixed string
- if not 'minimal' in args: args['minimal'] = False
- if not 'unattended' in args: args['unattended'] = False
- if not 'profile' in args: args['profile'] = None
- if not 'skip-encrypt' in args: args['skip-encrypt'] = False
- if not 'profiles-path' in args: args['profiles-path'] = profiles_path
- if not 'rerun' in args: args['rerun'] = None
- if not 'aur-keep' in args: args['aur-keep'] = False
- if not 'aur-support' in args: args['aur-support'] = True # Support adds yay (https://github.com/Jguer/yay) in installation steps.
- if not 'ignore-rerun' in args: args['ignore-rerun'] = False
- if not 'phone-home' in args: args['phone-home'] = False
-
- # Setup locales if we didn't get one.
- if not 'country' in args:
- country = None
- if get_default_gateway_linux():
- ip = get_external_ip()
- country = guess_country(ip)
- args['country'] = 'all' if not country else country
- if not 'localtime' in args: args['localtime'] = 'Europe/Stockholm' if args['country'] == 'SE' else 'GMT+0' # TODO: Arbitrary for now
-
- return args
-
-def load_automatic_instructions(*positionals, **kwargs):
- instructions = oDict()
- if get_default_gateway_linux(*positionals, **kwargs):
- locmac = get_local_MACs()
- if not len(locmac):
- print('[N] No network interfaces - No net deploy.')
- else:
- for mac in locmac:
- instructions = get_instructions(mac, *positionals, **kwargs)
-
- if 'args' in instructions:
- ## == Recursively fetch instructions if "include" is found under {args: ...}
- while 'include' in instructions['args']:
- includes = instructions['args']['include']
- print('[!] Importing net-deploy target: {}'.format(includes))
- del(instructions['args']['include'])
- if type(includes) in (dict, list):
- for include in includes:
- instructions = merge_dicts(instructions, get_instructions(include, *positionals, **kwargs), before=True)
- else:
- instructions = merge_dicts(instructions, get_instructions(includes, *positionals, **kwargs), before=True)
-
- ## Update arguments if we found any
- for key, val in instructions['args'].items():
- args[key] = val
- if 'user_args' in kwargs:
- for key, val in kwargs['user_args'].items():
- args[key] = val
- else:
- print('[N] No gateway - No net deploy')
-
- return instructions
-
-def cache_diskpw_on_disk():
- if not os.path.isfile(args['pwfile']):
- #PIN = '0000'
- with open(args['pwfile'], 'w') as pw:
- pw.write(args['password'])
-
-def refresh_partition_list(drive, *positionals, **kwargs):
- drive = args[drive]
- if not 'partitions' in args:
- args['partitions'] = oDict()
- for index, part_name in enumerate(sorted(get_partitions(drive, *positionals, **kwargs).keys())):
- args['partitions'][str(index+1)] = part_name
- return True
-
-def mkfs_fat32(drive, partition, *positionals, **kwargs):
- drive = args[drive]
- partition = args['partitions'][partition]
- o = b''.join(sys_command(f'/usr/bin/mkfs.vfat -F32 {drive}{partition}'))
- if (b'mkfs.fat' not in o and b'mkfs.vfat' not in o) or b'command not found' in o:
- return None
- return True
-
-def is_luksdev_mounted(*positionals, **kwargs):
- o = b''.join(sys_command('/usr/bin/file /dev/mapper/luksdev', hide_from_log=True)) # /dev/dm-0
- if b'cannot open' in o:
- return False
- return True
-
-def mount_luktsdev(drive, partition, keyfile, *positionals, **kwargs):
- drive = args[drive]
- partition = args['partitions'][partition]
- keyfile = args[keyfile]
- if not is_luksdev_mounted():
- o = b''.join(sys_command(f'/usr/bin/cryptsetup open {drive}{partition} luksdev --key-file {keyfile} --type luks2'.format(**args)))
- return is_luksdev_mounted()
-
-def encrypt_partition(drive, partition, keyfile='/tmp/diskpw', *positionals, **kwargs):
- drive = args[drive]
- partition = args['partitions'][partition]
- keyfile = args[keyfile]
- o = b''.join(sys_command(f'/usr/bin/cryptsetup -q -v --type luks2 --pbkdf argon2i --hash sha512 --key-size 512 --iter-time 10000 --key-file {keyfile} --use-urandom luksFormat {drive}{partition}'))
- if not b'Command successful.' in o:
- return False
- return True
-
-def mkfs_btrfs(drive='/dev/mapper/luksdev', *positionals, **kwargs):
- o = b''.join(sys_command(f'/usr/bin/mkfs.btrfs -f {drive}'))
- if not b'UUID' in o:
- return False
- return True
-
-def mount_luksdev(where='/dev/mapper/luksdev', to='/mnt', *positionals, **kwargs):
- check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt', *positionals, **kwargs).decode('UTF-8').strip()# /dev/dm-0
- if len(check_mounted):
- return False
-
- o = b''.join(sys_command('/usr/bin/mount /dev/mapper/luksdev /mnt', *positionals, **kwargs))
- return True
-
-def mount_part(drive, partition, mountpoint='/mnt', *positionals, **kwargs):
- os.makedirs(mountpoint, exist_ok=True)
- #o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0
-
- check_mounted = simple_command(f'/usr/bin/mount | /usr/bin/grep {mountpoint}', *positionals, **kwargs).decode('UTF-8').strip()
- if len(check_mounted):
- return False
-
- o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs))
- return True
-
-def mount_boot(drive, partition, mountpoint='/mnt/boot', *positionals, **kwargs):
- os.makedirs('/mnt/boot', exist_ok=True)
- #o = b''.join(sys_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs)) # /dev/dm-0
-
- check_mounted = simple_command('/usr/bin/mount | /usr/bin/grep /mnt/boot', *positionals, **kwargs).decode('UTF-8').strip()
- if len(check_mounted):
- return False
-
- o = b''.join(sys_command(f'/usr/bin/mount {drive}{partition} {mountpoint}', *positionals, **kwargs))
- return True
-
-def mount_mountpoints(drive, bootpartition, mountpoint='/mnt', *positionals, **kwargs):
- drive = args[drive]
- if args['skip-encrypt']:
- mount_part(drive, args['partitions']["2"], mountpoint, *positionals, **kwargs)
- else:
- mount_luksdev(*positionals, **kwargs)
- mount_boot(drive, args['partitions'][bootpartition], mountpoint=f'{mountpoint}/boot', *positionals, **kwargs)
- return True
-
-def re_rank_mirrors(top=10, *positionals, **kwargs):
- if (cmd := sys_command((f'/usr/bin/rankmirrors -n {top} /etc/pacman.d/mirrorlist > /etc/pacman.d/mirrorlist'))).exit_code == 0:
- return True
- log(f"Could not re-rank mirrors: {cmd.trace_log}", level=3, origin='re_rank_mirrors')
- return False
-
-def filter_mirrors_by_country_list(countries, top=None, *positionals, **kwargs):
- ## TODO: replace wget with urllib.request (no point in calling syscommand)
- country_list = []
- for country in countries.split(','):
- country_list.append(f'country={country}')
-
- if not SAFETY_LOCK:
- o = b''.join(sys_command((f"/usr/bin/wget 'https://www.archlinux.org/mirrorlist/?{'&'.join(country_list)}&protocol=https&ip_version=4&ip_version=6&use_mirror_status=on' -O /root/mirrorlist")))
- o = b''.join(sys_command(("/usr/bin/sed -i 's/#Server/Server/' /root/mirrorlist")))
- o = b''.join(sys_command(("/usr/bin/mv /root/mirrorlist /etc/pacman.d/")))
-
- if top:
- re_rank_mirrors(top, *positionals, **kwargs) or not os.path.isfile('/etc/pacman.d/mirrorlist')
-
- return True
-
-def add_custom_mirror(name, url, *positionals, **kwargs):
- if not SAFETY_LOCK:
- commandlog.append('# Adding custom mirror to /etc/pacman.conf')
- with open('/etc/pacman.conf', 'a') as mirrorlist:
- commandlog.append(f'# {name} @ {url}')
- mirrorlist.write('\n')
- mirrorlist.write(f'[{name}]\n')
- mirrorlist.write(f'Server = {url}\n')
- mirrorlist.write(f'SigLevel = Optional TrustAll\n')
- return True
-
-def add_specific_mirrors(mirrors, *positionals, **kwargs):
- if not SAFETY_LOCK:
- commandlog.append('# Adding mirrors to /etc/pacman.d/mirrorlist')
- with open('/etc/pacman.d/mirrorlist', 'a') as mirrorlist:
- mirrorlist.write('\n')
- for url in mirrors:
- commandlog.append(f'# {url}')
- mirrorlist.write(f'# {mirrors[url]}\n')
- mirrorlist.write(f'Server = {url}\n')
- return True
-
-def flush_all_mirrors(*positionals, **kwargs):
- if not SAFETY_LOCK:
- commandlog.append('# Flushed /etc/pacman.d/mirrorlist')
- with open('/etc/pacman.d/mirrorlist', 'w') as mirrorlist:
- mirrorlist.write('\n') # TODO: Not needed.
- return True
-
-def reboot(*positionals, **kwargs):
- simple_command('/usr/bin/sync', *positionals, **kwargs).decode('UTF-8').strip()
- simple_command('/usr/bin/reboot', *positionals, **kwargs).decode('UTF-8').strip()
-
-def strap_in_base(*positionals, **kwargs):
- if not SAFETY_LOCK:
- if args['aur-support']:
- args['packages'] += ' git'
- if (sync_mirrors := sys_command('/usr/bin/pacman -Syy', *positionals, **kwargs)).exit_code == 0:
- x = sys_command('/usr/bin/pacstrap /mnt base base-devel linux linux-firmware btrfs-progs efibootmgr nano wpa_supplicant dialog {packages}'.format(**args), *positionals, **kwargs)
- if x.exit_code == 0:
- return True
- else:
- log(f'Could not strap in base: {x.exit_code}', level=3, origin='strap_in_base')
- else:
- log(f'Could not sync mirrors: {sync_mirrors.exit_code}', level=3, origin='strap_in_base')
- return False
-
-def set_locale(fmt, *positionals, **kwargs):
- if not '.' in fmt:
- if fmt.lower() == 'se':
- fmt = 'en_SE.UTF-8 UTF-8'
- else:
- fmt = 'en_US.UTF-8 UTF-8'
-
- if not SAFETY_LOCK:
- o = b''.join(sys_command(f"/usr/bin/arch-chroot /mnt sh -c \"echo '{fmt}' > /etc/locale.gen\""))
- o = b''.join(sys_command(f"/usr/bin/arch-chroot /mnt sh -c \"echo 'LANG={fmt.split(' ')[0]}' > /etc/locale.conf\""))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt locale-gen'))
-
- return True
-
-def configure_base_system(*positionals, **kwargs):
- if not SAFETY_LOCK:
- ## TODO: Replace a lot of these syscalls with just python native operations.
- o = b''.join(sys_command('/usr/bin/genfstab -pU /mnt >> /mnt/etc/fstab'))
- if not os.path.isfile('/mnt/etc/fstab'):
- log(f'Could not locate fstab, strapping in packages most likely failed.', level=3, origin='configure_base_system')
- return False
-
- with open('/mnt/etc/fstab', 'a') as fstab:
- fstab.write('\ntmpfs /tmp tmpfs defaults,noatime,mode=1777 0 0\n') # Redundant \n at the start? who knoes?
-
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt rm -f /etc/localtime'))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt ln -s /usr/share/zoneinfo/{localtime} /etc/localtime'.format(**args)))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt hwclock --hctosys --localtime'))
- #o = sys_command('arch-chroot /mnt echo "{hostname}" > /etc/hostname'.format(**args))
- #o = sys_command("arch-chroot /mnt sed -i 's/#\(en_US\.UTF-8\)/\1/' /etc/locale.gen")
-
- o = b''.join(sys_command("/usr/bin/arch-chroot /mnt sh -c \"echo '{hostname}' > /etc/hostname\"".format(**args)))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt chmod 700 /root'))
-
- with open('/mnt/etc/mkinitcpio.conf', 'w') as mkinit:
- ## TODO: Don't replace it, in case some update in the future actually adds something.
- mkinit.write('MODULES=(btrfs)\n')
- mkinit.write('BINARIES=(/usr/bin/btrfs)\n')
- mkinit.write('FILES=()\n')
- mkinit.write('HOOKS=(base udev autodetect modconf block encrypt filesystems keyboard fsck)\n')
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt mkinitcpio -p linux'))
-
- return True
-
-def setup_bootloader(*positionals, **kwargs):
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt bootctl --no-variables --path=/boot install'))
-
- with open('/mnt/boot/loader/loader.conf', 'w') as loader:
- loader.write('default arch\n')
- loader.write('timeout 5\n')
-
- ## For some reason, blkid and /dev/disk/by-uuid are not getting along well.
- ## And blkid is wrong in terms of LUKS.
- #UUID = sys_command('blkid -s PARTUUID -o value {drive}{partition_2}'.format(**args)).decode('UTF-8').strip()
- with open('/mnt/boot/loader/entries/arch.conf', 'w') as entry:
- entry.write('title Arch Linux\n')
- entry.write('linux /vmlinuz-linux\n')
- entry.write('initrd /initramfs-linux.img\n')
- if args['skip-encrypt']:
- ## NOTE: We could use /dev/disk/by-partuuid but blkid does the same and a lot cleaner
- UUID = simple_command(f"blkid -s PARTUUID -o value /dev/{os.path.basename(args['drive'])}{args['partitions']['2']}").decode('UTF-8').strip()
- entry.write('options root=PARTUUID={UUID} rw intel_pstate=no_hwp\n'.format(UUID=UUID))
- else:
- UUID = simple_command(f"ls -l /dev/disk/by-uuid/ | grep {os.path.basename(args['drive'])}{args['partitions']['2']} | awk '{{print $9}}'").decode('UTF-8').strip()
- entry.write('options cryptdevice=UUID={UUID}:luksdev root=/dev/mapper/luksdev rw intel_pstate=no_hwp\n'.format(UUID=UUID))
-
- return True
-
-def add_AUR_support(*positionals, **kwargs):
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "useradd -m -G wheel aibuilder"'))
- o = b''.join(sys_command("/usr/bin/sed -i 's/# %wheel ALL=(ALL) NO/%wheel ALL=(ALL) NO/' /mnt/etc/sudoers"))
-
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder; git clone https://aur.archlinux.org/yay.git)\\""'))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "chown -R aibuilder.aibuilder /home/aibuilder/yay"'))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "su - aibuilder -c \\"(cd /home/aibuilder/yay; makepkg -si --noconfirm)\\" >/dev/null"'))
- ## Do not remove aibuilder just yet, can be used later for aur packages.
- #o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers'))
- #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"'))
- #o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"'))
- return True
-
-def run_post_install_steps(*positionals, **kwargs):
- log(f'Running post installation with input data {instructions}.', level=4, origin='run_post_install_steps')
- conf = {}
- if 'post' in instructions:
- conf = instructions['post']
- elif not 'args' in instructions and len(instructions):
- conf = instructions
-
- if 'git-branch' in conf:
- update_git(conf['git-branch'])
- del(conf['git-branch'])
-
- rerun = args['ignore-rerun']
- for title in conf:
- log(f'Running post installation step {title}', level=4, origin='run_post_install_steps')
- if args['rerun'] and args['rerun'] != title and not rerun:
- continue
- else:
- rerun = True
-
- print('[N] Network Deploy: {}'.format(title))
- if type(conf[title]) == str:
- print('[N] Loading {} configuration'.format(conf[title]))
- log(f'Loading {conf[title]} configuration', level=4, origin='run_post_install_steps')
- conf[title] = get_application_instructions(conf[title])
-
- for command in conf[title]:
- raw_command = command
- opts = conf[title][command] if type(conf[title][command]) in (dict, oDict) else {}
- if len(opts):
- if 'pass-args' in opts or 'format' in opts:
- command = command.format(**args)
- ## FIXME: Instead of deleting the two options
- ## in order to mute command output further down,
- ## check for a 'debug' flag per command and delete these two
- if 'pass-args' in opts:
- del(opts['pass-args'])
- elif 'format' in opts:
- del(opts['format'])
- elif ('debug' in opts and opts['debug']) or ('debug' in conf and conf['debug']):
- print('[-] Options: {}'.format(opts))
- if 'pass-args' in opts and opts['pass-args']:
- command = command.format(**args)
-
- if 'runas' in opts and f'su - {opts["runas"]} -c' not in command:
- command = command.replace('"', '\\"')
- command = f'su - {opts["runas"]} -c "{command}"'
-
- #print('[N] Command: {} ({})'.format(command, opts))
-
- ## https://superuser.com/questions/1242978/start-systemd-nspawn-and-execute-commands-inside
- ## !IMPORTANT
- ##
- ## arch-chroot mounts /run into the chroot environment, this breaks name resolves for some reason.
- ## Either skipping mounting /run and using traditional chroot is an option, but using
- ## `systemd-nspawn -D /mnt --machine temporary` might be a more flexible solution in case of file structure changes.
- if 'no-chroot' in opts and opts['no-chroot']:
- log(f'Executing {command} as simple command from live-cd.', level=4, origin='run_post_install_steps')
- o = simple_command(command, opts, *positionals, **kwargs)
- elif 'chroot' in opts and opts['chroot']:
- log(f'Executing {command} in chroot.', level=4, origin='run_post_install_steps')
- ## Run in a manually set up version of arch-chroot (arch-chroot will break namespaces).
- ## This is a bit risky in case the file systems changes over the years, but we'll probably be safe adding this as an option.
- ## **> Prefer if possible to use 'no-chroot' instead which "live boots" the OS and runs the command.
- o = simple_command("mount /dev/mapper/luksdev /mnt")
- o = simple_command("cd /mnt; cp /etc/resolv.conf etc")
- o = simple_command("cd /mnt; mount -t proc /proc proc")
- o = simple_command("cd /mnt; mount --make-rslave --rbind /sys sys")
- o = simple_command("cd /mnt; mount --make-rslave --rbind /dev dev")
- o = simple_command('chroot /mnt /bin/bash -c "{c}"'.format(c=command), opts=opts, *positionals, **kwargs)
- o = simple_command("cd /mnt; umount -R dev")
- o = simple_command("cd /mnt; umount -R sys")
- o = simple_command("cd /mnt; umount -R proc")
- else:
- if 'boot' in opts and opts['boot']:
- log(f'Executing {command} in boot mode.', level=4, origin='run_post_install_steps')
- ## So, if we're going to boot this maddafakker up, we'll need to
- ## be able to login. The quickest way is to just add automatic login.. so lessgo!
-
- ## Turns out.. that didn't work exactly as planned..
- ##
- # if not os.path.isdir('/mnt/etc/systemd/system/console-getty.service.d/'):
- # os.makedirs('/mnt/etc/systemd/system/console-getty.service.d/')
- # with open('/mnt/etc/systemd/system/console-getty.service.d/override.conf', 'w') as fh:
- # fh.write('[Service]\n')
- # fh.write('ExecStart=\n')
- # fh.write('ExecStart=-/usr/bin/agetty --autologin root -s %I 115200,38400,9600 vt102\n')
-
- ## So we'll add a bunch of triggers instead and let the sys_command manually react to them.
- ## "<hostname> login" followed by "Passwodd" in case it's been set in a previous step.. usually this shouldn't be nessecary
- ## since we set the password as the last step. And then the command itself which will be executed by looking for:
- ## [root@<hostname> ~]#
- defaults = {
- 'login:' : 'root\n',
- 'Password:' : args['password']+'\n',
- '[root@{args["hostname"]} ~]#' : command+'\n',
- }
- if not 'events' in opts: opts['events'] = {}
- events = {**defaults, **opts['events']}
- del(opts['events'])
- o = b''.join(sys_command('/usr/bin/systemd-nspawn -D /mnt -b --machine temporary', *positionals, **{'events' : events, **kwargs, **opts}))
-
- ## Not needed anymore: And cleanup after out selves.. Don't want to leave any residue..
- # os.remove('/mnt/etc/systemd/system/console-getty.service.d/override.conf')
- else:
- log(f'Executing {command} in with systemd-nspawn without boot.', level=4, origin='run_post_install_steps')
- o = b''.join(sys_command(f'/usr/bin/systemd-nspawn -D /mnt --machine temporary {command}', *positionals, **{**kwargs, **opts}))
- if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in o:
- log(f'{command} failed: {o.decode("UTF-8")}', level=4, origin='run_post_install_steps')
- print('[W] Post install command failed: {}'.format(o.decode('UTF-8')))
- #print(o)
-
- print('run_post_install_steps() is complete.')
- return True
-
-def create_user(username, password='', groups=[]):
- if username:
- o = (f'/usr/bin/arch-chroot /mnt useradd -m -G wheel {username}')
- if password:
- o = (f"/usr/bin/arch-chroot /mnt sh -c \"echo '{username}:{password}' | chpasswd\"")
- if groups:
- for group in groups:
- o = (f'/usr/bin/arch-chroot /mnt gpasswd -a {username} {group}')
- return True
-
-def prerequisit_check():
- if not os.path.isdir('/sys/firmware/efi'):
- return False, 'Archinstall only supports UEFI-booted machines.'
-
- return True
-
-if __name__ == '__main__':
-
- if not (prereq := prerequisit_check()) is True:
- print(f'[E] {prereq[1]}')
- exit(1)
-
- ## Setup some defaults
- # (in case no command-line parameters or netdeploy-params were given)
- args = setup_args_defaults(args)
- user_args = {}
- positionals = []
- for arg in sys.argv[1:]:
- if '--' == arg[:2]:
- if '=' in arg:
- key, val = [x.strip() for x in arg[2:].split('=')]
- else:
- key, val = arg[2:], True
- args[key] = val
- user_args[key] = val
- else:
- positionals.append(arg)
-
- update_git() # Breaks and restarts the script if an update was found.
- update_drive_list()
-
- ## == If we got networking,
- # Try fetching instructions for this box unless a specific profile was given, and execute them.
- if args['profile'] is None and not args['minimal']:
- instructions = load_automatic_instructions(user_args=user_args)
-
- elif args['profile'] and not args['minimal']:
- instructions = get_instructions(args['profile'])
- if len(instructions) <= 0:
- print('[E] No instructions by the name of {} was found.'.format(args['profile']))
- print(' Installation won\'t continue until a valid profile is given.')
- print(' (this is because --profile was given and a --default is not given)')
- exit(1)
-
- first = True
- while not args['minimal'] and not args['profile'] and len(instructions) <= 0:
- profile = input('What template do you want to install: ')
- instructions = get_instructions(profile)
- if first and len(instructions) <= 0:
- print('[E] No instructions by the name of {} was found.'.format(profile))
- print(' Installation won\'t continue until a valid profile is given.')
- print(' (this is because --default is not instructed and no --profile given)')
- first = False
-
- # TODO: Might not need to return anything here, passed by reference?
- instructions = merge_in_includes(instructions, user_args=user_args)
- cleanup_args()
-
- ## If no drive was found in args, select one.
- if not 'drive' in args:
- if len(harddrives):
- drives = sorted(list(harddrives.keys()))
- if len(drives) > 1 and 'force' not in args and not 'unattended' in args and ('minimal' in args and 'first-drive' not in args):
- for index, drive in enumerate(drives):
- print(f"{index}: {drive} ({harddrives[drive]['size'], harddrives[drive]['fstype'], harddrives[drive]['label']})")
- drive = input('Select one of the above disks (by number): ')
- if not drive.isdigit():
- raise KeyError("Multiple disks found, --drive=/dev/X not specified (or --force/--first-drive)")
- drives = [drives[int(drive)]] # Make sure only the selected drive is in the list of options
- args['drive'] = drives[0] # First drive found
- else:
- args['drive'] = None
-
- if args['drive'] and args['drive'][0] != '/':
- ## Remap the selected UUID to the device to be formatted.
- drive = get_drive_from_uuid(args['drive'])
- if not drive:
- print(f'[N] Could not map UUID "{args["drive"]}" to a device. Trying to match via PARTUUID instead!')
-
- drive = get_drive_from_part_uuid(args['drive'])
- if not drive:
- print(f'[E] Could not map UUID "{args["drive"]}" to a device. Aborting!')
- exit(1)
-
- args['drive'] = drive
-
- print(json.dumps(args, indent=4))
- if args['minimal'] and not 'force' in args and not 'unattended' in args:
- if(input('Are these settings OK? (No return beyond this point) N/y: ').lower() != 'y'):
- exit(1)
-
- cache_diskpw_on_disk()
- #else:
- # ## TODO: Convert to `rb` instead.
- # # We shouldn't discriminate \xfu from being a passwd phrase.
- # with open(args['pwfile'], 'r') as pw:
- # PIN = pw.read().strip()
-
- print()
- if not args['skip-encrypt']:
- print('[!] Disk & root PASSWORD is: {}'.format(args['password']))
- else:
- print('[!] root PASSWORD is: {}'.format(args['password']))
- print()
-
- if not args['rerun'] or args['ignore-rerun']:
- for i in range(5, 0, -1):
- print(f'Formatting {args["drive"]} in {i}...')
- time.sleep(1)
-
- close_disks()
- print(f'[N] Setting up {args["drive"]}.')
- if not format_disk('drive', start='start', end='size', debug=True):
- print(f'[E] Coult not format drive {args["drive"]}')
- exit(1)
-
- refresh_partition_list('drive')
- print(f'[N] Partitions: {len(args["partitions"])} (Boot: {list(args["partitions"].keys())[0]})')
-
- if len(args['partitions']) <= 0:
- print(f'[E] No partitions were created on {args["drive"]}', o)
- exit(1)
-
- if not args['rerun'] or args['ignore-rerun']:
- if not mkfs_fat32('drive', '1'):
- print(f'[E] Could not setup {args["drive"]}{args["partitions"]["1"]}')
- exit(1)
-
- if not args['skip-encrypt']:
- # "--cipher sha512" breaks the shit.
- # TODO: --use-random instead of --use-urandom
- print(f'[N] Adding encryption to {args["drive"]}{args["partitions"]["2"]}.')
- if not encrypt_partition('drive', '2', 'pwfile'):
- print('[E] Failed to setup disk encryption.', o)
- exit(1)
-
- if not args['skip-encrypt']:
- if not mount_luktsdev('drive', '2', 'pwfile'):
- print('[E] Could not open encrypted device.', o)
- exit(1)
-
- if not args['rerun'] or args['ignore-rerun']:
- print(f'[N] Creating btrfs filesystem inside {args["drive"]}{args["partitions"]["2"]}')
-
- on_part = '/dev/mapper/luksdev'
- if args['skip-encrypt']:
- on_part = f'{args["drive"]}{args["partitions"]["2"]}'
- if not mkfs_btrfs(on_part):
- print('[E] Could not setup btrfs filesystem.')
- exit(1)
-
- mount_mountpoints('drive', '1')
-
- if 'mirrors' in args and args['mirrors'] and 'country' in args and get_default_gateway_linux():
- print('[N] Reordering mirrors.')
- filter_mirrors_by_country_list(args['country'])
-
- pre_conf = {}
- if 'pre' in instructions:
- pre_conf = instructions['pre']
- elif 'prerequisits' in instructions:
- pre_conf = instructions['prerequisits']
-
- if 'git-branch' in pre_conf:
- update_git(pre_conf['git-branch'])
- del(pre_conf['git-branch'])
-
- rerun = args['ignore-rerun']
-
- ## Prerequisit steps needs to NOT be executed in arch-chroot.
- ## Mainly because there's no root structure to chroot into.
- ## But partly because some configurations need to be done against the live CD.
- ## (For instance, modifying mirrors are done on LiveCD and replicated intwards)
- for title in pre_conf:
- print('[N] Network prerequisit step: {}'.format(title))
- if args['rerun'] and args['rerun'] != title and not rerun:
- continue
- else:
- rerun = True
-
- for command in pre_conf[title]:
- raw_command = command
- opts = pre_conf[title][raw_command] if type(pre_conf[title][raw_command]) in (dict, oDict) else {}
- if len(opts):
- if 'pass-args' in opts or 'format' in opts:
- command = command.format(**args)
- ## FIXME: Instead of deleting the two options
- ## in order to mute command output further down,
- ## check for a 'debug' flag per command and delete these two
- if 'pass-args' in opts:
- del(opts['pass-args'])
- elif 'format' in opts:
- del(opts['format'])
- elif 'debug' in opts and opts['debug']:
- print('[N] Complete command-string: '.format(command))
- else:
- print('[-] Options: {}'.format(opts))
-
- #print('[N] Command: {} ({})'.format(raw_command, opts))
- o = b''.join(sys_command('{c}'.format(c=command), opts))
- if type(conf[title][raw_command]) == bytes and len(conf[title][raw_command]) and not conf[title][raw_command] in b''.join(o):
- print('[W] Prerequisit step failed: {}'.format(b''.join(o).decode('UTF-8')))
- #print(o)
-
- if not args['rerun'] or rerun:
- print('[N] Straping in packages.')
- base_return_code = strap_in_base() # TODO: check return here? we return based off pacstrap exit code.. Never tired it tho.
- else:
- base_return_code = None
-
- if not os.path.isdir('/mnt/etc'): # TODO: This might not be the most long term stable thing to rely on...
- print('[E] Failed to strap in packages', o)
- exit(1)
-
- if not args['rerun'] or rerun:
- print('[N] Configuring base system.')
- set_locale('en_US.UTF-8 UTF-8')
- configure_base_system()
- ## WORKAROUND: https://github.com/systemd/systemd/issues/13603#issuecomment-552246188
- print('[N] Setting up bootloader.')
- setup_bootloader()
-
- if args['aur-support']:
- print('[N] AUR support demanded, building "yay" before running POST steps.')
- add_AUR_support()
- print('[N] AUR support added. use "yay -Syy --noconfirm <package>" to deploy in POST.')
-
- ## == Passwords
- # o = sys_command('arch-chroot /mnt usermod --password {} root'.format(args['password']))
- # o = sys_command("arch-chroot /mnt sh -c 'echo {pin} | passwd --stdin root'".format(pin='"{pin}"'.format(**args, pin=args['password'])), echo=True)
- set_password(user='root', password=args['password'])
- time.sleep(5)
- if 'user' in args:
- create_user(args['user'], args['password'])#, groups=['wheel'])
-
- print('[N] Running post installation steps.')
- run_post_install_steps()
- time.sleep(2)
-
- if args['aur-support'] and not args['aur-keep']:
- o = b''.join(sys_command('/usr/bin/sed -i \'s/%wheel ALL=(ALL) NO/# %wheel ALL=(ALL) NO/\' /mnt/etc/sudoers'))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "userdel aibuilder"'))
- o = b''.join(sys_command('/usr/bin/arch-chroot /mnt sh -c "rm -rf /home/aibuilder"'))
-
- if args['phone-home']:
- phone_home(args['phone-home'])
-
- if args['post'] == 'reboot':
- o = simple_command('/usr/bin/umount -R /mnt')
- o = simple_command('/usr/bin/reboot now')
- else:
- print('Done. "umount -R /mnt; reboot" when you\'re done tinkering.')
+ def minimal_installation(self):
+ pass \ No newline at end of file
diff --git a/exceptions.py b/exceptions.py
new file mode 100644
index 00000000..24f3f273
--- /dev/null
+++ b/exceptions.py
@@ -0,0 +1,4 @@
+class RequirementError(BaseException):
+ pass
+class DiskError(BaseException):
+ pass \ No newline at end of file
diff --git a/helpers/__init__.py b/helpers/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/helpers/__init__.py
diff --git a/helpers/disk.py b/helpers/disk.py
new file mode 100644
index 00000000..ad72565d
--- /dev/null
+++ b/helpers/disk.py
@@ -0,0 +1,109 @@
+import glob, re, os, json
+from collections import OrderedDict
+from helpers.general import sys_command
+
+ROOT_DIR_PATTERN = re.compile('^.*?/devices')
+GPT = 0b00000001
+
+class BlockDevice():
+ def __init__(self, path, info):
+ self.path = path
+ self.info = info
+ if not 'backplane' in self.info:
+ self.info['backplane'] = self.find_backplane(self.info)
+
+ def find_backplane(self, info):
+ if not 'type' in info: raise DiskError(f'Could not locate backplane info for "{self.path}"')
+
+ if info['type'] == 'loop':
+ for drive in json.loads(b''.join(sys_command(f'losetup --json', hide_from_log=True)).decode('UTF_8'))['loopdevices']:
+ if not drive['name'] == self.path: continue
+
+ return drive['back-file']
+ elif info['type'] == 'disk':
+ return self.path
+ elif info['type'] == 'crypt':
+ if not 'pkname' in info: raise DiskError(f'A crypt device ({self.path}) without a parent kernel device name.')
+ return f"/dev/{info['pkname']}"
+
+ def __repr__(self, *args, **kwargs):
+ return f'BlockDevice(path={self.path})'
+
+ def __getitem__(self, key, *args, **kwargs):
+ if not key in self.info:
+ raise KeyError(f'{self} does not contain information: "{key}"')
+ return self.info[key]
+
+# def __enter__(self, *args, **kwargs):
+# return self
+#
+# def __exit__(self, *args, **kwargs):
+# print('Exit:', args, kwargs)
+# b''.join(sys_command(f'sync', *args, **kwargs, hide_from_log=True))
+
+class Formatter():
+ def __init__(self, blockdevice, mode=GPT):
+ self.blockdevice = blockdevice
+ self.mode = mode
+
+ def __enter__(self, *args, **kwargs):
+ print(f'Formatting {self.blockdevice} as {self.mode}:', args, kwargs)
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ print('Exit:', args, kwargs)
+ b''.join(sys_command(f'sync', *args, **kwargs, hide_from_log=True))
+
+ def format_disk(drive='drive', start='start', end='size', emulate=False, *positionals, **kwargs):
+ drive = args[drive]
+ start = args[start]
+ end = args[end]
+ if not drive:
+ raise ValueError('Need to supply a drive path, for instance: /dev/sdx')
+
+ if not SAFETY_LOCK:
+ # dd if=/dev/random of=args['drive'] bs=4096 status=progress
+ # https://github.com/dcantrell/pyparted would be nice, but isn't officially in the repo's #SadPanda
+ #if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ # return None
+ if sys_command(f'/usr/bin/parted -s {drive} mklabel gpt', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} mkpart primary FAT32 1MiB {start}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} name 1 "EFI"', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} set 1 esp on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} set 1 boot on', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+ if sys_command(f'/usr/bin/parted -s {drive} mkpart primary {start} {end}', emulate=emulate, *positionals, **kwargs).exit_code != 0:
+ return None
+
+
+def device_state(name, *args, **kwargs):
+ # Based out of: https://askubuntu.com/questions/528690/how-to-get-list-of-all-non-removable-disk-device-names-ssd-hdd-and-sata-ide-onl/528709#528709
+ if os.path.isfile('/sys/block/{}/device/block/{}/removable'.format(name, name)):
+ with open('/sys/block/{}/device/block/{}/removable'.format(name, name)) as f:
+ if f.read(1) == '1':
+ return
+
+ path = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/block/{}'.format(name)))
+ hotplug_buses = ("usb", "ieee1394", "mmc", "pcmcia", "firewire")
+ for bus in hotplug_buses:
+ if os.path.exists('/sys/bus/{}'.format(bus)):
+ for device_bus in os.listdir('/sys/bus/{}/devices'.format(bus)):
+ device_link = ROOT_DIR_PATTERN.sub('', os.readlink('/sys/bus/{}/devices/{}'.format(bus, device_bus)))
+ if re.search(device_link, path):
+ return
+ return True
+
+# lsblk --json -l -n -o path
+def all_disks(*args, **kwargs):
+ if not 'partitions' in kwargs: kwargs['partitions'] = False
+ drives = OrderedDict()
+ #for drive in json.loads(sys_command(f'losetup --json', *args, **lkwargs, hide_from_log=True)).decode('UTF_8')['loopdevices']:
+ for drive in json.loads(b''.join(sys_command(f'lsblk --json -l -n -o path,size,type,mountpoint,label,pkname', *args, **kwargs, hide_from_log=True)).decode('UTF_8'))['blockdevices']:
+ if not kwargs['partitions'] and drive['type'] == 'part': continue
+
+ drives[drive['path']] = BlockDevice(drive['path'], drive)
+ return drives \ No newline at end of file
diff --git a/helpers/general.py b/helpers/general.py
new file mode 100644
index 00000000..2b730c60
--- /dev/null
+++ b/helpers/general.py
@@ -0,0 +1,206 @@
+import os, json, hashlib, shlex
+import time, pty
+from subprocess import Popen, STDOUT, PIPE, check_output
+from select import epoll, EPOLLIN, EPOLLHUP
+
+def log(*args, **kwargs):
+ print(' '.join([str(x) for x in args]))
+
+def gen_uid(entropy_length=256):
+ return hashlib.sha512(os.urandom(entropy_length)).hexdigest()
+
+class sys_command():#Thread):
+ """
+ Stolen from archinstall_gui
+ """
+ def __init__(self, cmd, callback=None, start_callback=None, *args, **kwargs):
+ if not 'worker_id' in kwargs: kwargs['worker_id'] = gen_uid()
+ if not 'emulate' in kwargs: kwargs['emulate'] = False
+ #Thread.__init__(self)
+ if kwargs['emulate']:
+ log(f"Starting command '{cmd}' in emulation mode.")
+ self.cmd = shlex.split(cmd)
+ self.args = args
+ self.kwargs = kwargs
+ if not 'worker' in self.kwargs: self.kwargs['worker'] = None
+ self.callback = callback
+ self.pid = None
+ self.exit_code = None
+ self.started = time.time()
+ self.ended = None
+ self.worker_id = kwargs['worker_id']
+ self.trace_log = b''
+ self.status = 'starting'
+
+ user_catalogue = os.path.expanduser('~')
+ self.cwd = f"{user_catalogue}/archinstall/cache/workers/{kwargs['worker_id']}/"
+ self.exec_dir = f'{self.cwd}/{os.path.basename(self.cmd[0])}_workingdir'
+
+ if not self.cmd[0][0] == '/':
+ log('Worker command is not executed with absolute path, trying to find: {}'.format(self.cmd[0]), origin='spawn', level=5)
+ o = check_output(['/usr/bin/which', self.cmd[0]])
+ log('This is the binary {} for {}'.format(o.decode('UTF-8'), self.cmd[0]), origin='spawn', level=5)
+ self.cmd[0] = o.decode('UTF-8').strip()
+
+ if not os.path.isdir(self.exec_dir):
+ os.makedirs(self.exec_dir)
+
+ if self.kwargs['emulate']:
+ commandlog.append(cmd + ' # (emulated)')
+ elif 'hide_from_log' in self.kwargs and self.kwargs['hide_from_log']:
+ pass
+ else:
+ commandlog.append(cmd)
+ if start_callback: start_callback(self, *args, **kwargs)
+ #self.start()
+ self.run()
+
+ def __iter__(self, *args, **kwargs):
+ for line in self.trace_log.split(b'\n'):
+ yield line
+
+ def __repr__(self, *args, **kwargs):
+ return f"{self.cmd, self.trace_log}"
+
+ def decode(self, fmt='UTF-8'):
+ return self.trace_log.decode(fmt)
+
+ def dump(self):
+ return {
+ 'status' : self.status,
+ 'worker_id' : self.worker_id,
+ 'worker_result' : self.trace_log.decode('UTF-8'),
+ 'started' : self.started,
+ 'ended' : self.ended,
+ 'started_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.started)),
+ 'ended_pprint' : '{}-{}-{} {}:{}:{}'.format(*time.localtime(self.ended)) if self.ended else None,
+ 'exit_code' : self.exit_code
+ }
+
+ def run(self):
+ #main = None
+ #for t in tenum():
+ # if t.name == 'MainThread':
+ # main = t
+ # break
+
+ #if not main:
+ # log('Main thread not existing')
+ # return
+
+ self.status = 'running'
+ old_dir = os.getcwd()
+ os.chdir(self.exec_dir)
+ self.pid, child_fd = pty.fork()
+ if not self.pid: # Child process
+ # Replace child process with our main process
+ if not self.kwargs['emulate']:
+ try:
+ os.execv(self.cmd[0], self.cmd)
+ except FileNotFoundError:
+ self.status = 'done'
+ log(f"{self.cmd[0]} does not exist.", origin='spawn', level=2)
+ self.exit_code = 1
+ return False
+
+ os.chdir(old_dir)
+
+ poller = epoll()
+ poller.register(child_fd, EPOLLIN | EPOLLHUP)
+
+ if 'events' in self.kwargs and 'debug' in self.kwargs:
+ log(f'[D] Using triggers for command: {self.cmd}')
+ log(json.dumps(self.kwargs['events']))
+
+ alive = True
+ last_trigger_pos = 0
+ while alive and not self.kwargs['emulate']:
+ for fileno, event in poller.poll(0.1):
+ try:
+ output = os.read(child_fd, 8192).strip()
+ self.trace_log += output
+ except OSError:
+ alive = False
+ break
+
+ if 'debug' in self.kwargs and self.kwargs['debug'] and len(output):
+ log(self.cmd[0], 'gave:', output.decode('UTF-8'))
+ log(self.cmd[0],'gave:', output.decode('UTF-8'), origin='spawn', level=4)
+
+ if 'on_output' in self.kwargs:
+ self.kwargs['on_output'](self.kwargs['worker'], output)
+
+ lower = output.lower()
+ broke = False
+ if 'events' in self.kwargs:
+ for trigger in list(self.kwargs['events']):
+ if type(trigger) != bytes:
+ original = trigger
+ trigger = bytes(original, 'UTF-8')
+ self.kwargs['events'][trigger] = self.kwargs['events'][original]
+ del(self.kwargs['events'][original])
+ if type(self.kwargs['events'][trigger]) != bytes:
+ self.kwargs['events'][trigger] = bytes(self.kwargs['events'][trigger], 'UTF-8')
+
+ if trigger.lower() in self.trace_log[last_trigger_pos:].lower():
+ trigger_pos = self.trace_log[last_trigger_pos:].lower().find(trigger.lower())
+
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}")
+ log(f"Writing to subprocess {self.cmd[0]}: {self.kwargs['events'][trigger].decode('UTF-8')}", origin='spawn', level=5)
+
+ last_trigger_pos = trigger_pos
+ os.write(child_fd, self.kwargs['events'][trigger])
+ del(self.kwargs['events'][trigger])
+ broke = True
+ break
+
+ if broke:
+ continue
+
+ ## Adding a exit trigger:
+ if len(self.kwargs['events']) == 0:
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"Waiting for last command {self.cmd[0]} to finish.", origin='spawn', level=4)
+
+ if bytes(f']$'.lower(), 'UTF-8') in self.trace_log[0-len(f']$')-5:].lower():
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"{self.cmd[0]} has finished.", origin='spawn', level=4)
+ alive = False
+ break
+
+ self.status = 'done'
+
+ if 'debug' in self.kwargs and self.kwargs['debug']:
+ log(f"{self.cmd[0]} waiting for exit code.", origin='spawn', level=5)
+
+ if not self.kwargs['emulate']:
+ try:
+ self.exit_code = os.waitpid(self.pid, 0)[1]
+ except ChildProcessError:
+ try:
+ self.exit_code = os.waitpid(child_fd, 0)[1]
+ except ChildProcessError:
+ self.exit_code = 1
+ else:
+ self.exit_code = 0
+
+ if 'ignore_errors' in self.kwargs:
+ self.exit_code = 0
+
+ if self.exit_code != 0:
+ log(f"{self.cmd} did not exit gracefully, exit code {self.exit_code}.", origin='spawn', level=3)
+ log(self.trace_log.decode('UTF-8'), origin='spawn', level=3)
+ else:
+ log(f"{self.cmd[0]} exit nicely.", origin='spawn', level=5)
+
+ self.ended = time.time()
+ with open(f'{self.cwd}/trace.log', 'wb') as fh:
+ fh.write(self.trace_log)
+
+def prerequisit_check():
+ if not os.path.isdir('/sys/firmware/efi'):
+ raise RequirementError('Archinstall only supports machines in UEFI mode.')
+
+ return True
+
diff --git a/helpers/user_interaction.py b/helpers/user_interaction.py
new file mode 100644
index 00000000..1b7e85cf
--- /dev/null
+++ b/helpers/user_interaction.py
@@ -0,0 +1,17 @@
+from exceptions import *
+
+def select_disk(dict_o_disks):
+ drives = sorted(list(dict_o_disks.keys()))
+ if len(drives) > 1:
+ for index, drive in enumerate(drives):
+ print(f"{index}: {drive} ({dict_o_disks[drive]['size'], dict_o_disks[drive]['backplane'], dict_o_disks[drive]['label']})")
+ drive = input('Select one of the above disks (by number or full path): ')
+ if drive.isdigit():
+ drive = dict_o_disks[drives[int(drive)]]
+ elif drive in dict_o_disks:
+ drive = dict_o_disks[drive]
+ else:
+ raise DiskError(f'Selected drive does not exist: "{drive}"')
+ return drive
+
+ raise DiskError('select_disk() requires a non-empty dictionary of disks to select from.') \ No newline at end of file
diff --git a/installer.py b/installer.py
new file mode 100644
index 00000000..5c7d4467
--- /dev/null
+++ b/installer.py
@@ -0,0 +1,22 @@
+import archinstall, getpass
+
+selected_hdd = archinstall.select_disk(archinstall.all_disks())
+disk_password = getpass.getpass(prompt='Disk password (won\'t echo): ')
+
+with archinstall.Formatter(selected_hdd, archinstall.GPT) as formatter:
+ exit(1)
+ disk.encrypt('luks2', password=disk_password, key_size=512, hash_type='sha512', iter_time=10000, key_file='./pwfile')
+
+ root_partition = disk.partition['/']
+
+with archinstall.installer(root_partition, hostname='testmachine') as installation:
+ if installation.minimal_installation():
+ installation.add_bootloader()
+
+ installation.add_additional_packages(['nano', 'wget', 'git'])
+ installation.install_profile('desktop')
+
+ installation.user_create('anton', 'test')
+ installation.user_set_pw('root', 'toor')
+
+ installation.add_AUR_support() \ No newline at end of file