#!/usr/bin/env python3 """eSheep - Linux/Wayland desktop pet using GTK3 + layer-shell + Wayfire IPC""" import gi gi.require_version('Gtk', '3.0') gi.require_version('GtkLayerShell', '0.1') from gi.repository import Gtk, Gdk, GLib from gi.repository import GtkLayerShell import cairo import xml.etree.ElementTree as ET import base64, io, re, os, sys, random, math, threading, time from dataclasses import dataclass, field from typing import List, Optional, Dict, Tuple from PIL import Image # ─────────────────────── constants ─────────────────────── NS = 'https://esheep.petrucci.ch/' # Floor/condition types (match XML 'only' attribute values) COND_NONE = 'none' # on screen floor (or no special condition) COND_WINDOW = 'window' # on top of a window COND_TASKBAR = 'taskbar' # on taskbar/panel COND_VERTICAL = 'vertical' # against a vertical wall COND_HORIZ_UP = 'horizontal+' # hit top of screen going upward # ─────────────────────── data classes ─────────────────────── @dataclass class NextRef: probability: int only: str # matches COND_* values, '' = always applies anim_id: int @dataclass class AnimDef: id: int name: str svx: float; svy: float # start velocity x/y (px per frame) evx: float; evy: float # end velocity x/y sint: int; eint: int # start/end interval (ms between frames) sop: float; eop: float # start/end opacity soy: float; eoy: float # start/end offsetY repeat: object # int or str expression; 0 = oneshot repeatfrom: int # frame index to loop back to frames: List[int] # sprite tile indices seq_nexts: List[NextRef] # transitions at sequence end border_nexts: List[NextRef] # transitions when hitting border gravity_nexts: List[NextRef] # transitions when gravity kicks in actions: List[str] # 'flip' etc @dataclass class SpawnDef: id: int probability: int x_expr: str y_expr: str next_anim: int # ─────────────────────── XML parsing ─────────────────────── def _ns(tag, ns): return f'{{{ns}}}{tag}' if ns else tag def _fval(el, tag, ns, default=0.0): if el is None: return default t = el.findtext(_ns(tag, ns)) try: return float(t) if t is not None else default except (ValueError, TypeError): return default def _parse_nexts(parent_el, ns) -> List[NextRef]: if parent_el is None: return [] result = [] for n in parent_el.findall(_ns('next', ns)): try: result.append(NextRef( probability=int(n.get('probability', '100')), only=n.get('only', ''), anim_id=int(n.text.strip()) if n.text else 1 )) except (ValueError, TypeError): pass return result def parse_xml(path: str): """Returns (sprite_pil_image, tile_w, tile_h, tiles_x, anim_defs, spawns)""" tree = ET.parse(path) root = tree.getroot() ns = NS if root.tag.startswith('{') else '' # ── image block ── img_el = root.find(_ns('image', ns)) tiles_x = int(img_el.findtext(_ns('tilesx', ns)) or 16) tiles_y = int(img_el.findtext(_ns('tilesy', ns)) or 11) png_b64 = img_el.findtext(_ns('png', ns)) or '' png_data = base64.b64decode(png_b64.strip()) sprite = Image.open(io.BytesIO(png_data)).convert('RGBA') tile_w = sprite.width // tiles_x tile_h = sprite.height // tiles_y # ── spawns ── spawns: List[SpawnDef] = [] spawns_el = root.find(_ns('spawns', ns)) for sp in (list(spawns_el) if spawns_el is not None else []): nxt = sp.find(_ns('next', ns)) spawns.append(SpawnDef( id=int(sp.get('id', 0)), probability=int(sp.get('probability', 100)), x_expr=(sp.findtext(_ns('x', ns)) or '0').strip(), y_expr=(sp.findtext(_ns('y', ns)) or '0').strip(), next_anim=int(nxt.text.strip()) if nxt is not None and nxt.text else 1, )) # ── animations ── anims: Dict[int, AnimDef] = {} anims_el = root.find(_ns('animations', ns)) for anim_el in (list(anims_el) if anims_el is not None else []): aid = int(anim_el.get('id', 0)) start = anim_el.find(_ns('start', ns)) end = anim_el.find(_ns('end', ns)) seq = anim_el.find(_ns('sequence', ns)) bord = anim_el.find(_ns('border', ns)) grav = anim_el.find(_ns('gravity', ns)) frames: List[int] = [] seq_nexts: List[NextRef] = [] actions: List[str] = [] repeat_raw = '0' repeatfrom = 0 if seq is not None: repeat_raw = seq.get('repeat', '0').strip() repeatfrom = int(seq.get('repeatfrom', '0')) for child in seq: tag = child.tag.split('}')[-1] if '}' in child.tag else child.tag if tag == 'frame' and child.text: try: frames.append(int(child.text.strip())) except ValueError: pass elif tag == 'next': try: seq_nexts.append(NextRef( probability=int(child.get('probability', '100')), only=child.get('only', ''), anim_id=int(child.text.strip()) if child.text else 1, )) except (ValueError, TypeError): pass elif tag == 'action' and child.text: actions.append(child.text.strip()) anims[aid] = AnimDef( id=aid, name=(anim_el.findtext(_ns('name', ns)) or '').strip(), svx=_fval(start, 'x', ns), svy=_fval(start, 'y', ns), evx=_fval(end, 'x', ns), evy=_fval(end, 'y', ns), sint=int(_fval(start, 'interval', ns, 200)), eint=int(_fval(end, 'interval', ns, 200)), sop=_fval(start, 'opacity', ns, 1.0), eop=_fval(end, 'opacity', ns, 1.0), soy=_fval(start, 'offsety', ns, 0.0), eoy=_fval(end, 'offsety', ns, 0.0), repeat=repeat_raw, repeatfrom=repeatfrom, frames=frames, seq_nexts=seq_nexts, border_nexts=_parse_nexts(bord, ns), gravity_nexts=_parse_nexts(grav, ns), actions=actions, ) return sprite, tile_w, tile_h, tiles_x, anims, spawns # ─────────────────────── expression evaluator ─────────────────────── def eval_expr(expr, ctx: dict): """Evaluate XML animation expressions: 'screenW/2-imageH', etc.""" if expr is None: return 0 expr = str(expr).strip() if not expr: return 0 # Replace C# Convert(x, System.Int32) with int(x) expr = re.sub(r'Convert\s*\(([^,]+),\s*System\.Int32\)', r'int(\1)', expr) safe = {'int': int, 'abs': abs, 'max': max, 'min': min, 'round': round} try: return eval(expr, {"__builtins__": safe}, ctx) except Exception: return 0 # ─────────────────────── Wayfire IPC ─────────────────────── class WayfireWindows: """Background thread polling Wayfire IPC for window geometry.""" def __init__(self): self._windows: List[Tuple[int,int,int,int]] = [] # (x, y, w, h) self._taskbar: Optional[Tuple[int,int,int,int]] = None self._lock = threading.Lock() self._available = False socket_path = os.environ.get('WAYFIRE_SOCKET', '') if socket_path and os.path.exists(socket_path): self._available = True t = threading.Thread(target=self._poll, daemon=True) t.start() print(f"Wayfire IPC connected: {socket_path}", flush=True) else: print("No WAYFIRE_SOCKET – window detection disabled", flush=True) def _poll(self): try: from wayfire import WayfireSocket sock = WayfireSocket() while True: try: views = sock.list_views() wins = [] taskbar = None for v in views: g = v.get('geometry', {}) x, y = g.get('x', 0), g.get('y', 0) w, h = g.get('width', 0), g.get('height', 0) role = v.get('role', '') # Only toplevel windows count as landing surfaces; # skip layer-shell panels/backgrounds (they can hide the sheep) if role == 'toplevel' and w > 10 and h > 10: wins.append((x, y, w, h)) with self._lock: self._windows = wins self._taskbar = taskbar except Exception as e: print(f"Wayfire IPC error: {e}", flush=True) time.sleep(0.4) except ImportError: print("wayfire Python package not found", flush=True) except Exception as e: print(f"Wayfire IPC fatal: {e}", flush=True) def get_windows(self) -> List[Tuple[int,int,int,int]]: with self._lock: return list(self._windows) def get_taskbar(self) -> Optional[Tuple[int,int,int,int]]: with self._lock: return self._taskbar # ─────────────────────── Sheep state machine ─────────────────────── class Sheep: def __init__(self, anim_defs, tile_w, tile_h, screen_w, screen_h, spawn: SpawnDef): self.anim_defs = anim_defs self.tile_w = tile_w self.tile_h = tile_h self.screen_w = screen_w self.screen_h = screen_h self.facing_right = False # default sprite orientation is left-facing self.opacity = 1.0 self.alive = True ctx = self._ctx(screen_w // 2, -tile_h) self.x = float(eval_expr(spawn.x_expr, ctx)) self.y = float(eval_expr(spawn.y_expr, ctx)) # Clamp to screen + some margin for off-screen spawns self.x = max(-tile_w, min(self.x, screen_w)) self.y = max(-tile_h * 2, min(self.y, screen_h)) # Animation state self.anim_id = spawn.next_anim self.frame_idx = 0 self.seq_repeat = 0 self.repeat_total = 1 self.is_oneshot = True self.tick_accum = 0.0 # Floor tracking self.floor_cond = COND_NONE self._floors: List[Tuple[float, str]] = [] # (floor_y, cond) self._init_anim(spawn.next_anim) # ── helpers ── def _ctx(self, img_x=None, img_y=None): return { 'screenW': self.screen_w, 'screenH': self.screen_h, 'areaH': self.screen_h, 'imageW': self.tile_w, 'imageH': self.tile_h, 'imageX': img_x if img_x is not None else self.x, 'imageY': img_y if img_y is not None else self.y, 'random': random.randint(0, 100), 'randS': random.randint(0, 100), } def _lerp(self, a, b, t): return a + (b - a) * max(0.0, min(1.0, t)) def _progress(self): if self.repeat_total <= 1 or self.is_oneshot: return 0.0 return self.seq_repeat / self.repeat_total def _cur_vel(self): a = self.anim_defs.get(self.anim_id) if not a: return 0.0, 0.0 t = self._progress() return self._lerp(a.svx, a.evx, t), self._lerp(a.svy, a.evy, t) def _cur_interval(self): a = self.anim_defs.get(self.anim_id) if not a: return 200 return max(10, int(self._lerp(a.sint, a.eint, self._progress()))) def _cur_opacity(self): a = self.anim_defs.get(self.anim_id) if not a: return 1.0 return self._lerp(a.sop, a.eop, self._progress()) # ── animation transitions ── def _init_anim(self, anim_id: int): a = self.anim_defs.get(anim_id) if not a: return self.anim_id = anim_id self.frame_idx = 0 self.seq_repeat = 0 self.tick_accum = 0.0 ctx = self._ctx() raw = a.repeat try: val = int(eval_expr(raw, ctx)) except Exception: val = 0 self.is_oneshot = (val == 0) self.repeat_total = max(1, val) if val > 0 else 1 # Apply any immediate actions (e.g. flip triggered by entering this animation) if 'flip' in a.actions: self.facing_right = not self.facing_right def _pick_next(self, nexts: List[NextRef], cond: str) -> Optional[int]: """Weighted random pick from nexts matching cond.""" # First try exact match pool = [n for n in nexts if n.only == cond or n.only == ''] if not pool: pool = nexts # fallback: ignore condition if not pool: return None total = sum(n.probability for n in pool) if total == 0: return pool[0].anim_id r = random.randint(0, total - 1) cum = 0 for n in pool: cum += n.probability if r < cum: return n.anim_id return pool[-1].anim_id def _transition(self, anim_id: Optional[int]): if anim_id is not None: self._init_anim(anim_id) # ── floor update (call from main loop) ── def update_floors(self, windows: List[Tuple[int,int,int,int]], taskbar: Optional[Tuple[int,int,int,int]], screen_w: int, screen_h: int): self.screen_w = screen_w self.screen_h = screen_h # Collect all candidate floors (surfaces the sheep could land on) all_floors = [(float(screen_h), COND_NONE)] # Taskbar (treat as window) if taskbar: tx, ty, tw, th = taskbar if tx < self.x + self.tile_w - 4 and tx + tw > self.x + 4: all_floors.append((float(ty), COND_TASKBAR)) # Window tops – only windows that horizontally overlap the sheep for (wx, wy, ww, wh) in windows: if wx < self.x + self.tile_w - 4 and wx + ww > self.x + 4: all_floors.append((float(wy), COND_WINDOW)) # Keep only floors at or below the sheep's current bottom position # (floors above the sheep are irrelevant for landing) sheep_bot = self.y + self.tile_h self._floors = sorted( [(fy, fc) for (fy, fc) in all_floors if fy >= sheep_bot - 8], key=lambda f: f[0] ) if not self._floors: self._floors = [(float(screen_h), COND_NONE)] # ── tick ── def tick(self, dt_ms: float): a = self.anim_defs.get(self.anim_id) if not a or not a.frames: self._transition(1) return interval = self._cur_interval() vx, vy = self._cur_vel() # vx is in "facing direction" units: # facing left (facing_right=False): vx<0 = move left → screen dx = vx # facing right (facing_right=True): vx<0 = move right → screen dx = -vx dir_mul = -1.0 if self.facing_right else 1.0 sheep_bot_prev = self.y + self.tile_h self.x += vx * dir_mul * dt_ms / interval self.y += vy * dt_ms / interval self.opacity = self._cur_opacity() sheep_bot = self.y + self.tile_h # ── floor / landing collision ── # Find the nearest floor at or below the sheep floor_hit_y: Optional[float] = None floor_hit_fc: str = COND_NONE for (fy, fc) in self._floors: if sheep_bot >= fy - 1: # sheep has reached or passed this floor floor_hit_y = fy floor_hit_fc = fc break # _floors is sorted ascending; first hit wins if floor_hit_y is not None: # Snap sheep to floor surface self.y = floor_hit_y - self.tile_h self.floor_cond = floor_hit_fc # If we just LANDED (were above before, falling now) → landing animation just_landed = (sheep_bot_prev < floor_hit_y - 1) and (vy > 0.3) if just_landed and a.border_nexts: nxt = self._pick_next(a.border_nexts, floor_hit_fc) self._transition(nxt) return else: # No floor under us self.floor_cond = COND_NONE # Gravity: walking/running animations trigger fall when airborne. # 'only="none"' in gravity_nexts means "no special floor" = free fall. if a.gravity_nexts: nxt = self._pick_next(a.gravity_nexts, COND_NONE) self._transition(nxt) return # ── left / right / top border check ── if a.border_nexts: hit, bcond = False, COND_NONE # left border (sheep moving left, facing left) if self.x <= 0 and not self.facing_right: self.x = 0.0 hit, bcond = True, COND_NONE # right border (sheep moving right, facing right) elif self.x + self.tile_w >= self.screen_w and self.facing_right: self.x = float(self.screen_w - self.tile_w) hit, bcond = True, COND_NONE # top border (going up) if self.y <= 0 and vy < 0: self.y = 0.0 hit, bcond = True, COND_HORIZ_UP if hit: nxt = self._pick_next(a.border_nexts, bcond) self._transition(nxt) return # ── frame advance ── self.tick_accum += dt_ms while self.tick_accum >= self._cur_interval(): self.tick_accum -= self._cur_interval() self.frame_idx += 1 if self.frame_idx >= len(a.frames): self.seq_repeat += 1 done = self.is_oneshot or (self.seq_repeat >= self.repeat_total) if done: nxt = self._pick_next(a.seq_nexts, self.floor_cond) self._transition(nxt) return else: self.frame_idx = a.repeatfrom def draw_info(self): """Return (frame_tile_idx, x, y, facing_right, opacity)""" a = self.anim_defs.get(self.anim_id) if not a or not a.frames: return 0, self.x, self.y, self.facing_right, self.opacity fi = max(0, min(self.frame_idx, len(a.frames) - 1)) return a.frames[fi], self.x, self.y, self.facing_right, self.opacity # ─────────────────────── UFO ─────────────────────── class UFO: """Classic UFO abduction sequence – drawn programmatically.""" STATE_APPROACH = 'approach' STATE_BEAM = 'beam' STATE_LIFT = 'lift' STATE_DEPART = 'depart' STATE_DONE = 'done' def __init__(self, sheep: Sheep, screen_w: int, screen_h: int): self.sheep = sheep self.screen_w = screen_w self.screen_h = screen_h self.w = 80; self.h = 40 # UFO body size self.x = float(screen_w) # start off-screen right self.y = float(screen_h * 0.12) self.state = self.STATE_APPROACH self.beam_alpha = 0.0 self.sheep_orig_y = sheep.y self._timer = 0.0 def tick(self, dt_ms: float): sheep = self.sheep if self.state == self.STATE_APPROACH: # Move left toward sheep target_x = sheep.x - self.w / 2 + sheep.tile_w / 2 self.x -= 4.0 * dt_ms / 50 if self.x <= target_x: self.x = target_x self.state = self.STATE_BEAM self._timer = 0.0 elif self.state == self.STATE_BEAM: self._timer += dt_ms self.beam_alpha = min(1.0, self._timer / 600.0) if self._timer > 900: self.state = self.STATE_LIFT self.sheep_orig_y = sheep.y elif self.state == self.STATE_LIFT: speed = 3.0 * dt_ms / 50 sheep.y -= speed sheep.x = self.x + self.w / 2 - sheep.tile_w / 2 if sheep.y + sheep.tile_h < self.y + self.h: self.state = self.STATE_DEPART self._timer = 0.0 elif self.state == self.STATE_DEPART: self.x += 5.0 * dt_ms / 50 sheep.x = self.x + self.w / 2 - sheep.tile_w / 2 sheep.y = self.y + self.h / 2 self._timer += dt_ms if self._timer > 400 and self.x > self.screen_w: # Drop sheep back sheep.x = max(50, min(self.screen_w - sheep.tile_w - 50, self.screen_w * 0.3 + random.randint(-100, 100))) sheep.y = -sheep.tile_h - 10 sheep._init_anim(5) # fall animation self.state = self.STATE_DONE @property def done(self): return self.state == self.STATE_DONE def draw(self, cr: cairo.Context): x, y = self.x, self.y w, h = self.w, self.h # Beam (trapezoid under UFO) if self.state in (self.STATE_BEAM, self.STATE_LIFT): sheep = self.sheep beam_top_x = x + w * 0.3 beam_top_w = w * 0.4 beam_bot_x = sheep.x - 5 beam_bot_w = sheep.tile_w + 10 beam_bot_y = sheep.y + sheep.tile_h beam_top_y = y + h cr.save() cr.set_source_rgba(0.8, 0.9, 1.0, 0.35 * self.beam_alpha) cr.move_to(beam_top_x, beam_top_y) cr.line_to(beam_top_x + beam_top_w, beam_top_y) cr.line_to(beam_bot_x + beam_bot_w, beam_bot_y) cr.line_to(beam_bot_x, beam_bot_y) cr.close_path() cr.fill() cr.restore() # Saucer body cr.save() cx, cy = x + w / 2, y + h * 0.6 # Disk cr.set_source_rgba(0.7, 0.7, 0.8, 0.95) cr.save() cr.translate(cx, cy) cr.scale(w / 2, h * 0.35) cr.arc(0, 0, 1, 0, 2 * math.pi) cr.restore() cr.fill() # Dome cr.set_source_rgba(0.5, 0.8, 1.0, 0.85) cr.save() cr.translate(cx, y + h * 0.4) cr.scale(w * 0.28, h * 0.45) cr.arc(0, 0, 1, math.pi, 2 * math.pi) cr.restore() cr.fill() # Lights for i, col in enumerate([(1,.2,.2,1), (.2,1,.2,1), (.2,.2,1,1), (1,1,.2,1)]): lx = x + w * 0.2 + i * w * 0.18 ly = cy cr.set_source_rgba(*col) cr.arc(lx, ly, 3, 0, 2 * math.pi) cr.fill() cr.restore() # ─────────────────────── Flower ─────────────────────── class Flower: """Procedural flower that appears at a sheep's feet during eat animation (26).""" COLORS = [(1.0, 0.9, 0.15), (1.0, 0.4, 0.65), (0.75, 0.5, 1.0), (0.4, 0.9, 0.4)] LIFESPAN_MS = 2400.0 def __init__(self, sheep: 'Sheep'): self.sheep = sheep self._color = random.choice(self.COLORS) self._age = 0.0 self.done = False def tick(self, dt_ms: float): if self.sheep.anim_id != 26: self.done = True return self._age += dt_ms if self._age >= self.LIFESPAN_MS: self.done = True def draw(self, cr: cairo.Context): if self.done: return sheep = self.sheep progress = self._age / self.LIFESPAN_MS # 0→1 n_petals = max(0, round(5 * (1.0 - progress))) # 5 petals → 0 as eaten alpha = 1.0 - max(0.0, (progress - 0.85) / 0.15) # Position: in front of the sheep at ground level fx = sheep.x + (sheep.tile_w * 0.15 if not sheep.facing_right else sheep.tile_w * 0.75) fy = sheep.y + sheep.tile_h - 2 cr.save() # Stem cr.set_source_rgba(0.2, 0.65, 0.15, alpha) cr.set_line_width(1.5) cr.move_to(fx, fy) cr.line_to(fx, fy - 10) cr.stroke() # Petals cr.set_source_rgba(*self._color, alpha) for i in range(n_petals): ang = i * (2 * math.pi / 5) - math.pi / 2 cr.arc(fx + math.cos(ang) * 4.5, fy - 10 + math.sin(ang) * 4.5, 3.0, 0, 2 * math.pi) cr.fill() # Centre if n_petals > 0: cr.set_source_rgba(1.0, 0.55, 0.1, alpha) cr.arc(fx, fy - 10, 2.5, 0, 2 * math.pi) cr.fill() cr.restore() # ─────────────────────── Cairo sprite drawing ─────────────────────── def draw_frame(cr: cairo.Context, surface: cairo.ImageSurface, tile_w: int, tile_h: int, tiles_x: int, frame_idx: int, dest_x: float, dest_y: float, flip: bool = False, alpha: float = 1.0): """Draw one tile from the sprite sheet at (dest_x, dest_y).""" if frame_idx < 0: return col = frame_idx % tiles_x row = frame_idx // tiles_x sx = col * tile_w sy = row * tile_h cr.save() if flip: cr.translate(dest_x + tile_w, dest_y) cr.scale(-1, 1) cr.rectangle(-tile_w, 0, tile_w, tile_h) cr.clip() cr.set_source_surface(surface, -tile_w - sx, -sy) else: cr.translate(dest_x, dest_y) cr.rectangle(0, 0, tile_w, tile_h) cr.clip() cr.set_source_surface(surface, -sx, -sy) cr.paint_with_alpha(alpha) cr.restore() # ─────────────────────── main overlay ─────────────────────── class SheepOverlay: UFO_INTERVAL_S = 90 # seconds between UFO abductions (approx) def __init__(self, xml_path: str, num_sheep: int = 1): print("Loading assets…", flush=True) self.sprite, self.tile_w, self.tile_h, self.tiles_x, \ self.anim_defs, self.spawns = parse_xml(xml_path) print(f"Sprite: {self.sprite.width}×{self.sprite.height}, " f"tiles: {self.tile_w}×{self.tile_h} ({self.tiles_x} per row), " f"animations: {len(self.anim_defs)}", flush=True) self.cairo_surface = self._pil_to_cairo(self.sprite) self.dark_surface = self._make_dark_surface(self.sprite) # Patch fall-hard (10) to occasionally boing on landing — 20% chance hard = self.anim_defs.get(10) if hard: hard.seq_nexts = [NextRef(probability=20, only='', anim_id=8), NextRef(probability=80, only='', anim_id=1)] self.wayfire = WayfireWindows() self.sheep_list: List[Sheep] = [] self.ufos: List[UFO] = [] self.num_sheep = num_sheep self.last_time = time.monotonic() self._ufo_countdown = self.UFO_INTERVAL_S + random.randint(0, 30) # Drag state self._drag_sheep: Optional[Sheep] = None self._drag_ox = 0.0 self._drag_oy = 0.0 # Extra features self._flowers: List[Flower] = [] self._black_sheep_set: set = set() self._black_sheep_countdown = random.uniform(180, 300) # seconds self._eat_countdown = random.uniform(30, 90) # seconds self._interact_cooldowns: Dict[Tuple[int,int], float] = {} # ms self._frozen_sheep: Dict[int, float] = {} # id→remaining ms self._build_window() @staticmethod def _pil_to_cairo(img: Image.Image) -> cairo.ImageSurface: """Convert PIL RGBA image → cairo ImageSurface (ARGB32, premultiplied alpha).""" import numpy as np surf = cairo.ImageSurface(cairo.FORMAT_ARGB32, img.width, img.height) # Cairo FORMAT_ARGB32 uses premultiplied alpha in BGRA memory order arr = np.array(img, dtype=np.uint32) r = arr[:, :, 0]; g = arr[:, :, 1] b = arr[:, :, 2]; a = arr[:, :, 3] # Premultiply: channel' = channel * alpha / 255 rp = (r * a) // 255 gp = (g * a) // 255 bp = (b * a) // 255 # Pack as ARGB32 native-endian (little-endian = BGRA bytes) packed = (a << 24) | (rp << 16) | (gp << 8) | bp surf.get_data()[:] = packed.astype(np.uint32).tobytes() surf.mark_dirty() return surf @staticmethod def _make_dark_surface(img: 'Image.Image') -> cairo.ImageSurface: """Near-black tinted sprite for the black sheep visitor.""" import numpy as np arr = np.array(img, dtype=np.float32) arr[:, :, :3] *= 0.18 # desaturate to near-black, preserve alpha from PIL import Image as _Image return SheepOverlay._pil_to_cairo(_Image.fromarray(arr.astype(np.uint8))) def _build_window(self): self.win = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) self.win.set_app_paintable(True) screen = self.win.get_screen() visual = screen.get_rgba_visual() if visual: self.win.set_visual(visual) # layer-shell: full-screen overlay if not GtkLayerShell.is_supported(): print("ERROR: Your Wayland compositor does not support wlr-layer-shell.", file=sys.stderr) print(" eSheep requires Wayfire, Sway, Hyprland, or another wlroots compositor.", file=sys.stderr) sys.exit(1) GtkLayerShell.init_for_window(self.win) GtkLayerShell.set_layer(self.win, GtkLayerShell.Layer.OVERLAY) GtkLayerShell.set_exclusive_zone(self.win, -1) for edge in (GtkLayerShell.Edge.LEFT, GtkLayerShell.Edge.RIGHT, GtkLayerShell.Edge.TOP, GtkLayerShell.Edge.BOTTOM): GtkLayerShell.set_anchor(self.win, edge, True) # Use KeyboardMode enum if available (gtk-layer-shell 0.7+), else bool try: GtkLayerShell.set_keyboard_mode(self.win, GtkLayerShell.KeyboardMode.NONE) except AttributeError: GtkLayerShell.set_keyboard_interactivity(self.win, False) self.da = Gtk.DrawingArea() self.da.connect('draw', self._on_draw) # Mouse events for click-to-drag – must be set on the DrawingArea # (it has its own GdkWindow that fills the overlay and receives events first) # and BEFORE show_all() so the GdkWindow is realized with these masks. self.da.add_events( Gdk.EventMask.BUTTON_PRESS_MASK | Gdk.EventMask.BUTTON_RELEASE_MASK | Gdk.EventMask.POINTER_MOTION_MASK ) self.da.connect('button-press-event', self._on_button_press) self.da.connect('button-release-event', self._on_button_release) self.da.connect('motion-notify-event', self._on_motion) self.win.add(self.da) self.win.connect('destroy', Gtk.main_quit) self.win.show_all() # Input region starts as pass-through; _update_input_region() refines it each tick self.win.input_shape_combine_region(cairo.Region()) # Screen size display = Gdk.Display.get_default() mon = display.get_primary_monitor() or display.get_monitor(0) geo = mon.get_geometry() self.screen_w = geo.width self.screen_h = geo.height print(f"Screen: {self.screen_w}×{self.screen_h}", flush=True) # Spawn initial sheep for _ in range(self.num_sheep): self._spawn_sheep() GLib.timeout_add(50, self._tick) # 20 fps game loop # ── click-to-drag ── def _update_input_region(self): """Set the input region on both the window and the DrawingArea's GdkWindow to cover only the sheep sprites. While dragging, use the full screen so we don't lose the pointer mid-drag.""" if self._drag_sheep is not None: # Full screen while dragging – catches pointer everywhere region = cairo.Region(cairo.RectangleInt( 0, 0, self.screen_w, self.screen_h)) else: region = cairo.Region() pad = 4 # extra px around sprite for easier click targeting for sheep in self.sheep_list: rx = int(sheep.x) - pad ry = int(sheep.y) - pad rw = self.tile_w + pad * 2 rh = self.tile_h + pad * 2 region.union(cairo.RectangleInt(rx, ry, rw, rh)) # Apply to both the GtkWindow and the DrawingArea's own GdkWindow self.win.input_shape_combine_region(region) gdk_win = self.da.get_window() if gdk_win is not None: gdk_win.input_shape_combine_region(region, 0, 0) def _sheep_at(self, mx: float, my: float) -> Optional[Sheep]: """Return the sheep whose bounding rect contains (mx, my), or None.""" for sheep in reversed(self.sheep_list): # topmost first if (sheep.x <= mx <= sheep.x + self.tile_w and sheep.y <= my <= sheep.y + self.tile_h): return sheep return None def _on_button_press(self, _widget, event): if event.button != 1: return False sheep = self._sheep_at(event.x, event.y) if sheep is None: return False # Don't grab sheep currently being abducted if any(u.sheep is sheep for u in self.ufos): return False self._drag_sheep = sheep self._drag_ox = event.x - sheep.x self._drag_oy = event.y - sheep.y self._update_input_region() print(f"Grabbed sheep at ({sheep.x:.0f}, {sheep.y:.0f})", flush=True) return True def _on_button_release(self, _widget, event): if event.button != 1 or self._drag_sheep is None: return False sheep = self._drag_sheep self._drag_sheep = None self._update_input_region() # Let the sheep fall from wherever it was dropped fall_id = None for aid, adef in self.anim_defs.items(): if 'fall' in adef.name.lower(): fall_id = aid break if fall_id is None: fall_id = 5 sheep._init_anim(fall_id) print(f"Released sheep at ({sheep.x:.0f}, {sheep.y:.0f}) → fall", flush=True) return True def _on_motion(self, _widget, event): if self._drag_sheep is None: return False sheep = self._drag_sheep sheep.x = event.x - self._drag_ox sheep.y = event.y - self._drag_oy # Clamp to screen sheep.x = max(0, min(sheep.x, self.screen_w - self.tile_w)) sheep.y = max(0, min(sheep.y, self.screen_h - self.tile_h)) return True # ── extra features ── def _spawn_black_sheep(self): """Spawn a dark-tinted sheep from a random edge doing the blacksheep sequence.""" from_right = random.random() < 0.5 anim_id = 28 if from_right else 31 # 28=approach-left, 31=approach-right sp = SpawnDef( id=99, probability=100, x_expr='screenW' if from_right else '0-imageW', y_expr='screenH-imageH', next_anim=anim_id, ) sheep = Sheep(self.anim_defs, self.tile_w, self.tile_h, self.screen_w, self.screen_h, sp) self.sheep_list.append(sheep) self._black_sheep_set.add(sheep) print(f"Black sheep enters from the {'right' if from_right else 'left'}!", flush=True) def _check_interactions(self, dt_ms: float): """Bump and greet walking sheep that get too close.""" busy = {self._drag_sheep} | {u.sheep for u in self.ufos} active = [s for s in self.sheep_list if s not in busy] for i, a in enumerate(active): for b in active[i + 1:]: key = (min(id(a), id(b)), max(id(a), id(b))) cd = self._interact_cooldowns.get(key, 0.0) if cd > 0: self._interact_cooldowns[key] = cd - dt_ms continue same_level = abs(a.y - b.y) < self.tile_h * 0.7 hdist = abs(a.x - b.x) overlapping = hdist < self.tile_w * 0.6 and same_level # One or both sheep is black → skip social interactions either_black = a in self._black_sheep_set or b in self._black_sheep_set if overlapping and not either_black: # Bump: boing and reverse a.facing_right = not a.facing_right b.facing_right = not b.facing_right a._init_anim(8) b._init_anim(8) self._interact_cooldowns[key] = 7000.0 print("Sheep bumped into each other!", flush=True) elif (hdist < self.tile_w * 2.2 and same_level and not either_black and id(a) not in self._frozen_sheep and id(b) not in self._frozen_sheep): # Approaching each other? (each facing toward the other) a_toward = (a.x < b.x) == a.facing_right b_toward = (b.x < a.x) == b.facing_right if a_toward and b_toward: # Greet: face each other and freeze briefly a.facing_right = a.x < b.x b.facing_right = b.x < a.x self._frozen_sheep[id(a)] = 2200.0 self._frozen_sheep[id(b)] = 2200.0 self._interact_cooldowns[key] = 20000.0 print("Sheep greeting!", flush=True) def _spawn_sheep(self): total = sum(s.probability for s in self.spawns) r = random.randint(0, total - 1) cum = 0 chosen = self.spawns[0] for s in self.spawns: cum += s.probability if r < cum: chosen = s break sheep = Sheep( self.anim_defs, self.tile_w, self.tile_h, self.screen_w, self.screen_h, chosen ) self.sheep_list.append(sheep) print(f"Spawned sheep at ({sheep.x:.0f}, {sheep.y:.0f}) " f"anim={self.anim_defs.get(sheep.anim_id, type('',(),{'name':'?'})()).name}", flush=True) def _tick(self) -> bool: now = time.monotonic() dt = min((now - self.last_time) * 1000.0, 100.0) self.last_time = now windows = self.wayfire.get_windows() taskbar = self.wayfire.get_taskbar() # Update all sheep (skip dragged; respect greeting freeze) for sheep in self.sheep_list: if sheep is self._drag_sheep: continue frozen_ms = self._frozen_sheep.get(id(sheep), 0.0) if frozen_ms > 0: self._frozen_sheep[id(sheep)] = frozen_ms - dt continue elif id(sheep) in self._frozen_sheep: del self._frozen_sheep[id(sheep)] if not any(u.sheep is sheep for u in self.ufos): sheep.update_floors(windows, taskbar, self.screen_w, self.screen_h) sheep.tick(dt) # Keep the input region hugging the sheep positions self._update_input_region() # ── flowers ── self._eat_countdown -= dt / 1000.0 if self._eat_countdown <= 0: self._eat_countdown = random.uniform(60, 120) candidates = [s for s in self.sheep_list if s is not self._drag_sheep and s not in self._black_sheep_set and s.floor_cond == COND_NONE and not any(u.sheep is s for u in self.ufos) and s.anim_id not in (5, 6, 13, 26)] if candidates: chosen = random.choice(candidates) chosen._init_anim(26) self._flowers.append(Flower(chosen)) print("Sheep is eating flowers!", flush=True) for f in self._flowers: f.tick(dt) self._flowers = [f for f in self._flowers if not f.done] # ── black sheep ── self._black_sheep_countdown -= dt / 1000.0 if self._black_sheep_countdown <= 0: self._spawn_black_sheep() self._black_sheep_countdown = random.uniform(180, 360) # Remove black sheep that have fully faded out (finished anim 34, opacity→0) gone = [s for s in self._black_sheep_set if s.anim_id == 34 and s.opacity < 0.05] for s in gone: self._black_sheep_set.discard(s) if s in self.sheep_list: self.sheep_list.remove(s) print("Black sheep has vanished.", flush=True) # ── bumps & greetings ── self._check_interactions(dt) # UFO timer self._ufo_countdown -= dt / 1000.0 if self._ufo_countdown <= 0 and self.sheep_list: candidates = [s for s in self.sheep_list if s not in self._black_sheep_set] if candidates: target = random.choice(candidates) if not any(u.sheep is target for u in self.ufos): self.ufos.append(UFO(target, self.screen_w, self.screen_h)) print("UFO abduction started!", flush=True) self._ufo_countdown = self.UFO_INTERVAL_S + random.randint(-20, 30) # Tick UFOs for ufo in self.ufos: ufo.tick(dt) self.ufos = [u for u in self.ufos if not u.done] self.da.queue_draw() return True # keep timer def _on_draw(self, _widget, cr: cairo.Context): # Clear to fully transparent cr.set_operator(cairo.OPERATOR_CLEAR) cr.paint() cr.set_operator(cairo.OPERATOR_OVER) # Draw UFO beams first (behind sheep) for ufo in self.ufos: ufo.draw(cr) # Draw flowers (behind sheep) for flower in self._flowers: flower.draw(cr) # Draw sheep (black sheep use darkened sprite) for sheep in self.sheep_list: frame, sx, sy, flip, alpha = sheep.draw_info() surface = (self.dark_surface if sheep in self._black_sheep_set else self.cairo_surface) draw_frame(cr, surface, self.tile_w, self.tile_h, self.tiles_x, frame, sx, sy, flip, alpha) # Draw UFO bodies (on top of sheep) for ufo in self.ufos: if ufo.state != ufo.STATE_BEAM: ufo.draw(cr) def run(self): Gtk.main() # ─────────────────────── entry point ─────────────────────── def main(): import argparse, signal, subprocess p = argparse.ArgumentParser(description='eSheep for Wayland') p.add_argument('--xml', default=os.path.join(os.path.dirname(__file__), 'animations.xml'), help='Path to animations.xml') p.add_argument('--count', type=int, default=1, help='Number of sheep') p.add_argument('--exit', action='store_true', help='Kill any running eSheep instances and quit') args = p.parse_args() if args.exit: my_pid = os.getpid() result = subprocess.run(['pgrep', '-f', 'esheep.py'], capture_output=True, text=True) pids = [int(p) for p in result.stdout.split() if p.strip() and int(p.strip()) != my_pid] if pids: for pid in pids: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass print(f"Stopped {len(pids)} eSheep process(es).") else: print("No running eSheep found.") sys.exit(0) if not os.path.exists(args.xml): print(f"animations.xml not found: {args.xml}", file=sys.stderr) sys.exit(1) SheepOverlay(args.xml, args.count).run() if __name__ == '__main__': main()