1
0
mirror of synced 2024-11-15 02:17:36 +01:00
bemaniutils/bemani/utils/proxy.py

304 lines
11 KiB
Python
Raw Normal View History

import argparse
import requests
import socket
import yaml # type: ignore
from flask import Flask, Response, request
from typing import Any, Dict, Optional
import urllib.parse as urlparse
from bemani.protocol import EAmuseProtocol, Node
# Application configuration
app = Flask(__name__)
config: Dict[str, Any] = {}
def modify_request(config: Dict[str, Any], req_body: Node) -> Optional[Node]:
# Not sure if there's any reason to modify requests, but its plumbed
return None
def modify_response(config: Dict[str, Any], resp_body: Node) -> Optional[Node]:
# Figure out if we need to modify anything in the response
if resp_body.name != 'response':
# Not what we expected, bail
return None
# Now we get to the meat of packet detection
body = resp_body.children[0]
# Is this a services packet? We need to modify this to point the rest of the
# game packets at this proxy :(
if body.name == 'services':
for child in body.children:
if child.name == 'item':
if child.attribute('name') == 'ntp':
# Don't override this
continue
elif child.attribute('name') == 'keepalive':
# Completely rewrite to point at the proxy server, otherwise if we're proxying
# to a local backend, we will end up giving out a local address and we will get
# Network NG.
address = socket.gethostbyname(config['keepalive'])
child.set_attribute(
'url',
f'http://{address}/core/keepalive?pa={address}&ia={address}&ga={address}&ma={address}&t1=2&t2=10',
)
else:
# Get netloc to replace
url = urlparse.urlparse(child.attribute('url'))
defaultport = {
'http': 80,
'https': 443,
}.get(url.scheme, 0)
if config['local_port'] != defaultport:
new_url = child.attribute('url').replace(url.netloc, f'{config["local_host"]}:{config["local_port"]}')
else:
new_url = child.attribute('url').replace(url.netloc, f'{config["local_host"]}')
child.set_attribute('url', new_url)
return resp_body
return None
@app.route('/', defaults={'path': ''}, methods=['GET'])
@app.route('/<path:path>', methods=['GET'])
def receive_healthcheck(path: str) -> Response:
if '*' in config['remote']:
remote_host = config['remote']['*']['host']
remote_port = config['remote']['*']['port']
else:
return Response("No route for default PCBID", 500)
actual_path = f'/{path}'
if request.query_string is not None and len(request.query_string) > 0:
actual_path = actual_path + f'?{request.query_string.decode("ascii")}'
# Make request to foreign service, using the same parameters
r = requests.get(
f'http://{remote_host}:{remote_port}{actual_path}',
timeout=config['timeout'],
allow_redirects=False,
)
headers = {}
for header in ['Location']:
if header in r.headers:
headers[header] = r.headers[header]
return Response(r.content, r.status_code, headers)
@app.route('/', defaults={'path': ''}, methods=['POST'])
@app.route('/<path:path>', methods=['POST'])
def receive_request(path: str) -> Response:
# First, parse the packet itself
client_proto = EAmuseProtocol()
server_proto = EAmuseProtocol()
remote_address = request.headers.get('X-Remote-Address', None)
request_compression = request.headers.get('X-Compress', None)
request_encryption = request.headers.get('X-Eamuse-Info', None)
request_client = request.headers.get('User-Agent', None)
actual_path = f'/{path}'
if request.query_string is not None and len(request.query_string) > 0:
actual_path = actual_path + f'?{request.query_string.decode("ascii")}'
if config['verbose']:
print(f"HTTP request for URI {actual_path}")
print(f"Compression is {request_compression}")
print(f"Encryption key is {request_encryption}")
req = client_proto.decode(
request_compression,
request_encryption,
request.data,
)
if req is None:
# Nothing to do here
return Response("Unrecognized packet!", 500)
if config['verbose']:
print("Original request to server:")
print(req)
# Grab PCBID for directing to mulitple servers
pcbid = req.attribute('srcid')
if pcbid in config['remote']:
remote_host = config['remote'][pcbid]['host']
remote_port = config['remote'][pcbid]['port']
elif '*' in config['remote']:
remote_host = config['remote']['*']['host']
remote_port = config['remote']['*']['port']
else:
return Response(f"No route for PCBID {pcbid}", 500)
modified_request = modify_request(config, req)
if modified_request is None:
# Return the original binary data instead of re-encoding it
# to the exact same thing.
req_binary = request.data
else:
if config['verbose']:
print("Modified request to server:")
print(modified_request)
# Re-encode the modified packet
req_binary = server_proto.encode(
request_compression,
request_encryption,
modified_request,
client_proto.last_text_encoding,
client_proto.last_packet_encoding,
)
# Set up custom headers for remote request.
headers = {
# For lobby functionality, make sure the request receives
# the original IP address
'X-Remote-Address': remote_address or request.remote_addr,
# Some remote servers can be somewhat buggy, so we make sure
# to specify a range of encodings.
'Accept-Encoding': 'identity, deflate, compress, gzip',
}
# Copy over required headers that are sent by game client.
if request_compression is not None:
headers['X-Compress'] = request_compression
if request_encryption is not None:
headers['X-Eamuse-Info'] = request_encryption
if request_client is not None:
headers['User-Agent'] = request_client
# Make request to foreign service, using the same parameters
prep_req = requests.Request(
'POST',
url=f'http://{remote_host}:{remote_port}{actual_path}',
headers=headers,
data=req_binary,
).prepare()
sess = requests.Session()
r = sess.send(prep_req, timeout=config['timeout']) # type: ignore
if r.status_code != 200:
# Failed on remote side
return Response("Failed to get response!", 500)
# Decode response, for modification if necessary
response_compression = r.headers.get('X-Compress', None)
response_encryption = r.headers.get('X-Eamuse-Info', None)
resp = server_proto.decode(
response_compression,
response_encryption,
r.content,
)
if resp is None:
# Nothing to do here
return Response("Unrecognized packet!", 500)
if config['verbose']:
print("Original response from server:")
print(resp)
modified_response = modify_response(config, resp)
if modified_response is None:
# Return the original response data instead of re-encoding it
# to the exact same thing.
resp_binary = r.content
else:
if config['verbose']:
print("Modified response from server:")
print(modified_response)
# Re-encode the modified packet
resp_binary = client_proto.encode(
response_compression,
response_encryption,
modified_response,
)
# Some old clients are case sensitive, so be careful to capitalize
# these responses here.
flask_resp = Response(resp_binary)
if response_compression is not None:
flask_resp.headers['X-Compress'] = response_compression
if response_encryption is not None:
flask_resp.headers['X-Eamuse-Info'] = response_encryption
return flask_resp
def load_proxy_config(filename: str) -> None:
global config
config_data = yaml.safe_load(open(filename)) # type: ignore
if 'pcbid' in config_data and config_data['pcbid'] is not None:
for pcbid in config_data['pcbid']:
remote_name = config_data['pcbid'][pcbid]
remote_config = config_data['remote'][remote_name]
config['remote'][pcbid] = remote_config
def load_config(filename: str) -> None:
global config
config_data = yaml.safe_load(open(filename)) # type: ignore
config.update({
'local_host': config_data['local']['host'],
'local_port': config_data['local']['port'],
'verbose': config_data.get('verbose', False),
'timeout': config_data.get('timeout', 30),
'keepalive': config_data.get('keepalive', 'localhost'),
})
if 'default' in config_data:
remote_config = config_data['remote'][config_data['default']]
config.update({
'remote': {
'*': remote_config,
},
})
else:
config.update({
'remote': {}
})
if 'pcbid' in config_data and config_data['pcbid'] is not None:
load_proxy_config(filename)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="A utility to MITM non-SSL eAmusement connections.")
parser.add_argument("-p", "--port", help="Port to listen on. Defaults to 9090", type=int, default=9090)
parser.add_argument("-a", "--address", help="Address to listen on. Defaults to all addresses", type=str, default="0.0.0.0")
parser.add_argument("-r", "--real-address", help="Real address we are listening on (for NAT and such)", type=str, default='127.0.0.1')
parser.add_argument("-q", "--remote-port", help="Port to connect to.", type=int, required=True)
parser.add_argument("-b", "--remote-address", help="Address to connect to.", type=str, required=True)
parser.add_argument("-c", "--config", help="Configuration file for PCBID to remote server mapping.", type=str, default=None)
parser.add_argument("-k", "--keepalive", help="Keepalive domain to advertise. Defaults to localhost", type=str, default='localhost')
parser.add_argument("-v", "--verbose", help="Display verbose packet info.", action='store_true')
parser.add_argument("-t", "--timeout", help="Timeout (in seconds) for proxy requests. Defaults to 30 seconds.", type=int, default=30)
args = parser.parse_args()
config.update({
'local_host': args.real_address,
'local_port': args.port,
'remote': {
'*': {
'host': args.remote_address,
'port': args.remote_port,
},
},
'verbose': args.verbose,
'timeout': args.timeout,
'keepalive': args.keepalive,
})
# Fill in remote addresses for PCBIDs we should redirect to a non-default server
if args.config is not None:
load_proxy_config(args.config)
app.run(host='0.0.0.0', port=args.port, debug=True)