First commit.

This commit is contained in:
Yaro Kasear 2025-07-08 20:21:30 -05:00
commit 7f6bdf248c
6 changed files with 365 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
**/__pycache__/
.env

0
README.md Normal file
View file

6
app/.env.default Normal file
View file

@ -0,0 +1,6 @@
DISCORD_TOKEN = token_here
DISCORD_CHANNEL_ID = XXXXYYYYYYZZZZZ
MUCK_HOST = localhost
MUCK_PORT = 7675
MUCK_USERNAME = Discord_Bot
MUCK_PASSWORD = super_secret

331
app/muck_discord_bot.py Normal file
View file

@ -0,0 +1,331 @@
import asyncio
import discord
import re
import logging
import os
from collections import deque
from dotenv import load_dotenv
load_dotenv()
DISCORD_TOKEN = os.getenv('DISCORD_TOKEN', '')
DISCORD_CHANNEL_ID = os.getenv('DISCORD_CHANNEL_ID', '')
MUCK_HOST = os.getenv('MUCK_HOST', 'localhost')
MUCK_PORT = int(os.getenv('MUCK_PORT', '2560'))
MUCK_USERNAME = os.getenv('MUCK_USERNAME', 'Discord_Bot')
MUCK_PASSWORD = os.getenv('MUCK_PASSWORD', '')
logging.basicConfig(level=logging.INFO)
class PresenceSyncManager:
def __init__(self):
self.current_player = None
self.player_description_buffer = []
self.collecting_player_desc = False
self.blank_line_count = 0
self.reader = None
self.writer = None
self.channel = None
self.logged_in = False
self.discord_client = None
self.recent_spoofs = deque(maxlen=10)
self.whospe_buffer = None
self.active_threads = {} # Maps player name -> discord.Thread object
self.last_player_descriptions = {}
self.previous_players = None
async def start(self, channel, discord_client):
self.channel = channel
self.discord_client = discord_client
self.reader, self.writer = await asyncio.open_connection(MUCK_HOST, MUCK_PORT)
await self._login()
asyncio.create_task(self._read_loop())
asyncio.create_task(self.periodic_sync())
async def periodic_sync(self):
while True:
self.writer.write(b"whospe\n")
await self.writer.drain()
await asyncio.sleep(30) # Adjust interval as needed
async def _login(self):
await asyncio.sleep(1.5)
self.writer.write(f"connect {MUCK_USERNAME} {MUCK_PASSWORD}\n".encode())
await self.writer.drain()
async def _read_loop(self):
buffer = []
while True:
try:
line = await self.reader.readline()
if not line:
continue
decoded = line.decode(errors='ignore').strip()
logging.info(f"MUCK: {decoded}")
if not self.logged_in:
buffer.append(decoded)
if "You have" in decoded and "mail" in decoded:
self.logged_in = True
print("✅ Login complete.")
asyncio.create_task(self._update_room_state())
await self.channel.send("Bot is online and synced with MUCK.")
else:
await self._handle_muck_line(decoded)
except Exception as e:
logging.exception("Error in read loop")
await asyncio.sleep(1)
async def _update_room_state(self):
self.writer.write(b"whospe\n")
if not hasattr(self, "room_description_posted"):
await asyncio.sleep(1) # Slight delay to avoid overlap with player desc capture
self.room_thread = await self.channel.create_thread(
name="📍 Room State",
type=discord.ChannelType.public_thread
)
self.writer.write(b"look\n")
await self.writer.drain()
self.room_description_posted = True
self.room_lines = []
async def _process_whospe_output(self, lines):
logging.info("[WHOSPE] Parsed output:")
current_players = set()
for line in lines:
logging.info(f"[WHOSPE] {line}")
match = re.match(r"^(?:IC|OOC) \| (\w+)", line)
if match:
name = match.group(1)
if name == MUCK_USERNAME:
continue
current_players.add(name)
thread = self.active_threads.get(name)
if not thread:
thread = await self.channel.create_thread(
name=f"🦁 {name}",
type=discord.ChannelType.public_thread
)
# Store thread before sending message to avoid duplicate creation
self.active_threads[name] = thread
await thread.send(f"📌 Created thread for **{name}** (spotted in the MUCK).")
self.current_player = name
self.collecting_player_desc = True
self.player_description_buffer = []
self.writer.write(f"look {name}\n".encode())
await self.writer.drain()
# Start a timeout to capture description if it doesn't finish naturally
if not hasattr(self, "player_desc_timeout_task") or self.player_desc_timeout_task.done():
self.player_desc_timeout_task = asyncio.create_task(self._timeout_player_description())
self.previous_players = self.previous_players or set()
departed_players = self.previous_players - current_players
self.previous_players = current_players
# Delete threads for players no longer present
for player in departed_players:
thread = self.active_threads.get(player)
if thread:
try:
await thread.delete()
del self.active_threads[player]
except Exception as e:
logging.exception(f"Failed to delete thread for {player}")
# Issue room look only after all player descriptions have been handled
if not self.collecting_player_desc and not hasattr(self, "room_description_posted"):
self.room_description_posted = True
self.writer.write(b"look\n")
await self.writer.drain()
async def issue_room_look_if_safe(self):
if not self.collecting_player_desc and not hasattr(self, "room_description_posted"):
self.room_description_posted = True
self.writer.write(b"look\n")
await self.writer.drain()
async def _handle_muck_line(self, line):
# Start whospe capture
if "WhoElse version" in line:
self.whospe_buffer = [line]
return
if self.whospe_buffer is not None:
self.whospe_buffer.append(line)
if line.strip().endswith("=(done)=-"):
await self._process_whospe_output(self.whospe_buffer)
# Removed room description collection after whospe as per instructions
self.whospe_buffer = None
return
if self.collecting_player_desc:
if re.match(r"^\w+$", line.strip()):
# Finish current and start new
if self.last_player_descriptions.get(self.current_player) == "\n".join(self.player_description_buffer).strip():
self.collecting_player_desc = False
self.player_description_buffer = []
self.current_player = None
self.blank_line_count = 0
return
await self._post_player_description(self.current_player, self.player_description_buffer)
logging.info(f"📤 Finished description capture for {self.current_player}")
self.current_player = line.strip()
self.player_description_buffer = []
self.blank_line_count = 0
logging.info(f"📥 Starting description capture for {self.current_player}")
if not hasattr(self, "player_desc_timeout_task") or self.player_desc_timeout_task.done():
self.player_desc_timeout_task = asyncio.create_task(self._timeout_player_description())
return
elif line.strip() == "":
self.blank_line_count += 1
self.player_description_buffer.append(line)
if self.blank_line_count >= 2:
if self.last_player_descriptions.get(self.current_player) == "\n".join(self.player_description_buffer).strip():
self.collecting_player_desc = False
self.player_description_buffer = []
self.current_player = None
self.blank_line_count = 0
return
await self._post_player_description(self.current_player, self.player_description_buffer)
logging.info(f"📤 Finished description capture for {self.current_player}")
self.collecting_player_desc = False
self.player_description_buffer = []
self.current_player = None
self.blank_line_count = 0
if not hasattr(self, "player_desc_timeout_task") or self.player_desc_timeout_task.done():
self.player_desc_timeout_task = asyncio.create_task(self._timeout_player_description())
return
else:
self.blank_line_count = 0
self.player_description_buffer.append(line)
if not hasattr(self, "player_desc_timeout_task") or self.player_desc_timeout_task.done():
self.player_desc_timeout_task = asyncio.create_task(self._timeout_player_description())
return
# Capture player description
match = re.match(r"^(\w+)$", line)
if match and not self.collecting_player_desc:
player = match.group(1)
if player in self.active_threads:
self.current_player = player
self.player_description_buffer = []
self.collecting_player_desc = True
self.blank_line_count = 0
logging.info(f"📥 Starting description capture for {player}")
if not hasattr(self, "player_desc_timeout_task") or self.player_desc_timeout_task.done():
self.player_desc_timeout_task = asyncio.create_task(self._timeout_player_description())
return
# Filter out echo
if "[DISCORD]" in line or MUCK_USERNAME in line:
return
if self.collecting_player_desc:
return
if not hasattr(self, "room_lines"):
self.room_lines = []
if self.current_player is None and not self.collecting_player_desc:
if line.startswith("_(") or line == "Discord Room" or "It's a" in line:
self.room_lines.append(line)
return
if self.room_lines and self.current_player is None and self.room_thread:
for room_line in self.room_lines:
await self.room_thread.send(f"[MUCK] {room_line}")
self.room_lines = []
if not self.collecting_player_desc:
# Skip repetitive or non-informative lines
if line.startswith("[IC ]") or line.startswith("[OOC]") or "You can see..." in line:
return
if line.strip() == "" or re.match(r"^\(.*\)$", line):
return
if hasattr(self, "room_description_posted") and self.room_description_posted:
# Removed old room description block per instructions
pass
if self.channel:
await self.channel.send(f"[MUCK] {line}")
async def _post_player_description(self, player, lines):
if not lines or all(l.strip() == "" for l in lines):
return
content = "\n".join(lines).strip()
if self.last_player_descriptions.get(player) == content:
return
self.last_player_descriptions[player] = content
thread = self.active_threads.get(player)
if thread:
await thread.send(f"📖 **{player}**'s Description:\n```\n{content}\n```")
async def _timeout_player_description(self):
logging.info("⏳ Timeout task started for player description.")
await asyncio.sleep(2)
if self.collecting_player_desc and self.player_description_buffer:
if self.last_player_descriptions.get(self.current_player) == "\n".join(self.player_description_buffer).strip():
self.collecting_player_desc = False
self.player_description_buffer = []
self.current_player = None
self.blank_line_count = 0
return
await self._post_player_description(self.current_player, self.player_description_buffer)
logging.info(f"📤 Finished description capture for {self.current_player} (timeout)")
self.collecting_player_desc = False
self.player_description_buffer = []
self.current_player = None
self.blank_line_count = 0
def format_spoof(self, message):
content = message.content.strip()
name = message.author.display_name
if content.startswith(":"):
pose = content[1:]
if pose.startswith(","):
return f"spoof [DISCORD] {name}{pose}"
else:
return f"spoof [DISCORD] {name} {pose}"
elif content.lower().startswith("!me "):
return f"spoof [DISCORD] {name} {content[4:].strip()}"
elif (content.startswith("_") and content.endswith("_") and
content.count("_") >= 2 and not content[1:-1].strip().startswith("_")):
return f"spoof [DISCORD] {name} {content[1:-1].strip()}"
return f"spoof [DISCORD] {name} says, \"{content}\""
async def send_to_muck(self, message):
spoof_text = self.format_spoof(message)
self.recent_spoofs.append(spoof_text.strip())
self.writer.write((spoof_text + "\n").encode())
await self.writer.drain()
intents = discord.Intents.default()
intents.messages = True
intents.message_content = True
client = discord.Client(intents=intents)
presence_sync = PresenceSyncManager()
@client.event
async def on_ready():
logging.info("discord.client logging in using static token")
print(f"Logged in as {client.user}")
channel = client.get_channel(DISCORD_CHANNEL_ID)
await presence_sync.start(channel, client)
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.channel.id != DISCORD_CHANNEL_ID:
return
await presence_sync.send_to_muck(message)
client.run(DISCORD_TOKEN)

17
project.toml Normal file
View file

@ -0,0 +1,17 @@
[project]
name = "muck_discord_bot"
version = "0.0.1"
description = "A bot for connecting Discord to a MUCK."
requires-python = ">=3.9"
dependencies = [
"python-dotenv",
"discord.py",
"telnetlib3"
]
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[tools.setuptools]
packages = ["muck_discord_bot"]

9
setup.cfg Normal file
View file

@ -0,0 +1,9 @@
[metadata]
name = muck_discord_bot
version = 0.0.1
[options]
packages = find:
[options.packages.find]
where = .