diff options
| author | Ken D'Ambrosio <ken.dambrosio@constantcontact.com> | 2026-06-03 21:28:05 -0400 |
|---|---|---|
| committer | Ken D'Ambrosio <ken.dambrosio@constantcontact.com> | 2026-06-03 21:28:05 -0400 |
| commit | 860bb203df525648d1d143421d3edce505923907 (patch) | |
| tree | c11ba4eb66ac8f53b6152c52a0f5f530ac2854df /esheep.py | |
Initial commit: esheep Wayland desktop pet
GTK3/cairo layer-shell desktop sheep for wlroots compositors (Wayfire,
Sway, Hyprland). Parses original eSheep animations.xml, runs the full
state machine, UFO abductions, Wayfire IPC window detection, and
click-to-drag support.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Diffstat (limited to 'esheep.py')
| -rw-r--r-- | esheep.py | 944 |
1 files changed, 944 insertions, 0 deletions
diff --git a/esheep.py b/esheep.py new file mode 100644 index 0000000..590627c --- /dev/null +++ b/esheep.py @@ -0,0 +1,944 @@ +#!/usr/bin/env python3 +"""eSheep - Linux/Wayland desktop pet using GTK3 + layer-shell + Wayfire IPC""" + +import gi +gi.require_version('Gtk', '3.0') +gi.require_version('GtkLayerShell', '0.1') +from gi.repository import Gtk, Gdk, GLib +from gi.repository import GtkLayerShell + +import cairo +import xml.etree.ElementTree as ET +import base64, io, re, os, sys, random, math, threading, time +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Tuple +from PIL import Image + +# ─────────────────────── constants ─────────────────────── + +NS = 'https://esheep.petrucci.ch/' + +# Floor/condition types (match XML 'only' attribute values) +COND_NONE = 'none' # on screen floor (or no special condition) +COND_WINDOW = 'window' # on top of a window +COND_TASKBAR = 'taskbar' # on taskbar/panel +COND_VERTICAL = 'vertical' # against a vertical wall +COND_HORIZ_UP = 'horizontal+' # hit top of screen going upward + +# ─────────────────────── data classes ─────────────────────── + +@dataclass +class NextRef: + probability: int + only: str # matches COND_* values, '' = always applies + anim_id: int + +@dataclass +class AnimDef: + id: int + name: str + svx: float; svy: float # start velocity x/y (px per frame) + evx: float; evy: float # end velocity x/y + sint: int; eint: int # start/end interval (ms between frames) + sop: float; eop: float # start/end opacity + soy: float; eoy: float # start/end offsetY + repeat: object # int or str expression; 0 = oneshot + repeatfrom: int # frame index to loop back to + frames: List[int] # sprite tile indices + seq_nexts: List[NextRef] # transitions at sequence end + border_nexts: List[NextRef] # transitions when hitting border + gravity_nexts: List[NextRef] # transitions when gravity kicks in + actions: List[str] # 'flip' etc + +@dataclass +class SpawnDef: + id: int + probability: int + x_expr: str + y_expr: str + next_anim: int + +# ─────────────────────── XML parsing ─────────────────────── + +def _ns(tag, ns): + return f'{{{ns}}}{tag}' if ns else tag + +def _fval(el, tag, ns, default=0.0): + if el is None: + return default + t = el.findtext(_ns(tag, ns)) + try: + return float(t) if t is not None else default + except (ValueError, TypeError): + return default + +def _parse_nexts(parent_el, ns) -> List[NextRef]: + if parent_el is None: + return [] + result = [] + for n in parent_el.findall(_ns('next', ns)): + try: + result.append(NextRef( + probability=int(n.get('probability', '100')), + only=n.get('only', ''), + anim_id=int(n.text.strip()) if n.text else 1 + )) + except (ValueError, TypeError): + pass + return result + +def parse_xml(path: str): + """Returns (sprite_pil_image, tile_w, tile_h, tiles_x, anim_defs, spawns)""" + tree = ET.parse(path) + root = tree.getroot() + ns = NS if root.tag.startswith('{') else '' + + # ── image block ── + img_el = root.find(_ns('image', ns)) + tiles_x = int(img_el.findtext(_ns('tilesx', ns)) or 16) + tiles_y = int(img_el.findtext(_ns('tilesy', ns)) or 11) + png_b64 = img_el.findtext(_ns('png', ns)) or '' + png_data = base64.b64decode(png_b64.strip()) + sprite = Image.open(io.BytesIO(png_data)).convert('RGBA') + tile_w = sprite.width // tiles_x + tile_h = sprite.height // tiles_y + + # ── spawns ── + spawns: List[SpawnDef] = [] + spawns_el = root.find(_ns('spawns', ns)) + for sp in (list(spawns_el) if spawns_el is not None else []): + nxt = sp.find(_ns('next', ns)) + spawns.append(SpawnDef( + id=int(sp.get('id', 0)), + probability=int(sp.get('probability', 100)), + x_expr=(sp.findtext(_ns('x', ns)) or '0').strip(), + y_expr=(sp.findtext(_ns('y', ns)) or '0').strip(), + next_anim=int(nxt.text.strip()) if nxt is not None and nxt.text else 1, + )) + + # ── animations ── + anims: Dict[int, AnimDef] = {} + anims_el = root.find(_ns('animations', ns)) + for anim_el in (list(anims_el) if anims_el is not None else []): + aid = int(anim_el.get('id', 0)) + start = anim_el.find(_ns('start', ns)) + end = anim_el.find(_ns('end', ns)) + seq = anim_el.find(_ns('sequence', ns)) + bord = anim_el.find(_ns('border', ns)) + grav = anim_el.find(_ns('gravity', ns)) + + frames: List[int] = [] + seq_nexts: List[NextRef] = [] + actions: List[str] = [] + repeat_raw = '0' + repeatfrom = 0 + + if seq is not None: + repeat_raw = seq.get('repeat', '0').strip() + repeatfrom = int(seq.get('repeatfrom', '0')) + for child in seq: + tag = child.tag.split('}')[-1] if '}' in child.tag else child.tag + if tag == 'frame' and child.text: + try: + frames.append(int(child.text.strip())) + except ValueError: + pass + elif tag == 'next': + try: + seq_nexts.append(NextRef( + probability=int(child.get('probability', '100')), + only=child.get('only', ''), + anim_id=int(child.text.strip()) if child.text else 1, + )) + except (ValueError, TypeError): + pass + elif tag == 'action' and child.text: + actions.append(child.text.strip()) + + anims[aid] = AnimDef( + id=aid, + name=(anim_el.findtext(_ns('name', ns)) or '').strip(), + svx=_fval(start, 'x', ns), svy=_fval(start, 'y', ns), + evx=_fval(end, 'x', ns), evy=_fval(end, 'y', ns), + sint=int(_fval(start, 'interval', ns, 200)), + eint=int(_fval(end, 'interval', ns, 200)), + sop=_fval(start, 'opacity', ns, 1.0), + eop=_fval(end, 'opacity', ns, 1.0), + soy=_fval(start, 'offsety', ns, 0.0), + eoy=_fval(end, 'offsety', ns, 0.0), + repeat=repeat_raw, + repeatfrom=repeatfrom, + frames=frames, + seq_nexts=seq_nexts, + border_nexts=_parse_nexts(bord, ns), + gravity_nexts=_parse_nexts(grav, ns), + actions=actions, + ) + + return sprite, tile_w, tile_h, tiles_x, anims, spawns + +# ─────────────────────── expression evaluator ─────────────────────── + +def eval_expr(expr, ctx: dict): + """Evaluate XML animation expressions: 'screenW/2-imageH', etc.""" + if expr is None: + return 0 + expr = str(expr).strip() + if not expr: + return 0 + # Replace C# Convert(x, System.Int32) with int(x) + expr = re.sub(r'Convert\s*\(([^,]+),\s*System\.Int32\)', r'int(\1)', expr) + safe = {'int': int, 'abs': abs, 'max': max, 'min': min, 'round': round} + try: + return eval(expr, {"__builtins__": safe}, ctx) + except Exception: + return 0 + +# ─────────────────────── Wayfire IPC ─────────────────────── + +class WayfireWindows: + """Background thread polling Wayfire IPC for window geometry.""" + + def __init__(self): + self._windows: List[Tuple[int,int,int,int]] = [] # (x, y, w, h) + self._taskbar: Optional[Tuple[int,int,int,int]] = None + self._lock = threading.Lock() + self._available = False + + socket_path = os.environ.get('WAYFIRE_SOCKET', '') + if socket_path and os.path.exists(socket_path): + self._available = True + t = threading.Thread(target=self._poll, daemon=True) + t.start() + print(f"Wayfire IPC connected: {socket_path}", flush=True) + else: + print("No WAYFIRE_SOCKET – window detection disabled", flush=True) + + def _poll(self): + try: + from wayfire import WayfireSocket + sock = WayfireSocket() + while True: + try: + views = sock.list_views() + wins = [] + taskbar = None + for v in views: + g = v.get('geometry', {}) + x, y = g.get('x', 0), g.get('y', 0) + w, h = g.get('width', 0), g.get('height', 0) + role = v.get('role', '') + # Only toplevel windows count as landing surfaces; + # skip layer-shell panels/backgrounds (they can hide the sheep) + if role == 'toplevel' and w > 10 and h > 10: + wins.append((x, y, w, h)) + with self._lock: + self._windows = wins + self._taskbar = taskbar + except Exception as e: + print(f"Wayfire IPC error: {e}", flush=True) + time.sleep(0.4) + except ImportError: + print("wayfire Python package not found", flush=True) + except Exception as e: + print(f"Wayfire IPC fatal: {e}", flush=True) + + def get_windows(self) -> List[Tuple[int,int,int,int]]: + with self._lock: + return list(self._windows) + + def get_taskbar(self) -> Optional[Tuple[int,int,int,int]]: + with self._lock: + return self._taskbar + +# ─────────────────────── Sheep state machine ─────────────────────── + +class Sheep: + def __init__(self, anim_defs, tile_w, tile_h, screen_w, screen_h, spawn: SpawnDef): + self.anim_defs = anim_defs + self.tile_w = tile_w + self.tile_h = tile_h + self.screen_w = screen_w + self.screen_h = screen_h + + self.facing_right = False # default sprite orientation is left-facing + self.opacity = 1.0 + self.alive = True + + ctx = self._ctx(screen_w // 2, -tile_h) + self.x = float(eval_expr(spawn.x_expr, ctx)) + self.y = float(eval_expr(spawn.y_expr, ctx)) + # Clamp to screen + some margin for off-screen spawns + self.x = max(-tile_w, min(self.x, screen_w)) + self.y = max(-tile_h * 2, min(self.y, screen_h)) + + # Animation state + self.anim_id = spawn.next_anim + self.frame_idx = 0 + self.seq_repeat = 0 + self.repeat_total = 1 + self.is_oneshot = True + self.tick_accum = 0.0 + + # Floor tracking + self.floor_cond = COND_NONE + self._floors: List[Tuple[float, str]] = [] # (floor_y, cond) + + self._init_anim(spawn.next_anim) + + # ── helpers ── + + def _ctx(self, img_x=None, img_y=None): + return { + 'screenW': self.screen_w, 'screenH': self.screen_h, + 'areaH': self.screen_h, + 'imageW': self.tile_w, 'imageH': self.tile_h, + 'imageX': img_x if img_x is not None else self.x, + 'imageY': img_y if img_y is not None else self.y, + 'random': random.randint(0, 100), + 'randS': random.randint(0, 100), + } + + def _lerp(self, a, b, t): + return a + (b - a) * max(0.0, min(1.0, t)) + + def _progress(self): + if self.repeat_total <= 1 or self.is_oneshot: + return 0.0 + return self.seq_repeat / self.repeat_total + + def _cur_vel(self): + a = self.anim_defs.get(self.anim_id) + if not a: + return 0.0, 0.0 + t = self._progress() + return self._lerp(a.svx, a.evx, t), self._lerp(a.svy, a.evy, t) + + def _cur_interval(self): + a = self.anim_defs.get(self.anim_id) + if not a: + return 200 + return max(10, int(self._lerp(a.sint, a.eint, self._progress()))) + + def _cur_opacity(self): + a = self.anim_defs.get(self.anim_id) + if not a: + return 1.0 + return self._lerp(a.sop, a.eop, self._progress()) + + # ── animation transitions ── + + def _init_anim(self, anim_id: int): + a = self.anim_defs.get(anim_id) + if not a: + return + self.anim_id = anim_id + self.frame_idx = 0 + self.seq_repeat = 0 + self.tick_accum = 0.0 + + ctx = self._ctx() + raw = a.repeat + try: + val = int(eval_expr(raw, ctx)) + except Exception: + val = 0 + self.is_oneshot = (val == 0) + self.repeat_total = max(1, val) if val > 0 else 1 + + # Apply any immediate actions (e.g. flip triggered by entering this animation) + if 'flip' in a.actions: + self.facing_right = not self.facing_right + + def _pick_next(self, nexts: List[NextRef], cond: str) -> Optional[int]: + """Weighted random pick from nexts matching cond.""" + # First try exact match + pool = [n for n in nexts if n.only == cond or n.only == ''] + if not pool: + pool = nexts # fallback: ignore condition + if not pool: + return None + total = sum(n.probability for n in pool) + if total == 0: + return pool[0].anim_id + r = random.randint(0, total - 1) + cum = 0 + for n in pool: + cum += n.probability + if r < cum: + return n.anim_id + return pool[-1].anim_id + + def _transition(self, anim_id: Optional[int]): + if anim_id is not None: + self._init_anim(anim_id) + + # ── floor update (call from main loop) ── + + def update_floors(self, windows: List[Tuple[int,int,int,int]], + taskbar: Optional[Tuple[int,int,int,int]], + screen_w: int, screen_h: int): + self.screen_w = screen_w + self.screen_h = screen_h + + # Collect all candidate floors (surfaces the sheep could land on) + all_floors = [(float(screen_h), COND_NONE)] + + # Taskbar (treat as window) + if taskbar: + tx, ty, tw, th = taskbar + if tx < self.x + self.tile_w - 4 and tx + tw > self.x + 4: + all_floors.append((float(ty), COND_TASKBAR)) + + # Window tops – only windows that horizontally overlap the sheep + for (wx, wy, ww, wh) in windows: + if wx < self.x + self.tile_w - 4 and wx + ww > self.x + 4: + all_floors.append((float(wy), COND_WINDOW)) + + # Keep only floors at or below the sheep's current bottom position + # (floors above the sheep are irrelevant for landing) + sheep_bot = self.y + self.tile_h + self._floors = sorted( + [(fy, fc) for (fy, fc) in all_floors if fy >= sheep_bot - 8], + key=lambda f: f[0] + ) + if not self._floors: + self._floors = [(float(screen_h), COND_NONE)] + + # ── tick ── + + def tick(self, dt_ms: float): + a = self.anim_defs.get(self.anim_id) + if not a or not a.frames: + self._transition(1) + return + + interval = self._cur_interval() + vx, vy = self._cur_vel() + + # vx is in "facing direction" units: + # facing left (facing_right=False): vx<0 = move left → screen dx = vx + # facing right (facing_right=True): vx<0 = move right → screen dx = -vx + dir_mul = -1.0 if self.facing_right else 1.0 + sheep_bot_prev = self.y + self.tile_h + + self.x += vx * dir_mul * dt_ms / interval + self.y += vy * dt_ms / interval + self.opacity = self._cur_opacity() + + sheep_bot = self.y + self.tile_h + + # ── floor / landing collision ── + # Find the nearest floor at or below the sheep + floor_hit_y: Optional[float] = None + floor_hit_fc: str = COND_NONE + for (fy, fc) in self._floors: + if sheep_bot >= fy - 1: # sheep has reached or passed this floor + floor_hit_y = fy + floor_hit_fc = fc + break # _floors is sorted ascending; first hit wins + + if floor_hit_y is not None: + # Snap sheep to floor surface + self.y = floor_hit_y - self.tile_h + self.floor_cond = floor_hit_fc + + # If we just LANDED (were above before, falling now) → landing animation + just_landed = (sheep_bot_prev < floor_hit_y - 1) and (vy > 0.3) + if just_landed and a.border_nexts: + nxt = self._pick_next(a.border_nexts, floor_hit_fc) + self._transition(nxt) + return + else: + # No floor under us + self.floor_cond = COND_NONE + # Gravity: walking/running animations trigger fall when airborne. + # 'only="none"' in gravity_nexts means "no special floor" = free fall. + if a.gravity_nexts: + nxt = self._pick_next(a.gravity_nexts, COND_NONE) + self._transition(nxt) + return + + # ── left / right / top border check ── + if a.border_nexts: + hit, bcond = False, COND_NONE + # left border (sheep moving left, facing left) + if self.x <= 0 and not self.facing_right: + self.x = 0.0 + hit, bcond = True, COND_NONE + # right border (sheep moving right, facing right) + elif self.x + self.tile_w >= self.screen_w and self.facing_right: + self.x = float(self.screen_w - self.tile_w) + hit, bcond = True, COND_NONE + # top border (going up) + if self.y <= 0 and vy < 0: + self.y = 0.0 + hit, bcond = True, COND_HORIZ_UP + if hit: + nxt = self._pick_next(a.border_nexts, bcond) + self._transition(nxt) + return + + # ── frame advance ── + self.tick_accum += dt_ms + while self.tick_accum >= self._cur_interval(): + self.tick_accum -= self._cur_interval() + self.frame_idx += 1 + + if self.frame_idx >= len(a.frames): + self.seq_repeat += 1 + + done = self.is_oneshot or (self.seq_repeat >= self.repeat_total) + if done: + nxt = self._pick_next(a.seq_nexts, self.floor_cond) + self._transition(nxt) + return + else: + self.frame_idx = a.repeatfrom + + def draw_info(self): + """Return (frame_tile_idx, x, y, facing_right, opacity)""" + a = self.anim_defs.get(self.anim_id) + if not a or not a.frames: + return 0, self.x, self.y, self.facing_right, self.opacity + fi = max(0, min(self.frame_idx, len(a.frames) - 1)) + return a.frames[fi], self.x, self.y, self.facing_right, self.opacity + + +# ─────────────────────── UFO ─────────────────────── + +class UFO: + """Classic UFO abduction sequence – drawn programmatically.""" + + STATE_APPROACH = 'approach' + STATE_BEAM = 'beam' + STATE_LIFT = 'lift' + STATE_DEPART = 'depart' + STATE_DONE = 'done' + + def __init__(self, sheep: Sheep, screen_w: int, screen_h: int): + self.sheep = sheep + self.screen_w = screen_w + self.screen_h = screen_h + self.w = 80; self.h = 40 # UFO body size + self.x = float(screen_w) # start off-screen right + self.y = float(screen_h * 0.12) + self.state = self.STATE_APPROACH + self.beam_alpha = 0.0 + self.sheep_orig_y = sheep.y + self._timer = 0.0 + + def tick(self, dt_ms: float): + sheep = self.sheep + if self.state == self.STATE_APPROACH: + # Move left toward sheep + target_x = sheep.x - self.w / 2 + sheep.tile_w / 2 + self.x -= 4.0 * dt_ms / 50 + if self.x <= target_x: + self.x = target_x + self.state = self.STATE_BEAM + self._timer = 0.0 + + elif self.state == self.STATE_BEAM: + self._timer += dt_ms + self.beam_alpha = min(1.0, self._timer / 600.0) + if self._timer > 900: + self.state = self.STATE_LIFT + self.sheep_orig_y = sheep.y + + elif self.state == self.STATE_LIFT: + speed = 3.0 * dt_ms / 50 + sheep.y -= speed + sheep.x = self.x + self.w / 2 - sheep.tile_w / 2 + if sheep.y + sheep.tile_h < self.y + self.h: + self.state = self.STATE_DEPART + self._timer = 0.0 + + elif self.state == self.STATE_DEPART: + self.x += 5.0 * dt_ms / 50 + sheep.x = self.x + self.w / 2 - sheep.tile_w / 2 + sheep.y = self.y + self.h / 2 + self._timer += dt_ms + if self._timer > 400 and self.x > self.screen_w: + # Drop sheep back + sheep.x = max(50, min(self.screen_w - sheep.tile_w - 50, + self.screen_w * 0.3 + random.randint(-100, 100))) + sheep.y = -sheep.tile_h - 10 + sheep._init_anim(5) # fall animation + self.state = self.STATE_DONE + + @property + def done(self): + return self.state == self.STATE_DONE + + def draw(self, cr: cairo.Context): + x, y = self.x, self.y + w, h = self.w, self.h + + # Beam (trapezoid under UFO) + if self.state in (self.STATE_BEAM, self.STATE_LIFT): + sheep = self.sheep + beam_top_x = x + w * 0.3 + beam_top_w = w * 0.4 + beam_bot_x = sheep.x - 5 + beam_bot_w = sheep.tile_w + 10 + beam_bot_y = sheep.y + sheep.tile_h + beam_top_y = y + h + + cr.save() + cr.set_source_rgba(0.8, 0.9, 1.0, 0.35 * self.beam_alpha) + cr.move_to(beam_top_x, beam_top_y) + cr.line_to(beam_top_x + beam_top_w, beam_top_y) + cr.line_to(beam_bot_x + beam_bot_w, beam_bot_y) + cr.line_to(beam_bot_x, beam_bot_y) + cr.close_path() + cr.fill() + cr.restore() + + # Saucer body + cr.save() + cx, cy = x + w / 2, y + h * 0.6 + # Disk + cr.set_source_rgba(0.7, 0.7, 0.8, 0.95) + cr.save() + cr.translate(cx, cy) + cr.scale(w / 2, h * 0.35) + cr.arc(0, 0, 1, 0, 2 * math.pi) + cr.restore() + cr.fill() + # Dome + cr.set_source_rgba(0.5, 0.8, 1.0, 0.85) + cr.save() + cr.translate(cx, y + h * 0.4) + cr.scale(w * 0.28, h * 0.45) + cr.arc(0, 0, 1, math.pi, 2 * math.pi) + cr.restore() + cr.fill() + # Lights + for i, col in enumerate([(1,.2,.2,1), (.2,1,.2,1), (.2,.2,1,1), (1,1,.2,1)]): + lx = x + w * 0.2 + i * w * 0.18 + ly = cy + cr.set_source_rgba(*col) + cr.arc(lx, ly, 3, 0, 2 * math.pi) + cr.fill() + cr.restore() + + +# ─────────────────────── Cairo sprite drawing ─────────────────────── + +def draw_frame(cr: cairo.Context, surface: cairo.ImageSurface, + tile_w: int, tile_h: int, tiles_x: int, + frame_idx: int, dest_x: float, dest_y: float, + flip: bool = False, alpha: float = 1.0): + """Draw one tile from the sprite sheet at (dest_x, dest_y).""" + if frame_idx < 0: + return + col = frame_idx % tiles_x + row = frame_idx // tiles_x + sx = col * tile_w + sy = row * tile_h + + cr.save() + if flip: + cr.translate(dest_x + tile_w, dest_y) + cr.scale(-1, 1) + cr.rectangle(-tile_w, 0, tile_w, tile_h) + cr.clip() + cr.set_source_surface(surface, -tile_w - sx, -sy) + else: + cr.translate(dest_x, dest_y) + cr.rectangle(0, 0, tile_w, tile_h) + cr.clip() + cr.set_source_surface(surface, -sx, -sy) + + cr.paint_with_alpha(alpha) + cr.restore() + + +# ─────────────────────── main overlay ─────────────────────── + +class SheepOverlay: + UFO_INTERVAL_S = 90 # seconds between UFO abductions (approx) + + def __init__(self, xml_path: str, num_sheep: int = 1): + print("Loading assets…", flush=True) + self.sprite, self.tile_w, self.tile_h, self.tiles_x, \ + self.anim_defs, self.spawns = parse_xml(xml_path) + print(f"Sprite: {self.sprite.width}×{self.sprite.height}, " + f"tiles: {self.tile_w}×{self.tile_h} ({self.tiles_x} per row), " + f"animations: {len(self.anim_defs)}", flush=True) + + self.cairo_surface = self._pil_to_cairo(self.sprite) + self.wayfire = WayfireWindows() + self.sheep_list: List[Sheep] = [] + self.ufos: List[UFO] = [] + self.num_sheep = num_sheep + self.last_time = time.monotonic() + self._ufo_countdown = self.UFO_INTERVAL_S + random.randint(0, 30) + + # Drag state + self._drag_sheep: Optional[Sheep] = None + self._drag_ox = 0.0 # mouse_x - sheep.x at press time + self._drag_oy = 0.0 + + self._build_window() + + @staticmethod + def _pil_to_cairo(img: Image.Image) -> cairo.ImageSurface: + """Convert PIL RGBA image → cairo ImageSurface (ARGB32, premultiplied alpha).""" + import numpy as np + surf = cairo.ImageSurface(cairo.FORMAT_ARGB32, img.width, img.height) + # Cairo FORMAT_ARGB32 uses premultiplied alpha in BGRA memory order + arr = np.array(img, dtype=np.uint32) + r = arr[:, :, 0]; g = arr[:, :, 1] + b = arr[:, :, 2]; a = arr[:, :, 3] + # Premultiply: channel' = channel * alpha / 255 + rp = (r * a) // 255 + gp = (g * a) // 255 + bp = (b * a) // 255 + # Pack as ARGB32 native-endian (little-endian = BGRA bytes) + packed = (a << 24) | (rp << 16) | (gp << 8) | bp + surf.get_data()[:] = packed.astype(np.uint32).tobytes() + surf.mark_dirty() + return surf + + def _build_window(self): + self.win = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) + self.win.set_app_paintable(True) + screen = self.win.get_screen() + visual = screen.get_rgba_visual() + if visual: + self.win.set_visual(visual) + + # layer-shell: full-screen overlay + if not GtkLayerShell.is_supported(): + print("ERROR: Your Wayland compositor does not support wlr-layer-shell.", + file=sys.stderr) + print(" eSheep requires Wayfire, Sway, Hyprland, or another wlroots compositor.", + file=sys.stderr) + sys.exit(1) + GtkLayerShell.init_for_window(self.win) + GtkLayerShell.set_layer(self.win, GtkLayerShell.Layer.OVERLAY) + GtkLayerShell.set_exclusive_zone(self.win, -1) + for edge in (GtkLayerShell.Edge.LEFT, GtkLayerShell.Edge.RIGHT, + GtkLayerShell.Edge.TOP, GtkLayerShell.Edge.BOTTOM): + GtkLayerShell.set_anchor(self.win, edge, True) + # Use KeyboardMode enum if available (gtk-layer-shell 0.7+), else bool + try: + GtkLayerShell.set_keyboard_mode(self.win, GtkLayerShell.KeyboardMode.NONE) + except AttributeError: + GtkLayerShell.set_keyboard_interactivity(self.win, False) + + self.da = Gtk.DrawingArea() + self.da.connect('draw', self._on_draw) + + # Mouse events for click-to-drag – must be set on the DrawingArea + # (it has its own GdkWindow that fills the overlay and receives events first) + # and BEFORE show_all() so the GdkWindow is realized with these masks. + self.da.add_events( + Gdk.EventMask.BUTTON_PRESS_MASK | + Gdk.EventMask.BUTTON_RELEASE_MASK | + Gdk.EventMask.POINTER_MOTION_MASK + ) + self.da.connect('button-press-event', self._on_button_press) + self.da.connect('button-release-event', self._on_button_release) + self.da.connect('motion-notify-event', self._on_motion) + + self.win.add(self.da) + self.win.connect('destroy', Gtk.main_quit) + self.win.show_all() + + # Input region starts as pass-through; _update_input_region() refines it each tick + self.win.input_shape_combine_region(cairo.Region()) + + # Screen size + display = Gdk.Display.get_default() + mon = display.get_primary_monitor() or display.get_monitor(0) + geo = mon.get_geometry() + self.screen_w = geo.width + self.screen_h = geo.height + print(f"Screen: {self.screen_w}×{self.screen_h}", flush=True) + + # Spawn initial sheep + for _ in range(self.num_sheep): + self._spawn_sheep() + + GLib.timeout_add(50, self._tick) # 20 fps game loop + + # ── click-to-drag ── + + def _update_input_region(self): + """Set the input region on both the window and the DrawingArea's GdkWindow + to cover only the sheep sprites. While dragging, use the full screen so + we don't lose the pointer mid-drag.""" + if self._drag_sheep is not None: + # Full screen while dragging – catches pointer everywhere + region = cairo.Region(cairo.RectangleInt( + 0, 0, self.screen_w, self.screen_h)) + else: + region = cairo.Region() + pad = 4 # extra px around sprite for easier click targeting + for sheep in self.sheep_list: + rx = int(sheep.x) - pad + ry = int(sheep.y) - pad + rw = self.tile_w + pad * 2 + rh = self.tile_h + pad * 2 + region.union(cairo.RectangleInt(rx, ry, rw, rh)) + # Apply to both the GtkWindow and the DrawingArea's own GdkWindow + self.win.input_shape_combine_region(region) + gdk_win = self.da.get_window() + if gdk_win is not None: + gdk_win.input_shape_combine_region(region, 0, 0) + + def _sheep_at(self, mx: float, my: float) -> Optional[Sheep]: + """Return the sheep whose bounding rect contains (mx, my), or None.""" + for sheep in reversed(self.sheep_list): # topmost first + if (sheep.x <= mx <= sheep.x + self.tile_w and + sheep.y <= my <= sheep.y + self.tile_h): + return sheep + return None + + def _on_button_press(self, _widget, event): + if event.button != 1: + return False + sheep = self._sheep_at(event.x, event.y) + if sheep is None: + return False + # Don't grab sheep currently being abducted + if any(u.sheep is sheep for u in self.ufos): + return False + self._drag_sheep = sheep + self._drag_ox = event.x - sheep.x + self._drag_oy = event.y - sheep.y + self._update_input_region() + print(f"Grabbed sheep at ({sheep.x:.0f}, {sheep.y:.0f})", flush=True) + return True + + def _on_button_release(self, _widget, event): + if event.button != 1 or self._drag_sheep is None: + return False + sheep = self._drag_sheep + self._drag_sheep = None + self._update_input_region() + # Let the sheep fall from wherever it was dropped + fall_id = None + for aid, adef in self.anim_defs.items(): + if 'fall' in adef.name.lower(): + fall_id = aid + break + if fall_id is None: + fall_id = 5 + sheep._init_anim(fall_id) + print(f"Released sheep at ({sheep.x:.0f}, {sheep.y:.0f}) → fall", flush=True) + return True + + def _on_motion(self, _widget, event): + if self._drag_sheep is None: + return False + sheep = self._drag_sheep + sheep.x = event.x - self._drag_ox + sheep.y = event.y - self._drag_oy + # Clamp to screen + sheep.x = max(0, min(sheep.x, self.screen_w - self.tile_w)) + sheep.y = max(0, min(sheep.y, self.screen_h - self.tile_h)) + return True + + def _spawn_sheep(self): + total = sum(s.probability for s in self.spawns) + r = random.randint(0, total - 1) + cum = 0 + chosen = self.spawns[0] + for s in self.spawns: + cum += s.probability + if r < cum: + chosen = s + break + sheep = Sheep( + self.anim_defs, self.tile_w, self.tile_h, + self.screen_w, self.screen_h, chosen + ) + self.sheep_list.append(sheep) + print(f"Spawned sheep at ({sheep.x:.0f}, {sheep.y:.0f}) " + f"anim={self.anim_defs.get(sheep.anim_id, type('',(),{'name':'?'})()).name}", + flush=True) + + def _tick(self) -> bool: + now = time.monotonic() + dt = min((now - self.last_time) * 1000.0, 100.0) + self.last_time = now + + windows = self.wayfire.get_windows() + taskbar = self.wayfire.get_taskbar() + + # Update all sheep (skip everything for the one being dragged) + for sheep in self.sheep_list: + if sheep is self._drag_sheep: + continue + if not any(u.sheep is sheep for u in self.ufos): + sheep.update_floors(windows, taskbar, self.screen_w, self.screen_h) + sheep.tick(dt) + + # Keep the input region hugging the sheep positions + self._update_input_region() + + # UFO timer + self._ufo_countdown -= dt / 1000.0 + if self._ufo_countdown <= 0 and self.sheep_list: + target = random.choice(self.sheep_list) + if not any(u.sheep is target for u in self.ufos): + self.ufos.append(UFO(target, self.screen_w, self.screen_h)) + print("UFO abduction started!", flush=True) + self._ufo_countdown = self.UFO_INTERVAL_S + random.randint(-20, 30) + + # Tick UFOs + for ufo in self.ufos: + ufo.tick(dt) + self.ufos = [u for u in self.ufos if not u.done] + + self.da.queue_draw() + return True # keep timer + + def _on_draw(self, _widget, cr: cairo.Context): + # Clear to fully transparent + cr.set_operator(cairo.OPERATOR_CLEAR) + cr.paint() + cr.set_operator(cairo.OPERATOR_OVER) + + # Draw UFO beams first (behind sheep) + for ufo in self.ufos: + ufo.draw(cr) + + # Draw sheep + for sheep in self.sheep_list: + frame, sx, sy, flip, alpha = sheep.draw_info() + draw_frame(cr, self.cairo_surface, + self.tile_w, self.tile_h, self.tiles_x, + frame, sx, sy, flip, alpha) + + # Draw UFO bodies (on top of sheep) + for ufo in self.ufos: + if ufo.state != ufo.STATE_BEAM: + ufo.draw(cr) + + def run(self): + Gtk.main() + + +# ─────────────────────── entry point ─────────────────────── + +def main(): + import argparse + p = argparse.ArgumentParser(description='eSheep for Wayland') + p.add_argument('--xml', default=os.path.join(os.path.dirname(__file__), 'animations.xml'), + help='Path to animations.xml') + p.add_argument('--count', type=int, default=1, help='Number of sheep') + args = p.parse_args() + + if not os.path.exists(args.xml): + print(f"animations.xml not found: {args.xml}", file=sys.stderr) + sys.exit(1) + + SheepOverlay(args.xml, args.count).run() + +if __name__ == '__main__': + main() |
