from typing import Optional import argparse from bemani.sniff import Sniffer from bemani.protocol import EAmuseProtocol, EAmuseException from bemani.common import HTTP def mainloop(address: Optional[str]=None, port: int=80, verbose: bool=False) -> None: """ Main loop of BEMANIShark. Starts an instance of Sniffer and EAmuseProtocol and does a lazy job of banging them together with the above HTTP.parse. Will loop trying to decode packets forever. Arguments: address - A string representing an IP of interest port - An integer representing a port of interest """ sniffer = Sniffer(address=address, port=port) parser = EAmuseProtocol() while True: packets = sniffer.recv_stream() inbound = HTTP.parse(packets['inbound'], request=True) outbound = HTTP.parse(packets['outbound'], response=True) if inbound is not None: if inbound['data'] is None: in_req = None else: try: in_req = parser.decode( inbound['headers'].get('x-compress'), inbound['headers'].get('x-eamuse-info'), inbound['data'] ) except EAmuseException: in_req = None print(f"Inbound request (from {packets['source_address']}:{packets['source_port']} to {packets['destination_address']}:{packets['destination_port']}):") if verbose: print(f"HTTP {inbound['method']} request for URI {inbound['uri']}") print(f"Compression is {inbound['headers'].get('x-compress', 'none')}") print(f"Encryption key is {inbound['headers'].get('x-eamuse-info', 'none')}") if in_req is None: print("Inbound request was not parseable") else: print(in_req) if outbound is not None: if outbound['data'] is None: out_req = None else: try: out_req = parser.decode( outbound['headers'].get('x-compress'), outbound['headers'].get('x-eamuse-info'), outbound['data'] ) except EAmuseException: out_req = None print(f"Outbound response (from {packets['destination_address']}:{packets['destination_port']} to {packets['source_address']}:{packets['source_port']}):") if verbose: print(f"Compression is {outbound['headers'].get('x-compress', 'none')}") print(f"Encryption key is {outbound['headers'].get('x-eamuse-info', 'none')}") if out_req is None: print("Outbound response was not parseable") else: print(out_req) def main() -> None: parser = argparse.ArgumentParser(description="A utility to sniff packets and decode them as eAmusement packets. Should probably be run as root.") parser.add_argument("-p", "--port", help="Port to sniff on. Defaults to 80", type=int, default=80) parser.add_argument("-a", "--address", help="Address to sniff on. Defaults to all addresses", type=str, default=None) parser.add_argument("-v", "--verbose", help="Show extra packet information", action='store_true') args = parser.parse_args() mainloop(address=args.address, port=args.port, verbose=args.verbose) if __name__ == '__main__': main()