from machine import Pin, Timer, SPI from rp2 import PIO, StateMachine, asm_pio import time import framebuf import _thread SOM = 0x53 # OSDP start-of-message marker def crc16_itu_t(data, seed=0x1D0F): crc = seed for b in data: crc = ((crc >> 8) | (crc << 8)) & 0xFFFF crc ^= b crc ^= (crc & 0xFF) >> 4 crc ^= (crc << 12) & 0xFFFF crc ^= ((crc & 0xFF) << 5) & 0xFFFF return crc & 0xFFFF def checksum8(data): return (-sum(data)) & 0xFF # ---- OLED display support ------------------------------------------------- DC_PIN = 8 RST_PIN = 12 MOSI_PIN = 11 SCK_PIN = 10 CS_PIN = 9 class OLED_1inch3(framebuf.FrameBuffer): def __init__(self): self.width = 128 self.height = 64 self.rotate = 180 # only 0 or 180 supported self.cs = Pin(CS_PIN, Pin.OUT) self.rst = Pin(RST_PIN, Pin.OUT) self.cs(1) self.spi = SPI(1, 20_000_000, polarity=0, phase=0, sck=Pin(SCK_PIN), mosi=Pin(MOSI_PIN), miso=None) self.dc = Pin(DC_PIN, Pin.OUT) self.dc(1) self.buffer = bytearray(self.height * self.width // 8) super().__init__(self.buffer, self.width, self.height, framebuf.MONO_HMSB) self.init_display() self.white = 1 def write_cmd(self, cmd): self.cs(1) self.dc(0) self.cs(0) self.spi.write(bytearray([cmd])) self.cs(1) def write_data(self, buf): self.cs(1) self.dc(1) self.cs(0) self.spi.write(bytearray([buf])) self.cs(1) def init_display(self): self.rst(1) time.sleep_ms(1) self.rst(0) time.sleep_ms(10) self.rst(1) self.write_cmd(0xAE) self.write_cmd(0x00) self.write_cmd(0x10) self.write_cmd(0xB0) self.write_cmd(0xDC) self.write_cmd(0x00) self.write_cmd(0x81) self.write_cmd(0x6F) self.write_cmd(0x21) self.write_cmd(0xA1 if self.rotate == 180 else 0xA0) self.write_cmd(0xC0) self.write_cmd(0xA4) self.write_cmd(0xA6) self.write_cmd(0xA8) self.write_cmd(0x3F) self.write_cmd(0xD3) self.write_cmd(0x60) self.write_cmd(0xD5) self.write_cmd(0x41) self.write_cmd(0xD9) self.write_cmd(0x22) self.write_cmd(0xDB) self.write_cmd(0x35) self.write_cmd(0xAD) self.write_cmd(0x8A) self.write_cmd(0xAF) def show(self): self.write_cmd(0xB0) for page in range(64): column = page if self.rotate == 180 else 63 - page self.write_cmd(0x00 + (column & 0x0F)) self.write_cmd(0x10 + (column >> 4)) offset = page * 16 for num in range(16): self.write_data(self.buffer[offset + num]) class DisplayManager: def __init__(self): try: self.oled = OLED_1inch3() self.available = True self._boot_sequence() except Exception: self.available = False self.oled = None self.last_payload = None self.last_draw = 0 def _fmt_age(self, now_ms, ts): if ts is None: return "--" delta = time.ticks_diff(now_ms, ts) if delta < 0: delta += 1 << 30 if delta < 1000: return "<1s" if delta < 60000: return f"{delta // 1000}s" return f"{delta // 60000}m" def _truncate(self, text): return text[:16] def _render_lines(self, status, now_ms): lines = [] mode = status.get("mode", "unknown") lines.append(f"Mode:{mode}") osdp = status.get("osdp") if osdp: age = self._fmt_age(now_ms, osdp.get("ts")) count = osdp.get("count") baud = osdp.get("baud") if count is None: lines.append(self._truncate(f"OSDP:{baud}")) else: lines.append(self._truncate(f"OSDP:{baud} #{count}")) else: age = "--" lines.append("OSDP:--") secure = status.get("secure_seen") sc_line = "SCS:--" age_line = f"Age:{age}" if secure and osdp: mode_tag = osdp.get("sc_mode") if mode_tag == "handshake": sc_line = "SCS:HS" elif mode_tag == "mac": sc_line = "SCS:MAC" elif mode_tag == "mac_enc": sc_line = "SCS:MAC+ENC" else: sc_line = "SCS:ON" lines.append(self._truncate(sc_line)) lines.append(self._truncate(age_line)) wg = status.get("wiegand") if wg: fmt = wg.get("format", "--") lines.append(self._truncate(f"WG:{fmt}")) age_text = self._fmt_age(now_ms, wg.get("ts")) count = wg.get("count") if count is None: lines.append(self._truncate(f"Age:{age_text}")) else: lines.append(self._truncate(f"Age:{age_text} #{count}")) else: lines.append("WG:--") lines.append("") while len(lines) < 6: lines.append("") return lines[:6] def _boot_sequence(self): if not self.available: return self.oled.fill(0) self.oled.fill_rect(0, 0, self.oled.width, 6, self.oled.white) self.oled.fill_rect(0, self.oled.height - 6, self.oled.width, 6, self.oled.white) self.oled.fill(0x0000) self.oled.line(0,0,5,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,20,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,35,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,65,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,95,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,125,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,125,41,self.oled.white) self.oled.show() time.sleep(0.1) self.oled.line(0,0,125,21,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(0,0,125,3,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,125,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,110,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,95,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,65,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,35,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,1,64,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,1,44,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,1,24,self.oled.white) self.oled.show() time.sleep(0.01) self.oled.line(127,1,1,3,self.oled.white) self.oled.show() self.oled.fill_rect(0,22,128,20,0x0000) self.oled.show() self.oled.text("OSDPeek", 35, 28, self.oled.white) self.oled.show() time.sleep_ms(2000) self.oled.fill(0) self.oled.show() def update(self, status, now_ms): if not self.available: return osdp = status.get("osdp") wg = status.get("wiegand") payload = ( status.get("mode"), osdp.get("baud") if osdp else None, osdp.get("label") if osdp else None, osdp.get("info") if osdp else None, status.get("secure_seen"), wg.get("format") if wg else None, wg.get("ts") if wg else None, ) if (self.last_payload == payload and time.ticks_diff(now_ms, self.last_draw) < 500): return self.last_payload = payload self.last_draw = now_ms self.oled.fill(0) for idx, line in enumerate(self._render_lines(status, now_ms)): self.oled.text(self._truncate(line), 0, idx * 10, self.oled.white) self.oled.show() @asm_pio(in_shiftdir=PIO.SHIFT_RIGHT, autopush=True, push_thresh=8) def uart_rx_8n1(): wait(0, pin, 0) set(x, 7) nop()[31] nop()[14] label("bit") in_(pins, 1) jmp(x_dec, "bit")[31] wait(1, pin, 0) FIFO_JOIN_RX = getattr(PIO, "FIFO_JOIN_RX", getattr(PIO, "JOIN_RX", None)) def make_rx(sm_id, rx_gpio, baud): if isinstance(rx_gpio, Pin): pin = rx_gpio else: pin = Pin(rx_gpio, Pin.IN) prog = uart_rx_8n1 kwargs = { "freq": baud * 32, # 32 PIO cycles per UART bit for additional margin "in_base": pin, "jmp_pin": pin, } if FIFO_JOIN_RX is not None: try: kwargs["fifo_join"] = FIFO_JOIN_RX sm = StateMachine(sm_id, prog, **kwargs) except TypeError: kwargs.pop("fifo_join", None) sm = StateMachine(sm_id, prog, **kwargs) else: sm = StateMachine(sm_id, prog, **kwargs) sm.active(1) return sm class OSDPCollector: SOM = 0x53 def __init__(self, label, max_len=1024): self.label = label self._max_len = max_len self._buffer = bytearray() self._expected = 0 self._pending = [] def feed(self, byte): if not self._buffer and byte != self.SOM: return buf = self._buffer buf.append(byte & 0xFF) if len(buf) > self._max_len: cutoff = len(buf) // 2 self._buffer = bytearray(buf[cutoff:]) self._expected = 0 def process(self): self._consume_available() def pop_pending(self): frames, self._pending = self._pending, [] return frames def reset(self): self._buffer = bytearray() self._expected = 0 self._pending = [] def _consume_available(self): buf = self._buffer while buf: if buf[0] != self.SOM: self._resync() buf = self._buffer continue if not self._expected: if len(buf) < 4: return length = buf[2] | (buf[3] << 8) if length < 7 or length > self._max_len: self._resync(drop_first=True) buf = self._buffer continue self._expected = length if len(buf) < self._expected: return frame = bytes(buf[: self._expected]) self._pending.append(frame) buf = bytearray(buf[self._expected :]) self._buffer = buf self._expected = 0 def _resync(self, drop_first=False): buf = self._buffer start = 1 if drop_first else 0 idx = -1 for i in range(start, len(buf)): if buf[i] == self.SOM: idx = i break if idx <= 0: if idx == 0: return self._buffer = bytearray() self._expected = 0 else: self._buffer = bytearray(buf[idx:]) self._expected = 0 PIN_HIGH_CONFIRM = 4 # consecutive highs needed to select/switch lines PIN_IDLE_CONFIRM = 120 # consecutive highs before declaring disconnect (all high) WIEGAND_INTERBIT_US = 6000 # gap to treat a Wiegand frame as complete WIEGAND_RETAIN_MS = 1000 # keep showing Wiegand before falling back to idle MODE_IDLE = "Idle" MODE_NOT_CONNECTED = "NC" KNOWN_WIEGAND_LENGTHS = {26, 32, 34, 35, 37, 40, 44, 56, 64, 128} class LineWatcher: """Track candidate lines for polarity, idle, and disconnect conditions.""" def __init__(self, pin_map, high_confirm, idle_confirm): self._pin_map = pin_map self._pins = tuple(pin_map.keys()) self._high_confirm = high_confirm self._idle_confirm = idle_confirm self._high_counts = {pin: 0 for pin in self._pins} self._idle_count = 0 self._active = None self._last_states = {pin: 0 for pin in self._pins} def step(self): """Return (event, pin) with event in {"attach","switch","idle","nc",None}.""" states = {pin: self._pin_map[pin].value() for pin in self._pins} # Detect hard disconnect (all high) before looking for a candidate. if all(level == 1 for level in states.values()): if self._idle_count < self._idle_confirm: self._idle_count += 1 if self._idle_count >= self._idle_confirm: self._active = None self._reset_counters(keep_last=False) self._last_states = states return "nc", None else: self._idle_count = 0 # Update high counters (treat high as candidate, low clears counter) for pin, level in states.items(): if level: if self._high_counts[pin] < self._high_confirm: self._high_counts[pin] += 1 else: self._high_counts[pin] = 0 if self._active is None: candidates = [pin for pin, count in self._high_counts.items() if count >= self._high_confirm] if len(candidates) == 1: self._active = candidates[0] self._reset_counters(keep_last=True) self._last_states = states return "attach", self._active self._last_states = states return None, None candidates = [ pin for pin, count in self._high_counts.items() if pin != self._active and count >= self._high_confirm ] if len(candidates) == 1: self._active = candidates[0] self._reset_counters(keep_last=True) self._last_states = states return "switch", self._active if any(level == 1 for level in states.values()) and not all(level == 1 for level in states.values()): # Line is present but idle (one high, one low) if any(self._last_states[pin] != states[pin] for pin in self._pins): self._last_states = states return "idle", self._active self._last_states = states return None, None @property def last_states(self): return dict(self._last_states) def _reset_counters(self, keep_last): self._high_counts = {pin: 0 for pin in self._pins} self._idle_count = 0 if not keep_last: self._last_states = {pin: 0 for pin in self._pins} class PIOOSDPSniffer: def __init__(self, pins=(16,), bauds=(115200, 38400, 9600), sm_base_id=0): self._pins = tuple(pins) self._bauds = tuple(bauds) self._pin_inputs = { pin: Pin(pin, Pin.IN, Pin.PULL_UP) for pin in self._pins } self._watcher = LineWatcher(self._pin_inputs, PIN_HIGH_CONFIRM, PIN_IDLE_CONFIRM) self._sm_base_id = sm_base_id self._active_pin = None self.had_bytes = False self.attached = False self.state = "nc" self.just_attached = False self._selected_baud = None self._bundles = [] self._bundles_lock = _thread.allocate_lock() self._queue_lock = _thread.allocate_lock() self._frame_queue = [] self._had_bytes_flag = False self._thread_running = True _thread.start_new_thread(self._drain_worker, ()) def poll(self): frames = [] self.had_bytes = False self.just_attached = False event, pin = self._watcher.step() if event == "attach": self._start_session(pin, reset_counters=True) self.just_attached = True self.state = "active" elif event == "switch": self._start_session(pin, reset_counters=False) self.state = "active" elif event == "nc": self._stop_session() self.state = "nc" elif event == "idle": if self.attached: self.state = "idle" if not self.attached: return frames # Retrieve frames accumulated by the worker thread. self._queue_lock.acquire() try: if self._frame_queue: frames = self._frame_queue self._frame_queue = [] self.had_bytes = self._had_bytes_flag or bool(frames) self._had_bytes_flag = False finally: self._queue_lock.release() return frames def get_levels(self): return {pin: self._pin_inputs[pin].value() for pin in self._pins} def select_baud(self, baud): self._selected_baud = baud def clear_selection(self): self._selected_baud = None def stop(self): self._thread_running = False time.sleep_ms(50) self._stop_session() # Internal helpers ------------------------------------------------- def _start_session(self, pin, reset_counters): self._bundles_lock.acquire() try: if self._active_pin == pin and self._bundles: return for bundle in self._bundles: bundle["sm"].active(0) self._bundles = [] self._active_pin = pin pin_obj = self._pin_inputs[pin] bundles = [] for idx, baud in enumerate(self._bauds): sm = make_rx(self._sm_base_id + idx, pin_obj, baud) collector = OSDPCollector(f"GP{pin}@{baud}") bundles.append({ "baud": baud, "pin": pin, "label": f"GP{pin}", "sm": sm, "collector": collector, "rx": sm.rx_fifo, "get": sm.get, "channel": { "baud": baud, "pin": pin, "label": f"GP{pin}", }, }) self._bundles = bundles self.attached = True self._selected_baud = None finally: self._bundles_lock.release() if reset_counters: # signal to caller via just_attached flag (already set outside) pass def _stop_session(self): self._bundles_lock.acquire() try: for bundle in self._bundles: bundle["sm"].active(0) self._bundles = [] self._active_pin = None self.attached = False self._selected_baud = None finally: self._bundles_lock.release() def _drain_worker(self): import time while self._thread_running: self._bundles_lock.acquire() bundles = tuple(self._bundles) self._bundles_lock.release() if not bundles: #time.sleep_us(80) continue selected = self._selected_baud busy_any = False for bundle in bundles: if selected is not None and bundle["baud"] != selected: continue rx = bundle["rx"] get = bundle["get"] collector = bundle["collector"] busy = False while rx(): busy = True v = get() collector.feed((v >> 24) & 0xFF) if not busy: continue busy_any = True collector.process() frames = collector.pop_pending() if not frames: continue self._queue_lock.acquire() try: for frame in frames: self._frame_queue.append((bundle["channel"], frame)) self._had_bytes_flag = True finally: self._queue_lock.release() if not busy_any: #time.sleep_us(50) continue class OSDPAnalyzer: def __init__(self): self.frames = 0 self.last_info = "" self.secure_seen = False self.last_valid = False self.sec_state = "none" # none, handshake, mac, mac_enc self.sec_error = None self._nonce_cp = None self._nonce_pd = None def process_frame(self, frame): if len(frame) < 7 or frame[0] != SOM: return False total_len = frame[2] | (frame[3] << 8) if total_len != len(frame): return False ctrl = frame[4] use_crc = bool(ctrl & 0x04) valid = True if use_crc: if len(frame) < 7: return False payload = frame[:-2] rx_crc = frame[-2] | (frame[-1] << 8) calc_crc = crc16_itu_t(payload) if calc_crc != rx_crc: valid = False data = frame[5:-2] else: calc = (sum(frame) & 0xFF) if calc != 0: valid = False data = frame[5:-1] if len(frame) > 6 else b"" secure = bool(ctrl & 0x08) raw_without_tail = frame[:-2] if use_crc else frame[:-1] if secure and valid and len(data) >= 2: self._handle_secure_block(frame, ctrl, data, raw_without_tail) self.frames += 1 crc_flag = "CRC" if use_crc else "CHK" sec_flag = "" if self.sec_state == "handshake": sec_flag = "SC-HS" elif self.sec_state == "mac": sec_flag = "SC-MAC" elif self.sec_state == "mac_enc": sec_flag = "SC-MAC+ENC" validity = "OK" if valid else "BAD" parts = [f"addr={frame[1]}", f"len={total_len}"] if sec_flag and secure: parts.append(sec_flag) parts.append(f"{crc_flag}-{validity}") self.last_info = " ".join(parts) self.last_valid = valid return valid # --- Secure-channel helpers ------------------------------------------------- def _parse_scb(self, data): if len(data) < 2: return None, 0, b"" scb_type = data[0] scb_len = data[1] if scb_len + 2 > len(data): return scb_type, scb_len, data[2:] return scb_type, scb_len, data[2: 2 + scb_len] def _handle_secure_block(self, frame, ctrl, data, raw_without_tail): scb_type, scb_len, scb_payload = self._parse_scb(data) if scb_type is None: return self.secure_seen = True if scb_type in (0x11, 0x12): # CP_ RAND, PD_ RAND if scb_type == 0x11 and len(scb_payload) >= 8: self._nonce_cp = bytes(scb_payload[:8]) elif scb_type == 0x12 and len(scb_payload) >= 8: self._nonce_pd = bytes(scb_payload[:8]) self.sec_state = "handshake" self.sec_error = None return if scb_type == 0x13: # CP_CRYPT self.sec_state = "handshake" self.sec_error = None return if scb_type == 0x14: # R-MAC_I self.sec_state = "handshake" self.sec_error = None return if scb_type == 0x15: # Secure channel active, MAC present. self.sec_state = "mac" self._handle_secure_message(raw_without_tail, mac_len=4) return if scb_type == 0x16: # Secure channel active, MAC present. self.sec_state = "mac" self._handle_secure_message(raw_without_tail, mac_len=4) return if scb_type == 0x17: # Secure channel active, MAC + encryption. self.sec_state = "mac_enc" self._handle_secure_message(raw_without_tail, mac_len=4) return if scb_type == 0x18: # Secure channel active, MAC + encryption. self.sec_state = "mac_enc" self._handle_secure_message(raw_without_tail, mac_len=4) return # Unknown SCB type; keep secure flag but leave state. self.sec_error = f"Unknown SCB type {scb_type:02X}" def _handle_secure_message(self, raw_without_tail, mac_len): # For now we just acknowledge the secure mode; full MAC verification # would require deriving session keys from the negotiated SCBK. if len(raw_without_tail) <= mac_len: self.sec_error = "Secure payload too short" return self.sec_error = None class WiegandSniffer: def __init__(self, d0_pin=14, d1_pin=15): self.d0 = Pin(d0_pin, Pin.IN, Pin.PULL_UP) self.d1 = Pin(d1_pin, Pin.IN, Pin.PULL_UP) self.bits = [] self.last_edge_us = time.ticks_us() self.timer = Timer() self.frame_ready = False self.frame_bits = [] self.last_frame_us = None self.enabled = True self.d0.irq(trigger=Pin.IRQ_FALLING, handler=self._edge0) self.d1.irq(trigger=Pin.IRQ_FALLING, handler=self._edge1) self.timer.init(period=5, mode=Timer.PERIODIC, callback=self._tick) def _edge0(self, pin): if not self.enabled: return self.bits.append(0) self.last_edge_us = time.ticks_us() def _edge1(self, pin): if not self.enabled: return self.bits.append(1) self.last_edge_us = time.ticks_us() def _tick(self, _): if not self.enabled: return if self.bits and time.ticks_diff(time.ticks_us(), self.last_edge_us) > WIEGAND_INTERBIT_US: self.frame_bits = self.bits[:] self.bits.clear() self.frame_ready = True def poll(self): if self.frame_ready: self.frame_ready = False bit_count = len(self.frame_bits) # clamp to realistic Wiegand lengths; ignore obvious noise frames if bit_count < 24 or bit_count > 128: return None fmt = self._describe(bit_count) self.last_frame_us = time.ticks_us() return {"bits": bit_count, "format": fmt, "raw_bits": self.frame_bits[:]} return None def set_enabled(self, enabled=True): if self.enabled == enabled: return self.enabled = enabled self.bits.clear() self.frame_ready = False self.frame_bits = [] def _describe(self, bits): if bits in KNOWN_WIEGAND_LENGTHS: return f"{bits}-bit" return f"{bits}-bit (unkn)" def _bits_to_int(bits): value = 0 for bit in bits: value = (value << 1) | (bit & 1) return value def _invert_bits(bits): return [0 if b else 1 for b in bits] def _decode_wiegand_candidate(bits): length = len(bits) if length not in KNOWN_WIEGAND_LENGTHS: return None payload_value = _bits_to_int(bits) info = { "length": length, "raw_bits": tuple(bits), "raw_value": payload_value, "parity_ok": None, } if length == 26: data = bits[1:-1] even_ok = ((sum(data[:12]) + bits[0]) & 1) == 0 odd_ok = ((sum(data[12:]) + bits[-1]) & 1) == 1 if not (even_ok and odd_ok): return None facility = _bits_to_int(data[:8]) card = _bits_to_int(data[8:]) info.update({ "parity_ok": True, "facility": facility, "card": card, }) else: info["parity_ok"] = None return info def decode_wiegand_frame(bits): candidates = [ ("normal", bits), ("reversed", list(reversed(bits))), ("inverted", _invert_bits(bits)), ("rev_inverted", _invert_bits(list(reversed(bits)))), ] for orientation, candidate in candidates: decoded = _decode_wiegand_candidate(candidate) if decoded: decoded["orientation"] = orientation return decoded return None print("OSDPeek — Monitoring…") PIO_PINS = (16, 18) # RS-485 A/B candidates BAUD_CANDIDATES = (115200, 38400, 9600) sniffer = PIOOSDPSniffer(pins=PIO_PINS, bauds=BAUD_CANDIDATES) analyzer = OSDPAnalyzer() wiegand = WiegandSniffer(d0_pin=14, d1_pin=13) print(f"[OSDP] Monitoring bauds: {', '.join(str(b) for b in BAUD_CANDIDATES)}") display = DisplayManager() status = { "mode": MODE_NOT_CONNECTED, "osdp": None, "secure_seen": False, "wiegand": None, } display.update(status, time.ticks_ms()) mode = MODE_NOT_CONNECTED reported_secure = False detected_channels = set() last_osdp_ms = None OSDP_RETAIN_MS = 1000 osdp_frame_count = 0 wiegand_frame_count = 0 bus_type = "nc" display_dirty = True last_display_ms = 0 try: while True: now_ms = time.ticks_ms() frames = sniffer.poll() osdp_active = False prev_bus_type = bus_type def reset_session(clear_queue=True): status["osdp"] = None status["wiegand"] = None status["secure_seen"] = False detected_channels.clear() sniffer.clear_selection() if clear_queue: sniffer._queue_lock.acquire() try: sniffer._frame_queue = [] sniffer._had_bytes_flag = False finally: sniffer._queue_lock.release() analyzer.sec_state = "none" analyzer.secure_seen = False analyzer._nonce_cp = None analyzer._nonce_pd = None analyzer.sec_error = None return 0, 0, False if sniffer.just_attached and prev_bus_type == "nc": osdp_frame_count, wiegand_frame_count, reported_secure = reset_session(clear_queue=False) display_dirty = True osdp_levels = sniffer.get_levels() osdp_vals = tuple(osdp_levels.get(pin, 0) for pin in sorted(osdp_levels)) wg_vals = (wiegand.d0.value(), wiegand.d1.value()) def _all(vals, value): return all(v == value for v in vals) def _mixed(vals): return (0 in vals) and (1 in vals) new_bus_type = "unknown" if _all(osdp_vals, 1) and _all(wg_vals, 1): new_bus_type = "nc" elif _all(osdp_vals, 1) and _all(wg_vals, 0): new_bus_type = "wiegand" elif _mixed(osdp_vals) and _mixed(wg_vals): new_bus_type = "osdp" if new_bus_type == "unknown": new_bus_type = bus_type if new_bus_type != prev_bus_type: if new_bus_type == "nc": sniffer._stop_session() sniffer.state = "nc" display_dirty = True elif prev_bus_type == "nc" and new_bus_type in ("wiegand", "osdp"): osdp_frame_count, wiegand_frame_count, reported_secure = reset_session() display_dirty = True bus_type = new_bus_type mode = MODE_NOT_CONNECTED if bus_type == "nc" else MODE_IDLE if bus_type == "osdp": for channel, frame in frames: if not analyzer.process_frame(frame): continue hex_bytes = " ".join(f"{b:02X}" for b in frame) label = channel["label"] baud = channel["baud"] key = (label, baud) prefix = "Detected" if key not in detected_channels else "Frame" print(f"[OSDP] {prefix} @ {baud} bps ({label}) | {analyzer.last_info} | {hex_bytes}") detected_channels.add(key) if bus_type != "osdp": if bus_type == "nc": osdp_frame_count, wiegand_frame_count, reported_secure = reset_session() bus_type = "osdp" sniffer.select_baud(baud) mode = "OSDP" osdp_active = True osdp_frame_count += 1 last_osdp_ms = now_ms status["osdp"] = { "baud": baud, "info": analyzer.last_info, "ts": now_ms, "count": osdp_frame_count, "sc_mode": analyzer.sec_state, } status["secure_seen"] = analyzer.secure_seen display_dirty = True if analyzer.secure_seen and not reported_secure: print("[OSDP] Secure Channel observed") reported_secure = True frames = [] if mode == "OSDP" and not osdp_active and last_osdp_ms is not None: if time.ticks_diff(now_ms, last_osdp_ms) > OSDP_RETAIN_MS: mode = MODE_IDLE detected_channels.clear() display_dirty = True sniffer.clear_selection() wg = wiegand.poll() decoded = None if wg and bus_type in ("wiegand", "nc"): decoded = decode_wiegand_frame(wg["raw_bits"]) if not decoded: wg = None if wg and decoded: byte_len = (decoded["length"] + 7) // 8 hex_value = f"0x{decoded['raw_value']:0{byte_len * 2}X}" details = [] facility = decoded.get("facility") card = decoded.get("card") if facility is not None: details.append(f"fc={facility}") if card is not None: details.append(f"id={card}") orientation = decoded.get("orientation") if orientation in ("inverted", "rev_inverted"): details.append("lines?") parity_ok = decoded.get("parity_ok") if parity_ok is False: details.append("parity?") detail_str = f" | {' '.join(details)}" if details else "" fmt = wg["format"] print(f"[Wiegand] Detected | {fmt} | {hex_value}{detail_str}") if bus_type != "wiegand": if bus_type == "nc": osdp_frame_count, wiegand_frame_count, reported_secure = reset_session() bus_type = "wiegand" sniffer.clear_selection() mode = "Wiegand" wiegand_frame_count += 1 status["wiegand"] = { "format": fmt, "ts": now_ms, "hex": hex_value, "facility": facility, "card": card, "orientation": orientation, "count": wiegand_frame_count, } display_dirty = True if mode == "Wiegand": last_wg_us = wiegand.last_frame_us if last_wg_us is None or time.ticks_diff(time.ticks_us(), last_wg_us) > (WIEGAND_RETAIN_MS * 1000): mode = MODE_IDLE display_dirty = True if mode in ("OSDP", "Wiegand"): new_mode = mode else: if bus_type == "nc": new_mode = MODE_NOT_CONNECTED elif bus_type == "wiegand": new_mode = "Wiegand (I)" elif bus_type == "osdp": new_mode = "OSDP (I)" else: new_mode = MODE_IDLE if new_mode != status["mode"]: status["mode"] = new_mode if new_mode == MODE_NOT_CONNECTED: sniffer.clear_selection() display_dirty = True if display_dirty or time.ticks_diff(now_ms, last_display_ms) >= 500: display.update(status, now_ms) last_display_ms = now_ms display_dirty = False if frames: continue time.sleep_us(50) except KeyboardInterrupt: sniffer.stop() status["mode"] = "stopped" display.update(status, time.ticks_ms()) print("\nStopped by user.")