holy fuck shit finally works!!!
alpha build 0039
This commit is contained in:
162
pywebsrv.py
162
pywebsrv.py
@@ -56,19 +56,74 @@ except ImportError:
|
||||
# )
|
||||
pass
|
||||
|
||||
AMETHYST_BUILD_NUMBER = "0018"
|
||||
AMETHYST_BUILD_NUMBER = "0039"
|
||||
AMETHYST_REPO = "https://git.novacow.ch/Nova/PyWebServer/"
|
||||
|
||||
|
||||
class ConfigParser:
|
||||
def __init__(self, text):
|
||||
self.data = {"hosts": {}, "globals": {}}
|
||||
self._parse(text)
|
||||
|
||||
def _parse(self, text):
|
||||
lines = [
|
||||
line.strip()
|
||||
for line in text.splitlines()
|
||||
if line.strip() and not line.strip().startswith("#")
|
||||
]
|
||||
|
||||
current_block = None
|
||||
current_name = None
|
||||
|
||||
for line in lines:
|
||||
if line.startswith("host ") and line.endswith("{"):
|
||||
current_name = line.split()[1]
|
||||
self.data["hosts"][current_name] = {}
|
||||
current_block = ("host", current_name)
|
||||
continue
|
||||
|
||||
if line == "globals {":
|
||||
current_block = ("globals", None)
|
||||
continue
|
||||
|
||||
if line == "}":
|
||||
current_block = None
|
||||
current_name = None
|
||||
continue
|
||||
|
||||
if ":" in line and current_block:
|
||||
key, value = line.split(":", 1)
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
if "," in value:
|
||||
value = [v.strip() for v in value.split(",")]
|
||||
|
||||
if current_block[0] == "host":
|
||||
self.data["hosts"][current_name][key] = value
|
||||
else:
|
||||
self.data["globals"][key] = value
|
||||
|
||||
def query_config(self, key, host=None):
|
||||
if host:
|
||||
return self.data["hosts"].get(host, {}).get(key)
|
||||
if key == "hosts":
|
||||
print(f"\n\n\nHosts!\nHosts: {self.data['hosts']}\n\n\n")
|
||||
return list(self.data["hosts"].keys())
|
||||
return self.data["globals"].get(key)
|
||||
|
||||
|
||||
class FileHandler:
|
||||
CONFIG_FILE = "pywebsrv.conf"
|
||||
new_conf = "new_conf.conf"
|
||||
|
||||
def __init__(self, base_dir=None):
|
||||
# this is a fucking clusterfuck.
|
||||
self.config_path = os.path.join(os.getcwd(), self.CONFIG_FILE)
|
||||
self.new_conf = os.path.join(os.getcwd(), self.new_conf)
|
||||
self.base_dir = self.read_config("directory")
|
||||
self.cached_conf = None
|
||||
with open(self.new_conf, "r") as f:
|
||||
self.cfg = ConfigParser(f.read())
|
||||
if not os.path.exists(self.config_path):
|
||||
print(
|
||||
"The pywebsrv.conf file needs to be in the same directory "
|
||||
@@ -76,6 +131,7 @@ class FileHandler:
|
||||
"https://git.novacow.ch/Nova/PyWebServer/raw/branch/main/pywebsrv.conf"
|
||||
)
|
||||
exit(1)
|
||||
# TODO: fix this please!!
|
||||
|
||||
def read_file(self, file_path, directory=None):
|
||||
if "../" in file_path or "%" in file_path:
|
||||
@@ -121,7 +177,7 @@ class FileHandler:
|
||||
"disable-autocertgen",
|
||||
"key-file",
|
||||
"cert-file",
|
||||
"block-ua"
|
||||
"block-ua",
|
||||
]
|
||||
if option not in valid_options:
|
||||
return None
|
||||
@@ -146,7 +202,7 @@ class FileHandler:
|
||||
if val.startswith("match(") and val.endswith(")"):
|
||||
idx = val.index("(")
|
||||
idx2 = val.index(")")
|
||||
ua_to_match = val[idx+1:idx2]
|
||||
ua_to_match = val[idx + 1 : idx2]
|
||||
host_to_match.append(ua_to_match)
|
||||
else:
|
||||
literal_blocks.append(val)
|
||||
@@ -169,50 +225,11 @@ class FileHandler:
|
||||
return value
|
||||
return None
|
||||
|
||||
def read_new_config(self, option, host=None):
|
||||
"""
|
||||
Reads the configuration file and returns a dict
|
||||
"""
|
||||
if self.cached_conf is None:
|
||||
with open(self.new_conf, "r", encoding="utf-8") as fh:
|
||||
text = fh.read()
|
||||
|
||||
blocks = re.findall(
|
||||
r'^(host\s+(\S+)|globals)\s*\{([^}]*)\}', text, re.MULTILINE
|
||||
)
|
||||
parsed = {}
|
||||
host_list = []
|
||||
print(f"Blocks: {blocks}")
|
||||
for tag, hostname, body in blocks:
|
||||
section = hostname if hostname else "globals"
|
||||
if hostname:
|
||||
host_list.append(hostname)
|
||||
kv = {}
|
||||
for line in body.splitlines():
|
||||
line = line.strip()
|
||||
if not line or ":" not in line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
key, rest = line.split(":", 1)
|
||||
key = key.strip()
|
||||
rest = rest.strip()
|
||||
|
||||
if "," in rest:
|
||||
kv[key] = [item.strip() for item in rest.split(",")]
|
||||
else:
|
||||
kv[key] = rest
|
||||
parsed[section] = kv
|
||||
parsed["globals"]["hosts"] = host_list
|
||||
self.cached_conf = parsed
|
||||
else:
|
||||
parsed = self.cached_conf
|
||||
if option == "host":
|
||||
try:
|
||||
return host_list
|
||||
except Exception:
|
||||
return parsed["globals"]["hosts"]
|
||||
section = parsed.get(host or "globals", {})
|
||||
return section.get(option)
|
||||
def read_new_config(self, key, host_name=None):
|
||||
print(
|
||||
f"\n\n\nQuery!\nkey: {key}\nhost_name: {host_name}\nret: {self.cfg.query_config(key, host_name)}"
|
||||
)
|
||||
return self.cfg.query_config(key, host_name)
|
||||
|
||||
def autocert(self):
|
||||
"""
|
||||
@@ -228,7 +245,7 @@ class FileHandler:
|
||||
class RequestParser:
|
||||
def __init__(self):
|
||||
self.file_handler = FileHandler()
|
||||
self.hosts = self.file_handler.read_new_config("host")
|
||||
self.hosts = self.file_handler.read_new_config("hosts")
|
||||
print(f"Hosts: {self.hosts}")
|
||||
|
||||
def parse_request_line(self, line):
|
||||
@@ -272,7 +289,7 @@ class RequestParser:
|
||||
Mfw im in an ugly code writing contest and my opponent is nova while writing a side project
|
||||
"""
|
||||
host = f"{host}"
|
||||
print(f"hosts: {self.hosts}, host: {host}, split: {host.rsplit(":", 1)[0]}")
|
||||
print(f"hosts: {self.hosts}, host: {host}, split: {host.rsplit(':', 1)[0]}")
|
||||
if ":" in host:
|
||||
host = host.rsplit(":", 1)[0]
|
||||
host = host.lstrip()
|
||||
@@ -281,11 +298,14 @@ class RequestParser:
|
||||
host == "localhost" or host == "127.0.0.1" or host == "[::1]"
|
||||
) and self.file_handler.read_new_config("allow-localhost"):
|
||||
return True
|
||||
if self.hosts is None:
|
||||
return True
|
||||
if host not in self.hosts:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
#
|
||||
# class ProxyServer:
|
||||
# def __init__(
|
||||
@@ -299,10 +319,10 @@ class WebServer:
|
||||
):
|
||||
self.http_port = int(http_port)
|
||||
self.https_port = int(https_port)
|
||||
self.cert_file = cert_file
|
||||
self.key_file = key_file
|
||||
self.file_handler = FileHandler()
|
||||
self.parser = RequestParser()
|
||||
self.cert_file = self.file_handler.read_new_config("cert") or cert_file
|
||||
self.key_file = self.file_handler.read_new_config("key") or key_file
|
||||
self.skip_ssl = False
|
||||
|
||||
# me when no certificate and key file
|
||||
@@ -387,6 +407,8 @@ class WebServer:
|
||||
if self.skip_ssl is True:
|
||||
print("WARN: You have enabled HTTPS without SSL!!")
|
||||
yn = input("Is this intended behaviour? [y/N] ")
|
||||
if yn.lower() == "n":
|
||||
exit(1)
|
||||
https_thread.start()
|
||||
# ipv6https_thread.start()
|
||||
if http is True:
|
||||
@@ -455,7 +477,9 @@ class WebServer:
|
||||
data = conn.recv(512)
|
||||
request = data.decode(errors="ignore")
|
||||
if not data:
|
||||
response = self.build_response(400, "Bad Request") # user did fucky-wucky
|
||||
response = self.build_response(
|
||||
400, "Bad Request"
|
||||
) # user did fucky-wucky
|
||||
elif len(data) > 8192:
|
||||
response = self.build_response(413, "Request too long")
|
||||
else:
|
||||
@@ -485,9 +509,7 @@ class WebServer:
|
||||
)
|
||||
break
|
||||
else:
|
||||
return self.build_response(
|
||||
400, self.no_host_req_response.encode()
|
||||
)
|
||||
return self.build_response(400, self.no_host_req_response.encode())
|
||||
|
||||
for line in data.splitlines():
|
||||
if "User-Agent" in line:
|
||||
@@ -499,9 +521,7 @@ class WebServer:
|
||||
)
|
||||
break
|
||||
else:
|
||||
return self.build_response(
|
||||
400, "You cannot connect without a User-Agent."
|
||||
)
|
||||
return self.build_response(400, "You cannot connect without a User-Agent.")
|
||||
|
||||
method, path, version = self.parser.parse_request_line(request_line)
|
||||
|
||||
@@ -515,12 +535,16 @@ class WebServer:
|
||||
self.parser = RequestParser()
|
||||
return self.build_response(302, "")
|
||||
|
||||
if not self.parser.is_method_allowed(
|
||||
method
|
||||
):
|
||||
if not self.parser.is_method_allowed(method):
|
||||
return self.build_response(405, self.http_405_html)
|
||||
|
||||
directory = self.file_handler.read_new_config("directory", host)
|
||||
if ":" in host:
|
||||
host2 = host.rsplit(":", 1)[0]
|
||||
|
||||
directory = (
|
||||
self.file_handler.read_new_config("directory", host2)
|
||||
or self.file_handler.base_dir
|
||||
)
|
||||
|
||||
file_content, mimetype = self.file_handler.read_file(path, directory)
|
||||
|
||||
@@ -541,7 +565,9 @@ class WebServer:
|
||||
mimetype = mimetype[0]
|
||||
if mimetype is None:
|
||||
# We have to assume it's binary.
|
||||
return self.build_binary_response(200, file_content, "application/octet-stream")
|
||||
return self.build_binary_response(
|
||||
200, file_content, "application/octet-stream"
|
||||
)
|
||||
if "text/" not in mimetype:
|
||||
return self.build_binary_response(200, file_content, mimetype)
|
||||
|
||||
@@ -555,7 +581,7 @@ class WebServer:
|
||||
403: "Forbidden",
|
||||
404: "Not Found",
|
||||
405: "Method Not Allowed",
|
||||
500: "Internal Server Error"
|
||||
500: "Internal Server Error",
|
||||
}
|
||||
status_message = messages.get(status_code)
|
||||
headers = (
|
||||
@@ -586,7 +612,7 @@ class WebServer:
|
||||
405: "Method Not Allowed",
|
||||
413: "Payload Too Large",
|
||||
500: "Internal Server Error",
|
||||
635: "Go Away"
|
||||
635: "Go Away",
|
||||
}
|
||||
status_message = messages.get(status_code)
|
||||
|
||||
@@ -607,7 +633,9 @@ class WebServer:
|
||||
# Why not 307, Moved Permanently? Because browsers will cache the
|
||||
# response and not send the reload command.
|
||||
host = self.file_handler.read_config("host")[0]
|
||||
port = self.file_handler.read_config("port-https") or self.file_handler.read_config("port")
|
||||
port = self.file_handler.read_config(
|
||||
"port-https"
|
||||
) or self.file_handler.read_config("port")
|
||||
if port != 80 and port != 443:
|
||||
if port == 8443:
|
||||
host = f"https://{host}:{port}/"
|
||||
|
||||
Reference in New Issue
Block a user