#!/usr/bin/env python3 import os import glob import zmq import threading import io import subprocess import serial import time import sys from datetime import datetime from PIL import Image from PIL import ImageFont from PIL import ImageDraw from PIL import ImageOps from StreamDeck.DeviceManager import DeviceManager from StreamDeck.Devices.StreamDeck import DialEventType, TouchscreenEventType stopwhencrash = True processes = "" resetCounter = 0 ctrlContext = zmq.Context() ctrlSocket = ctrlContext.socket(zmq.PUB) ctrlSocket.bind("tcp://127.0.0.1:4999") ASSETS_PATH = os.path.join(os.path.dirname(__file__), "Assets") ICONS_PATH = os.path.join(ASSETS_PATH, "icons") BUTTON_CONFIG = [{'id': 0, 'msg' : "Fuzzer\nRUN", 'size': 25, 'w': 10, 'h': 0, 'icon': "bug.png", 'icon-pressed' : "", 'pressed': True}, \ {'id': 1, 'msg' : "-", 'size': 25, 'w': 30, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}, \ {'id': 2, 'msg' : "-", 'size': 25, 'w': 10, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}, \ {'id': 3, 'msg' : "-", 'size': 25, 'w': 20, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}, \ {'id': 4, 'msg' : "-", 'size': 25, 'w': 10, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}, \ {'id': 5, 'msg' : "-", 'size': 25, 'w': 30, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}, \ {'id': 6, 'msg' : "-", 'size': 25, 'w': 10, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}, \ {'id': 7, 'msg' : "-", 'size': 25, 'w': 20, 'h': 0, 'icon': "", 'icon-pressed' : "", 'pressed': False}] def get_text_dimensions(font,text_string): ascent, descent = font.getmetrics() text_width = font.getmask(text_string).getbbox()[2] text_height = font.getmask(text_string).getbbox()[3] + descent return (text_width, text_height) def button(key): W = 120 H = 120 font = ImageFont.truetype(os.path.join(ASSETS_PATH,"Roboto-Medium.ttf"), BUTTON_CONFIG[key]['size']) img = Image.new('RGB', (W, H), color='black') draw = ImageDraw.Draw(img) if BUTTON_CONFIG[key]['icon'] != "": icon = Image.open(os.path.join(ICONS_PATH, BUTTON_CONFIG[key]['icon']))#.resize((80, 80)) img.paste(icon, (0, 0), icon.convert("RGBA")) draw.text((BUTTON_CONFIG[key]['w'],BUTTON_CONFIG[key]['h']),BUTTON_CONFIG[key]['msg'],font=font,fill=(255,255,255),align='center', spacing=65) else: w, h = get_text_dimensions(font, BUTTON_CONFIG[key]['msg']) #draw.text((20,20),BUTTON_CONFIG[key]['msg'],font=font,fill=(255,255,255),align='left') draw.text((BUTTON_CONFIG[key]['w'],BUTTON_CONFIG[key]['h']),BUTTON_CONFIG[key]['msg'],font=font,fill=(255,255,255),align='center') img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') return img_byte_arr.getvalue() def button_pressed(key): W = 120 H = 120 font = ImageFont.truetype(os.path.join(ASSETS_PATH,"Roboto-Medium.ttf"), BUTTON_CONFIG[key]['size']) img = Image.new('RGB', (W, H), color='black') draw = ImageDraw.Draw(img) if BUTTON_CONFIG[key]['icon'] != "": icon = Image.open(os.path.join(ICONS_PATH, BUTTON_CONFIG[key]['icon'])).convert("L")#.resize((80, 80)) icon_c = ImageOps.colorize(icon, black ="black", white ="green") img.paste(icon_c, (0, 0), icon_c.convert("RGBA")) draw.text((BUTTON_CONFIG[key]['w'],BUTTON_CONFIG[key]['h']),BUTTON_CONFIG[key]['msg'],font=font,fill=(0,255,0),align='center', spacing=65) else: w, h = get_text_dimensions(font, BUTTON_CONFIG[key]['msg']) #draw.text((20,20),BUTTON_CONFIG[key]['msg'],font=font,fill=(255,255,255),align='left') draw.text((BUTTON_CONFIG[key]['w'],BUTTON_CONFIG[key]['h']),BUTTON_CONFIG[key]['msg'],font=font,fill=(0,255,0),align='center') #draw.text(((W-w)/2,(H-h)/2),BUTTON_CONFIG[key]['msg'],font=font,fill=(0,255,0),align='center') img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') return img_byte_arr.getvalue() def touchscreen_text(msg, w, f=25): W = w H = 100 font = ImageFont.truetype(os.path.join(ASSETS_PATH,"Roboto-Medium.ttf"), f) img = Image.new('RGB', (W, H), color='black') draw = ImageDraw.Draw(img) w, h = get_text_dimensions(font,msg) draw.text((10,5),msg,font=font,fill=(255,255,255)) img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') return img_byte_arr.getvalue() BUTTONS = [] BUTTONS_PRESSED = [] for i in range(8): BUTTONS.append(button(i)) BUTTONS_PRESSED.append(button_pressed(i)) # image for idle state #img = Image.new('RGB', (120, 120), color='black') #released_icon = Image.open(os.path.join(ASSETS_PATH, 'Released.png')).resize((80, 80)) #img.paste(released_icon, (20, 20), released_icon) # callback when buttons are pressed or released def key_change_callback(deck, key, key_state): print("Key: " + str(key) + " state: " + str(key_state)) if key == 0: topic = 1 if key_state: message = "disableFuzz" else: message = "enableFuzz" ctrlSocket.send_string("%d %s" % (topic, message)) deck.set_key_image(key, BUTTONS_PRESSED[key] if key_state else BUTTONS[key]) # callback when dials are pressed or released def dial_change_callback(deck, dial, event, value): if event == DialEventType.PUSH: print(f"dial pushed: {dial} state: {value}") if dial == 3 and value: for process in processes: process.kill() today = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') os.system('/home/platform-tools/adb logcat -b all -d >> /home/srs-fuzz-logs/'+today+'-logcat-crashlogs.txt') os.system('mv /tmp/enb_mac.pcap /home/srs-fuzz-logs/'+today+'-enb_mac.pcap') deck.reset() deck.close() # Wait until all application threads have terminated (for this example, # this is when all deck handles are closed). for t in threading.enumerate(): try: t.join() except RuntimeError: pass sys.exit("Byebye!") else: # build an image for the touch lcd img = Image.new('RGB', (800, 100), 'black') icon = Image.open(os.path.join(ASSETS_PATH, 'Exit.png')).resize((80, 80)) img.paste(icon, (690, 10), icon) for k in range(0, deck.DIAL_COUNT - 1): img.paste(pressed_icon if (dial == k and value) else released_icon, (30 + (k * 220), 10), pressed_icon if (dial == k and value) else released_icon) img_byte_arr = io.BytesIO() img.save(img_byte_arr, format='JPEG') img_byte_arr = img_byte_arr.getvalue() deck.set_touchscreen_image(img_byte_arr, 0, 0, 800, 100) elif event == DialEventType.TURN: print(f"dial {dial} turned: {value}") # callback when lcd is touched def touchscreen_event_callback(deck, evt_type, value): if evt_type == TouchscreenEventType.SHORT: print("Short touch @ " + str(value['x']) + "," + str(value['y'])) elif evt_type == TouchscreenEventType.LONG: print("Long touch @ " + str(value['x']) + "," + str(value['y'])) elif evt_type == TouchscreenEventType.DRAG: print("Drag started @ " + str(value['x']) + "," + str(value['y']) + " ended @ " + str(value['x_out']) + "," + str(value['y_out'])) def printTimestamp(): now = datetime.now().isoformat() print(now) def check_network_status(): network_status = os.popen('/home/platform-tools/adb shell dumpsys telephony.registry | grep "mServiceState"').read() #print('IN_SERVICE' in network_status) #print(network_status) return 'IN_SERVICE' in network_status def set_flight_mode(state): # 1 to enable flight mode, 0 to disable it os.system('/home/platform-tools/adb shell cmd connectivity airplane-mode {}'.format('enable' if state == '1' else 'disable')) def run_programs(): # run the specified programs program_paths = ['xfce4-terminal -x sudo python3 /home/srsran-zmq-client.py', 'xfce4-terminal -x sudo srsepc /home/srsRAN_4G/srsepc/epc.conf', 'xfce4-terminal -x sudo /home/srsRAN_4G/build/srsenb/src/srsenb'] processes = [] for program in program_paths: process = subprocess.Popen("exec " + program, shell=True) processes.append(process) return processes def restart_programs(processes): # stop the processes for process in processes: process.kill() time.sleep(5) # then restart the programs return run_programs() if __name__ == "__main__": streamdecks = DeviceManager().enumerate() print("Found {} Stream Deck(s).\n".format(len(streamdecks))) for index, deck in enumerate(streamdecks): # This example only works with devices that have screens. if deck.DECK_TYPE != 'Stream Deck +': print(deck.DECK_TYPE) print("Sorry, this example only works with Stream Deck +") continue deck.open() deck.reset() deck.set_key_callback(key_change_callback) deck.set_dial_callback(dial_change_callback) deck.set_touchscreen_callback(touchscreen_event_callback) print("Opened '{}' device (serial number: '{}')".format(deck.deck_type(), deck.get_serial_number())) # Set initial screen brightness to 30%. deck.set_brightness(80) for key in range(0, deck.KEY_COUNT): deck.set_key_image(key, BUTTONS_PRESSED[key] if BUTTON_CONFIG[key]['pressed'] else BUTTONS[key]) # build an image for the touch lcd img = Image.new('RGB', (800, 100), 'black') icon = Image.open(os.path.join(ICONS_PATH, 'circle-with-cross.png')).resize((100, 100)) img.paste(icon, (690, 0), icon) img_bytes = io.BytesIO() img.save(img_bytes, format='JPEG') touchscreen_image_bytes = img_bytes.getvalue() deck.set_touchscreen_image(touchscreen_image_bytes, 0, 0, 800, 100) # clear android logs today = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') os.system('/home/platform-tools/adb logcat -b all -d >> /home/srs-fuzz-logs/'+today+'-fuzz-logcat-crashlogs.txt') os.system('/home/platform-tools/adb logcat -b all -c') processes = run_programs() flight_mode_counter = 0 set_flight_mode('1') time.sleep(10) set_flight_mode('0') time.sleep(20) print("All programs started") deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nRadios Started", 600),0,0,600,100) while True: if not check_network_status(): printTimestamp() print("Network is down. Enabling flight mode...") deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nNetwork is down. Enabling flight mode...", 600),0,0,600,100) set_flight_mode('1') time.sleep(10) print("Disabling flight mode...") deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nDisabling flight mode...", 600),0,0,600,100) set_flight_mode('0') print("Waiting for the network to come online...\n") deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nWaiting for the network to come online...", 600),0,0,600,100) # Check the cellular network wakes up, if not re-enable flightmode time_start = time.time() while True: if check_network_status(): printTimestamp() print("Network is up.\n") for process in processes: process.kill() latest = os.stat(max(glob.glob('/home/srs-fuzz-logs/*-fuzz-log.txt'),key=os.path.getctime)).st_size deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nNetwork is up. Reset!\nLatest full-log size "+str(latest), 600),0,0,600,100) today = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') os.system('mv /tmp/enb_mac.pcap /home/srs-fuzz-logs/'+today+'-enb_mac.pcap') os.system('/home/platform-tools/adb logcat -b all -d >> /home/srs-fuzz-logs/'+today+'-logcat-crashlogs.txt') processes = restart_programs(processes) flight_mode_counter = 0 # reset the counter os.system('/home/platform-tools/adb logcat -b all -c') set_flight_mode('1') time.sleep(5) set_flight_mode('0') time.sleep(5) elif time.time() - time_start > 40 and not check_network_status(): deck.set_touchscreen_image(touchscreen_text(str(flight_mode_counter), 90, 50),600,0,90,100) printTimestamp() print("Network didn't wakeup. Enabling flight mode again...") deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nNetwork didn't wakeup. Enabling flight mode again...", 600),0,0,600,100) set_flight_mode('1') flight_mode_counter += 1 time.sleep(10) print("Disabling flight mode...\n") deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nDisabling flight mode...", 600),0,0,600,100) set_flight_mode('0') if flight_mode_counter == 5: print("Restarting programs...") print("Collecting logs..") latest = os.stat(max(glob.glob('/home/srs-fuzz-logs/*-fuzz-log.txt'),key=os.path.getctime)).st_size deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nFlightmode Counter reached. Reset!\nLatest full-log size "+str(latest), 600),0,0,600,100) today = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') os.system('/home/platform-tools/adb logcat -b all -d >> /home/srs-fuzz-logs/'+today+'-logcat-crashlogs.txt') if stopwhencrash: for process in processes: process.kill() os.system('mv /tmp/enb_mac.pcap /home/srs-fuzz-logs/'+today+'-enb_mac.pcap') processes = restart_programs(processes) flight_mode_counter = 0 # reset the counter os.system('/home/platform-tools/adb logcat -b all -c') set_flight_mode('1') time.sleep(5) set_flight_mode('0') time.sleep(5) time.sleep(5) # check every 5 seconds within the 60 second period else: flight_mode_counter = 0 deck.set_touchscreen_image(touchscreen_text(str(flight_mode_counter), 90, 50),600,0,90,100) time.sleep(30) # check the status every 20 seconds if check_network_status(): for process in processes: process.kill() today = datetime.today().strftime('%Y-%m-%d_%H-%M-%S') os.system('mv /tmp/enb_mac.pcap /home/srs-fuzz-logs/'+today+'-enb_mac.pcap') os.system('/home/platform-tools/adb logcat -b all -d >> /home/srs-fuzz-logs/'+today+'-logcat-crashlogs.txt') processes = restart_programs(processes) flight_mode_counter = 0 # reset the counter os.system('/home/platform-tools/adb logcat -b all -c') latest = os.stat(max(glob.glob('/home/srs-fuzz-logs/*-fuzz-log.txt'),key=os.path.getctime)).st_size deck.set_touchscreen_image(touchscreen_text(datetime.now().isoformat()+"\nNetwork is up. Reset!\nLatest full-log size "+str(latest), 600),0,0,600,100) set_flight_mode('1') time.sleep(5) set_flight_mode('0') time.sleep(10)