11 Commits

Author SHA1 Message Date
Nova 96eba42c04 almost ready now pwease? uwu 2026-03-28 23:50:34 +01:00
Nova b48b682da7 html, branding change and getting ready for pre-rel 2026-03-24 22:50:26 +01:00
Nova 97896c87a7 idk anymore man 2026-03-13 22:13:59 +01:00
Nova 127612d408 smartass 2026-03-03 22:27:56 +01:00
Nova 3ff7a33695 something pog 2026-03-03 22:26:19 +01:00
Nova 04ec2ebec1 holy fuck shit finally works!!!
alpha build 0039
2026-03-02 21:38:28 +01:00
Nova 36c8c95efe amethyst 0018 2026-01-30 08:41:30 +01:00
Nova f5dafb689e Build 0001, first Amethyst version that starts! 2025-08-20 23:51:31 +02:00
Nova 4eada65040 First testing version of what will become 2.0
Partial new config functionality.
2025-08-20 15:39:14 +02:00
Nova 4d4a44fd06 v1.4, first parts of 2.0 are merged in 2025-08-20 13:58:58 +02:00
Nova a36141edd0 release for 1.3.0
minor update, got ua blocking working and match statements in config
2025-07-22 16:35:39 +02:00
7 changed files with 792 additions and 589 deletions
+27 -45
View File
@@ -1,51 +1,33 @@
# PyWebServer # Amethyst Web Server
## GitHub ## A word of warning!
The upstream of this project is on my own [Gitea instance](https://git.novacow.ch/Nova/PyWebServer/). Currently Amethyst is in very early alpha stage, a lot of things will be broken, names won't be correct,
Because of that I'll mostly reply to issues and PRs there, you can submit issues and PRs on GitHub, but it might take longer before I read it. promised features missing, but I'm very much working on it live!
Every save I do increments the build number by 1, I won't publish all of them, but most of them will be published.
Once a milestone is hit (e.g. a new feature fully implemented), I'll publish a release!
## Installing ## Currently working features:
Installing and running PyWebServer is very simple. * New configuration is ~95% done, most features work.
Assuming you're running Linux: * Fixed **A LOT** of unreported bugs from the old code.
```bash * More resilliency against errors.
git clone https://git.novacow.ch/Nova/PyWebServer.git * Improved security.
cd ./PyWebServer/ * Proxy almost working!
```
Windows users, make sure you have installed Git, from there:
```powershell
git clone https://git.novacow.ch/Nova/PyWebServer.git
Set-Location .\PyWebServer\
```
Then, open `pywebsrv.conf` in your favorite text editor and change the `directory` key to the full path where your files are stored.
After that, put your files in and run this:
Linux:
```bash
python3 /path/to/pywebsrv.py
```
Windows:
```powershell
# If you have installed Python via the Microsoft Store:
python3 \path\to\pywebsrv.py
# Via the python.org website:
py \path\to\pywebsrv.py
```
## SSL Support ## Project status:
PyWebServer supports SSL/TLS for authentication via HTTPS. In the config file, you should enable the HTTPS port. After that you need to create the certificate. Amethyst will stay in beta for a while, I want all features to work, but I will make pre-release versions that are mostly stable.
Currently PyWebServer looks for the `cert.pem` and the `key.pem` files in the root directory of the installation. They can be found as the `amethyst-prerel-0.a.b` releases. I won't guarantee 100% stability, but waay more than just some random build.
## HTTP support ## Install instructions:
Currently PyWebServer only supports HTTP/1.1, this is very unlikely to change, as most of the modern web today still uses HTTP/1.1. Install Python, execute `amethyst.py` and change the provided config.
For methods PyWebServer only supports `GET`, this is being reworked though, check issue [#3](https://git.novacow.ch/Nova/PyWebServer/issues/3) for progress.
## Files support ## Minimum requirements:
Unlike other small web servers, PyWebServer has full support for binary files being sent and received (once that logic is put in) over HTTP(S). Python 3.8+
And whatever PC that happens to run that.
I recommend Python 3.10 or above though, with a PC running:
* Windows 8.1+
* macOS 10.15+
* Linux 4.19+
* FreeBSD 13.2R+
* Some other somewhat recent OS.
## Support ## Currently W.I.P. Check back later!
PyWebServer will follow a standard support scheme.
### 1.x
For every 1.x version there will be support until 2 newer versions come out.
So that means that 1.0 will still be supported when 1.1 comes out, but no longer be supported when 1.2 comes out.
### 2.x
I am planning on releasing a 2.x version with will have a lot more advanced features, like nginx's server block emulation amongst other things.
When 2.0 will come out, the last version of 1.x will be supported for a while longer, but no new features will be added.
+16
View File
@@ -0,0 +1,16 @@
# WARNING: This is an alpha spec of NSCL 2.0!!
host * {
directory:./html
apimode:0
block-ua:match("Discordbot")
}
globals {
http:1
https:1
port:8080
https-port:8443
key:./key.pem
cert./cert.pem
}
+717
View File
@@ -0,0 +1,717 @@
"""
License:
PyWebServer
Copyright (C) 2025 Nova
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Contact:
E-mail: nova@novacow.ch
NOTE: Once 2.0 is released, PyWebServer will become the Amethyst Web Server
This is PyWebServer, an ultra minimalist webserver, meant to still have
a lot standard webserver features. A comprehensive list is below:
Features:
HTTP and HTTPS support.
Automatically generate certificates using AutoCertGen plugin.
Blocking per host.
Easy port configuration.
Easy to understand documentation and configuration.
Very small size, compared to something like Apache and NGINX.
No compromise(-ish) security:
Directory traversal attack prevention.
No fuss HTTPS setup.
Per-host blocking.
Ability for per-IP blocking.
Ability for per-UA blocking.
Simple to understand and mod codebase.
All GNU GPL-3-or-above license. (Do with it what you want.)
Library aswell as a standalone script:
You can easily get access to other parts of the script if you need it.
TODO: actually put normal comments in
TODO: INPROG: add typing to all code, new code will feature it by default.
"""
import os
import mimetypes
import threading
import ssl
import socket
# import re
import signal
import sys
try:
if not os.getcwd() in sys.path:
sys.path.append(f"{os.getcwd()}")
from .certgen import AutoCertGen
except ImportError:
# just do nothing, it's not working anyway.
print(
"WARN: You need the AutoCertGen plugin! Please install it from\n"
"https://git.novacow.ch/Nova/AutoCertGen/"
)
# pass
AMETHYST_BUILD_NUMBER = "b0.2.0-0072"
AMETHYST_REPO = "https://git.novacow.ch/Nova/PyWebServer/"
class ConfigParser:
def __init__(self, text):
self.data: dict = {"hosts": {}, "globals": {}}
self._parse(text)
def _parse(self, text):
lines: list = [
line.strip()
for line in text.splitlines()
if line.strip() and not line.strip().startswith("#")
]
current_block: tuple | None = None
current_name: str | None = None
for line in lines:
if line.startswith("host ") and line.endswith("{"):
current_name = line.split()[1]
self.data["hosts"][current_name] = {}
current_block = ("host", current_name)
continue
if line == "globals {":
current_block = ("globals", None)
continue
if line == "}":
current_block = None
current_name = None
continue
if ":" in line and current_block:
key, value = line.split(":", 1)
key: str = key.strip()
value: str = value.strip()
if "," in value:
value = [v.strip() for v in value.split(",")]
if current_block[0] == "host":
self.data["hosts"][current_name][key] = value
else:
self.data["globals"][key] = value
def query_config(self, key, host=None):
if host:
return self.data["hosts"].get(host, {}).get(key)
if key == "hosts":
print(f"\n\n\nHosts!\nHosts: {self.data['hosts']}\n\n\n")
return list(self.data["hosts"].keys())
return self.data["globals"].get(key)
class FileHandler:
def __init__(self, base_dir=None):
# this is a fucking clusterfuck.
self.config_file = "amethyst.conf"
self.config_path = os.path.join(os.getcwd(), self.config_file)
with open(self.config_path, "r") as f:
self.cfg = ConfigParser(f.read())
self.base_dir = self.read_config("directory")
if not os.path.exists(self.config_path):
# uuh???
print(
"The amethyst.conf file needs to be in the same directory "
"as amethyst.py! Get the default config file from:\n"
"https://git.novacow.ch/Nova/PyWebServer/raw/branch/2.0/amethyst.conf"
)
exit(1)
# TODO: fix this please!!
def read_file(self, file_path, directory=None):
if "../" in file_path or "%" in file_path:
return 403, None
if file_path == "api.py":
return 404, None
if directory is not None:
full_path = os.path.join(directory, file_path.lstrip("/"))
else:
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
if not os.path.isfile(full_path):
return 404, None
try:
mimetype = mimetypes.guess_type(full_path)
with open(full_path, "rb") as f:
return f.read(), mimetype
except Exception as e:
print(f"Error reading file {full_path}: {e}")
return 500, None
def write_file(self, file_path, data, directory=None):
if "../" in file_path or "%" in file_path:
return 403
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
with open(full_path, "a") as f:
f.write(data)
return 0
def read_config(self, key, host_name=None):
print(
f"\n\n\nQuery!\nkey: {key}\nhost_name: {host_name}\nret: {self.cfg.query_config(key, host_name)}"
)
return self.cfg.query_config(key, host_name)
def autocert(self):
"""
Generate some self-signed certificates using AutoCertGen
TODO: doesn't work, need to fix. probably add `./` to $PATH
"""
autocert = AutoCertGen()
autocert.gen_cert()
class RequestParser:
def __init__(self):
self.file_handler = FileHandler()
self.hosts = self.file_handler.read_config("hosts")
print(f"Hosts: {self.hosts}")
def parse_request_line(self, line, host):
"""Parses the HTTP request line."""
try:
method, path, version = line.split(" ")
except ValueError:
return None, None, None
if path.endswith("/") or ("." not in path):
if not path.endswith("/"):
path += "/"
index = self.file_handler.read_config("index", host) or "index.html"
path += f"{index}"
return method, path, version
def parse_match_blocks(self, to_parse: str | list):
if isinstance(to_parse, str):
to_parse = [to_parse]
match = []
literal = []
for block in to_parse:
if block.startswith('match("'):
adx = block[7:-2]
match.append(adx)
else:
literal.append(block)
return match, literal
def ua_is_allowed(self, ua, host=None):
"""Parses and matches UA to block"""
# return True
_list = self.file_handler.read_config("block-ua", host)
if _list is None:
return True
match, literal = self.parse_match_blocks(_list)
if ua in literal:
return False
for _ua in match:
if _ua.lower() in ua.lower():
return False
return True
def is_method_allowed(self, method, host=None):
"""
Checks if the HTTP method is allowed.
Reads allowed methods from a configuration file.
Falls back to allowing only 'GET' if the file does not exist.
Should (for now) only be GET as I haven't implemented the logic for PUT
"""
allowed_methods = self.file_handler.read_config("allowed-methods", host)
if allowed_methods is None:
allowed_methods = ["GET"]
return method in allowed_methods
def host_parser(self, host):
"""
Parses the host and makes sure it's allowed in
Mfw im in an ugly code writing contest and my opponent is nova while writing a side project
"""
host = f"{host}"
print(f"hosts: {self.hosts}, host: {host}, split: {host.rsplit(':', 1)[0]}")
if ":" in host:
host = host.rsplit(":", 1)[0]
host = host.lstrip()
host = host.rstrip()
if self.hosts is None:
return True
if host not in self.hosts:
if "*" in self.hosts:
return "catchall"
return False
else:
return True
class ProxyServer:
def __init__(self, fh):
self.file_handler: FileHandler = fh
def try_connection(
self, host: str, port: int, data: bytes, chost: str, force_tls: bool = None
):
if port in [443, 8443, 9443]:
do_tls = True
else:
if force_tls is True:
do_tls = True
else:
do_tls = False
print(f"\n\n\nchost: {chost}\n\n\n")
nhost = self.file_handler.read_config("proxy", chost)
print(f"\n\n\nnhost: {nhost}\n\n\n")
if ":" in nhost:
nport = int(nhost.split(":")[1])
nhost = nhost.split(":")[0]
else:
nport = port
print(f"{nhost}, {nport}, {data}")
data = self.reset_host(nhost, nport, data)
try:
return self.tcp_send(host, port, data, do_tls)
except Exception:
if do_tls is False:
print("Retrying with TLS...")
return self.try_connection(host, port, data, chost, True)
else:
raise
@staticmethod
def reset_host(host: str, port: int, data: bytes):
data = data.decode()
data = data.splitlines()
for line in data:
print(line)
if line.startswith("Host:"):
if port not in [80, 443]:
new_line = f"Host: {host}:{port}"
else:
new_line = f"Host: {host}"
idx = data.index(line)
data[idx] = new_line
print(f"\n\n\n{idx}\n\n\n")
if line.startswith("Connection:"):
idx = data.index(line)
new_line = "Connection: close"
data[idx] = new_line
data = "\r\n".join(data)
data = f"{data}\r\n\r\n"
print(data)
return data.encode()
# return data
@staticmethod
def create_tls_context():
# Create a context that by default verifies with system CAs
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
def tcp_send(self, host, port, data: bytes, do_tls: bool):
try:
with socket.create_connection((host, port), timeout=10) as raw_sock:
raw_sock.settimeout(10)
if do_tls:
ctx = self.create_tls_context()
server_hostname = host
with ctx.wrap_socket(
raw_sock, server_hostname=server_hostname
) as ssock:
ssock.sendall(data)
print("data reached")
return ssock.recv(512000)
else:
raw_sock.sendall(data)
return raw_sock.recv(512000)
except Exception:
raise
class WebServer:
def __init__(
self, http_port=8080, https_port=8443, cert_file="cert.pem", key_file="key.pem"
):
self.http_port = int(http_port)
self.https_port = int(https_port)
self.file_handler = FileHandler()
self.parser = RequestParser()
self.cert_file = self.file_handler.read_config("cert") or cert_file
self.key_file = self.file_handler.read_config("key") or key_file
self.skip_ssl = False
# me when no certificate and key file
if not os.path.exists(self.cert_file) or not os.path.exists(self.key_file):
if not os.path.exists(self.cert_file) and not os.path.exists(self.key_file):
pass
# maybe warn users we purge their key/cert files? xdd
elif not os.path.exists(self.cert_file):
os.remove(self.key_file)
elif not os.path.exists(self.key_file):
os.remove(self.cert_file)
print("WARN: No HTTPS certificate was found!")
if self.file_handler.read_config("disable-autocertgen") is True:
print("WARN: AutoCertGen is disabled, ignoring...")
self.skip_ssl = True
else:
choice = input("Do you wish to generate an HTTPS certificate? [y/N] ")
if choice.lower() == "y":
self.file_handler.autocert()
else:
self.skip_ssl = True
self.no_host_req_response = (
"This host cannot be reached without sending a `Host` header."
)
self.http_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.http_socket.bind(("::", self.http_port))
self.https_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.https_socket.bind(("::", self.https_port))
self.proxy_handler = ProxyServer(self.file_handler)
if self.skip_ssl is False:
# https gets the ssl treatment!! yaaaay :3
self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ssl_context.load_cert_chain(
certfile=self.cert_file, keyfile=self.key_file
)
self.https_socket = self.ssl_context.wrap_socket(
self.https_socket, server_side=True
)
self.http_404_html = (
"<html><head><title>HTTP 404 - Amethyst</title></head>"
f"<body><center><h1>HTTP 404 - Not Found!</h1><p>Running Amethyst/build-{AMETHYST_BUILD_NUMBER}</p>"
"</center></body></html>"
)
self.http_403_html = (
"<html><head><title>HTTP 403 - Amethyst</title></head>"
f"<body><center><h1>HTTP 403 - Forbidden</h1><p>Running Amethyst/build-{AMETHYST_BUILD_NUMBER}</p>"
"</center></body></html>"
)
self.http_405_html = (
"<html><head><title>HTTP 405 - Amethyst</title></head>"
f"<body><center><h1>HTTP 405 - Method not allowed</h1><p>Running Amethyst/build-{AMETHYST_BUILD_NUMBER}</p>"
"</center></body></html>"
)
self.running = True
def start(self, http, https):
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
http_thread = threading.Thread(target=self.start_http, daemon=True)
https_thread = threading.Thread(target=self.start_https, daemon=True)
if https is True:
if self.skip_ssl is True:
print("WARN: You have enabled HTTPS without SSL!!")
yn = input("Is this intended behaviour? [y/N] ")
if yn.lower() == "n":
exit(1)
https_thread.start()
if http is True:
http_thread.start()
http_thread.join()
https_thread.join()
def start_http(self):
self.http_socket.listen(5)
print(f"HTTP server listening on port {self.http_port}...")
while self.running:
try:
conn, addr = self.http_socket.accept()
self.handle_connection(conn, addr)
except Exception as e:
print(f"HTTP error: {e}")
except OSError:
break
def start_https(self):
self.https_socket.listen(5)
print(f"HTTPS server listening on port {self.https_port}...")
while self.running:
try:
conn, addr = self.https_socket.accept()
self.handle_connection(conn, addr)
except Exception as e:
print(
f"HTTPS error: {e}"
) # be ready for ssl errors if you use a self-sign!!
except OSError:
break
def handle_connection(self, conn, addr):
try:
data = conn.recv(32768)
request = data.decode(errors="ignore")
if not data:
response = self.build_response(
400, "Bad Request"
) # user did fucky-wucky
elif len(data) > 8192:
response = self.build_response(413, "Request too long")
else:
response = self.handle_request(request, addr)
if isinstance(response, str):
response = response.encode()
print(len(response))
conn.sendall(response)
except Exception as e:
print(f"Error handling connection: {e}")
response = self.build_response(
500,
"Amethyst is currently unable to serve your request. Below is debug info.\r\n"
f"Error: {e}; Version: amethyst-b{AMETHYST_BUILD_NUMBER}\r\n"
"You cannot do anything at this time, the server owner has made a misconfiguration or there is a bug in the program",
)
conn.sendall(response)
finally:
conn.close()
def handle_request(self, data, addr):
print(f"data: {data}")
request_line = data.splitlines()[0]
# Extract host from headers, never works though
for line in data.splitlines():
if "Host" in line:
host = line.split(":", 1)[1].strip()
allowed = self.parser.host_parser(host)
if allowed == "catchall":
host = "*"
allowed = True
if not allowed:
return self.build_response(
403, "Connecting via this host is disallowed."
)
break
else:
return self.build_response(400, self.no_host_req_response.encode())
for line in data.splitlines():
if "User-Agent" in line:
ua = line.split(":", 1)[1].strip()
allowed = self.parser.ua_is_allowed(ua, host)
if not allowed:
return self.build_response(
403, "This UA has been blocked by the owner of this site."
)
break
else:
return self.build_response(400, "You cannot connect without a User-Agent.")
if ":" in host:
host = host.rsplit(":", 1)[0]
else:
host = host
method, path, version = self.parser.parse_request_line(request_line, host)
if not all([method, path, version]):
return self.build_response(400, "Bad Request")
if self.file_handler.read_config("proxy", host) is not None:
orig_host = host
value = self.file_handler.read_config("proxy", host)
if ":" in value:
host = value.split(":")[0]
port = int(value.split(":")[1])
else:
host = value
port = 443
return self.proxy_handler.try_connection(
host,
port,
data.encode(),
orig_host,
)
# Figure out a better way to reload config
if path == "/?pywebsrv_reload_conf=1":
print("Got reload command! Reloading configuration...")
self.file_handler = FileHandler()
self.parser = RequestParser()
return self.build_response(302, "", host=host)
if not self.parser.is_method_allowed(method):
return self.build_response(405, self.http_405_html)
directory = (
self.file_handler.read_config("directory", host)
or self.file_handler.base_dir
)
if self.file_handler.read_config("apimode", host) is True:
if not os.path.join(os.getcwd(), directory) in sys.path:
sys.path.append(f"{os.path.join(os.getcwd(), directory)}")
import api
apiclass = api.API()
return apiclass.on_request(data)
file_content, mimetype = self.file_handler.read_file(path, directory)
if file_content == 403:
print("WARN: Directory traversal attack prevented.") # look ma, security!!
return self.build_response(403, self.http_403_html)
if file_content == 404:
return self.build_response(404, self.http_404_html)
if file_content == 500:
return self.build_response(
500,
"Amethyst has encountered a fatal error and cannot serve "
"your request. Contact the owner with this error: FATAL_FILE_RO_ACCESS",
) # When there was an issue with reading we throw this.
mimetype = mimetype[0]
if mimetype is None:
# We have to assume it's binary.
return self.build_binary_response(
200, file_content, "application/octet-stream"
)
if "text/" not in mimetype:
return self.build_binary_response(200, file_content, mimetype)
return self.build_response(200, file_content)
@staticmethod
def build_binary_response(status_code, binary_data, content_type):
"""Handles binary files like MP3s."""
messages = {
200: "OK",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
500: "Internal Server Error",
}
status_message = messages.get(status_code)
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
f"Server: Amethyst/amethyst-build-{AMETHYST_BUILD_NUMBER}\r\n"
f"Content-Type: {content_type}\r\n"
f"Content-Length: {len(binary_data)}\r\n"
f"Connection: close\r\n\r\n"
# Connection close is done because it is way easier to implement.
# It's not like this program will see production use anyway.
)
return headers.encode() + binary_data
@staticmethod
def build_response(status_code, body, host=None):
"""
For textfiles we'll not have to guess MIME-types, though the other function
build_binary_response will be merged in here anyway.
"""
messages = {
200: "OK",
204: "No Content",
302: "Found",
304: "Not Modified", # TODO KEKL
400: "Bad Request",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
413: "Payload Too Large",
500: "Internal Server Error",
621: "fuck off! :3",
}
status_message = messages.get(status_code)
if isinstance(body, str):
body = body.encode()
# TODO: dont encode yet, and i encode. awesome comments here.
# Don't encode yet, if 302 status code we have to include location.
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
f"Server: Amethyst/build-{AMETHYST_BUILD_NUMBER}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n\r\n"
).encode()
if status_code == 302:
# 302 currently only happens when the reload is triggered.
# Why not 307, Moved Permanently? Because browsers will cache the
# response and not send the reload command.
# if port == 443:
# host = f"https://{host}/"
# else:
# host = f"http://{host}/"
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
f"Location: {host}\r\n"
f"Server: Amethyst/build-{AMETHYST_BUILD_NUMBER}\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n\r\n"
).encode()
if status_code == 621:
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
"Server: Amethyst/build-0621\r\n"
"Content-Length: 30\r\n"
f"Connection: close\r\n\r\n"
)
body = "https://e621.net/posts/6155664"
print(f"{headers + body}")
return headers + body
def shutdown(self, signum, frame):
print("\nRecieved signal to exit!\nShutting down server...")
self.running = False
self.http_socket.close()
self.https_socket.close()
sys.exit(0)
def main():
print(
"WARNING!!\n"
f"This is Amethyst alpha build {AMETHYST_BUILD_NUMBER}\n"
"Since this is an alpha version of Amethyst, most features aren't working!\n"
"These builds are also very verbose and will spit out a lot on the terminal. "
"As you can imagine, this is for debugging purposes.\n"
"THERE IS ABSOLUTELY NO SUPPORT FOR THESE VERSIONS!\n"
"DO NOT USE THEM IN PRODUCTION SETTINGS!\n"
f"Please report any bugs on {AMETHYST_REPO}\n"
)
input("Press <Enter> to continue. ")
file_handler = FileHandler()
file_handler.base_dir = file_handler.read_config("directory")
http_port = file_handler.read_config("port") or 8080
https_port = file_handler.read_config("https-port") or 8443
http_enabled = bool(file_handler.read_config("http")) or True
print(http_enabled)
https_enabled = bool(file_handler.read_config("https")) or False
print(https_enabled)
server = WebServer(http_port=http_port, https_port=https_port)
server.start(http_enabled, https_enabled)
if __name__ == "__main__":
main()
+26
View File
@@ -0,0 +1,26 @@
"""
This is the Amethyst API mode Python interface whatevers.
Docs will follow.
"""
# Below go imports.
import sys
import os
if not os.getcwd() in sys.path:
sys.path.append(os.getcwd())
import pywebsrv
class API:
"""
class
"""
def __init__(self):
# DO NOT USE THIS CLASS FOR PROGRAM, ONLY ON_REQUEST PLEASE!!
# Below go definitions to get things working.
self.build_response = pywebsrv.WebServer.build_binary_response
def on_request(self, req):
return self.build_response(200, "This is a test", "text/html")
+6 -5
View File
@@ -4,10 +4,11 @@
<title>Test page</title> <title>Test page</title>
</head> </head>
<body> <body>
<h1>Hey there!</h1> <center>
<h2>You're seeing this page because you haven't set up PyWebServer yet!</h2> <h1>Hello from Amethyst!</h1>
<h2>This page confirms that PyWebServer can read and serve files from your PC.</h2> <h2>This page confirms Amethyst can read files from your PC or server and serve them to your browser!</h2>
<h2>To make this go away, please edit the file `pywebsrv.conf` and edit the `directory` key to your directory of choice!</h2> <p>This is a test page, if you aren't the server owner, they might not have finished setting up their site, be patient. If this doesn't go away after a while, tell them they've made an oopsie</p>
<p>Here you can simulate a 404 error: <a href="/uuh">Click me for a 404 error!</a></p> <p>This server runs Amethyst Pre-Rel 0.2.0-0072</p>
</center>
</body> </body>
</html> </html>
-33
View File
@@ -1,33 +0,0 @@
# Using NSCL 1.3
# Port defenition. What ports to use.
# port is the HTTP port, port-https is the HTTPS port
port:8080
port-https:8443
# Here you choose what directory PyWebServer looks in for files.
directory:<Enter directory here>
# Host defenition, what hosts you can connect via.
# You can use FQDNs, IP-addresses and localhost,
# Support for multiple hosts is coming.
host:localhost
# Enables HTTP support. (Only enables/disables the HTTP port.)
http:1
# Enables HTTPS support. (Only enables/disables the HTTPS port.)
https:1
# Allows the use of localhost to connect.
# The default is on, this is seperate of the host defenition.
allow-localhost:1
# If you're using the webserver in a library form,
# you can disable the AutoCertGen and never trigger it.
disable-autocertgen:0
# If you wish to block IP-addresses, this function is coming though.
# block-ip:0.0.0.0,1.1.1.1,2.2.2.2
# If you wish to block User-Agents, this function is coming though.
# block-ua:(NULL)
# TEST: experimental non-defined keys go here:
# keyfile key
key-file:/home/nova/PyWebServer/key.pem
# certfile keys
cert-file:/home/nova/PyWebServer/cert.pem
# allowed-methods, csv's
allowed-methods:GET
-506
View File
@@ -1,506 +0,0 @@
"""
License:
PyWebServer
Copyright (C) 2025 Nova
This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>.
Contact:
E-mail: nova@novacow.ch
This is PyWebServer, an ultra minimalist webserver, meant to still have
a lot standard webserver features. A comprehensive list is below:
Features:
HTTP and HTTPS support.
Automatically generate certificates using AutoCertGen plugin.
Blocking per host.
Easy port configuration.
Easy to understand documentation and configuration.
Very small size, compared to something like Apache and NGINX.
No compromise(-ish) security:
Directory traversal attack prevention.
No fuss HTTPS setup.
Per-host blocking.
Ability for per-IP blocking.
Ability for per-UA blocking.
Simple to understand and mod codebase.
All GNU GPL-3-or-above license. (Do with it what you want.)
Library aswell as a standalone script:
You can easily get access to other parts of the script if you need it.
TODO: actually put normal comments in
"""
import os
import mimetypes
import threading
import ssl
import socket
import signal
import sys
try:
from certgen import AutoCertGen
except ImportError:
print(
"WARN: You need the AutoCertGen plugin! Please install it from\n"
"https://git.novacow.ch/Nova/AutoCertGen/"
)
class FileHandler:
CONFIG_FILE = "pywebsrv.conf"
DEFAULT_CONFIG = (
"port:8080\nport-https:8443\nhttp:1"
"\nhttps:0\ndirectory:{cwd}\nhost:localhost"
"\nallow-localhost:1"
)
def __init__(self, base_dir=None):
self.config_path = os.path.join(os.getcwd(), self.CONFIG_FILE)
self.base_dir = self.read_config("directory")
def check_first_run(self):
if not os.path.isfile(self.config_path):
self.on_first_run()
return True
return False
def on_first_run(self):
with open(self.config_path, "w") as f:
f.write(self.DEFAULT_CONFIG.format(cwd=os.getcwd()))
def read_file(self, file_path):
if "../" in file_path:
return 403, None
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
if not os.path.isfile(full_path):
return 404, None
try:
mimetype = mimetypes.guess_type(full_path)
with open(full_path, "rb") as f:
return f.read(), mimetype
except Exception as e:
print(f"Error reading file {full_path}: {e}")
return 500, None
def write_file(self, file_path, data):
if "../" in file_path:
return 403
full_path = os.path.join(self.base_dir, file_path.lstrip("/"))
with open(full_path, "a") as f:
f.write(data)
return 0
def read_config(self, option):
"""
clean code, whats that????
TODO: docs
"""
option = option.lower()
valid_options = [
"port",
"directory",
"host",
"http",
"https",
"port-https",
"allow-localhost",
"disable-autocertgen",
"key-file",
"cert-file"
]
if option not in valid_options:
return None
with open(self.config_path, "r") as f:
for line in f:
if line.startswith("#"):
continue
try:
key, value = line.strip().split(":", 1)
except ValueError:
return None
key = key.lower()
if key == option:
if option == "host":
seperated_values = value.split(",", -1)
return [value.lower() for value in seperated_values]
if option == "port" or option == "port-https":
return int(value)
if (
option == "http"
or option == "https"
or option == "allow-localhost"
or option == "disable-autocertgen"
):
return bool(int(value))
if option == "directory":
if value == "<Enter directory here>":
return os.path.join(os.getcwd(), "html")
if value.endswith("/"):
value = value.rstrip("/")
return value
return value
return None
def autocert(self):
"""
Generate some self-signed certificates using AutoCertGen
"""
autocert = AutoCertGen()
autocert.gen_cert()
class RequestParser:
def __init__(self):
self.file_handler = FileHandler()
self.hosts = self.file_handler.read_config("host")
def parse_request_line(self, line):
"""Parses the HTTP request line."""
try:
method, path, version = line.split(" ")
except ValueError:
return None, None, None
if path.endswith("/"):
path += "index.html"
return method, path, version
def is_method_allowed(self, method):
"""
Checks if the HTTP method is allowed.
Reads allowed methods from a configuration file.
Falls back to allowing only 'GET' if the file does not exist.
Should (for now) only be GET as I haven't implemented the logic for PUT
"""
allowed_methods = ["GET"]
# While the logic for PUT, DELETE, etc. is not added, we shouldn't
# allow for it to attempt it.
# Prepatched for new update.
# allowed_methods = self.file_handler.read_config("allowed-methods")
return method in allowed_methods
def host_parser(self, host):
"""
Parses the host and makes sure it's allowed in
Mfw im in an ugly code writing contest and my opponent is nova while writing a side project
"""
host = f"{host}"
if ":" in host:
host = host.split(":", 1)[0]
host = host.lstrip()
host = host.rstrip()
if (
host == "localhost" or host == "127.0.0.1"
) and self.file_handler.read_config("allow-localhost"):
return True
if host not in self.hosts:
return False
else:
return True
class WebServer:
def __init__(
self, http_port=8080, https_port=8443, cert_file="cert.pem", key_file="key.pem"
):
self.http_port = http_port
self.https_port = https_port
self.cert_file = cert_file
self.key_file = key_file
self.file_handler = FileHandler()
self.parser = RequestParser()
self.skip_ssl = False
# me when no certificate and key file
if not os.path.exists(self.cert_file) or not os.path.exists(self.key_file):
if not os.path.exists(self.cert_file) and not os.path.exists(self.key_file):
pass
# maybe warn users we purge their key/cert files? xdd
elif not os.path.exists(self.cert_file):
os.remove(self.key_file)
elif not os.path.exists(self.key_file):
os.remove(self.cert_file)
print("WARN: No HTTPS certificate was found!")
if self.file_handler.read_config("disable-autocertgen") is True:
print("WARN: AutoCertGen is disabled, ignoring...")
self.skip_ssl = True
else:
choice = input("Do you wish to generate an HTTPS certificate? [y/N] ")
if choice.lower() == "y":
self.file_handler.autocert()
else:
self.skip_ssl = True
self.no_host_req_response = (
"This host cannot be reached without sending a `Host` header."
)
# ipv6 when????/??//?????//?
self.http_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.http_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.http_socket.bind(("0.0.0.0", self.http_port))
self.https_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.https_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.https_socket.bind(("0.0.0.0", self.https_port))
if self.skip_ssl is False:
# https gets the ssl treatment!! yaaaay :3
self.ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.ssl_context.load_cert_chain(
certfile=self.cert_file, keyfile=self.key_file
)
self.https_socket = self.ssl_context.wrap_socket(
self.https_socket, server_side=True
)
self.http_404_html = (
"<html><head><title>HTTP 404 - PyWebServer</title></head>"
"<body><center><h1>HTTP 404 - Not Found!</h1><p>Running PyWebServer/1.2.1</p>"
"</center></body></html>"
)
self.http_403_html = (
"<html><head><title>HTTP 403 - PyWebServer</title></head>"
"<body><center><h1>HTTP 403 - Forbidden</h1><p>Running PyWebServer/1.2.1</p>"
"</center></body></html>"
)
self.http_405_html = (
"<html><head><title>HTTP 405 - PyWebServer</title></head>"
"<body><center><h1>HTTP 405 - Method not allowed</h1><p>Running PyWebServer/1.2.1</p>"
"</center></body></html>"
)
self.running = True
def start(self, http, https):
signal.signal(signal.SIGINT, self.shutdown)
signal.signal(signal.SIGTERM, self.shutdown)
http_thread = threading.Thread(target=self.start_http, daemon=True)
https_thread = threading.Thread(target=self.start_https, daemon=True)
if https is True:
if self.skip_ssl is True:
print("WARN: You have enabled HTTPS without SSL!!")
yn = input("Is this intended behaviour? [y/N] ")
https_thread.start()
if http is True:
http_thread.start()
http_thread.join()
https_thread.join()
def start_http(self):
self.http_socket.listen(5)
print(f"HTTP server listening on port {self.http_port}...")
while self.running:
try:
conn, addr = self.http_socket.accept()
print(f"HTTP connection received from {addr}")
self.handle_connection(conn, addr)
except Exception as e:
print(f"HTTP error: {e}")
except OSError:
break
def start_https(self):
self.https_socket.listen(5)
print(f"HTTPS server listening on port {self.https_port}...")
while self.running:
try:
conn, addr = self.https_socket.accept()
print(f"HTTPS connection received from {addr}")
self.handle_connection(conn, addr)
except Exception as e:
print(
f"HTTPS error: {e}"
) # be ready for ssl errors if you use a self-sign!!
except OSError:
break
def handle_connection(self, conn, addr):
try:
data = conn.recv(512)
request = data.decode(errors="ignore")
response = self.handle_request(request, addr)
if isinstance(response, str):
response = response.encode()
conn.sendall(response)
except Exception as e:
print(f"Error handling connection: {e}")
finally:
conn.close()
def handle_request(self, data, addr):
print(f"len data: {len(data)}")
if not data:
return self.build_response(400, "Bad Request") # user did fucky-wucky
if len(data) > 8192:
return self.build_response(413, "Request too long")
request_line = data.splitlines()[0]
# Extract host from headers, never works though
for line in data.splitlines():
if "Host" in line:
host = line.split(":", 1)[1].strip()
allowed = self.parser.host_parser(host)
if not allowed:
return self.build_response(
403, "Connecting via this host is disallowed."
)
break
else:
return self.build_response(
400, self.no_host_req_response.encode()
)
method, path, version = self.parser.parse_request_line(request_line)
# Figure out a better way to reload config
if path == "/?pywebsrv_reload_conf=1":
print("Got reload command! Reloading configuration...")
self.file_handler.base_dir = self.file_handler.read_config("directory")
return self.build_response(302, "")
if not all([method, path, version]):
return self.build_response(400, "Bad Request")
if not self.parser.is_method_allowed(
method
):
return self.build_response(405, self.http_405_html)
file_content, mimetype = self.file_handler.read_file(path)
if file_content == 403:
print("WARN: Directory traversal attack prevented.") # look ma, security!!
return self.build_response(403, self.http_403_html)
if file_content == 404:
return self.build_response(404, self.http_404_html)
if file_content == 500:
return self.build_response(
500,
"PyWebServer has encountered a fatal error and cannot serve "
"your request. Contact the owner with this error: FATAL_FILE_RO_ACCESS",
) # When there was an issue with reading we throw this.
# A really crude implementation of binary files. Later in 2.0 I'll actually
# make this useful.
mimetype = mimetype[0]
if "text/" not in mimetype:
return self.build_binary_response(200, file_content, mimetype)
return self.build_response(200, file_content)
@staticmethod
def build_binary_response(status_code, binary_data, content_type):
"""Handles binary files like MP3s."""
messages = {
200: "OK",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
500: "Internal Server Error",
}
status_message = messages.get(status_code)
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
f"Server: PyWebServer/1.2.1\r\n"
f"Content-Type: {content_type}\r\n"
f"Content-Length: {len(binary_data)}\r\n"
f"Connection: close\r\n\r\n"
# Connection close is done because it is way easier to implement.
# It's not like this program will see production use anyway.
)
return headers.encode() + binary_data
def build_response(self, status_code, body):
"""
For textfiles we'll not have to guess MIME-types, though the other function
build_binary_response will be merged in here anyway.
"""
messages = {
200: "OK",
204: "No Content",
302: "Found",
304: "Not Modified", # TODO KEKL
400: "Bad Request",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
413: "Payload Too Large",
500: "Internal Server Error",
635: "Go Away",
}
status_message = messages.get(status_code)
if isinstance(body, str):
body = body.encode()
# TODO: dont encode yet, and i encode. awesome comments here.
# Don't encode yet, if 302 status code we have to include location.
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
f"Server: PyWebServer/1.2.1\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n\r\n"
).encode()
if status_code == 302:
# 302 currently only happens when the reload is triggered.
# Why not 307, Moved Permanently? Because browsers will cache the
# response and not send the reload command.
host = self.file_handler.read_config("host")[0]
port = self.file_handler.read_config("port-https") or self.file_handler.read_config("port")
if port != 80 and port != 443:
if port == 8443:
host = f"https://{host}:{port}/"
else:
host = f"http://{host}:{port}/"
else:
if port == 443:
host = f"https://{host}/"
else:
host = f"http://{host}/"
headers = (
f"HTTP/1.1 {status_code} {status_message}\r\n"
f"Location: {host}\r\n"
f"Server: PyWebServer/1.2.1\r\n"
f"Content-Length: {len(body)}\r\n"
f"Connection: close\r\n\r\n"
).encode()
return headers + body
def shutdown(self, signum, frame):
print("\nRecieved signal to exit!\nShutting down server...")
self.running = False
self.http_socket.close()
self.https_socket.close()
sys.exit(0)
def main():
file_handler = FileHandler()
file_handler.check_first_run()
file_handler.base_dir = file_handler.read_config("directory")
http_port = file_handler.read_config("port") or 8080
https_port = file_handler.read_config("port-https") or 8443
http_enabled = file_handler.read_config("http") or True
https_enabled = file_handler.read_config("https") or False
server = WebServer(http_port=http_port, https_port=https_port)
server.start(http_enabled, https_enabled)
if __name__ == "__main__":
main()