summaryrefslogtreecommitdiffstats
path: root/esheep.py
diff options
context:
space:
mode:
Diffstat (limited to 'esheep.py')
-rw-r--r--esheep.py1131
1 files changed, 1131 insertions, 0 deletions
diff --git a/esheep.py b/esheep.py
new file mode 100644
index 0000000..5bf2328
--- /dev/null
+++ b/esheep.py
@@ -0,0 +1,1131 @@
+#!/usr/bin/env python3
+"""eSheep - Linux/Wayland desktop pet using GTK3 + layer-shell + Wayfire IPC"""
+
+import gi
+gi.require_version('Gtk', '3.0')
+gi.require_version('GtkLayerShell', '0.1')
+from gi.repository import Gtk, Gdk, GLib
+from gi.repository import GtkLayerShell
+
+import cairo
+import xml.etree.ElementTree as ET
+import base64, io, re, os, sys, random, math, threading, time
+from dataclasses import dataclass, field
+from typing import List, Optional, Dict, Tuple
+from PIL import Image
+
+# ─────────────────────── constants ───────────────────────
+
+NS = 'https://esheep.petrucci.ch/'
+
+# Floor/condition types (match XML 'only' attribute values)
+COND_NONE = 'none' # on screen floor (or no special condition)
+COND_WINDOW = 'window' # on top of a window
+COND_TASKBAR = 'taskbar' # on taskbar/panel
+COND_VERTICAL = 'vertical' # against a vertical wall
+COND_HORIZ_UP = 'horizontal+' # hit top of screen going upward
+
+# ─────────────────────── data classes ───────────────────────
+
+@dataclass
+class NextRef:
+ probability: int
+ only: str # matches COND_* values, '' = always applies
+ anim_id: int
+
+@dataclass
+class AnimDef:
+ id: int
+ name: str
+ svx: float; svy: float # start velocity x/y (px per frame)
+ evx: float; evy: float # end velocity x/y
+ sint: int; eint: int # start/end interval (ms between frames)
+ sop: float; eop: float # start/end opacity
+ soy: float; eoy: float # start/end offsetY
+ repeat: object # int or str expression; 0 = oneshot
+ repeatfrom: int # frame index to loop back to
+ frames: List[int] # sprite tile indices
+ seq_nexts: List[NextRef] # transitions at sequence end
+ border_nexts: List[NextRef] # transitions when hitting border
+ gravity_nexts: List[NextRef] # transitions when gravity kicks in
+ actions: List[str] # 'flip' etc
+
+@dataclass
+class SpawnDef:
+ id: int
+ probability: int
+ x_expr: str
+ y_expr: str
+ next_anim: int
+
+# ─────────────────────── XML parsing ───────────────────────
+
+def _ns(tag, ns):
+ return f'{{{ns}}}{tag}' if ns else tag
+
+def _fval(el, tag, ns, default=0.0):
+ if el is None:
+ return default
+ t = el.findtext(_ns(tag, ns))
+ try:
+ return float(t) if t is not None else default
+ except (ValueError, TypeError):
+ return default
+
+def _parse_nexts(parent_el, ns) -> List[NextRef]:
+ if parent_el is None:
+ return []
+ result = []
+ for n in parent_el.findall(_ns('next', ns)):
+ try:
+ result.append(NextRef(
+ probability=int(n.get('probability', '100')),
+ only=n.get('only', ''),
+ anim_id=int(n.text.strip()) if n.text else 1
+ ))
+ except (ValueError, TypeError):
+ pass
+ return result
+
+def parse_xml(path: str):
+ """Returns (sprite_pil_image, tile_w, tile_h, tiles_x, anim_defs, spawns)"""
+ tree = ET.parse(path)
+ root = tree.getroot()
+ ns = NS if root.tag.startswith('{') else ''
+
+ # ── image block ──
+ img_el = root.find(_ns('image', ns))
+ tiles_x = int(img_el.findtext(_ns('tilesx', ns)) or 16)
+ tiles_y = int(img_el.findtext(_ns('tilesy', ns)) or 11)
+ png_b64 = img_el.findtext(_ns('png', ns)) or ''
+ png_data = base64.b64decode(png_b64.strip())
+ sprite = Image.open(io.BytesIO(png_data)).convert('RGBA')
+ tile_w = sprite.width // tiles_x
+ tile_h = sprite.height // tiles_y
+
+ # ── spawns ──
+ spawns: List[SpawnDef] = []
+ spawns_el = root.find(_ns('spawns', ns))
+ for sp in (list(spawns_el) if spawns_el is not None else []):
+ nxt = sp.find(_ns('next', ns))
+ spawns.append(SpawnDef(
+ id=int(sp.get('id', 0)),
+ probability=int(sp.get('probability', 100)),
+ x_expr=(sp.findtext(_ns('x', ns)) or '0').strip(),
+ y_expr=(sp.findtext(_ns('y', ns)) or '0').strip(),
+ next_anim=int(nxt.text.strip()) if nxt is not None and nxt.text else 1,
+ ))
+
+ # ── animations ──
+ anims: Dict[int, AnimDef] = {}
+ anims_el = root.find(_ns('animations', ns))
+ for anim_el in (list(anims_el) if anims_el is not None else []):
+ aid = int(anim_el.get('id', 0))
+ start = anim_el.find(_ns('start', ns))
+ end = anim_el.find(_ns('end', ns))
+ seq = anim_el.find(_ns('sequence', ns))
+ bord = anim_el.find(_ns('border', ns))
+ grav = anim_el.find(_ns('gravity', ns))
+
+ frames: List[int] = []
+ seq_nexts: List[NextRef] = []
+ actions: List[str] = []
+ repeat_raw = '0'
+ repeatfrom = 0
+
+ if seq is not None:
+ repeat_raw = seq.get('repeat', '0').strip()
+ repeatfrom = int(seq.get('repeatfrom', '0'))
+ for child in seq:
+ tag = child.tag.split('}')[-1] if '}' in child.tag else child.tag
+ if tag == 'frame' and child.text:
+ try:
+ frames.append(int(child.text.strip()))
+ except ValueError:
+ pass
+ elif tag == 'next':
+ try:
+ seq_nexts.append(NextRef(
+ probability=int(child.get('probability', '100')),
+ only=child.get('only', ''),
+ anim_id=int(child.text.strip()) if child.text else 1,
+ ))
+ except (ValueError, TypeError):
+ pass
+ elif tag == 'action' and child.text:
+ actions.append(child.text.strip())
+
+ anims[aid] = AnimDef(
+ id=aid,
+ name=(anim_el.findtext(_ns('name', ns)) or '').strip(),
+ svx=_fval(start, 'x', ns), svy=_fval(start, 'y', ns),
+ evx=_fval(end, 'x', ns), evy=_fval(end, 'y', ns),
+ sint=int(_fval(start, 'interval', ns, 200)),
+ eint=int(_fval(end, 'interval', ns, 200)),
+ sop=_fval(start, 'opacity', ns, 1.0),
+ eop=_fval(end, 'opacity', ns, 1.0),
+ soy=_fval(start, 'offsety', ns, 0.0),
+ eoy=_fval(end, 'offsety', ns, 0.0),
+ repeat=repeat_raw,
+ repeatfrom=repeatfrom,
+ frames=frames,
+ seq_nexts=seq_nexts,
+ border_nexts=_parse_nexts(bord, ns),
+ gravity_nexts=_parse_nexts(grav, ns),
+ actions=actions,
+ )
+
+ return sprite, tile_w, tile_h, tiles_x, anims, spawns
+
+# ─────────────────────── expression evaluator ───────────────────────
+
+def eval_expr(expr, ctx: dict):
+ """Evaluate XML animation expressions: 'screenW/2-imageH', etc."""
+ if expr is None:
+ return 0
+ expr = str(expr).strip()
+ if not expr:
+ return 0
+ # Replace C# Convert(x, System.Int32) with int(x)
+ expr = re.sub(r'Convert\s*\(([^,]+),\s*System\.Int32\)', r'int(\1)', expr)
+ safe = {'int': int, 'abs': abs, 'max': max, 'min': min, 'round': round}
+ try:
+ return eval(expr, {"__builtins__": safe}, ctx)
+ except Exception:
+ return 0
+
+# ─────────────────────── Wayfire IPC ───────────────────────
+
+class WayfireWindows:
+ """Background thread polling Wayfire IPC for window geometry."""
+
+ def __init__(self):
+ self._windows: List[Tuple[int,int,int,int]] = [] # (x, y, w, h)
+ self._taskbar: Optional[Tuple[int,int,int,int]] = None
+ self._lock = threading.Lock()
+ self._available = False
+
+ socket_path = os.environ.get('WAYFIRE_SOCKET', '')
+ if socket_path and os.path.exists(socket_path):
+ self._available = True
+ t = threading.Thread(target=self._poll, daemon=True)
+ t.start()
+ print(f"Wayfire IPC connected: {socket_path}", flush=True)
+ else:
+ print("No WAYFIRE_SOCKET – window detection disabled", flush=True)
+
+ def _poll(self):
+ try:
+ from wayfire import WayfireSocket
+ sock = WayfireSocket()
+ while True:
+ try:
+ views = sock.list_views()
+ wins = []
+ taskbar = None
+ for v in views:
+ g = v.get('geometry', {})
+ x, y = g.get('x', 0), g.get('y', 0)
+ w, h = g.get('width', 0), g.get('height', 0)
+ role = v.get('role', '')
+ # Only toplevel windows count as landing surfaces;
+ # skip layer-shell panels/backgrounds (they can hide the sheep)
+ if role == 'toplevel' and w > 10 and h > 10:
+ wins.append((x, y, w, h))
+ with self._lock:
+ self._windows = wins
+ self._taskbar = taskbar
+ except Exception as e:
+ print(f"Wayfire IPC error: {e}", flush=True)
+ time.sleep(0.4)
+ except ImportError:
+ print("wayfire Python package not found", flush=True)
+ except Exception as e:
+ print(f"Wayfire IPC fatal: {e}", flush=True)
+
+ def get_windows(self) -> List[Tuple[int,int,int,int]]:
+ with self._lock:
+ return list(self._windows)
+
+ def get_taskbar(self) -> Optional[Tuple[int,int,int,int]]:
+ with self._lock:
+ return self._taskbar
+
+# ─────────────────────── Sheep state machine ───────────────────────
+
+class Sheep:
+ def __init__(self, anim_defs, tile_w, tile_h, screen_w, screen_h, spawn: SpawnDef):
+ self.anim_defs = anim_defs
+ self.tile_w = tile_w
+ self.tile_h = tile_h
+ self.screen_w = screen_w
+ self.screen_h = screen_h
+
+ self.facing_right = False # default sprite orientation is left-facing
+ self.opacity = 1.0
+ self.alive = True
+
+ ctx = self._ctx(screen_w // 2, -tile_h)
+ self.x = float(eval_expr(spawn.x_expr, ctx))
+ self.y = float(eval_expr(spawn.y_expr, ctx))
+ # Clamp to screen + some margin for off-screen spawns
+ self.x = max(-tile_w, min(self.x, screen_w))
+ self.y = max(-tile_h * 2, min(self.y, screen_h))
+
+ # Animation state
+ self.anim_id = spawn.next_anim
+ self.frame_idx = 0
+ self.seq_repeat = 0
+ self.repeat_total = 1
+ self.is_oneshot = True
+ self.tick_accum = 0.0
+
+ # Floor tracking
+ self.floor_cond = COND_NONE
+ self._floors: List[Tuple[float, str]] = [] # (floor_y, cond)
+
+ self._init_anim(spawn.next_anim)
+
+ # ── helpers ──
+
+ def _ctx(self, img_x=None, img_y=None):
+ return {
+ 'screenW': self.screen_w, 'screenH': self.screen_h,
+ 'areaH': self.screen_h,
+ 'imageW': self.tile_w, 'imageH': self.tile_h,
+ 'imageX': img_x if img_x is not None else self.x,
+ 'imageY': img_y if img_y is not None else self.y,
+ 'random': random.randint(0, 100),
+ 'randS': random.randint(0, 100),
+ }
+
+ def _lerp(self, a, b, t):
+ return a + (b - a) * max(0.0, min(1.0, t))
+
+ def _progress(self):
+ if self.repeat_total <= 1 or self.is_oneshot:
+ return 0.0
+ return self.seq_repeat / self.repeat_total
+
+ def _cur_vel(self):
+ a = self.anim_defs.get(self.anim_id)
+ if not a:
+ return 0.0, 0.0
+ t = self._progress()
+ return self._lerp(a.svx, a.evx, t), self._lerp(a.svy, a.evy, t)
+
+ def _cur_interval(self):
+ a = self.anim_defs.get(self.anim_id)
+ if not a:
+ return 200
+ return max(10, int(self._lerp(a.sint, a.eint, self._progress())))
+
+ def _cur_opacity(self):
+ a = self.anim_defs.get(self.anim_id)
+ if not a:
+ return 1.0
+ return self._lerp(a.sop, a.eop, self._progress())
+
+ # ── animation transitions ──
+
+ def _init_anim(self, anim_id: int):
+ a = self.anim_defs.get(anim_id)
+ if not a:
+ return
+ self.anim_id = anim_id
+ self.frame_idx = 0
+ self.seq_repeat = 0
+ self.tick_accum = 0.0
+
+ ctx = self._ctx()
+ raw = a.repeat
+ try:
+ val = int(eval_expr(raw, ctx))
+ except Exception:
+ val = 0
+ self.is_oneshot = (val == 0)
+ self.repeat_total = max(1, val) if val > 0 else 1
+
+ # Apply any immediate actions (e.g. flip triggered by entering this animation)
+ if 'flip' in a.actions:
+ self.facing_right = not self.facing_right
+
+ def _pick_next(self, nexts: List[NextRef], cond: str) -> Optional[int]:
+ """Weighted random pick from nexts matching cond."""
+ # First try exact match
+ pool = [n for n in nexts if n.only == cond or n.only == '']
+ if not pool:
+ pool = nexts # fallback: ignore condition
+ if not pool:
+ return None
+ total = sum(n.probability for n in pool)
+ if total == 0:
+ return pool[0].anim_id
+ r = random.randint(0, total - 1)
+ cum = 0
+ for n in pool:
+ cum += n.probability
+ if r < cum:
+ return n.anim_id
+ return pool[-1].anim_id
+
+ def _transition(self, anim_id: Optional[int]):
+ if anim_id is not None:
+ self._init_anim(anim_id)
+
+ # ── floor update (call from main loop) ──
+
+ def update_floors(self, windows: List[Tuple[int,int,int,int]],
+ taskbar: Optional[Tuple[int,int,int,int]],
+ screen_w: int, screen_h: int):
+ self.screen_w = screen_w
+ self.screen_h = screen_h
+
+ # Collect all candidate floors (surfaces the sheep could land on)
+ all_floors = [(float(screen_h), COND_NONE)]
+
+ # Taskbar (treat as window)
+ if taskbar:
+ tx, ty, tw, th = taskbar
+ if tx < self.x + self.tile_w - 4 and tx + tw > self.x + 4:
+ all_floors.append((float(ty), COND_TASKBAR))
+
+ # Window tops – only windows that horizontally overlap the sheep
+ for (wx, wy, ww, wh) in windows:
+ if wx < self.x + self.tile_w - 4 and wx + ww > self.x + 4:
+ all_floors.append((float(wy), COND_WINDOW))
+
+ # Keep only floors at or below the sheep's current bottom position
+ # (floors above the sheep are irrelevant for landing)
+ sheep_bot = self.y + self.tile_h
+ self._floors = sorted(
+ [(fy, fc) for (fy, fc) in all_floors if fy >= sheep_bot - 8],
+ key=lambda f: f[0]
+ )
+ if not self._floors:
+ self._floors = [(float(screen_h), COND_NONE)]
+
+ # ── tick ──
+
+ def tick(self, dt_ms: float):
+ a = self.anim_defs.get(self.anim_id)
+ if not a or not a.frames:
+ self._transition(1)
+ return
+
+ interval = self._cur_interval()
+ vx, vy = self._cur_vel()
+
+ # vx is in "facing direction" units:
+ # facing left (facing_right=False): vx<0 = move left → screen dx = vx
+ # facing right (facing_right=True): vx<0 = move right → screen dx = -vx
+ dir_mul = -1.0 if self.facing_right else 1.0
+ sheep_bot_prev = self.y + self.tile_h
+
+ self.x += vx * dir_mul * dt_ms / interval
+ self.y += vy * dt_ms / interval
+ self.opacity = self._cur_opacity()
+
+ sheep_bot = self.y + self.tile_h
+
+ # ── floor / landing collision ──
+ # Find the nearest floor at or below the sheep
+ floor_hit_y: Optional[float] = None
+ floor_hit_fc: str = COND_NONE
+ for (fy, fc) in self._floors:
+ if sheep_bot >= fy - 1: # sheep has reached or passed this floor
+ floor_hit_y = fy
+ floor_hit_fc = fc
+ break # _floors is sorted ascending; first hit wins
+
+ if floor_hit_y is not None:
+ # Snap sheep to floor surface
+ self.y = floor_hit_y - self.tile_h
+ self.floor_cond = floor_hit_fc
+
+ # If we just LANDED (were above before, falling now) → landing animation
+ just_landed = (sheep_bot_prev < floor_hit_y - 1) and (vy > 0.3)
+ if just_landed and a.border_nexts:
+ nxt = self._pick_next(a.border_nexts, floor_hit_fc)
+ self._transition(nxt)
+ return
+ else:
+ # No floor under us
+ self.floor_cond = COND_NONE
+ # Gravity: walking/running animations trigger fall when airborne.
+ # 'only="none"' in gravity_nexts means "no special floor" = free fall.
+ if a.gravity_nexts:
+ nxt = self._pick_next(a.gravity_nexts, COND_NONE)
+ self._transition(nxt)
+ return
+
+ # ── left / right / top border check ──
+ if a.border_nexts:
+ hit, bcond = False, COND_NONE
+ # left border (sheep moving left, facing left)
+ if self.x <= 0 and not self.facing_right:
+ self.x = 0.0
+ hit, bcond = True, COND_NONE
+ # right border (sheep moving right, facing right)
+ elif self.x + self.tile_w >= self.screen_w and self.facing_right:
+ self.x = float(self.screen_w - self.tile_w)
+ hit, bcond = True, COND_NONE
+ # top border (going up)
+ if self.y <= 0 and vy < 0:
+ self.y = 0.0
+ hit, bcond = True, COND_HORIZ_UP
+ if hit:
+ nxt = self._pick_next(a.border_nexts, bcond)
+ self._transition(nxt)
+ return
+
+ # ── frame advance ──
+ self.tick_accum += dt_ms
+ while self.tick_accum >= self._cur_interval():
+ self.tick_accum -= self._cur_interval()
+ self.frame_idx += 1
+
+ if self.frame_idx >= len(a.frames):
+ self.seq_repeat += 1
+
+ done = self.is_oneshot or (self.seq_repeat >= self.repeat_total)
+ if done:
+ nxt = self._pick_next(a.seq_nexts, self.floor_cond)
+ self._transition(nxt)
+ return
+ else:
+ self.frame_idx = a.repeatfrom
+
+ def draw_info(self):
+ """Return (frame_tile_idx, x, y, facing_right, opacity)"""
+ a = self.anim_defs.get(self.anim_id)
+ if not a or not a.frames:
+ return 0, self.x, self.y, self.facing_right, self.opacity
+ fi = max(0, min(self.frame_idx, len(a.frames) - 1))
+ return a.frames[fi], self.x, self.y, self.facing_right, self.opacity
+
+
+# ─────────────────────── UFO ───────────────────────
+
+class UFO:
+ """Classic UFO abduction sequence – drawn programmatically."""
+
+ STATE_APPROACH = 'approach'
+ STATE_BEAM = 'beam'
+ STATE_LIFT = 'lift'
+ STATE_DEPART = 'depart'
+ STATE_DONE = 'done'
+
+ def __init__(self, sheep: Sheep, screen_w: int, screen_h: int):
+ self.sheep = sheep
+ self.screen_w = screen_w
+ self.screen_h = screen_h
+ self.w = 80; self.h = 40 # UFO body size
+ self.x = float(screen_w) # start off-screen right
+ self.y = float(screen_h * 0.12)
+ self.state = self.STATE_APPROACH
+ self.beam_alpha = 0.0
+ self.sheep_orig_y = sheep.y
+ self._timer = 0.0
+
+ def tick(self, dt_ms: float):
+ sheep = self.sheep
+ if self.state == self.STATE_APPROACH:
+ # Move left toward sheep
+ target_x = sheep.x - self.w / 2 + sheep.tile_w / 2
+ self.x -= 4.0 * dt_ms / 50
+ if self.x <= target_x:
+ self.x = target_x
+ self.state = self.STATE_BEAM
+ self._timer = 0.0
+
+ elif self.state == self.STATE_BEAM:
+ self._timer += dt_ms
+ self.beam_alpha = min(1.0, self._timer / 600.0)
+ if self._timer > 900:
+ self.state = self.STATE_LIFT
+ self.sheep_orig_y = sheep.y
+
+ elif self.state == self.STATE_LIFT:
+ speed = 3.0 * dt_ms / 50
+ sheep.y -= speed
+ sheep.x = self.x + self.w / 2 - sheep.tile_w / 2
+ if sheep.y + sheep.tile_h < self.y + self.h:
+ self.state = self.STATE_DEPART
+ self._timer = 0.0
+
+ elif self.state == self.STATE_DEPART:
+ self.x += 5.0 * dt_ms / 50
+ sheep.x = self.x + self.w / 2 - sheep.tile_w / 2
+ sheep.y = self.y + self.h / 2
+ self._timer += dt_ms
+ if self._timer > 400 and self.x > self.screen_w:
+ # Drop sheep back
+ sheep.x = max(50, min(self.screen_w - sheep.tile_w - 50,
+ self.screen_w * 0.3 + random.randint(-100, 100)))
+ sheep.y = -sheep.tile_h - 10
+ sheep._init_anim(5) # fall animation
+ self.state = self.STATE_DONE
+
+ @property
+ def done(self):
+ return self.state == self.STATE_DONE
+
+ def draw(self, cr: cairo.Context):
+ x, y = self.x, self.y
+ w, h = self.w, self.h
+
+ # Beam (trapezoid under UFO)
+ if self.state in (self.STATE_BEAM, self.STATE_LIFT):
+ sheep = self.sheep
+ beam_top_x = x + w * 0.3
+ beam_top_w = w * 0.4
+ beam_bot_x = sheep.x - 5
+ beam_bot_w = sheep.tile_w + 10
+ beam_bot_y = sheep.y + sheep.tile_h
+ beam_top_y = y + h
+
+ cr.save()
+ cr.set_source_rgba(0.8, 0.9, 1.0, 0.35 * self.beam_alpha)
+ cr.move_to(beam_top_x, beam_top_y)
+ cr.line_to(beam_top_x + beam_top_w, beam_top_y)
+ cr.line_to(beam_bot_x + beam_bot_w, beam_bot_y)
+ cr.line_to(beam_bot_x, beam_bot_y)
+ cr.close_path()
+ cr.fill()
+ cr.restore()
+
+ # Saucer body
+ cr.save()
+ cx, cy = x + w / 2, y + h * 0.6
+ # Disk
+ cr.set_source_rgba(0.7, 0.7, 0.8, 0.95)
+ cr.save()
+ cr.translate(cx, cy)
+ cr.scale(w / 2, h * 0.35)
+ cr.arc(0, 0, 1, 0, 2 * math.pi)
+ cr.restore()
+ cr.fill()
+ # Dome
+ cr.set_source_rgba(0.5, 0.8, 1.0, 0.85)
+ cr.save()
+ cr.translate(cx, y + h * 0.4)
+ cr.scale(w * 0.28, h * 0.45)
+ cr.arc(0, 0, 1, math.pi, 2 * math.pi)
+ cr.restore()
+ cr.fill()
+ # Lights
+ for i, col in enumerate([(1,.2,.2,1), (.2,1,.2,1), (.2,.2,1,1), (1,1,.2,1)]):
+ lx = x + w * 0.2 + i * w * 0.18
+ ly = cy
+ cr.set_source_rgba(*col)
+ cr.arc(lx, ly, 3, 0, 2 * math.pi)
+ cr.fill()
+ cr.restore()
+
+
+# ─────────────────────── Flower ───────────────────────
+
+class Flower:
+ """Flower sprite (anim 27) shown at the sheep's side during eat animation (26).
+ Loops anim 27 for the full duration the sheep stays in anim 26."""
+
+ def __init__(self, sheep: 'Sheep', flower_adef):
+ self.sheep = sheep
+ self._adef = flower_adef # AnimDef for anim 27, may be None
+ self._frame_idx = 0
+ self._tick_accum = 0.0
+ self.done = False
+
+ def tick(self, dt_ms: float):
+ if self.sheep.anim_id != 26:
+ self.done = True
+ return
+ if not self._adef or not self._adef.frames:
+ return
+ interval = self._adef.sint
+ self._tick_accum += dt_ms
+ while self._tick_accum >= interval:
+ self._tick_accum -= interval
+ self._frame_idx += 1
+ if self._frame_idx >= len(self._adef.frames):
+ self._frame_idx = 0 # loop while sheep keeps eating
+
+ def draw(self, cr: cairo.Context,
+ surface: cairo.ImageSurface,
+ tile_w: int, tile_h: int, tiles_x: int):
+ if self.done or not self._adef or not self._adef.frames:
+ return
+ sheep = self.sheep
+ fi = self._adef.frames[min(self._frame_idx, len(self._adef.frames) - 1)]
+ # Place the flower sprite in front of the sheep at the same ground level
+ if not sheep.facing_right:
+ fx = sheep.x - tile_w * 0.5
+ else:
+ fx = sheep.x + tile_w * 0.75
+ fy = sheep.y
+ draw_frame(cr, surface, tile_w, tile_h, tiles_x, fi, fx, fy)
+
+
+# ─────────────────────── Cairo sprite drawing ───────────────────────
+
+def draw_frame(cr: cairo.Context, surface: cairo.ImageSurface,
+ tile_w: int, tile_h: int, tiles_x: int,
+ frame_idx: int, dest_x: float, dest_y: float,
+ flip: bool = False, alpha: float = 1.0):
+ """Draw one tile from the sprite sheet at (dest_x, dest_y)."""
+ if frame_idx < 0:
+ return
+ col = frame_idx % tiles_x
+ row = frame_idx // tiles_x
+ sx = col * tile_w
+ sy = row * tile_h
+
+ cr.save()
+ if flip:
+ cr.translate(dest_x + tile_w, dest_y)
+ cr.scale(-1, 1)
+ cr.rectangle(0, 0, tile_w, tile_h)
+ cr.clip()
+ cr.set_source_surface(surface, -sx, -sy)
+ else:
+ cr.translate(dest_x, dest_y)
+ cr.rectangle(0, 0, tile_w, tile_h)
+ cr.clip()
+ cr.set_source_surface(surface, -sx, -sy)
+
+ cr.paint_with_alpha(alpha)
+ cr.restore()
+
+
+# ─────────────────────── main overlay ───────────────────────
+
+class SheepOverlay:
+ UFO_INTERVAL_S = 90 # seconds between UFO abductions (approx)
+
+ def __init__(self, xml_path: str, num_sheep: int = 1):
+ print("Loading assets…", flush=True)
+ self.sprite, self.tile_w, self.tile_h, self.tiles_x, \
+ self.anim_defs, self.spawns = parse_xml(xml_path)
+ print(f"Sprite: {self.sprite.width}×{self.sprite.height}, "
+ f"tiles: {self.tile_w}×{self.tile_h} ({self.tiles_x} per row), "
+ f"animations: {len(self.anim_defs)}", flush=True)
+
+ self.cairo_surface = self._pil_to_cairo(self.sprite)
+
+ # Patch fall-soft (9) and fall-hard (10) to occasionally boing on landing
+ soft = self.anim_defs.get(9)
+ if soft:
+ soft.seq_nexts = [NextRef(probability=15, only='', anim_id=8),
+ NextRef(probability=85, only='', anim_id=1)]
+ hard = self.anim_defs.get(10)
+ if hard:
+ hard.seq_nexts = [NextRef(probability=20, only='', anim_id=8),
+ NextRef(probability=80, only='', anim_id=1)]
+ self.wayfire = WayfireWindows()
+ self.sheep_list: List[Sheep] = []
+ self.ufos: List[UFO] = []
+ self.num_sheep = num_sheep
+ self.last_time = time.monotonic()
+ self._ufo_countdown = self.UFO_INTERVAL_S + random.randint(0, 30)
+
+ # Drag state
+ self._drag_sheep: Optional[Sheep] = None
+ self._drag_ox = 0.0
+ self._drag_oy = 0.0
+
+ # Extra features
+ self._flowers: List[Flower] = []
+ self._eat_countdown = random.uniform(30, 90) # seconds
+ self._hop_countdown = random.uniform(45, 90) # seconds
+ self._interact_cooldowns: Dict[Tuple[int,int], float] = {} # ms
+ self._frozen_sheep: Dict[int, float] = {} # id→remaining ms
+
+ self._build_window()
+
+ @staticmethod
+ def _pil_to_cairo(img: Image.Image) -> cairo.ImageSurface:
+ """Convert PIL RGBA image → cairo ImageSurface (ARGB32, premultiplied alpha)."""
+ import numpy as np
+ surf = cairo.ImageSurface(cairo.FORMAT_ARGB32, img.width, img.height)
+ # Cairo FORMAT_ARGB32 uses premultiplied alpha in BGRA memory order
+ arr = np.array(img, dtype=np.uint32)
+ r = arr[:, :, 0]; g = arr[:, :, 1]
+ b = arr[:, :, 2]; a = arr[:, :, 3]
+ # Premultiply: channel' = channel * alpha / 255
+ rp = (r * a) // 255
+ gp = (g * a) // 255
+ bp = (b * a) // 255
+ # Pack as ARGB32 native-endian (little-endian = BGRA bytes)
+ packed = (a << 24) | (rp << 16) | (gp << 8) | bp
+ surf.get_data()[:] = packed.astype(np.uint32).tobytes()
+ surf.mark_dirty()
+ return surf
+
+ def _build_window(self):
+ self.win = Gtk.Window(type=Gtk.WindowType.TOPLEVEL)
+ self.win.set_app_paintable(True)
+ screen = self.win.get_screen()
+ visual = screen.get_rgba_visual()
+ if visual:
+ self.win.set_visual(visual)
+
+ # layer-shell: full-screen overlay
+ if not GtkLayerShell.is_supported():
+ print("ERROR: Your Wayland compositor does not support wlr-layer-shell.",
+ file=sys.stderr)
+ print(" eSheep requires Wayfire, Sway, Hyprland, or another wlroots compositor.",
+ file=sys.stderr)
+ sys.exit(1)
+ GtkLayerShell.init_for_window(self.win)
+ GtkLayerShell.set_layer(self.win, GtkLayerShell.Layer.OVERLAY)
+ GtkLayerShell.set_exclusive_zone(self.win, -1)
+ for edge in (GtkLayerShell.Edge.LEFT, GtkLayerShell.Edge.RIGHT,
+ GtkLayerShell.Edge.TOP, GtkLayerShell.Edge.BOTTOM):
+ GtkLayerShell.set_anchor(self.win, edge, True)
+ # Use KeyboardMode enum if available (gtk-layer-shell 0.7+), else bool
+ try:
+ GtkLayerShell.set_keyboard_mode(self.win, GtkLayerShell.KeyboardMode.NONE)
+ except AttributeError:
+ GtkLayerShell.set_keyboard_interactivity(self.win, False)
+
+ self.da = Gtk.DrawingArea()
+ self.da.connect('draw', self._on_draw)
+
+ # Mouse events for click-to-drag – must be set on the DrawingArea
+ # (it has its own GdkWindow that fills the overlay and receives events first)
+ # and BEFORE show_all() so the GdkWindow is realized with these masks.
+ self.da.add_events(
+ Gdk.EventMask.BUTTON_PRESS_MASK |
+ Gdk.EventMask.BUTTON_RELEASE_MASK |
+ Gdk.EventMask.POINTER_MOTION_MASK
+ )
+ self.da.connect('button-press-event', self._on_button_press)
+ self.da.connect('button-release-event', self._on_button_release)
+ self.da.connect('motion-notify-event', self._on_motion)
+
+ self.win.add(self.da)
+ self.win.connect('destroy', Gtk.main_quit)
+ self.win.show_all()
+
+ # Input region starts as pass-through; _update_input_region() refines it each tick
+ self.win.input_shape_combine_region(cairo.Region())
+
+ # Screen size
+ display = Gdk.Display.get_default()
+ mon = display.get_primary_monitor() or display.get_monitor(0)
+ geo = mon.get_geometry()
+ self.screen_w = geo.width
+ self.screen_h = geo.height
+ print(f"Screen: {self.screen_w}×{self.screen_h}", flush=True)
+
+ # Spawn initial sheep
+ for _ in range(self.num_sheep):
+ self._spawn_sheep()
+
+ GLib.timeout_add(50, self._tick) # 20 fps game loop
+
+ # ── click-to-drag ──
+
+ def _update_input_region(self):
+ """Set the input region on both the window and the DrawingArea's GdkWindow
+ to cover only the sheep sprites. While dragging, use the full screen so
+ we don't lose the pointer mid-drag."""
+ if self._drag_sheep is not None:
+ # Full screen while dragging – catches pointer everywhere
+ region = cairo.Region(cairo.RectangleInt(
+ 0, 0, self.screen_w, self.screen_h))
+ else:
+ region = cairo.Region()
+ pad = 4 # extra px around sprite for easier click targeting
+ for sheep in self.sheep_list:
+ rx = int(sheep.x) - pad
+ ry = int(sheep.y) - pad
+ rw = self.tile_w + pad * 2
+ rh = self.tile_h + pad * 2
+ region.union(cairo.RectangleInt(rx, ry, rw, rh))
+ # Apply to both the GtkWindow and the DrawingArea's own GdkWindow
+ self.win.input_shape_combine_region(region)
+ gdk_win = self.da.get_window()
+ if gdk_win is not None:
+ gdk_win.input_shape_combine_region(region, 0, 0)
+
+ def _sheep_at(self, mx: float, my: float) -> Optional[Sheep]:
+ """Return the sheep whose bounding rect contains (mx, my), or None."""
+ for sheep in reversed(self.sheep_list): # topmost first
+ if (sheep.x <= mx <= sheep.x + self.tile_w and
+ sheep.y <= my <= sheep.y + self.tile_h):
+ return sheep
+ return None
+
+ def _on_button_press(self, _widget, event):
+ if event.button != 1:
+ return False
+ sheep = self._sheep_at(event.x, event.y)
+ if sheep is None:
+ return False
+ # Don't grab sheep currently being abducted
+ if any(u.sheep is sheep for u in self.ufos):
+ return False
+ self._drag_sheep = sheep
+ self._drag_ox = event.x - sheep.x
+ self._drag_oy = event.y - sheep.y
+ self._update_input_region()
+ print(f"Grabbed sheep at ({sheep.x:.0f}, {sheep.y:.0f})", flush=True)
+ return True
+
+ def _on_button_release(self, _widget, event):
+ if event.button != 1 or self._drag_sheep is None:
+ return False
+ sheep = self._drag_sheep
+ self._drag_sheep = None
+ self._update_input_region()
+ # Let the sheep fall from wherever it was dropped
+ fall_id = None
+ for aid, adef in self.anim_defs.items():
+ if 'fall' in adef.name.lower():
+ fall_id = aid
+ break
+ if fall_id is None:
+ fall_id = 5
+ sheep._init_anim(fall_id)
+ print(f"Released sheep at ({sheep.x:.0f}, {sheep.y:.0f}) → fall", flush=True)
+ return True
+
+ def _on_motion(self, _widget, event):
+ if self._drag_sheep is None:
+ return False
+ sheep = self._drag_sheep
+ sheep.x = event.x - self._drag_ox
+ sheep.y = event.y - self._drag_oy
+ # Clamp to screen
+ sheep.x = max(0, min(sheep.x, self.screen_w - self.tile_w))
+ sheep.y = max(0, min(sheep.y, self.screen_h - self.tile_h))
+ return True
+
+ # ── extra features ──
+
+ def _check_interactions(self, dt_ms: float):
+ """Bump and greet walking sheep that get too close."""
+ busy = {self._drag_sheep} | {u.sheep for u in self.ufos}
+ active = [s for s in self.sheep_list if s not in busy]
+
+ for i, a in enumerate(active):
+ for b in active[i + 1:]:
+ key = (min(id(a), id(b)), max(id(a), id(b)))
+ cd = self._interact_cooldowns.get(key, 0.0)
+ if cd > 0:
+ self._interact_cooldowns[key] = cd - dt_ms
+ continue
+
+ same_level = abs(a.y - b.y) < self.tile_h * 0.7
+ hdist = abs(a.x - b.x)
+ overlapping = hdist < self.tile_w * 0.6 and same_level
+
+ if overlapping:
+ # Bump: boing and reverse
+ a.facing_right = not a.facing_right
+ b.facing_right = not b.facing_right
+ a._init_anim(8)
+ b._init_anim(8)
+ self._interact_cooldowns[key] = 7000.0
+ print("Sheep bumped into each other!", flush=True)
+
+ elif (hdist < self.tile_w * 2.2 and same_level
+ and id(a) not in self._frozen_sheep
+ and id(b) not in self._frozen_sheep):
+ # Approaching each other? (each facing toward the other)
+ a_toward = (a.x < b.x) == a.facing_right
+ b_toward = (b.x < a.x) == b.facing_right
+ if a_toward and b_toward:
+ # Greet: face each other and freeze briefly
+ a.facing_right = a.x < b.x
+ b.facing_right = b.x < a.x
+ self._frozen_sheep[id(a)] = 2200.0
+ self._frozen_sheep[id(b)] = 2200.0
+ self._interact_cooldowns[key] = 20000.0
+ print("Sheep greeting!", flush=True)
+
+ def _spawn_sheep(self):
+ total = sum(s.probability for s in self.spawns)
+ r = random.randint(0, total - 1)
+ cum = 0
+ chosen = self.spawns[0]
+ for s in self.spawns:
+ cum += s.probability
+ if r < cum:
+ chosen = s
+ break
+ sheep = Sheep(
+ self.anim_defs, self.tile_w, self.tile_h,
+ self.screen_w, self.screen_h, chosen
+ )
+ self.sheep_list.append(sheep)
+ print(f"Spawned sheep at ({sheep.x:.0f}, {sheep.y:.0f}) "
+ f"anim={self.anim_defs.get(sheep.anim_id, type('',(),{'name':'?'})()).name}",
+ flush=True)
+
+ def _tick(self) -> bool:
+ now = time.monotonic()
+ dt = min((now - self.last_time) * 1000.0, 100.0)
+ self.last_time = now
+
+ windows = self.wayfire.get_windows()
+ taskbar = self.wayfire.get_taskbar()
+
+ # Update all sheep (skip dragged; respect greeting freeze)
+ for sheep in self.sheep_list:
+ if sheep is self._drag_sheep:
+ continue
+ frozen_ms = self._frozen_sheep.get(id(sheep), 0.0)
+ if frozen_ms > 0:
+ self._frozen_sheep[id(sheep)] = frozen_ms - dt
+ continue
+ elif id(sheep) in self._frozen_sheep:
+ del self._frozen_sheep[id(sheep)]
+ if not any(u.sheep is sheep for u in self.ufos):
+ sheep.update_floors(windows, taskbar, self.screen_w, self.screen_h)
+ sheep.tick(dt)
+
+ # Keep the input region hugging the sheep positions
+ self._update_input_region()
+
+ # ── flowers ──
+ self._eat_countdown -= dt / 1000.0
+ if self._eat_countdown <= 0:
+ self._eat_countdown = random.uniform(60, 120)
+ candidates = [s for s in self.sheep_list
+ if s is not self._drag_sheep
+ and s.floor_cond == COND_NONE
+ and not any(u.sheep is s for u in self.ufos)
+ and s.anim_id not in (5, 6, 13, 26)]
+ if candidates:
+ chosen = random.choice(candidates)
+ chosen._init_anim(26)
+ self._flowers.append(Flower(chosen, self.anim_defs.get(27)))
+ print("Sheep is eating flowers!", flush=True)
+ for f in self._flowers:
+ f.tick(dt)
+ self._flowers = [f for f in self._flowers if not f.done]
+
+ # ── small hop (simulates walking off a low ledge → soft landing → occasional boing) ──
+ self._hop_countdown -= dt / 1000.0
+ if self._hop_countdown <= 0:
+ self._hop_countdown = random.uniform(45, 90)
+ hop_candidates = [s for s in self.sheep_list
+ if s is not self._drag_sheep
+ and s.floor_cond == COND_NONE
+ and s.anim_id in (1, 7, 35, 36, 49, 50)
+ and not any(u.sheep is s for u in self.ufos)]
+ if hop_candidates:
+ chosen = random.choice(hop_candidates)
+ chosen.y -= 25
+ chosen._init_anim(5)
+ print("Sheep small hop!", flush=True)
+
+ # ── bumps & greetings ──
+ self._check_interactions(dt)
+
+ # UFO timer
+ self._ufo_countdown -= dt / 1000.0
+ if self._ufo_countdown <= 0 and self.sheep_list:
+ target = random.choice(self.sheep_list)
+ if not any(u.sheep is target for u in self.ufos):
+ self.ufos.append(UFO(target, self.screen_w, self.screen_h))
+ print("UFO abduction started!", flush=True)
+ self._ufo_countdown = self.UFO_INTERVAL_S + random.randint(-20, 30)
+
+ # Tick UFOs
+ for ufo in self.ufos:
+ ufo.tick(dt)
+ self.ufos = [u for u in self.ufos if not u.done]
+
+ self.da.queue_draw()
+ return True # keep timer
+
+ def _on_draw(self, _widget, cr: cairo.Context):
+ # Clear to fully transparent
+ cr.set_operator(cairo.OPERATOR_CLEAR)
+ cr.paint()
+ cr.set_operator(cairo.OPERATOR_OVER)
+
+ # Draw UFO beams first (behind sheep)
+ for ufo in self.ufos:
+ ufo.draw(cr)
+
+ # Draw flowers (behind sheep)
+ for flower in self._flowers:
+ flower.draw(cr, self.cairo_surface, self.tile_w, self.tile_h, self.tiles_x)
+
+ # Draw sheep
+ for sheep in self.sheep_list:
+ frame, sx, sy, flip, alpha = sheep.draw_info()
+ draw_frame(cr, self.cairo_surface,
+ self.tile_w, self.tile_h, self.tiles_x,
+ frame, sx, sy, flip, alpha)
+
+ # Draw UFO bodies (on top of sheep)
+ for ufo in self.ufos:
+ if ufo.state != ufo.STATE_BEAM:
+ ufo.draw(cr)
+
+ def run(self):
+ Gtk.main()
+
+
+# ─────────────────────── entry point ───────────────────────
+
+def _daemonize():
+ """Re-exec as a detached subprocess (avoids fork-in-multithreaded-process issues)."""
+ import subprocess
+ argv = [a for a in sys.argv if a != '--daemon']
+ subprocess.Popen(
+ [sys.executable] + argv,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ start_new_session=True,
+ )
+ sys.exit(0)
+
+
+def main():
+ import argparse, signal, subprocess
+ p = argparse.ArgumentParser(description='eSheep for Wayland')
+ p.add_argument('--xml', default=os.path.join(os.path.dirname(__file__), 'animations.xml'),
+ help='Path to animations.xml')
+ p.add_argument('--count', type=int, default=1, help='Number of sheep')
+ p.add_argument('--daemon', action='store_true',
+ help='Detach from terminal and run in the background')
+ p.add_argument('--exit', action='store_true',
+ help='Kill any running eSheep instances and quit')
+ args = p.parse_args()
+
+ if args.exit:
+ my_pid = os.getpid()
+ result = subprocess.run(['pgrep', '-f', 'esheep.py'],
+ capture_output=True, text=True)
+ pids = [int(p) for p in result.stdout.split()
+ if p.strip() and int(p.strip()) != my_pid]
+ if pids:
+ for pid in pids:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+ print(f"Stopped {len(pids)} eSheep process(es).")
+ else:
+ print("No running eSheep found.")
+ sys.exit(0)
+
+ if not os.path.exists(args.xml):
+ print(f"animations.xml not found: {args.xml}", file=sys.stderr)
+ sys.exit(1)
+
+ if args.daemon:
+ _daemonize()
+
+ SheepOverlay(args.xml, args.count).run()
+
+if __name__ == '__main__':
+ main()