Click on any #NNNN task number in the briefing to load its full detail view. Uses SGR mouse protocol for accurate click position, strips ANSI to find task IDs in visible text. Tolerates clicks within 5 chars of the # symbol. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
817 lines
27 KiB
Python
Executable File
817 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Nodie Briefing — terminal display with interactive mode
|
||
Usage: briefing [7|30] [client]
|
||
briefing — 7-day daily briefing
|
||
briefing 30 — 30-day weekly review
|
||
briefing 7 OHS — 7-day, OHS only
|
||
"""
|
||
|
||
import json, sys, subprocess, re, os, io, tty, termios, struct, fcntl, signal
|
||
from contextlib import redirect_stdout
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
API_KEY = "b666180ef00fffd23b9a606a4bd8deddea274d2e69fab975583d3d966fbfafed"
|
||
BASE_URL = "https://teacup.nodie.co.za/api/v1"
|
||
|
||
# ANSI
|
||
BOLD = "\033[1m"
|
||
DIM = "\033[2m"
|
||
ITALIC = "\033[3m"
|
||
UNDER = "\033[4m"
|
||
RESET = "\033[0m"
|
||
WHITE = "\033[97m"
|
||
GREY = "\033[90m"
|
||
CYAN = "\033[36m"
|
||
GREEN = "\033[32m"
|
||
YELLOW = "\033[33m"
|
||
RED = "\033[31m"
|
||
MAGENTA = "\033[35m"
|
||
BG_YELLOW = "\033[43m"
|
||
BG_GREEN = "\033[42m"
|
||
BLACK = "\033[30m"
|
||
|
||
SAST = timezone(timedelta(hours=2))
|
||
|
||
# Priority colors
|
||
PRI_COLOR = {
|
||
'critical': RED,
|
||
'high': RED,
|
||
'medium': YELLOW,
|
||
'low': GREY,
|
||
None: GREY,
|
||
'': GREY,
|
||
}
|
||
|
||
# Status colors
|
||
STATUS_COLOR = {
|
||
'in_progress': GREEN,
|
||
'todo': CYAN,
|
||
'hold_to_discuss': MAGENTA,
|
||
'waitingfortesting': YELLOW,
|
||
'willdemo': YELLOW,
|
||
'done': DIM,
|
||
'completed': DIM,
|
||
}
|
||
|
||
|
||
def days_ago(created_at_str):
|
||
"""Return human-friendly age string."""
|
||
if not created_at_str:
|
||
return ""
|
||
try:
|
||
created = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=SAST)
|
||
now = datetime.now(SAST)
|
||
delta = now - created
|
||
if delta.days == 0:
|
||
return "today"
|
||
elif delta.days == 1:
|
||
return "1d ago"
|
||
elif delta.days < 7:
|
||
return f"{delta.days}d ago"
|
||
elif delta.days < 30:
|
||
weeks = delta.days // 7
|
||
return f"{weeks}w ago"
|
||
else:
|
||
months = delta.days // 30
|
||
return f"{months}mo ago"
|
||
except (ValueError, TypeError):
|
||
return ""
|
||
|
||
|
||
def is_new(created_at_str):
|
||
"""True if created today."""
|
||
if not created_at_str:
|
||
return False
|
||
try:
|
||
created = datetime.strptime(created_at_str, "%Y-%m-%d %H:%M:%S").replace(tzinfo=SAST)
|
||
now = datetime.now(SAST)
|
||
return created.date() == now.date()
|
||
except (ValueError, TypeError):
|
||
return False
|
||
|
||
|
||
def format_priority(pri):
|
||
color = PRI_COLOR.get(pri, GREY)
|
||
return f"{color}{pri or 'unset'}{RESET}"
|
||
|
||
|
||
def format_status(status):
|
||
color = STATUS_COLOR.get(status, GREY)
|
||
return f"{color}{status or 'unset'}{RESET}"
|
||
|
||
|
||
def new_badge():
|
||
return f" {BG_YELLOW}{BLACK} NEW {RESET}"
|
||
|
||
|
||
def print_task(task, indent=" "):
|
||
"""Print a single task line with metadata."""
|
||
nid = task['nodesid']
|
||
name = task['node_name']
|
||
status = task.get('status')
|
||
priority = task.get('priority')
|
||
created = task.get('created_at', '')
|
||
server = task.get('server')
|
||
assigned = task.get('on_behalf_of')
|
||
age = days_ago(created)
|
||
is_today = is_new(created)
|
||
|
||
# Task number + name
|
||
badge = new_badge() if is_today else ""
|
||
line = f"{indent}{YELLOW}#{nid}{RESET} {BOLD}{WHITE}{name}{RESET}{badge}"
|
||
print(line)
|
||
|
||
# Meta line
|
||
parts = [format_status(status), format_priority(priority)]
|
||
if age:
|
||
age_color = GREEN if is_today else GREY
|
||
parts.append(f"{age_color}{age}{RESET}")
|
||
if server:
|
||
parts.append(f"{CYAN}{server}{RESET}")
|
||
if assigned:
|
||
# Shorten email format
|
||
short = assigned.split(' (')[0] if ' (' in assigned else assigned
|
||
parts.append(f"{DIM}For: {short}{RESET}")
|
||
|
||
print(f"{indent} {' | '.join(parts)}")
|
||
|
||
# Children summary
|
||
cs = task.get('children_summary')
|
||
if cs and cs.get('total', 0) > 0:
|
||
child_parts = []
|
||
for key in ['in_progress', 'todo', 'waitingfortesting', 'done', 'completed']:
|
||
if cs.get(key, 0) > 0:
|
||
child_parts.append(f"{cs[key]} {key}")
|
||
if child_parts:
|
||
print(f"{indent} {GREY}Subtasks: {', '.join(child_parts)}{RESET}")
|
||
|
||
# Latest note preview
|
||
note = task.get('latest_note')
|
||
if note:
|
||
text = note.get('content', '')
|
||
if len(text) > 100:
|
||
text = text[:100] + "..."
|
||
print(f"{indent} {DIM}{ITALIC}{text}{RESET}")
|
||
|
||
|
||
def render_calendar(cal_data):
|
||
"""Render calendar section from raw data."""
|
||
if not cal_data:
|
||
return
|
||
|
||
print(f"\n{BOLD}{CYAN}{'─' * 60}{RESET}")
|
||
print(f"{BOLD}{CYAN}Calendar{RESET}")
|
||
print(f"{BOLD}{CYAN}{'─' * 60}{RESET}")
|
||
|
||
now = datetime.now(SAST)
|
||
|
||
for date_str, events in cal_data.items():
|
||
try:
|
||
dt = datetime.strptime(date_str, "%Y-%m-%d")
|
||
if dt.date() == now.date():
|
||
label = f"{dt.strftime('%a %d %b')} (today)"
|
||
elif dt.date() == (now + timedelta(days=1)).date():
|
||
label = f"{dt.strftime('%a %d %b')} (tomorrow)"
|
||
else:
|
||
label = dt.strftime('%a %d %b')
|
||
except ValueError:
|
||
label = date_str
|
||
|
||
print(f"\n {BOLD}{WHITE}{label}{RESET}")
|
||
|
||
for ev in events:
|
||
start = ev.get('start_datetime', '')
|
||
end = ev.get('end_datetime', '')
|
||
title = ev.get('title', '')
|
||
|
||
# Format times
|
||
try:
|
||
s = datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
|
||
e = datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
|
||
time_str = f"{s.strftime('%H:%M')}–{e.strftime('%H:%M')}"
|
||
|
||
# Status marker
|
||
if s.replace(tzinfo=SAST) < now < e.replace(tzinfo=SAST):
|
||
marker = f" {BG_GREEN}{BLACK} NOW {RESET}"
|
||
elif e.replace(tzinfo=SAST) < now:
|
||
marker = f" {GREY}[DONE]{RESET}"
|
||
elif s.replace(tzinfo=SAST) > now and (s.replace(tzinfo=SAST) - now).seconds < 3600:
|
||
marker = f" {CYAN}[NEXT]{RESET}"
|
||
else:
|
||
marker = ""
|
||
except ValueError:
|
||
time_str = start
|
||
marker = ""
|
||
|
||
print(f" {DIM}{time_str}{RESET} {title}{marker}")
|
||
|
||
|
||
def render_tasks(tasks):
|
||
"""Render tasks grouped by category, with NEW section first."""
|
||
if not tasks:
|
||
return
|
||
|
||
# Separate new tasks
|
||
new_tasks = [t for t in tasks if t.get('category') == 'actionable' and is_new(t.get('created_at', ''))]
|
||
actionable = [t for t in tasks if t.get('category') == 'actionable' and not is_new(t.get('created_at', ''))]
|
||
waiting = [t for t in tasks if t.get('category') == 'waiting']
|
||
done = [t for t in tasks if t.get('category') == 'done']
|
||
|
||
# New today
|
||
if new_tasks:
|
||
print(f"\n{BOLD}{RED}{'─' * 60}{RESET}")
|
||
print(f"{BOLD}{RED}New Today ({len(new_tasks)}){RESET}")
|
||
print(f"{BOLD}{RED}{'─' * 60}{RESET}")
|
||
by_client = group_by_client(new_tasks)
|
||
for client, projects in by_client.items():
|
||
print(f"\n {BOLD}{WHITE}{client}{RESET}")
|
||
for project, ptasks in projects.items():
|
||
if project:
|
||
print(f" {BOLD}{GREY}{project}{RESET}")
|
||
for t in ptasks:
|
||
print_task(t, " ")
|
||
|
||
# Actionable
|
||
if actionable:
|
||
print(f"\n{BOLD}{CYAN}{'─' * 60}{RESET}")
|
||
print(f"{BOLD}{CYAN}Needs Work ({len(actionable)}){RESET}")
|
||
print(f"{BOLD}{CYAN}{'─' * 60}{RESET}")
|
||
by_client = group_by_client(actionable)
|
||
for client, projects in by_client.items():
|
||
print(f"\n {BOLD}{WHITE}{client}{RESET}")
|
||
for project, ptasks in projects.items():
|
||
if project:
|
||
print(f" {BOLD}{GREY}{project}{RESET}")
|
||
for t in ptasks:
|
||
print_task(t, " ")
|
||
|
||
# Waiting
|
||
if waiting:
|
||
print(f"\n{BOLD}{YELLOW}{'─' * 60}{RESET}")
|
||
print(f"{BOLD}{YELLOW}Waiting on Others ({len(waiting)}){RESET}")
|
||
print(f"{BOLD}{YELLOW}{'─' * 60}{RESET}")
|
||
by_client = group_by_client(waiting)
|
||
for client, projects in by_client.items():
|
||
print(f"\n {BOLD}{WHITE}{client}{RESET}")
|
||
for project, ptasks in projects.items():
|
||
if project:
|
||
print(f" {BOLD}{GREY}{project}{RESET}")
|
||
for t in ptasks:
|
||
print_task(t, " ")
|
||
|
||
# Done
|
||
if done:
|
||
print(f"\n{BOLD}{GREEN}{'─' * 60}{RESET}")
|
||
print(f"{BOLD}{GREEN}Recently Completed ({len(done)}){RESET}")
|
||
print(f"{BOLD}{GREEN}{'─' * 60}{RESET}")
|
||
by_client = group_by_client(done)
|
||
for client, projects in by_client.items():
|
||
print(f"\n {BOLD}{WHITE}{client}{RESET}")
|
||
for project, ptasks in projects.items():
|
||
if project:
|
||
print(f" {BOLD}{GREY}{project}{RESET}")
|
||
for t in ptasks:
|
||
print_task(t, " ")
|
||
|
||
|
||
def group_by_client(tasks):
|
||
"""Group tasks by client > project, preserving order."""
|
||
result = {}
|
||
for t in tasks:
|
||
client = t.get('client') or 'Unknown'
|
||
project = t.get('project') or ''
|
||
if client not in result:
|
||
result[client] = {}
|
||
if project not in result[client]:
|
||
result[client][project] = []
|
||
result[client][project].append(t)
|
||
return result
|
||
|
||
|
||
def render_commits(commits):
|
||
"""Render git commit summary."""
|
||
if not commits:
|
||
return
|
||
|
||
total = sum(len(c) for c in commits.values())
|
||
print(f"\n{BOLD}{CYAN}{'─' * 60}{RESET}")
|
||
print(f"{BOLD}{CYAN}Commits ({total}){RESET}")
|
||
print(f"{BOLD}{CYAN}{'─' * 60}{RESET}")
|
||
|
||
for repo, repo_commits in commits.items():
|
||
print(f"\n {BOLD}{WHITE}{repo}{RESET} {GREY}({len(repo_commits)}){RESET}")
|
||
for c in repo_commits[:5]: # Show max 5 per repo
|
||
sha = c.get('sha', '')[:7]
|
||
date = c.get('date', '')[:10]
|
||
msg = c.get('message', '').split('\n')[0]
|
||
if len(msg) > 70:
|
||
msg = msg[:70] + "..."
|
||
print(f" {GREY}{sha} {date}{RESET} {msg}")
|
||
if len(repo_commits) > 5:
|
||
print(f" {DIM}... +{len(repo_commits) - 5} more{RESET}")
|
||
|
||
|
||
def render_summary(summary, range_days):
|
||
"""Render the summary header."""
|
||
now = datetime.now(SAST)
|
||
print(f"\n{BOLD}{WHITE}{now.strftime('%a %d %b, %H:%M')}{RESET}")
|
||
|
||
a = summary.get('actionable', 0)
|
||
w = summary.get('waiting', 0)
|
||
b = summary.get('blocked', 0)
|
||
d = summary.get('done', 0)
|
||
m = summary.get('meetings_today', 0)
|
||
|
||
parts = []
|
||
if a: parts.append(f"{CYAN}{a} actionable{RESET}")
|
||
if w: parts.append(f"{YELLOW}{w} waiting{RESET}")
|
||
if b: parts.append(f"{RED}{b} blocked{RESET}")
|
||
if d: parts.append(f"{GREEN}{d} done{RESET}")
|
||
print(f" {' | '.join(parts)} — {range_days}-day window")
|
||
if m:
|
||
print(f" {MAGENTA}{m} meeting{'s' if m != 1 else ''} today{RESET}")
|
||
|
||
|
||
def fetch_task_detail(nid):
|
||
"""Fetch full task detail for interactive mode."""
|
||
result = subprocess.run(
|
||
["curl", "-s", "-H", f"Authorization: Bearer {API_KEY}",
|
||
f"{BASE_URL}/nodes/{nid}"],
|
||
capture_output=True, text=True
|
||
)
|
||
try:
|
||
data = json.loads(result.stdout)
|
||
if data.get('success'):
|
||
return data['data']
|
||
except (json.JSONDecodeError, KeyError):
|
||
pass
|
||
return None
|
||
|
||
|
||
def show_task_detail(nid):
|
||
"""Show full task detail in interactive mode."""
|
||
print(f"\n {DIM}Fetching #{nid}...{RESET}")
|
||
data = fetch_task_detail(nid)
|
||
if not data:
|
||
print(f" {RED}Could not fetch task #{nid}{RESET}")
|
||
return
|
||
|
||
node = data.get('node', {})
|
||
details = data.get('details', {})
|
||
bc = data.get('breadcrumb', {})
|
||
children = data.get('children', [])
|
||
|
||
# Header
|
||
print(f"\n {BOLD}{WHITE}#{node.get('nodesid')} — {node.get('node_name')}{RESET}")
|
||
print(f" {GREY}{'─' * 50}{RESET}")
|
||
|
||
# Breadcrumb
|
||
bc_parts = []
|
||
if bc.get('client'):
|
||
bc_parts.append(bc['client'].get('node_name', ''))
|
||
if bc.get('project'):
|
||
bc_parts.append(bc['project'].get('node_name', ''))
|
||
if bc.get('category'):
|
||
bc_parts.append(bc['category'].get('node_name', ''))
|
||
if bc_parts:
|
||
print(f" {DIM}{' > '.join(bc_parts)}{RESET}")
|
||
|
||
# Dates
|
||
created = node.get('created_at', '')
|
||
updated = node.get('updated_at', '')
|
||
created_by = node.get('created_by', '')
|
||
age = days_ago(created)
|
||
print(f" {DIM}Created: {created} ({age}){' by ' + created_by if created_by else ''}{RESET}")
|
||
if updated != created:
|
||
print(f" {DIM}Updated: {updated} ({days_ago(updated)}){RESET}")
|
||
|
||
# Details
|
||
if isinstance(details, dict):
|
||
status = details.get('status') or details.get('taskStatus') or details.get('subtaskStatus')
|
||
priority = details.get('priority') or details.get('taskPriority') or details.get('subtaskPriority')
|
||
server = details.get('server')
|
||
assigned = details.get('on_behalf_of')
|
||
completion = details.get('completion') or details.get('taskCompletion') or details.get('subtaskCompletion')
|
||
|
||
meta = []
|
||
if status: meta.append(f"Status: {format_status(status)}")
|
||
if priority: meta.append(f"Priority: {format_priority(priority)}")
|
||
if completion and completion != '0': meta.append(f"Completion: {completion}%")
|
||
if server: meta.append(f"Server: {CYAN}{server}{RESET}")
|
||
if assigned: meta.append(f"For: {assigned}")
|
||
for m in meta:
|
||
print(f" {m}")
|
||
|
||
# Description
|
||
desc = details.get('description')
|
||
if desc:
|
||
print(f"\n {WHITE}{desc}{RESET}")
|
||
|
||
# Note content (from ticket)
|
||
note = details.get('note_content')
|
||
if note:
|
||
# Strip HTML
|
||
clean = re.sub(r'<[^>]+>', ' ', note)
|
||
clean = re.sub(r'\s+', ' ', clean).strip()
|
||
if len(clean) > 500:
|
||
clean = clean[:500] + "..."
|
||
print(f"\n {DIM}{clean}{RESET}")
|
||
|
||
# Children
|
||
if children:
|
||
subtasks = [c for c in children if c.get('node_typesid') in (4, 5)]
|
||
notes = [c for c in children if c.get('node_typesid') == 6]
|
||
|
||
if subtasks:
|
||
print(f"\n {BOLD}Subtasks ({len(subtasks)}):{RESET}")
|
||
for st in subtasks:
|
||
st_details = st.get('details', {})
|
||
st_status = ''
|
||
if isinstance(st_details, dict):
|
||
st_status = st_details.get('status') or st_details.get('subtaskStatus') or ''
|
||
print(f" {YELLOW}#{st['nodesid']}{RESET} {st['node_name']} {GREY}[{st_status}]{RESET}")
|
||
|
||
if notes:
|
||
print(f"\n {BOLD}Notes ({len(notes)}):{RESET}")
|
||
for n in notes[-3:]: # Show last 3 notes
|
||
n_details = n.get('details', {})
|
||
content = ''
|
||
if isinstance(n_details, dict):
|
||
content = n_details.get('note_content') or n_details.get('description') or ''
|
||
content = re.sub(r'<[^>]+>', ' ', content)
|
||
content = re.sub(r'\s+', ' ', content).strip()
|
||
if len(content) > 200:
|
||
content = content[:200] + "..."
|
||
n_age = days_ago(n.get('created_at', ''))
|
||
print(f" {GREY}{n.get('created_at', '')[:10]} ({n_age}){RESET}")
|
||
print(f" {DIM}{content}{RESET}")
|
||
|
||
print(f"\n {DIM}URL: https://teacup.nodie.co.za/?sn={node.get('nodesid')}{RESET}")
|
||
|
||
|
||
def fetch_briefing_content(range_days, client):
|
||
"""Fetch briefing data from API and render to string. Returns (content, tasks)."""
|
||
url = f"{BASE_URL}/briefing?range={range_days}"
|
||
if client:
|
||
url += f"&client={client}"
|
||
|
||
result = subprocess.run(
|
||
["curl", "-s", "-H", f"Authorization: Bearer {API_KEY}", url],
|
||
capture_output=True, text=True
|
||
)
|
||
|
||
if not result.stdout.strip():
|
||
return f"{RED}Empty response from API{RESET}\n", []
|
||
|
||
try:
|
||
data = json.loads(result.stdout)
|
||
except json.JSONDecodeError:
|
||
return f"{RED}Invalid JSON response{RESET}\n", []
|
||
|
||
if not data.get("success"):
|
||
return f"{RED}API error: {data.get('message', 'Unknown')}{RESET}\n", []
|
||
|
||
raw = data["data"]
|
||
|
||
buf = io.StringIO()
|
||
with redirect_stdout(buf):
|
||
render_summary(raw.get('summary', {}), int(range_days))
|
||
render_calendar(raw.get('calendar', {}))
|
||
render_tasks(raw.get('tasks', []))
|
||
render_commits(raw.get('git_commits', {}))
|
||
|
||
return buf.getvalue(), raw.get('tasks', [])
|
||
|
||
|
||
def fetch_task_content(nid):
|
||
"""Fetch task detail and render to string."""
|
||
buf = io.StringIO()
|
||
with redirect_stdout(buf):
|
||
show_task_detail(nid)
|
||
return buf.getvalue()
|
||
|
||
|
||
# ── Terminal helpers ────────────────────────────────────────────────
|
||
|
||
def get_terminal_size():
|
||
"""Get terminal rows, cols using multiple methods."""
|
||
# Try ioctl on multiple fds
|
||
for fd_try in [sys.stdout, sys.stdin, sys.stderr]:
|
||
try:
|
||
result = struct.unpack('hh', fcntl.ioctl(fd_try, termios.TIOCGWINSZ, b'\0' * 4))
|
||
if result[0] > 0 and result[1] > 0:
|
||
return result[0], result[1]
|
||
except Exception:
|
||
pass
|
||
# Try /dev/tty directly
|
||
try:
|
||
with open('/dev/tty', 'r') as tty_fd:
|
||
result = struct.unpack('hh', fcntl.ioctl(tty_fd, termios.TIOCGWINSZ, b'\0' * 4))
|
||
if result[0] > 0 and result[1] > 0:
|
||
return result[0], result[1]
|
||
except Exception:
|
||
pass
|
||
# Try os/shutil
|
||
try:
|
||
sz = os.get_terminal_size()
|
||
return sz.lines, sz.columns
|
||
except Exception:
|
||
pass
|
||
try:
|
||
import shutil
|
||
sz = shutil.get_terminal_size()
|
||
return sz.lines, sz.columns
|
||
except Exception:
|
||
pass
|
||
return 40, 80
|
||
|
||
|
||
def read_key(fd):
|
||
"""Read a keypress from raw terminal. Returns key string or ('CLICK', row, col) tuple."""
|
||
ch = os.read(fd, 1)
|
||
if not ch:
|
||
return ''
|
||
if ch == b'\x1b':
|
||
# Read next bytes
|
||
b2 = os.read(fd, 1)
|
||
if b2 != b'[':
|
||
return 'ESC'
|
||
b3 = os.read(fd, 1)
|
||
|
||
# SGR mouse: \033[<btn;x;y;M or \033[<btn;x;y;m
|
||
if b3 == b'<':
|
||
# Read until M or m
|
||
buf = b''
|
||
while True:
|
||
c = os.read(fd, 1)
|
||
if c in (b'M', b'm'):
|
||
break
|
||
buf += c
|
||
parts = buf.decode().split(';')
|
||
if len(parts) == 3:
|
||
btn = int(parts[0])
|
||
col = int(parts[1])
|
||
row = int(parts[2])
|
||
# btn 64/65 = scroll up/down
|
||
if btn == 64: return 'SCROLLUP'
|
||
if btn == 65: return 'SCROLLDN'
|
||
# btn 0 = left click (on release: c == b'm')
|
||
if btn == 0 and c == b'M':
|
||
return ('CLICK', row, col)
|
||
return ''
|
||
|
||
# Arrow keys and other sequences
|
||
if b3 == b'A': return 'UP'
|
||
if b3 == b'B': return 'DOWN'
|
||
if b3 == b'5': os.read(fd, 1); return 'PGUP'
|
||
if b3 == b'6': os.read(fd, 1); return 'PGDN'
|
||
if b3 == b'H': return 'HOME'
|
||
if b3 == b'F': return 'END'
|
||
|
||
# Old-style X10 mouse: \033[M + 3 bytes
|
||
if b3 == b'M':
|
||
mouse = os.read(fd, 3)
|
||
btn = mouse[0] - 32
|
||
if btn == 64: return 'SCROLLUP'
|
||
if btn == 65: return 'SCROLLDN'
|
||
if btn == 0:
|
||
col = mouse[1] - 32
|
||
row = mouse[2] - 32
|
||
return ('CLICK', row, col)
|
||
return ''
|
||
return 'ESC'
|
||
return ch.decode('utf-8', errors='replace')
|
||
|
||
|
||
def tui_viewer(range_days, client):
|
||
"""Raw-terminal scrollable viewer with ANSI color support."""
|
||
fd = sys.stdin.fileno()
|
||
old_settings = termios.tcgetattr(fd)
|
||
|
||
# Switch to alternate screen buffer, hide cursor
|
||
sys.stdout.write('\033[?1049h\033[?25l')
|
||
# Enable mouse reporting (SGR mode for scroll)
|
||
sys.stdout.write('\033[?1000h\033[?1006h')
|
||
sys.stdout.flush()
|
||
|
||
scroll = 0
|
||
lines = []
|
||
tasks = []
|
||
briefing_lines = []
|
||
viewing_task = False
|
||
input_mode = False
|
||
input_buf = ""
|
||
status_msg = ""
|
||
needs_redraw = [False] # mutable for signal handler
|
||
|
||
# Handle terminal resize
|
||
def on_resize(signum, frame):
|
||
needs_redraw[0] = True
|
||
signal.signal(signal.SIGWINCH, on_resize)
|
||
|
||
TASK_ID_RE = re.compile(r'#(\d{3,5})')
|
||
|
||
def find_task_at_click(row, col):
|
||
"""Find a #NNNN task number on the clicked line, closest to click col."""
|
||
line_idx = scroll + row - 1 # row is 1-based
|
||
if line_idx < 0 or line_idx >= len(lines):
|
||
return None
|
||
# Strip ANSI to get visible text and map positions
|
||
raw = lines[line_idx]
|
||
clean = ANSI_ESC.sub('', raw)
|
||
# Find all #NNNN in the visible text
|
||
best = None
|
||
best_dist = 999
|
||
for m in TASK_ID_RE.finditer(clean):
|
||
start, end = m.start(), m.end()
|
||
# Distance from click to the match
|
||
if start <= col <= end:
|
||
return int(m.group(1))
|
||
dist = min(abs(col - start), abs(col - end))
|
||
if dist < best_dist and dist <= 5:
|
||
best_dist = dist
|
||
best = int(m.group(1))
|
||
return best
|
||
|
||
def split_lines(content):
|
||
return content.split('\n')
|
||
|
||
ANSI_ESC = re.compile(r'\x1b\[[0-9;]*m')
|
||
|
||
def visible_len(s):
|
||
"""Length of string excluding ANSI escape codes."""
|
||
return len(ANSI_ESC.sub('', s))
|
||
|
||
def render():
|
||
rows, cols = get_terminal_size()
|
||
view_height = rows - 1 # bottom row = status bar
|
||
|
||
# Move cursor to top-left
|
||
out = '\033[H'
|
||
|
||
for i in range(view_height):
|
||
line_idx = scroll + i
|
||
out += f'\033[{i+1};1H'
|
||
if line_idx < len(lines):
|
||
line = lines[line_idx]
|
||
# Pad line to fill terminal width
|
||
pad = cols - visible_len(line)
|
||
if pad > 0:
|
||
out += line + ' ' * pad
|
||
else:
|
||
out += line
|
||
else:
|
||
# Empty line — fill with spaces
|
||
out += ' ' * cols
|
||
out += '\033[0m'
|
||
|
||
# Status bar (bottom row, reverse video)
|
||
if input_mode:
|
||
bar = f" Task #: {input_buf}\u2588"
|
||
elif status_msg:
|
||
bar = f" {status_msg}"
|
||
else:
|
||
bar = " \u2191\u2193/jk scroll | r refresh | : task# | b back | q quit"
|
||
|
||
# Scroll percentage
|
||
if len(lines) > view_height:
|
||
pct = min(100, int((scroll + view_height) / max(1, len(lines)) * 100))
|
||
bar_right = f" {pct}% "
|
||
else:
|
||
bar_right = ""
|
||
|
||
bar_pad = cols - len(bar) - len(bar_right)
|
||
if bar_pad < 0:
|
||
bar_pad = 0
|
||
out += f'\033[{rows};1H\033[7m{bar}{" " * bar_pad}{bar_right}\033[0m'
|
||
|
||
sys.stdout.write(out)
|
||
sys.stdout.flush()
|
||
|
||
def load_briefing():
|
||
nonlocal lines, tasks, scroll, status_msg, briefing_lines
|
||
status_msg = "Loading..."
|
||
render()
|
||
content, tasks = fetch_briefing_content(range_days, client)
|
||
lines = split_lines(content)
|
||
briefing_lines = lines[:]
|
||
scroll = 0
|
||
status_msg = ""
|
||
|
||
def load_task(nid):
|
||
nonlocal lines, scroll, status_msg
|
||
status_msg = f"Loading #{nid}..."
|
||
render()
|
||
content = fetch_task_content(nid)
|
||
lines = split_lines(content)
|
||
scroll = 0
|
||
status_msg = ""
|
||
|
||
try:
|
||
tty.setraw(fd)
|
||
load_briefing()
|
||
|
||
while True:
|
||
if needs_redraw[0]:
|
||
needs_redraw[0] = False
|
||
render()
|
||
key = read_key(fd)
|
||
if not key:
|
||
continue
|
||
rows, cols = get_terminal_size()
|
||
view_height = rows - 1
|
||
max_scroll = max(0, len(lines) - view_height)
|
||
# Clamp scroll after resize
|
||
scroll = min(scroll, max_scroll)
|
||
|
||
if input_mode:
|
||
if key == 'ESC':
|
||
input_mode = False
|
||
input_buf = ""
|
||
elif key in ('\r', '\n'):
|
||
input_mode = False
|
||
task_num = input_buf.strip().lstrip('#')
|
||
input_buf = ""
|
||
if task_num.isdigit():
|
||
viewing_task = True
|
||
load_task(int(task_num))
|
||
elif key in ('\x7f', '\x08'): # Backspace
|
||
input_buf = input_buf[:-1]
|
||
elif len(key) == 1 and ord(key) >= 32:
|
||
input_buf += key
|
||
continue
|
||
|
||
# Mouse click — find task number
|
||
if isinstance(key, tuple) and key[0] == 'CLICK':
|
||
_, click_row, click_col = key
|
||
task_id = find_task_at_click(click_row, click_col)
|
||
if task_id:
|
||
viewing_task = True
|
||
load_task(task_id)
|
||
continue
|
||
|
||
if key == 'q':
|
||
break
|
||
elif key == 'r':
|
||
viewing_task = False
|
||
load_briefing()
|
||
elif key == ':':
|
||
input_mode = True
|
||
input_buf = ""
|
||
elif key == 'b':
|
||
if viewing_task:
|
||
viewing_task = False
|
||
lines = briefing_lines[:]
|
||
scroll = 0
|
||
elif key in ('UP', 'k'):
|
||
scroll = max(0, scroll - 1)
|
||
elif key in ('DOWN', 'j'):
|
||
scroll = min(max_scroll, scroll + 1)
|
||
elif key == 'PGUP':
|
||
scroll = max(0, scroll - view_height)
|
||
elif key == 'PGDN':
|
||
scroll = min(max_scroll, scroll + view_height)
|
||
elif key in ('HOME', 'g'):
|
||
scroll = 0
|
||
elif key in ('END', 'G'):
|
||
scroll = max_scroll
|
||
elif key == 'SCROLLUP':
|
||
scroll = max(0, scroll - 3)
|
||
elif key == 'SCROLLDN':
|
||
scroll = min(max_scroll, scroll + 3)
|
||
elif key == ' ':
|
||
scroll = min(max_scroll, scroll + view_height)
|
||
|
||
except (KeyboardInterrupt, EOFError):
|
||
pass
|
||
finally:
|
||
# Restore terminal
|
||
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
|
||
sys.stdout.write('\033[?1000l\033[?1006l') # disable mouse
|
||
sys.stdout.write('\033[?25h') # show cursor
|
||
sys.stdout.write('\033[?1049l') # restore main screen
|
||
sys.stdout.flush()
|
||
|
||
|
||
def main():
|
||
range_days = "7"
|
||
client = None
|
||
|
||
args = sys.argv[1:]
|
||
if args and args[0].isdigit():
|
||
range_days = args.pop(0)
|
||
if args:
|
||
client = args[0]
|
||
|
||
if sys.stdout.isatty():
|
||
tui_viewer(range_days, client)
|
||
else:
|
||
# Non-interactive: just print
|
||
content, _ = fetch_briefing_content(range_days, client)
|
||
print(content)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|