Systemsicherheit/Assignment 4 - Protokollsicherheit (Praxis)/proxy/tcpproxy.py

490 lines
19 KiB
Python
Raw Permalink Normal View History

2024-05-25 10:25:40 +02:00
#!/usr/bin/env python3
import argparse
import pkgutil
import os
import sys
import threading
import socket
import socks
import ssl
import time
import select
import errno
# TODO: implement verbose output
# some code snippets, as well as the original idea, from Black Hat Python
def is_valid_ip4(ip):
# some rudimentary checks if ip is actually a valid IP
octets = ip.split('.')
if len(octets) != 4:
return False
return octets[0] != 0 and all(0 <= int(octet) <= 255 for octet in octets)
def parse_args():
parser = argparse.ArgumentParser(description='Simple TCP proxy for data ' +
'interception and ' +
'modification. ' +
'Select modules to handle ' +
'the intercepted traffic.')
parser.add_argument('-ti', '--targetip', dest='target_ip',
help='remote target IP or host name')
parser.add_argument('-tp', '--targetport', dest='target_port', type=int,
help='remote target port')
parser.add_argument('-li', '--listenip', dest='listen_ip',
default='0.0.0.0', help='IP address/host name to listen for ' +
'incoming data')
parser.add_argument('-lp', '--listenport', dest='listen_port', type=int,
default=8080, help='port to listen on')
parser.add_argument('-pi', '--proxy-ip', dest='proxy_ip', default=None,
help='IP address/host name of proxy')
parser.add_argument('-pp', '--proxy-port', dest='proxy_port', type=int,
default=1080, help='proxy port', )
parser.add_argument('-pt', '--proxy-type', dest='proxy_type', default='SOCKS5', choices=['SOCKS4', 'SOCKS5', 'HTTP'],
type = str.upper, help='proxy type. Options are SOCKS5 (default), SOCKS4, HTTP')
parser.add_argument('-om', '--outmodules', dest='out_modules',
help='comma-separated list of modules to modify data' +
' before sending to remote target.')
parser.add_argument('-im', '--inmodules', dest='in_modules',
help='comma-separated list of modules to modify data' +
' received from the remote target.')
parser.add_argument('-v', '--verbose', dest='verbose', default=False,
action='store_true',
help='More verbose output of status information')
parser.add_argument('-n', '--no-chain', dest='no_chain_modules',
action='store_true', default=False,
help='Don\'t send output from one module to the ' +
'next one')
parser.add_argument('-l', '--log', dest='logfile', default=None,
help='Log all data to a file before modules are run.')
parser.add_argument('--list', dest='list', action='store_true',
help='list available modules')
parser.add_argument('-lo', '--list-options', dest='help_modules', default=None,
help='Print help of selected module')
parser.add_argument('-s', '--ssl', dest='use_ssl', action='store_true',
default=False, help='detect SSL/TLS as well as STARTTLS')
parser.add_argument('-sc', '--server-certificate', default='mitm.pem',
help='server certificate in PEM format (default: %(default)s)')
parser.add_argument('-sk', '--server-key', default='mitm.pem',
help='server key in PEM format (default: %(default)s)')
parser.add_argument('-cc', '--client-certificate', default=None,
help='client certificate in PEM format in case client authentication is required by the target')
parser.add_argument('-ck', '--client-key', default=None,
help='client key in PEM format in case client authentication is required by the target')
return parser.parse_args()
def generate_module_list(modstring, incoming=False, verbose=False):
# This method receives the comma-separated module list, imports the modules
# and creates a Module instance for each module. A list of these instances
# is then returned.
# The incoming parameter is True when the modules belong to the incoming
# chain (-im)
# modstring looks like mod1,mod2:key=val,mod3:key=val:key2=val2,mod4 ...
modlist = []
namelist = modstring.split(',')
for n in namelist:
name, options = parse_module_options(n)
try:
__import__('proxymodules.' + name)
modlist.append(sys.modules['proxymodules.' + name].Module(incoming, verbose, options))
except ImportError:
print('Module %s not found' % name)
sys.exit(3)
return modlist
def parse_module_options(n):
# n is of the form module_name:key1=val1:key2=val2 ...
# this method returns the module name and a dict with the options
n = n.split(':', 1)
if len(n) == 1:
# no module options present
return n[0], None
name = n[0]
optionlist = n[1].split(':')
options = {}
for op in optionlist:
try:
k, v = op.split('=')
options[k] = v
except ValueError:
print(op, ' is not valid!')
sys.exit(23)
return name, options
def list_modules():
# show all available proxy modules
cwd = os.getcwd()
module_path = cwd + os.sep + 'proxymodules'
for _, module, _ in pkgutil.iter_modules([module_path]):
__import__('proxymodules.' + module)
m = sys.modules['proxymodules.' + module].Module()
print(f'{m.name} - {m.description}')
def print_module_help(modlist):
# parse comma-separated list of module names, print module help text
modules = generate_module_list(modlist)
for m in modules:
try:
print(f'{m.name} - {m.description}')
print(m.help())
except AttributeError:
print('\tNo options or missing help() function.')
def update_module_hosts(modules, source, destination):
# set source and destination IP/port for each module
# source and destination are ('IP', port) tuples
# this can only be done once local and remote connections have been established
if modules is not None:
for m in modules:
if hasattr(m, 'source'):
m.source = source
if hasattr(m, 'destination'):
m.destination = destination
def receive_from(s):
# receive data from a socket until no more data is there
b = b""
while True:
data = s.recv(4096)
b += data
if not data or len(data) < 4096:
break
return b
def handle_data(data, modules, dont_chain, incoming, verbose):
# execute each active module on the data. If dont_chain is set, feed the
# output of one plugin to the following plugin. Not every plugin will
# necessarily modify the data, though.
for m in modules:
vprint(("> > > > in: " if incoming else "< < < < out: ") + m.name, verbose)
if dont_chain:
m.execute(data)
else:
data = m.execute(data)
return data
def is_client_hello(sock):
firstbytes = sock.recv(128, socket.MSG_PEEK)
return (len(firstbytes) >= 3 and
firstbytes[0] == 0x16 and
firstbytes[1:3] in [b"\x03\x00",
b"\x03\x01",
b"\x03\x02",
b"\x03\x03",
b"\x02\x00"]
)
def enable_ssl(args, remote_socket, local_socket):
sni = None
def sni_callback(sock, name, ctx):
nonlocal sni
sni = name
try:
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.sni_callback = sni_callback
ctx.load_cert_chain(certfile=args.server_certificate,
keyfile=args.server_key,
)
local_socket = ctx.wrap_socket(local_socket,
server_side=True,
)
except ssl.SSLError as e:
print("SSL handshake failed for listening socket", str(e))
raise
try:
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if args.client_certificate and args.client_key:
ctx.load_cert_chain(certfile=args.client_certificate,
keyfile=args.client_key,
)
remote_socket = ctx.wrap_socket(remote_socket,
server_hostname=sni,
)
except ssl.SSLError as e:
print("SSL handshake failed for remote socket", str(e))
raise
return [remote_socket, local_socket]
def starttls(args, local_socket, read_sockets):
return (args.use_ssl and
local_socket in read_sockets and
not isinstance(local_socket, ssl.SSLSocket) and
is_client_hello(local_socket)
)
def start_proxy_thread(local_socket, args, in_modules, out_modules):
# This method is executed in a thread. It will relay data between the local
# host and the remote host, while letting modules work on the data before
# passing it on.
remote_socket = socks.socksocket()
if args.proxy_ip:
proxy_types = {'SOCKS5': socks.SOCKS5, 'SOCKS4': socks.SOCKS4, 'HTTP': socks.HTTP}
remote_socket.set_proxy(proxy_types[args.proxy_type], args.proxy_ip, args.proxy_port)
try:
remote_socket.connect((args.target_ip, args.target_port))
vprint('Connected to %s:%d' % remote_socket.getpeername(), args.verbose)
log(args.logfile, 'Connected to %s:%d' % remote_socket.getpeername())
except socket.error as serr:
if serr.errno == errno.ECONNREFUSED:
for s in [remote_socket, local_socket]:
s.close()
print(f'{time.strftime("%Y%m%d-%H%M%S")}, {args.target_ip}:{args.target_port}- Connection refused')
log(args.logfile, f'{time.strftime("%Y%m%d-%H%M%S")}, {args.target_ip}:{args.target_port}- Connection refused')
return None
elif serr.errno == errno.ETIMEDOUT:
for s in [remote_socket, local_socket]:
s.close()
print(f'{time.strftime("%Y%m%d-%H%M%S")}, {args.target_ip}:{args.target_port}- Connection timed out')
log(args.logfile, f'{time.strftime("%Y%m%d-%H%M%S")}, {args.target_ip}:{args.target_port}- Connection timed out')
return None
else:
for s in [remote_socket, local_socket]:
s.close()
raise serr
try:
update_module_hosts(out_modules, local_socket.getpeername(), remote_socket.getpeername())
update_module_hosts(in_modules, remote_socket.getpeername(), local_socket.getpeername())
except socket.error as serr:
if serr.errno == errno.ENOTCONN:
# kind of a blind shot at fixing issue #15
# I don't yet understand how this error can happen, but if it happens I'll just shut down the thread
# the connection is not in a useful state anymore
for s in [remote_socket, local_socket]:
s.close()
return None
else:
for s in [remote_socket, local_socket]:
s.close()
print(f"{time.strftime('%Y%m%d-%H%M%S')}: Socket exception in start_proxy_thread")
raise serr
# This loop ends when no more data is received on either the local or the
# remote socket
running = True
while running:
read_sockets, _, _ = select.select([remote_socket, local_socket], [], [])
if starttls(args, local_socket, read_sockets):
try:
ssl_sockets = enable_ssl(args, remote_socket, local_socket)
remote_socket, local_socket = ssl_sockets
vprint("SSL enabled", args.verbose)
log(args.logfile, "SSL enabled")
except ssl.SSLError as e:
print("SSL handshake failed", str(e))
log(args.logfile, "SSL handshake failed", str(e))
break
read_sockets, _, _ = select.select(ssl_sockets, [], [])
for sock in read_sockets:
try:
peer = sock.getpeername()
except socket.error as serr:
if serr.errno == errno.ENOTCONN:
# kind of a blind shot at fixing issue #15
# I don't yet understand how this error can happen, but if it happens I'll just shut down the thread
# the connection is not in a useful state anymore
for s in [remote_socket, local_socket]:
s.close()
running = False
break
else:
print(f"{time.strftime('%Y%m%d-%H%M%S')}: Socket exception in start_proxy_thread")
raise serr
data = receive_from(sock)
log(args.logfile, 'Received %d bytes' % len(data))
if sock == local_socket:
if len(data):
log(args.logfile, b'< < < out\n' + data)
if out_modules is not None:
data = handle_data(data, out_modules,
args.no_chain_modules,
False, # incoming data?
args.verbose)
remote_socket.send(data.encode() if isinstance(data, str) else data)
else:
vprint("Connection from local client %s:%d closed" % peer, args.verbose)
log(args.logfile, "Connection from local client %s:%d closed" % peer)
remote_socket.close()
running = False
break
elif sock == remote_socket:
if len(data):
log(args.logfile, b'> > > in\n' + data)
if in_modules is not None:
data = handle_data(data, in_modules,
args.no_chain_modules,
True, # incoming data?
args.verbose)
local_socket.send(data)
else:
vprint("Connection to remote server %s:%d closed" % peer, args.verbose)
log(args.logfile, "Connection to remote server %s:%d closed" % peer)
local_socket.close()
running = False
break
def log(handle, message, message_only=False):
# if message_only is True, only the message will be logged
# otherwise the message will be prefixed with a timestamp and a line is
# written after the message to make the log file easier to read
if not isinstance(message, bytes):
message = bytes(message, 'ascii')
if handle is None:
return
if not message_only:
logentry = bytes("%s %s\n" % (time.strftime('%Y%m%d-%H%M%S'), str(time.time())), 'ascii')
else:
logentry = b''
logentry += message
if not message_only:
logentry += b'\n' + b'-' * 20 + b'\n'
handle.write(logentry)
def vprint(msg, is_verbose):
# this will print msg, but only if is_verbose is True
if is_verbose:
print(msg)
def main():
args = parse_args()
if args.list is False and args.help_modules is None:
if not args.target_ip:
print('Target IP is required: -ti')
sys.exit(6)
if not args.target_port:
print('Target port is required: -tp')
sys.exit(7)
if ((args.client_key is None) ^ (args.client_certificate is None)):
print("You must either specify both the client certificate and client key or leave both empty")
sys.exit(8)
if args.logfile is not None:
try:
args.logfile = open(args.logfile, 'ab', 0) # unbuffered
except Exception as ex:
print('Error opening logfile')
print(ex)
sys.exit(4)
if args.list:
list_modules()
sys.exit(0)
if args.help_modules is not None:
print_module_help(args.help_modules)
sys.exit(0)
if args.listen_ip != '0.0.0.0' and not is_valid_ip4(args.listen_ip):
try:
ip = socket.gethostbyname(args.listen_ip)
except socket.gaierror:
ip = False
if ip is False:
print('%s is not a valid IP address or host name' % args.listen_ip)
sys.exit(1)
else:
args.listen_ip = ip
if not is_valid_ip4(args.target_ip):
try:
ip = socket.gethostbyname(args.target_ip)
except socket.gaierror:
ip = False
if ip is False:
print('%s is not a valid IP address or host name' % args.target_ip)
sys.exit(2)
else:
args.target_ip = ip
if args.in_modules is not None:
in_modules = generate_module_list(args.in_modules, incoming=True, verbose=args.verbose)
else:
in_modules = None
if args.out_modules is not None:
out_modules = generate_module_list(args.out_modules, incoming=False, verbose=args.verbose)
else:
out_modules = None
# this is the socket we will listen on for incoming connections
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
proxy_socket.bind((args.listen_ip, args.listen_port))
except socket.error as e:
print(e.strerror)
sys.exit(5)
proxy_socket.listen(100)
log(args.logfile, str(args))
# endless loop until ctrl+c
try:
while True:
in_socket, in_addrinfo = proxy_socket.accept()
vprint('Connection from %s:%d' % in_addrinfo, args.verbose)
log(args.logfile, 'Connection from %s:%d' % in_addrinfo)
proxy_thread = threading.Thread(target=start_proxy_thread,
args=(in_socket, args, in_modules,
out_modules))
log(args.logfile, "Starting proxy thread " + proxy_thread.name)
proxy_thread.start()
except KeyboardInterrupt:
log(args.logfile, 'Ctrl+C detected, exiting...')
print('\nCtrl+C detected, exiting...')
sys.exit(0)
if __name__ == '__main__':
main()