Files
PyJail/main.py
2025-01-09 07:37:24 +01:00

256 lines
10 KiB
Python

"""
This is the PyJail, a jailing tool for running Python apps in a sandboxed environment.
Version: edge0007-base0.2.1
"""
import os
import time
import runpy
class PyJail:
"""
The jail manager, handles all system calls and such.
"""
def __init__(self, debug=False):
self.rootpath = ""
self.rootpath = self.fs()
self._debug = debug
with open(self.fs("/proc/klog"), "w") as f:
# Always use jailmgr.msg() from this point onwards.
f.write(f"[{time.time}] [jailmgr.__init__()] [INFO] START LOG")
f.close()
with open(self.fs("/proc/kproc"), "w") as f:
f.write("proc: jailmgr(1)")
self._program_counter = 2
f.close()
self._resolve_symlinks = False if os.path.isdir(self.fs("/bin")) else True
def run_program(self, path_to_bin):
"""
Runs a specified program.
"""
path_to_bin = self.fs(path_to_bin)
if path_to_bin == 3 or path_to_bin == 2:
self.msg(
"jailmgr.run_program()",
"An error has occurred launching the program.",
True,
"WARNING",
)
else:
with open(self.fs("/proc/kproc"), "a+") as f:
f.write(f"proc: {path_to_bin}({self._program_counter})")
self._program_counter += 1
runpy.run_path(path_to_bin)
def msg(
self, caller: str, message: str, emit: bool = False, log_level: str = "INFO"
):
"""
The custom message parser, can parse messages and alert apps of said messages.
Replaces print statements.
Args:
caller: The program that called the logger
message: Is the message to parse.
emit: If the message needs to be passed to apps.
log_level: The loglevel, either DEBUG, INFO, WARNING, ERROR, CRITICAL
"""
emit_full = False
if self._debug is True:
emit_full = True
accepted_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if log_level.upper() not in accepted_log_levels:
self.msg(
"jailmgr.msg()", f"Not accepted loglevel!! {log_level}", False, "ERROR"
)
return 1
if log_level == "DEBUG" and self._debug is False:
emit = False
emit_full = False
msg = f"[{time.time}] [{caller}] [{log_level}] {message}"
with open(self.fs("/proc/klog"), "a+") as f:
f.write(msg)
if emit is True and log_level.upper() == "CRITICAL":
print(msg)
elif emit is True:
print(message)
elif emit_full is True:
print(msg)
return 0
def fs(self, check_path=None, resolve_symlinks=True):
"""
Keeps track of the jailed filesystem and makes sure any calls to any
file get done in the jailed filesystem
"""
if check_path is not None:
if os.path.exists(f"{self.rootpath}{check_path}"):
if self._resolve_symlinks is True or resolve_symlinks is True:
# This exists to ease /bin and other symlinks that are always present.
# It allows the system to drastically speed up resolving symlinks.
symlinkable_dirs = ["/bin", "/sbin", "/lib", "/lib64"]
for directory in symlinkable_dirs:
if check_path.startswith(directory):
check_path_usr_merge = f"/usr{check_path}"
return self.rootpath + check_path_usr_merge
# Well, if it doesn't start with any of them, we need to check each and every directory.
check_path_split = check_path.split("/", -1)
prepend_path = ""
for i, path in enumerate(check_path_split):
prepend_path = (
f"{prepend_path}/{path}"
if not path.endswith("/")
else f"{prepend_path}/{path}/"
)
check_path_split[i] = f"{prepend_path}"
check_path = ""
for i, path in enumerate(check_path_split):
if os.path.isdir(path) and i != (len(check_path_split) - 1):
# Directory is not a symlink, we can just ignore and move on.
pass
elif os.path.isdir(path) and i == (len(check_path_split) - 1):
# The last thing to access was a directory. We can safely return the full path now.
return path
elif not os.path.isdir(path) and i != (
len(check_path_split) - 1
):
# This was not the last thing we needed to access, so we assume it's a symlink.
# One problem is that we can't be sure, so we make sure it is a symlink.
with open(self.fs(path, False)) as f:
is_symlink = f.read()
f.close()
if is_symlink.startswith("symlnk"):
# This is a symlink!
# Symlinks always contain the full literal path that they need to access, so we can
# take that and do the same trick to split it and add the next things to it.
is_symlink_split = is_symlink.split(" ", 1)
symlink_dest = is_symlink_split[1]
symlink_dest = f"{symlink_dest}/{path}"
return symlink_dest
else:
# This is either not a symlink or an improperly configured one.
self.msg(
"jailmgr.fs()",
"reached non-symlink file not at end of list",
False,
"ERROR",
)
self.msg(
"jailmgr.fs()",
"What was assumed to be a directory isn't a"
" directory nor a symlink! This might be because of a "
"typo or misconfigured symlink. The directory in question: "
f"{path}",
True,
"WARNING",
)
return 2
return self.rootpath + check_path
if check_path.startswith("."):
check_path = check_path.lstrip(".")
return os.getcwd() + check_path if "vfs" in os.getcwd() else 2
elif self.rootpath in check_path:
self.msg(
"jailmgr.fs()",
"Cannot parse rootpath, expected vfspath",
log_level="ERROR",
)
return 3
else:
# Path is not in the jailed fs, so we say it doesn't exist.
self.msg(
"jailmgr.fs()",
"File/directory doesn't exist in vfspath",
log_level="ERROR",
)
return 2
else:
rootpath = os.getcwd() + "/vfs"
self.msg("jailmgr.fs()", message=rootpath, log_level="INFO")
return rootpath
def kver(self):
"""
Returns the kernel version
"""
return "edge0007-base0.2.1"
def netsock(self, ip, port, mode, msg):
"""
An easy interface to network sockets, built right into the jailmanager
Args:
ip: The IP of the server to access.
port: The port to access the server on
mode: Either UDP, TCP or PKG (HTTP)
msg: The message to send the server
Returns:
Whatever the server returns.
"""
if mode == "PKG":
import requests
else:
import socket
if mode == "TCP":
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except Exception:
self.msg(
"jailmgr.netsock()", "Socket import failed!", False, "CRITICAL"
)
self.msg(
"jailmgr.netsock()", "An unexpected error occurred!", True, "ERROR"
)
return None
# Connect to the server
client_socket.connect((ip, port))
self.msg(
"jailmgr.netsock()",
f"Connected to server at {ip}:{port}",
False,
"INFO",
)
# Send the message to the server
client_socket.send(msg.encode())
# Receive the response from the server
response = client_socket.recv(1024).decode()
client_socket.close() # Close the connection
self.msg("jailmgr.netsock", f"Received from server: {response}")
return response
elif mode == "PKG":
# raise NotImplementedError("TODO: PKG will be implemented later!")
file_io = requests.get(ip)
if file_io.startswith("PYPAK PMD"):
with open(self.fs(f"/usr/netsock/cache/{msg}.pmd"), "a+") as f:
f.write(file_io)
f.close()
else:
with open(self.fs(f"/usr/netsock/cache/{msg}.py"), "a+") as f:
f.write(file_io)
f.close()
else:
# Create a UDP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Send the message to the server
client_socket.sendto(msg.encode(), (ip, port))
# Receive the response from the server
response, _ = client_socket.recvfrom(1024)
self.msg(f"{self}", f"Received from server: {response.decode()}")
# Close the socket
client_socket.close()
# raise NotImplementedError("TODO: Netsock will be implemented once 0.3.0 comes around!")