Send patches - preferably formatted by git format-patch - to patches at archlinux32 dot org.
summaryrefslogtreecommitdiff
path: root/archinstall/lib/plugins.py
blob: 0ff63610b18a627a76b1caef7172d460a58af19a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import hashlib
import importlib
import logging
import os
import sys
import pathlib
import urllib.parse
import urllib.request
from importlib import metadata
from typing import Optional, List
from types import ModuleType

from .output import log
from .storage import storage

plugins = {}

# 1: List archinstall.plugin definitions
# 2: Load the plugin entrypoint
# 3: Initiate the plugin and store it as .name in plugins
for plugin_definition in metadata.entry_points().select(group='archinstall.plugin'):
	plugin_entrypoint = plugin_definition.load()
	try:
		plugins[plugin_definition.name] = plugin_entrypoint()
	except Exception as err:
		log(err, level=logging.ERROR)
		log(f"The above error was detected when loading the plugin: {plugin_definition}", fg="red", level=logging.ERROR)


# The following functions and core are support structures for load_plugin()
def localize_path(profile_path :str) -> str:
	if (url := urllib.parse.urlparse(profile_path)).scheme and url.scheme in ('https', 'http'):
		converted_path = f"/tmp/{os.path.basename(profile_path).replace('.py', '')}_{hashlib.md5(os.urandom(12)).hexdigest()}.py"

		with open(converted_path, "w") as temp_file:
			temp_file.write(urllib.request.urlopen(url.geturl()).read().decode('utf-8'))

		return converted_path
	else:
		return profile_path


def import_via_path(path :str, namespace :Optional[str] = None) -> ModuleType:
	if not namespace:
		namespace = os.path.basename(path)

		if namespace == '__init__.py':
			path = pathlib.PurePath(path)
			namespace = path.parent.name

	try:
		spec = importlib.util.spec_from_file_location(namespace, path)
		imported = importlib.util.module_from_spec(spec)
		sys.modules[namespace] = imported
		spec.loader.exec_module(sys.modules[namespace])

		return namespace
	except Exception as err:
		log(err, level=logging.ERROR)
		log(f"The above error was detected when loading the plugin: {path}", fg="red", level=logging.ERROR)

		try:
			del(sys.modules[namespace]) # noqa: E275
		except:
			pass

def find_nth(haystack :List[str], needle :str, n :int) -> int:
	start = haystack.find(needle)
	while start >= 0 and n > 1:
		start = haystack.find(needle, start + len(needle))
		n -= 1
	return start

def load_plugin(path :str) -> ModuleType:
	parsed_url = urllib.parse.urlparse(path)
	log(f"Loading plugin {parsed_url}.", fg="gray", level=logging.INFO)

	# The Profile was not a direct match on a remote URL
	if not parsed_url.scheme:
		# Path was not found in any known examples, check if it's an absolute path
		if os.path.isfile(path):
			namespace = import_via_path(path)
	elif parsed_url.scheme in ('https', 'http'):
		namespace = import_via_path(localize_path(path))

	if namespace in sys.modules:
		# Version dependency via __archinstall__version__ variable (if present) in the plugin
		# Any errors in version inconsistency will be handled through normal error handling if not defined.
		if hasattr(sys.modules[namespace], '__archinstall__version__'):
			archinstall_major_and_minor_version = float(storage['__version__'][:find_nth(storage['__version__'], '.', 2)])

			if sys.modules[namespace].__archinstall__version__ < archinstall_major_and_minor_version:
				log(f"Plugin {sys.modules[namespace]} does not support the current Archinstall version.", fg="red", level=logging.ERROR)

		# Locate the plugin entry-point called Plugin()
		# This in accordance with the entry_points() from setup.cfg above
		if hasattr(sys.modules[namespace], 'Plugin'):
			try:
				plugins[namespace] = sys.modules[namespace].Plugin()
				log(f"Plugin {plugins[namespace]} has been loaded.", fg="gray", level=logging.INFO)
			except Exception as err:
				log(err, level=logging.ERROR)
				log(f"The above error was detected when initiating the plugin: {path}", fg="red", level=logging.ERROR)
		else:
			log(f"Plugin '{path}' is missing a valid entry-point or is corrupt.", fg="yellow", level=logging.WARNING)