amethyst 0018
This commit is contained in:
85
pywebsrv.py
85
pywebsrv.py
@@ -56,9 +56,10 @@ except ImportError:
|
||||
# )
|
||||
pass
|
||||
|
||||
AMETHYST_BUILD_NUMBER = "0001"
|
||||
AMETHYST_BUILD_NUMBER = "0018"
|
||||
AMETHYST_REPO = "https://git.novacow.ch/Nova/PyWebServer/"
|
||||
|
||||
|
||||
class FileHandler:
|
||||
CONFIG_FILE = "pywebsrv.conf"
|
||||
new_conf = "new_conf.conf"
|
||||
@@ -76,11 +77,14 @@ class FileHandler:
|
||||
)
|
||||
exit(1)
|
||||
|
||||
def read_file(self, file_path):
|
||||
if "../" in file_path:
|
||||
def read_file(self, file_path, directory=None):
|
||||
if "../" in file_path or "%" in file_path:
|
||||
return 403, None
|
||||
|
||||
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
|
||||
if directory is not None:
|
||||
full_path = os.path.join(directory, file_path.lstrip("/"))
|
||||
else:
|
||||
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
|
||||
if not os.path.isfile(full_path):
|
||||
return 404, None
|
||||
|
||||
@@ -92,8 +96,8 @@ class FileHandler:
|
||||
print(f"Error reading file {full_path}: {e}")
|
||||
return 500, None
|
||||
|
||||
def write_file(self, file_path, data):
|
||||
if "../" in file_path:
|
||||
def write_file(self, file_path, data, directory=None):
|
||||
if "../" in file_path or "%" in file_path:
|
||||
return 403
|
||||
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
|
||||
with open(full_path, "a") as f:
|
||||
@@ -193,7 +197,6 @@ class FileHandler:
|
||||
key = key.strip()
|
||||
rest = rest.strip()
|
||||
|
||||
# Split comma-separated values (e.g. GET,PUT)
|
||||
if "," in rest:
|
||||
kv[key] = [item.strip() for item in rest.split(",")]
|
||||
else:
|
||||
@@ -216,6 +219,8 @@ class FileHandler:
|
||||
Generate some self-signed certificates using AutoCertGen
|
||||
TODO: doesn't work, need to fix. probably add `./` to $PATH
|
||||
"""
|
||||
if not os.getcwd() in sys.path:
|
||||
sys.path.append(f"{os.getcwd()}")
|
||||
autocert = AutoCertGen()
|
||||
autocert.gen_cert()
|
||||
|
||||
@@ -267,14 +272,14 @@ class RequestParser:
|
||||
Mfw im in an ugly code writing contest and my opponent is nova while writing a side project
|
||||
"""
|
||||
host = f"{host}"
|
||||
print(f"hosts: {self.hosts}, host: {host}")
|
||||
print(f"hosts: {self.hosts}, host: {host}, split: {host.rsplit(":", 1)[0]}")
|
||||
if ":" in host:
|
||||
host = host.split(":", 1)[0]
|
||||
host = host.rsplit(":", 1)[0]
|
||||
host = host.lstrip()
|
||||
host = host.rstrip()
|
||||
if (
|
||||
host == "localhost" or host == "127.0.0.1"
|
||||
) and self.file_handler.read_config("allow-localhost"):
|
||||
host == "localhost" or host == "127.0.0.1" or host == "[::1]"
|
||||
) and self.file_handler.read_new_config("allow-localhost"):
|
||||
return True
|
||||
if host not in self.hosts:
|
||||
return False
|
||||
@@ -324,14 +329,22 @@ class WebServer:
|
||||
"This host cannot be reached without sending a `Host` header."
|
||||
)
|
||||
|
||||
# ipv6 when????/??//?????//?
|
||||
self.http_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.http_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.http_socket.bind(("0.0.0.0", self.http_port))
|
||||
# TODO: enable experimental ipv6 support in config
|
||||
|
||||
self.https_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.https_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.https_socket.bind(("0.0.0.0", self.https_port))
|
||||
# ipv6 when????/??//?????//?
|
||||
# self.http_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# self.http_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
# self.http_socket.bind(("0.0.0.0", self.http_port))
|
||||
#
|
||||
# self.https_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
# self.https_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
# self.https_socket.bind(("0.0.0.0", self.https_port))
|
||||
|
||||
self.http_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
||||
self.http_socket.bind(("::", self.http_port))
|
||||
|
||||
self.https_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
|
||||
self.https_socket.bind(("::", self.https_port))
|
||||
|
||||
if self.skip_ssl is False:
|
||||
# https gets the ssl treatment!! yaaaay :3
|
||||
@@ -367,17 +380,23 @@ class WebServer:
|
||||
|
||||
http_thread = threading.Thread(target=self.start_http, daemon=True)
|
||||
https_thread = threading.Thread(target=self.start_https, daemon=True)
|
||||
# ipv6http_thread = threading.Thread(target=self.start_http_ipv6, daemon=True)
|
||||
# ipv6https_thread = threading.Thread(target=self.start_https_ipv6, daemon=True)
|
||||
|
||||
if https is True:
|
||||
if self.skip_ssl is True:
|
||||
print("WARN: You have enabled HTTPS without SSL!!")
|
||||
yn = input("Is this intended behaviour? [y/N] ")
|
||||
https_thread.start()
|
||||
# ipv6https_thread.start()
|
||||
if http is True:
|
||||
# ipv6http_thread.start()
|
||||
http_thread.start()
|
||||
|
||||
http_thread.join()
|
||||
https_thread.join()
|
||||
# ipv6http_thread.join()
|
||||
# ipv6https_thread.join()
|
||||
|
||||
def start_http(self):
|
||||
self.http_socket.listen(5)
|
||||
@@ -391,6 +410,32 @@ class WebServer:
|
||||
except OSError:
|
||||
break
|
||||
|
||||
def start_http_ipv6(self):
|
||||
self.ipv6http_socket.listen(5)
|
||||
print(f"IPv6 HTTP server listening on port {self.http_port}...")
|
||||
while self.running:
|
||||
try:
|
||||
conn, addr = self.ipv6http_socket.accept()
|
||||
self.handle_connection(conn, addr)
|
||||
except Exception as e:
|
||||
print(f"HTTP error: {e}")
|
||||
except OSError:
|
||||
break
|
||||
|
||||
def start_https_ipv6(self):
|
||||
self.ipv6https_socket.listen(5)
|
||||
print(f"IPv6 HTTPS server listening on port {self.https_port}...")
|
||||
while self.running:
|
||||
try:
|
||||
conn, addr = self.ipv6https_socket.accept()
|
||||
self.handle_connection(conn, addr)
|
||||
except Exception as e:
|
||||
print(
|
||||
f"HTTPS error: {e}"
|
||||
) # be ready for ssl errors if you use a self-sign!!
|
||||
except OSError:
|
||||
break
|
||||
|
||||
def start_https(self):
|
||||
self.https_socket.listen(5)
|
||||
print(f"HTTPS server listening on port {self.https_port}...")
|
||||
@@ -475,7 +520,9 @@ class WebServer:
|
||||
):
|
||||
return self.build_response(405, self.http_405_html)
|
||||
|
||||
file_content, mimetype = self.file_handler.read_file(path)
|
||||
directory = self.file_handler.read_new_config("directory", host)
|
||||
|
||||
file_content, mimetype = self.file_handler.read_file(path, directory)
|
||||
|
||||
if file_content == 403:
|
||||
print("WARN: Directory traversal attack prevented.") # look ma, security!!
|
||||
|
||||
Reference in New Issue
Block a user