import zmq import pyradamsa import os from datetime import datetime import keyboard import sys rad = pyradamsa.Radamsa() today = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') filename = "srs-fuzz-logs/" + today + "srsran-zmq-fuzz-log.txt" if not os.path.exists(filename): open(filename, 'w').close() fh = open(filename, 'a') enableFuzz = True everyTen = False ctrlContext = zmq.Context() ctrlSocket = ctrlContext.socket(zmq.SUB) ctrlSocket.connect("tcp://127.0.0.1:4999") ctrlSocket.setsockopt(zmq.SUBSCRIBE, b'1') pullContext = zmq.Context() pullSocket = pullContext.socket(zmq.PULL) pullSocket.connect("tcp://127.0.0.1:5550") pushContext = zmq.Context() pushSocket = pushContext.socket(zmq.PUSH) pushSocket.bind("tcp://127.0.0.1:5557") def ctrlPoll(): global enableFuzz if ctrlSocket.poll(timeout=1) == zmq.POLLIN: ctrl = ctrlSocket.recv() topic, ctrl = ctrl.split() print(topic, ctrl) if topic == b'1': if ctrl == b'enableFuzz': enableFuzz = True if ctrl == b'disableFuzz': enableFuzz = False print("Fuzzing state:", enableFuzz) def toggleFuzz(e): global enableFuzz enableFuzz = not enableFuzz now = datetime.now().isoformat() print(now, " Fuzzing state:", enableFuzz) def toggleTen(e): global everyTen everyTen = not everyTen now = datetime.now().isoformat() print(now, " Every ten state:", everyTen) print("SRSRAN Fuzzer") print("ZMQ Pull Socket at 5550") print("ZMQ Push Socket at 5557") print("Fuzzing state:", enableFuzz) print("Every Ten packet state:", everyTen) print("Press å to enable or disable fuzzing") print("Press < to enable or disable every packet fuzzing") try: while True: ctrlPoll() message = pullSocket.recv() now = datetime.now().isoformat() fh.write(f"{now} | og | {message} \r\n") if enableFuzz: #Fuzzing DLInformationTransfer, Auth request package if b'\x08\x01\x20' in message or b'\x08\x00\x81' in message: loc = message.find(b'\x08\x01\x20') if loc == -1: loc = message.find(b'\x08\x00\x81') pdu = message[:loc] sdu = message[loc:] fuzzed = rad.fuzz(sdu)#, seed=1337) p1 = pdu[0:1] pduLength = len(fuzzed+pdu) p2 = pdu[2:] assembled = p1+pduLength.to_bytes(1,'little')+p2+fuzzed pushSocket.send(assembled) fh.write(f"{now} | rd | {assembled} \r\n") fh.flush() print(now, "Fuzzed packet sent") else: pushSocket.send(message) else: pushSocket.send(message) except KeyboardInterrupt: print("Closed") fh.close() ##EXAMPLE FUZZING HARNESSES FOR SOME IMPORTANT PREAUTH PACKETS #Fuzzing RRC ConnectionSetup package """if b'\x60\x12\x9b' in message: loc = message.find(b'\x60\x12\x9b') pdu = message[:loc] sdu = message[loc:] fuzzed = rad.fuzz(sdu)#, seed=1337) p1 = pdu[0:2] pduLength = len(pdu+fuzzed) p2 = pdu[3:] #RRC pdu mac header | full pdu length | rest of the pdu | assembled = p1+pduLength.to_bytes(1,'little')+p2+fuzzed[0:1]+len(fuzzed).to_bytes(1,'little')+fuzzed[2:] #assembled = p1+pduLength.to_bytes(1,'little')+p2+fuzzed#fuzzed[0:1]+len(fuzzed).to_bytes(1,'little')+fuzzed[2:] pushSocket.send(assembled) fh.write(f"{now} | rd | {assembled} \r\n") fh.flush() print(now, "Fuzzed packet sent")""" #Fuzzing DLInformationTransfer, Security Mode Command """if b'\x08\x00\x81' in message: loc = message.find(b'\x08\x00\x81') pdu = message[:loc] sdu = message[loc:] fuzzed = rad.fuzz(sdu)#, seed=1337) p1 = pdu[0:1] pduLength = len(pdu+fuzzed) p2 = pdu[2:] #RRC pdu mac header | full pdu length | rest of the pdu | #assembled = p1+pduLength.to_bytes(1,'little')+p2+fuzzed[0:1]+len(fuzzed).to_bytes(1,'little')+fuzzed[2:] assembled = p1+pduLength.to_bytes(1,'little')+p2+fuzzed#fuzzed[0:1]+len(fuzzed).to_bytes(1,'little')+fuzzed[2:] pushSocket.send(assembled) fh.write(f"{now} | rd | {assembled} \r\n") fh.flush() print(now, "Fuzzed packet sent")""" #Fuzzing DLInformationTransfer, Auth request package """if b'\x08\x01\x20' in message: loc = message.find(b'\x08\x01\x20') pdu = message[:loc] sdu = message[loc:] fuzzed = rad.fuzz(pdu)#, seed=1337) p1 = fuzzed[0:1] pduLength = len(fuzzed+sdu) p2 = fuzzed[2:] assembled = p1+pduLength.to_bytes(1,'little')+p2+sdu pushSocket.send(assembled) fh.write(f"{now} | rd | {assembled} \r\n") fh.flush() print(now, "Fuzzed packet sent")""" #Validate """if b'\x60\x12\x9b' in message: loc = message.find(b'\x60\x12\x9b') assembled = message[:loc+1]+b'\x16\x9b.g'+message[loc+1+4:] pushSocket.send(assembled) fh.write(f"{now} | rd | {assembled} \r\n") fh.flush() print(now, "Fuzzed packet sent") #enableFuzz = False sys.exit()"""