#!/usr/bin/env python3 """ JARVIS Display ────────────── Renders the JARVIS UI overlay via X11. Development: python display.py Kiosk: DISPLAY=:0 python display.py --fullscreen Weather via Open-Meteo (free, no API key). News via BBC RSS (no API key). Stocks via Yahoo Finance public quote API (no API key). """ import os import json import math import threading import time as _time import argparse import subprocess import urllib.request import urllib.parse import xml.etree.ElementTree as ET from datetime import datetime _parser = argparse.ArgumentParser(description='JARVIS display', add_help=True) _parser.add_argument('--fullscreen', '-fs', action='store_true', help='Run fullscreen under X11/Wayland') _parser.add_argument('--width', type=int, default=1280, help='Screen width (default 1280)') _parser.add_argument('--height', type=int, default=800, help='Screen height (default 800)') _parser.add_argument('--mic', type=int, default=0, help='Microphone device index (default 0)') _args, _ = _parser.parse_known_args() import pygame # ── Locations to cycle through (weather) ───────────────────────────────────── _LOCATIONS = ['Poole', 'Portsmouth', 'Besancon', 'Paris', 'Gorinchem'] # ── Stock indices ───────────────────────────────────────────────────────────── _INDICES = [ ('^FTSE', 'FTSE 100'), ('^FCHI', 'CAC 40'), ('^GDAXI', 'DAX'), ('^STOXX50E', 'Euro Stoxx 50'), ('^GSPC', 'S&P 500'), ('^DJI', 'DJIA'), ('^IXIC', 'Nasdaq'), ('^N225', 'Nikkei 225'), ('^HSI', 'Hang Seng'), ('000001.SS', 'Shanghai'), ('^BSESN', 'Sensex'), ('^NSEI', 'Nifty 50'), ('^AXJO', 'ASX 200'), ('^VIX', 'VIX'), ] # ── Colour palette ─────────────────────────────────────────────────────────── BLACK = ( 0, 0, 0) WHITE = (255, 255, 255) GRAY = (160, 160, 160) DIM_GRAY = ( 80, 80, 80) WIRE_COL = ( 58, 58, 58) WIRE_EYE = ( 95, 95, 95) GREEN = ( 80, 200, 80) RED = (200, 80, 80) # ── WMO weather codes ───────────────────────────────────────────────────────── _WMO = { 0: 'Clear sky', 1: 'Mainly clear', 2: 'Partly cloudy', 3: 'Overcast', 45: 'Fog', 48: 'Freezing fog', 51: 'Light drizzle', 53: 'Drizzle', 55: 'Heavy drizzle', 61: 'Light rain', 63: 'Rain', 65: 'Heavy rain', 71: 'Light snow', 73: 'Snow', 75: 'Heavy snow', 77: 'Snow grains', 80: 'Light showers', 81: 'Showers', 82: 'Heavy showers', 85: 'Snow showers', 86: 'Heavy snow showers', 95: 'Thunderstorm', 96: 'Thunderstorm + hail', 99: 'Thunderstorm + hail', } # ── Shared HTTP helper ──────────────────────────────────────────────────────── _HEADERS = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) JARVIS/1.0'} def _get(url, timeout=15): req = urllib.request.Request(url, headers=_HEADERS) with urllib.request.urlopen(req, timeout=timeout) as r: return r.read() # ═══════════════════════════════════════════════════════════════════════════════ # NEWS # ═══════════════════════════════════════════════════════════════════════════════ _headlines: list[str] = [] _headlines_lock = threading.Lock() def _fetch_news() -> None: data = _get('https://feeds.bbci.co.uk/news/rss.xml') root = ET.fromstring(data) items = root.findall('.//item') titles = [it.findtext('title', '').strip() for it in items] titles = [t for t in titles if t][:20] with _headlines_lock: _headlines.clear() _headlines.extend(titles) def _news_worker() -> None: while True: try: _fetch_news() except Exception: pass _time.sleep(300) # refresh every 5 minutes # ═══════════════════════════════════════════════════════════════════════════════ # STOCKS # ═══════════════════════════════════════════════════════════════════════════════ # Each entry: {'name': str, 'price': str, 'pct': float, 'pct_str': str} _stocks: list[dict] = [] _stocks_lock = threading.Lock() def _fetch_one_stock(symbol: str) -> dict | None: url = (f'https://query2.finance.yahoo.com/v8/finance/chart/{urllib.parse.quote(symbol)}' f'?interval=1d&range=2d') data = json.loads(_get(url)) result = data.get('chart', {}).get('result') if not result: return None meta = result[0]['meta'] price = meta.get('regularMarketPrice', 0) prev = meta.get('chartPreviousClose') or meta.get('previousClose', price) pct = ((price - prev) / prev * 100) if prev else 0.0 sign = '+' if pct >= 0 else '' return { 'price': f'{price:,.2f}', 'pct': pct, 'pct_str': f'{sign}{pct:.2f}%', } def _fetch_stocks() -> None: fresh = [] for sym, name in _INDICES: try: q = _fetch_one_stock(sym) except Exception: q = None if q: q['name'] = name fresh.append(q) if fresh: with _stocks_lock: _stocks.clear() _stocks.extend(fresh) def _stocks_worker() -> None: while True: try: _fetch_stocks() except Exception: pass _time.sleep(300) # refresh every 5 minutes # ═══════════════════════════════════════════════════════════════════════════════ # WEATHER # ═══════════════════════════════════════════════════════════════════════════════ _weather_all: list[dict] = [] _weather_lock = threading.Lock() def _geocode(city: str) -> tuple[float, float, str]: url = ('https://geocoding-api.open-meteo.com/v1/search' f'?name={urllib.parse.quote(city)}&count=1&language=en&format=json') data = json.loads(_get(url)) r = data['results'][0] return r['latitude'], r['longitude'], r['name'] def _fetch_one(lat: float, lon: float) -> dict: url = ( 'https://api.open-meteo.com/v1/forecast' f'?latitude={lat:.4f}&longitude={lon:.4f}' '&daily=weather_code,temperature_2m_max,temperature_2m_min' '&hourly=temperature_2m,weather_code' '&timezone=auto&forecast_days=7' ) data = json.loads(_get(url)) daily = data['daily'] hourly = data['hourly'] week = [] for i, date_str in enumerate(daily['time']): d = datetime.strptime(date_str, '%Y-%m-%d') week.append({ 'day': 'Today' if i == 0 else d.strftime('%A'), 'high': round(daily['temperature_2m_max'][i]), 'low': round(daily['temperature_2m_min'][i]), 'desc': _WMO.get(daily['weather_code'][i], ''), }) now = datetime.now() hours = [] for i, ts in enumerate(hourly['time']): if len(hours) >= 5: break t = datetime.strptime(ts, '%Y-%m-%dT%H:%M') if t <= now: continue hours.append({ 'label': t.strftime('%-I%p').lower(), 'temp': round(hourly['temperature_2m'][i]), 'desc': _WMO.get(hourly['weather_code'][i], ''), }) return {'week': week, 'hours': hours} def _weather_worker(locations: list[tuple[float, float, str]]) -> None: while True: fresh = [] for lat, lon, name in locations: try: entry = _fetch_one(lat, lon) except Exception: entry = {'week': [], 'hours': []} entry['town'] = name fresh.append(entry) with _weather_lock: _weather_all.clear() _weather_all.extend(fresh) _time.sleep(1800) # ═══════════════════════════════════════════════════════════════════════════════ # WAKE WORD # ═══════════════════════════════════════════════════════════════════════════════ _OWW_MODEL = '/home/dfr84/Python/JARVIS/Jarvis.onnx' _OWW_THRESHOLD = 0.6 _OWW_DEBOUNCE = 2 # consecutive frames above threshold to trigger _OWW_CHUNK = 1280 _LISTEN_SECONDS = 4.0 _RESPONSE_HOLD = 3.0 # state: 'idle' | 'listening' | 'positive' | 'negative' _wake: dict = {'state': 'idle', 'detected_at': 0.0, 'idle_since': 0.0} _WAKE_COOLDOWN = 4.0 # seconds after response before listening again _FACE_GREEN = ( 40, 200, 80) _FACE_BLUE = ( 40, 80, 220) _FACE_RED = (200, 50, 50) def _lerp_color(c1: tuple, c2: tuple, t: float) -> tuple: return tuple(int(c1[i] + (c2[i] - c1[i]) * t) for i in range(3)) def _speak(text: str) -> None: subprocess.run(['espeak-ng', '-s', '150', text], capture_output=True) def _handle_command(raw: bytes, n_ch: int, native_hz: int) -> None: try: import speech_recognition as sr import numpy as np audio_np = np.frombuffer(raw, dtype=np.int16) if n_ch > 1: audio_np = audio_np.reshape(-1, n_ch)[:, 0] target_hz = 16000 if native_hz != target_hz: ratio = target_hz / native_hz new_len = int(len(audio_np) * ratio) indices = np.round(np.linspace(0, len(audio_np) - 1, new_len)).astype(int) audio_np = audio_np[indices] audio_data = sr.AudioData(audio_np.tobytes(), target_hz, 2) text = sr.Recognizer().recognize_google(audio_data).lower() if 'are you there' in text: _wake['state'] = 'positive' _speak('Yes, I am here') else: _wake['state'] = 'negative' _speak("Sorry, I didn't understand") except Exception: _wake['state'] = 'negative' _speak("Sorry, I didn't understand") _time.sleep(_RESPONSE_HOLD) _wake['idle_since'] = _time.time() _wake['state'] = 'idle' def _wake_worker() -> None: try: import pyaudio import numpy as np from openwakeword.model import Model except ImportError: return try: model = Model(wakeword_model_paths=[_OWW_MODEL]) audio = pyaudio.PyAudio() dev_info = audio.get_device_info_by_index(_args.mic) n_ch = int(dev_info['maxInputChannels']) native_hz = int(dev_info['defaultSampleRate']) target_hz = 16000 buf_frames = int(_OWW_CHUNK * native_hz / target_hz) stream = audio.open( format=pyaudio.paInt16, channels=n_ch, rate=native_hz, input=True, input_device_index=_args.mic, frames_per_buffer=buf_frames, ) cmd_frames: list[bytes] = [] hit_count = 0 while True: data = stream.read(buf_frames, exception_on_overflow=False) state = _wake['state'] if state == 'idle': audio_np = np.frombuffer(data, dtype=np.int16) if n_ch > 1: audio_np = audio_np.reshape(-1, n_ch)[:, 0] if native_hz != target_hz: ratio = target_hz / native_hz new_len = int(len(audio_np) * ratio) indices = np.round(np.linspace(0, len(audio_np) - 1, new_len)).astype(int) audio_np = audio_np[indices] if _time.time() - _wake['idle_since'] < _WAKE_COOLDOWN: hit_count = 0 continue triggered = any(s >= _OWW_THRESHOLD for s in model.predict(audio_np).values()) if triggered: hit_count += 1 else: hit_count = 0 if hit_count >= _OWW_DEBOUNCE: hit_count = 0 _wake['state'] = 'listening' _wake['detected_at'] = _time.time() cmd_frames.clear() elif state == 'listening': cmd_frames.append(data) if _time.time() - _wake['detected_at'] >= _LISTEN_SECONDS: _wake['state'] = 'processing' threading.Thread( target=_handle_command, args=(b''.join(cmd_frames), n_ch, native_hz), daemon=True, ).start() cmd_frames.clear() except Exception: pass # ═══════════════════════════════════════════════════════════════════════════════ # WIREFRAME FACE # ═══════════════════════════════════════════════════════════════════════════════ def _hw(t: float, rx: float) -> float: if t <= 0.0: return rx * math.sqrt(max(0.0, 1.0 - t * t)) return rx * math.sqrt(max(0.0, 1.0 - t * t)) * (1.0 - 0.30 * t) def draw_wireframe_face(surface, cx, cy, rx, ry, col_wire=WIRE_COL, col_eye=WIRE_EYE): N_H, N_V, PTS = 13, 11, 40 def sc(xf, t): w = _hw(t, rx) bow = int(0.06 * w * (1.0 - t*t) * (1.0 - xf*xf)) return (int(cx + xf*w), int(cy + t*ry) - bow) def draw(pts, col=col_wire): if len(pts) >= 2: pygame.draw.lines(surface, col, False, pts, 1) t_vals = [-1.0 + 2.0*i/(PTS-1) for i in range(PTS)] draw([sc(-1.0, t) for t in t_vals]) draw([sc( 1.0, t) for t in t_vals]) pygame.draw.lines(surface, col_wire, False, [sc(-1.0,-1.0), sc(1.0,-1.0)], 1) pygame.draw.lines(surface, col_wire, False, [sc(-0.6,1.0), sc(0.0,1.02), sc(0.6,1.0)], 1) for i in range(1, N_H): t = -1.0 + 2.0*i/N_H draw([sc(-1.0 + 2.0*j/(PTS-1), t) for j in range(PTS)]) for i in range(1, N_V): xf = -1.0 + 2.0*i/N_V draw([sc(xf, t) for t in t_vals]) eye_t, eye_xf = -0.38, 0.35 eye_w = _hw(eye_t, rx) e_rx = int(eye_w * 0.24) e_ry = int(ry * 0.09) for sign in (-1, +1): ecx = int(cx + sign * eye_xf * eye_w) ecy = int(cy + eye_t * ry) pts = [(int(ecx + e_rx*math.cos(2*math.pi*k/28)), int(ecy + e_ry*math.sin(2*math.pi*k/28))) for k in range(29)] pygame.draw.lines(surface, col_eye, True, pts, 1) inner = int(eye_w * 0.12) e_btm = int(cy + eye_t*ry) + e_ry nose_y = int(cy + 0.08*ry) nose_w = int(eye_w * 0.07) draw([(cx-inner, e_btm), (cx-nose_w, nose_y)]) draw([(cx+inner, e_btm), (cx+nose_w, nose_y)]) # ═══════════════════════════════════════════════════════════════════════════════ # CLOCK # ═══════════════════════════════════════════════════════════════════════════════ def draw_clock(surface, fonts, x, y): now = datetime.now() t_surf = fonts['large'].render(now.strftime('%-H:%M'), True, WHITE) s_surf = fonts['small'].render(now.strftime('%S'), True, DIM_GRAY) surface.blit(fonts['small'].render(now.strftime('%A, %-d %B %Y'), True, GRAY), (x, y)) ty = y + fonts['small'].get_height() + 4 surface.blit(t_surf, (x, ty)) surface.blit(s_surf, (x + t_surf.get_width() + 2, ty + t_surf.get_height() - s_surf.get_height() - 8)) # ═══════════════════════════════════════════════════════════════════════════════ # TICKER (shared scroll renderer) # ═══════════════════════════════════════════════════════════════════════════════ # Cache: key → (source_text, Surface) _ticker_cache: dict[str, tuple[str, pygame.Surface]] = {} def _build_news_surf(font) -> pygame.Surface | None: with _headlines_lock: items = list(_headlines) if not items: return None text = ' ◆ '.join(items) key = 'news' if key in _ticker_cache and _ticker_cache[key][0] == text: return _ticker_cache[key][1] surf = font.render(text, True, GRAY) _ticker_cache[key] = (text, surf) return surf def _build_stocks_surf(font) -> pygame.Surface | None: with _stocks_lock: items = list(_stocks) if not items: return font.render('Market data loading…', True, DIM_GRAY) sep = font.render(' | ', True, DIM_GRAY) # Build a key from current data to detect changes key = 'stocks' data_key = '|'.join(f"{s['name']}{s['price']}{s['pct_str']}" for s in items) if key in _ticker_cache and _ticker_cache[key][0] == data_key: return _ticker_cache[key][1] # Render each segment: name in GRAY, price+pct in green/red parts: list[pygame.Surface] = [] for i, s in enumerate(items): col = GREEN if s['pct'] > 0 else RED if s['pct'] < 0 else WHITE parts.append(font.render(s['name'] + ' ', True, GRAY)) parts.append(font.render(s['price'] + ' ' + s['pct_str'], True, col)) if i < len(items) - 1: parts.append(sep) total_w = sum(p.get_width() for p in parts) h = font.get_height() surf = pygame.Surface((total_w, h), pygame.SRCALPHA) x = 0 for p in parts: surf.blit(p, (x, 0)) x += p.get_width() _ticker_cache[key] = (data_key, surf) return surf def draw_ticker(surface, surf, x, y, w, speed=50.0): """Scroll surf right-to-left in the region (x, y, w, surf.height).""" if surf is None: return sw = surf.get_width() total = sw + 120 # gap between end of text and next repeat off = (_time.time() * speed) % total bx = int(x + w - off) old = surface.get_clip() surface.set_clip(pygame.Rect(x, y, w, surf.get_height())) surface.blit(surf, (bx, y)) surface.blit(surf, (bx + total, y)) # seamless second copy surface.set_clip(old) # ═══════════════════════════════════════════════════════════════════════════════ # WEATHER PANEL # ═══════════════════════════════════════════════════════════════════════════════ _WEATHER_COLS = 5 _WEATHER_COL_W = 84 def draw_weather(surface, fonts, x, y): with _weather_lock: all_data = list(_weather_all) th = fonts['tiny'].get_height() row = th + 2 town_h = fonts['medium'].get_height() def _trunc(s, n=13): return s if len(s) <= n else s[:n-1] + '…' def _col(items, start_y, label_fn, row2_fn, row3_fn): for i, item in enumerate(items[:_WEATHER_COLS]): cx = x + i * _WEATHER_COL_W surface.blit(fonts['tiny'].render(label_fn(item), True, GRAY), (cx, start_y)) surface.blit(fonts['tiny'].render(row2_fn(item), True, WHITE), (cx, start_y + row)) surface.blit(fonts['tiny'].render(row3_fn(item), True, DIM_GRAY), (cx, start_y + 2*row)) if not all_data: surface.blit(fonts['medium'].render('Weather loading…', True, DIM_GRAY), (x, y)) return idx = int(_time.time() / 10) % len(all_data) data = all_data[idx] week = data.get('week', []) hours = data.get('hours', []) town = data.get('town', '') surface.blit(fonts['medium'].render(town, True, GRAY), (x, y)) _col(week, y + town_h + 4, lambda d: 'Today' if d['day'] == 'Today' else d['day'][:3], lambda d: f"{d['high']}°/{d['low']}°", lambda d: _trunc(d['desc'])) if hours: _col(hours, y + town_h + 4 + 3*row + 8, lambda h: h['label'], lambda h: f"{h['temp']}°", lambda h: _trunc(h['desc'])) # ═══════════════════════════════════════════════════════════════════════════════ # FONT HELPER # ═══════════════════════════════════════════════════════════════════════════════ def _load_font(size, bold=False): for name in ('DejaVuSans', 'FreeSans', 'LiberationSans', 'Helvetica', 'Arial', None): try: f = pygame.font.SysFont(name, size, bold=bold) if f: return f except Exception: pass return pygame.font.Font(None, size) # ═══════════════════════════════════════════════════════════════════════════════ # MAIN # ═══════════════════════════════════════════════════════════════════════════════ def main() -> None: pygame.init() W, H = _args.width, _args.height if _args.fullscreen: screen = pygame.display.set_mode((W, H), pygame.FULLSCREEN | pygame.NOFRAME) else: screen = pygame.display.set_mode((W, H)) pygame.display.set_caption('JARVIS') pygame.mouse.set_visible(False) fonts = { 'large': _load_font(38), 'medium': _load_font(18), 'small': _load_font(13), 'tiny': _load_font(11), } # ── Layout ──────────────────────────────────────────────────────────────── face_rx = int(min(W, H) * 0.085) face_ry = int(min(W, H) * 0.100) face_cx = W // 2 face_cy = int(face_ry * 1.2) + 8 clock_block_h = fonts['small'].get_height() + 4 + fonts['large'].get_height() clock_y = face_cy - clock_block_h // 2 weather_x = W - _WEATHER_COLS * _WEATHER_COL_W - 15 weather_y = clock_y - fonts['medium'].get_height() - 4 # Equal blank space either side of the face face_gap = weather_x - (face_cx + face_rx) # gap on the right face_left = face_cx - face_rx - face_gap # mirrored left boundary # GREEN BOX — stocks: starts just after the time+seconds on the time row _time_row_w = fonts['large'].size('23:59')[0] + 2 + fonts['small'].size('59')[0] stocks_ticker_x = 20 + _time_row_w + 12 stocks_ticker_y = clock_y + fonts['small'].get_height() + 4 # same row as time stocks_ticker_w = face_left - stocks_ticker_x # RED BOX — news: full left zone, just below the clock block news_ticker_x = 20 news_ticker_w = face_left - news_ticker_x news_ticker_y = clock_y + clock_block_h + 8 # ── Background threads ──────────────────────────────────────────────────── threading.Thread(target=_news_worker, daemon=True).start() threading.Thread(target=_stocks_worker, daemon=True).start() threading.Thread(target=_wake_worker, daemon=True).start() try: resolved = [_geocode(city) for city in _LOCATIONS] threading.Thread(target=_weather_worker, args=(resolved,), daemon=True).start() except Exception: pass # ── Face image ──────────────────────────────────────────────────────────── _here = os.path.dirname(os.path.abspath(__file__)) png_path = os.path.join(_here, 'assets', 'face_wire.png') face_img = None if os.path.exists(png_path): raw = pygame.image.load(png_path).convert() iw, ih = raw.get_size() scale = min((face_rx * 2.2) / iw, (face_ry * 2.4) / ih) face_img = pygame.transform.smoothscale( raw, (max(1, int(iw*scale)), max(1, int(ih*scale)))) # Reusable surface for PNG tinting (fill + BLEND_MULT each frame) face_tint_surf = pygame.Surface(face_img.get_size()) if face_img else None pg_clock = pygame.time.Clock() while True: for event in pygame.event.get(): if event.type == pygame.QUIT: pygame.quit(); return if event.type == pygame.KEYDOWN and event.key in (pygame.K_ESCAPE, pygame.K_q): pygame.quit(); return # ── Wake-word face colour ───────────────────────────────────────────── _ws = _wake['state'] if _ws in ('listening', 'processing'): elapsed = _time.time() - _wake['detected_at'] t = (math.sin(elapsed * math.pi * 2.0) + 1.0) / 2.0 face_color = _lerp_color(_FACE_GREEN, _FACE_BLUE, t) elif _ws == 'positive': face_color = _FACE_GREEN elif _ws == 'negative': face_color = _FACE_RED else: face_color = None screen.fill(BLACK) # Face — tinted when wake active, normal otherwise if face_img: if face_color and face_tint_surf: face_tint_surf.fill(face_color) face_tint_surf.blit(face_img, (0, 0), special_flags=pygame.BLEND_MULT) screen.blit(face_tint_surf, face_tint_surf.get_rect(center=(face_cx, face_cy)), special_flags=pygame.BLEND_ADD) else: screen.blit(face_img, face_img.get_rect(center=(face_cx, face_cy)), special_flags=pygame.BLEND_ADD) else: cw = face_color or WIRE_COL ce = face_color or WIRE_EYE draw_wireframe_face(screen, face_cx, face_cy, face_rx, face_ry, col_wire=cw, col_eye=ce) # Clock (top-left) draw_clock(screen, fonts, 20, clock_y) # News ticker — RED BOX: below clock, full left-of-face width draw_ticker(screen, _build_news_surf(fonts['small']), news_ticker_x, news_ticker_y, news_ticker_w, speed=40.0) # Stocks ticker — GREEN BOX: right of clock digits, same row draw_ticker(screen, _build_stocks_surf(fonts['small']), stocks_ticker_x, stocks_ticker_y, stocks_ticker_w, speed=50.0) # Weather (top-right) draw_weather(screen, fonts, weather_x, weather_y) pygame.display.flip() pg_clock.tick(30) # bumped to 30 fps for smooth scrolling if __name__ == '__main__': main()