20 changed files with 299 additions and 605 deletions
@ -1,2 +1,5 @@
|
||||
bidict |
||||
websockets |
||||
websockets |
||||
mongoengine |
||||
dataclasses |
||||
python-dotenv |
||||
@ -0,0 +1,5 @@
|
||||
|
||||
class CommandServer: |
||||
|
||||
async def run(self): |
||||
pass |
||||
@ -0,0 +1,66 @@
|
||||
from datetime import datetime |
||||
from ipaddress import ip_address |
||||
|
||||
from mongoengine import Document, StringField, DateTimeField, IntField, ListField, ReferenceField, BooleanField |
||||
|
||||
|
||||
class IPField(IntField): |
||||
"""An IP field. |
||||
""" |
||||
|
||||
def validate(self, value): |
||||
try: |
||||
ip_address(value) |
||||
except ValueError as err: |
||||
self.error(str(err)) |
||||
|
||||
def to_mongo(self, value): |
||||
return int(ip_address(value)) |
||||
|
||||
def to_python(self, value): |
||||
return ip_address(value) |
||||
|
||||
def prepare_query_value(self, op, value): |
||||
return int(ip_address(value)) |
||||
|
||||
|
||||
class UserConnect(Document): |
||||
cookie = StringField(required=True) |
||||
pokemon = StringField(required=True) |
||||
adjective = StringField(required=True) |
||||
ip = IPField(required=True) |
||||
time = DateTimeField(required=True, default=datetime.now) |
||||
|
||||
|
||||
class ChatMessage(Document): |
||||
cookie = StringField(required=True) |
||||
ip = IPField() |
||||
msg = StringField() |
||||
time = DateTimeField(required=True, default=datetime.now) |
||||
|
||||
|
||||
class UserProfile(Document): |
||||
cookie = StringField(required=True, primary_key=True) |
||||
pokemon = StringField(required=True) |
||||
adjective = StringField(required=True) |
||||
tags = ListField(StringField) |
||||
whitelisted = BooleanField(default=False) |
||||
|
||||
|
||||
class IPProfile(Document): |
||||
ip = IPField(required=True) |
||||
tags = ListField(StringField) |
||||
whitelisted = BooleanField(default=False) |
||||
|
||||
|
||||
class FilterRule(Document): |
||||
rule = StringField() |
||||
ip = StringField() |
||||
cookie = StringField() |
||||
action = StringField(default="mute") |
||||
# used when the action is "tag" |
||||
tag = StringField() |
||||
|
||||
def match(self, s: str) -> bool: |
||||
pass |
||||
|
||||
@ -0,0 +1,20 @@
|
||||
from .commons import ConnectProcessor |
||||
from ..commons import UserList |
||||
|
||||
|
||||
class BannedIPsProcessor(ConnectProcessor): |
||||
pass |
||||
|
||||
|
||||
class MutedIPsProcessor(ConnectProcessor): |
||||
pass |
||||
|
||||
|
||||
class LockdownModeProcessor(ConnectProcessor): |
||||
"""Matches any user not whitelisted when activated""" |
||||
|
||||
def __init__(self): |
||||
activated = False |
||||
|
||||
def match(self, sender_id: str, users_list: UserList) -> bool: |
||||
pass |
||||
@ -0,0 +1,71 @@
|
||||
from typing import Type |
||||
|
||||
from tools.coco.client import CocoClient |
||||
from tools.commons import Message, BotMessage |
||||
from tools.constants import AUTHORIZED_USERIDS |
||||
from .commons import * |
||||
|
||||
|
||||
class DispatcherBotProcessor(MessageProcessor): |
||||
"""A processor that matches a context, then forwards the message to a list of sub-processors. |
||||
This enables the botprocessor-matching mechanism to behave kinda like a decision tree""" |
||||
|
||||
def __init__(self, processors_list: List[MessageProcessor]): |
||||
self.dispatcher = MessageDispatcher(processors_list) |
||||
|
||||
def process(self, text: str, sender_id: str, users_list: UserList): |
||||
return self.dispatcher.dispatch(text, sender_id, users_list) |
||||
|
||||
|
||||
class CommandsDispatcherProcessor(DispatcherBotProcessor): |
||||
"""Reacts to commands of the form '/botname command' or 'botname, command' """ |
||||
|
||||
def __init__(self, processors_list: List[MessageProcessor], trigger_word: str = None, default_response: str = None): |
||||
super().__init__(processors_list) |
||||
self.trigger = trigger_word |
||||
self.default_response = default_response if default_response is not None else "Commande non reconnue, pd" |
||||
|
||||
def match(self, text: str, sender_id: str, users_list: UserList): |
||||
trigger = self.trigger.upper() if self.trigger is not None else users_list.my_name.upper() |
||||
return text.upper().startswith(trigger + ",") \ |
||||
or text.upper().startswith("/" + trigger) |
||||
|
||||
def process(self, text: str, sender_id: str, users_list: UserList): |
||||
without_cmd = text[len(self.trigger) + 1:] |
||||
response = super().process(without_cmd, sender_id, users_list) |
||||
return Message(self.default_response) if response is None else response |
||||
|
||||
|
||||
def admin_command(klass: Type[MessageProcessor]): |
||||
pass |
||||
|
||||
|
||||
@admin_command |
||||
class LockDownCommandProcessor: |
||||
pass |
||||
|
||||
|
||||
@admin_command |
||||
class TaggerCommandProcessor: |
||||
"""Adds tagging rules based on a word filtering rule""" |
||||
pass |
||||
|
||||
|
||||
class TaggerProcessor: |
||||
"""Tags IP's based on word filtering rule""" |
||||
pass |
||||
|
||||
|
||||
class BotHelp(MessageProcessor): |
||||
"""Displays the help string for all processors in the list that have a helpt string""" |
||||
|
||||
def __init__(self, processors_list: List[BaseCocobotCommand]): |
||||
all_help_strs = [proc.HELP_STR |
||||
for proc in processors_list if proc.HELP_STR is not None] |
||||
self.help_str = ", ".join(all_help_strs) |
||||
|
||||
def match(self, text: str, sender_id: str, users_list: UserList): |
||||
return text.lower().startswith("help") |
||||
|
||||
def process(self, text: str, sender_id: str, users_list: UserList): |
||||
return BotMessage(self.help_str) |
||||
@ -0,0 +1,31 @@
|
||||
from setuptools import setup, find_packages |
||||
|
||||
with open("README.md") as readme: |
||||
long_description = readme.read() |
||||
|
||||
|
||||
with open("requirements.txt") as reqs: |
||||
requirements = reqs.read().split("\n") |
||||
|
||||
setup( |
||||
name='scormod', |
||||
version='0.1', |
||||
description="A moderation bot for the loult", |
||||
long_description=long_description, |
||||
long_description_content_type='text/markdown', |
||||
author='hadware', |
||||
license='MIT', |
||||
classifiers=[ |
||||
'Development Status :: 3 - Alpha', |
||||
'Intended Audience :: Developers', |
||||
'License :: OSI Approved :: AGPL License', |
||||
'Programming Language :: Python :: 3', |
||||
'Programming Language :: Python :: 3.6', |
||||
'Programming Language :: Python :: 3.7', |
||||
], |
||||
keywords='cookies', |
||||
packages=find_packages(), |
||||
install_requires=requirements, |
||||
include_package_data=True, |
||||
test_suite='nose.collector', |
||||
tests_require=['nose']) |
||||
@ -1,153 +0,0 @@
|
||||
import logging |
||||
import random |
||||
from typing import List, Dict, Tuple, Union, Set |
||||
from collections import defaultdict |
||||
|
||||
from .requests import LoginRequest, PostLoginRequest, PulseRequest, SendMsgRequest |
||||
from ..commons import BotMessage, Message, AbstractResponse |
||||
|
||||
|
||||
class Interlocutor: |
||||
|
||||
def __init__(self, nick: str, age: int, city: str, is_male: bool, conv_id: str): |
||||
self.nick = nick |
||||
self.age = age |
||||
self.is_male = is_male |
||||
self.city = city |
||||
self.id = conv_id |
||||
|
||||
@classmethod |
||||
def from_string(cls, str): |
||||
# 47130922100412004Rastapopoulos |
||||
# 47 (age) 1 (sexe) 30922 (city id) 100412(conv id) |
||||
age = int(str[:2]) |
||||
is_male = int(str[2:3]) in (1, 6) |
||||
city_id = str[3:8] |
||||
conv_id = str[8:14] |
||||
nick = str[17:] |
||||
return cls(nick, age, city_id, is_male, conv_id) |
||||
|
||||
def to_botmessage(self): |
||||
sex_indic = "un homme" if self.is_male else "une femme" |
||||
return BotMessage("Conversation avec %s, %s de %i ans" %(self.nick, sex_indic, self.age)) |
||||
|
||||
def __eq__(self, other): |
||||
return other.nick == self.nick |
||||
|
||||
def __hash__(self): |
||||
return hash(self.nick) |
||||
|
||||
|
||||
class CocoClient: |
||||
|
||||
def __init__(self): |
||||
self.interlocutors = set() # type: Set[Interlocutor] |
||||
self.current_interlocutor = None # type: Interlocutor |
||||
self.histories = defaultdict(list) # type:defaultdict[Interlocutor,List[Tuple]] |
||||
|
||||
self.user_id = None # type:str |
||||
self.user_pass = None # type:str |
||||
self.nick = None # type:str |
||||
self.is_connected = False |
||||
|
||||
def _format_history(self, interlocutor: Interlocutor): |
||||
if interlocutor in self.histories: |
||||
return [BotMessage("💬 %s: %s" % (nick, msg)) |
||||
for nick, msg in self.histories[interlocutor][-5:]] |
||||
else: |
||||
return [] |
||||
|
||||
def __process_and_format_received_msg(self, received_msgs): |
||||
out = [] |
||||
for user_code, msg in received_msgs: |
||||
user = Interlocutor.from_string(user_code) |
||||
self.interlocutors.add(user) |
||||
self.histories[user].append((user.nick, msg)) |
||||
logging.info("Msg from %s : %s" % (user.nick, msg)) |
||||
|
||||
if self.current_interlocutor is not None and user == self.current_interlocutor: |
||||
out.append(Message("💬 %s: %s" % (user.nick, msg))) |
||||
else: |
||||
out.append(BotMessage("💬 %s: %s" % (user.nick, msg))) |
||||
return out |
||||
|
||||
def disconnect(self): |
||||
self.interlocutors = set() |
||||
self.histories = defaultdict(list) |
||||
self.current_interlocutor = None |
||||
self.is_connected = False |
||||
self.nick = None |
||||
|
||||
def connect(self, nick: str, age: int, is_female: bool, zip_code: str): |
||||
self.disconnect() |
||||
|
||||
self.nick = nick |
||||
login_req = LoginRequest(nick, age, is_female, zip_code) |
||||
self.user_id, self.user_pass = login_req.retrieve() |
||||
logging.info("Logged in to coco as %s" % self.nick) |
||||
post_login_req = PostLoginRequest(self.user_id, self.user_pass) |
||||
try: |
||||
if post_login_req.retrieve(): |
||||
logging.info("Post login successful") |
||||
self.is_connected = True |
||||
else: |
||||
logging.info("Post login failed") |
||||
except ZeroDivisionError: |
||||
logging.info("Message cipher failed") |
||||
|
||||
def pulse(self) -> List[AbstractResponse]: |
||||
pulse_req = PulseRequest(self.user_id, self.user_pass) |
||||
received_msg = pulse_req.retrieve() |
||||
return self.__process_and_format_received_msg(received_msg) |
||||
|
||||
def send_msg(self, msg: str) -> List[AbstractResponse]: |
||||
if self.current_interlocutor is not None: |
||||
sendmsg_req = SendMsgRequest(self.user_id, self.user_pass, self.current_interlocutor.id, msg) |
||||
output = sendmsg_req.retrieve() |
||||
self.histories[self.current_interlocutor].append((self.nick, msg)) |
||||
out_msg = Message("💬 %s: %s" % (self.nick, msg)) |
||||
|
||||
out = [out_msg] |
||||
if output: |
||||
out += self.__process_and_format_received_msg(output) |
||||
return out |
||||
else: |
||||
return [BotMessage("Il faut sélectionner une conversation d'abord pd")] |
||||
|
||||
def broadcast_msg(self, msg: str) -> List[AbstractResponse]: |
||||
if self.interlocutors: |
||||
outputs = [Message("💬 %s à tous: %s" % (self.nick, msg))] |
||||
for interlocutor in self.interlocutors: |
||||
sendmsg_req = SendMsgRequest(self.user_id, self.user_pass, interlocutor.id, msg) |
||||
output = sendmsg_req.retrieve() |
||||
self.histories[interlocutor].append((self.nick, msg)) |
||||
if output: |
||||
outputs += self.__process_and_format_received_msg(output) |
||||
return outputs |
||||
else: |
||||
return [BotMessage("Aucune conversation active")] |
||||
|
||||
|
||||
def switch_conv(self, nick: str=None) -> Union[List[BotMessage], BotMessage]: |
||||
if not self.interlocutors: |
||||
return BotMessage("Pas de conversations en cours") |
||||
|
||||
new_interlocutor = None |
||||
if nick is not None: |
||||
for usr in self.interlocutors: |
||||
if usr.nick.upper() == nick.upper(): |
||||
new_interlocutor = usr |
||||
break |
||||
else: |
||||
new_interlocutor = random.choice(list(self.interlocutors)) |
||||
|
||||
if new_interlocutor is None: |
||||
return BotMessage("Impossible de trouver l'utilisateur") |
||||
else: |
||||
self.current_interlocutor = new_interlocutor |
||||
return [new_interlocutor.to_botmessage()] + \ |
||||
self._format_history(self.current_interlocutor) |
||||
|
||||
def list_convs(self): |
||||
return BotMessage("Conversations : " + ", ".join(["%s(%i)" % (usr.nick, usr.age) |
||||
for usr in self.interlocutors])) |
||||
@ -1,115 +0,0 @@
|
||||
import logging |
||||
from urllib.request import Request, urlopen |
||||
from random import randint, choice, random |
||||
from string import ascii_uppercase |
||||
from typing import Tuple, List |
||||
import re |
||||
|
||||
from .tools import get_city_id, coco_cipher, encode_msg, decode_msg |
||||
|
||||
|
||||
class BaseCocoRequest: |
||||
host = 'http://cloud.coco.fr/' |
||||
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0', |
||||
'Cookie': '_ga=GA1.2.795717920.1518381607', |
||||
'Host': 'cloud.coco.fr', |
||||
'Referer': 'http://www.coco.fr/chat/index.html'} |
||||
|
||||
def _get_url(self): |
||||
pass |
||||
|
||||
def _parse_response(self, response): |
||||
pass |
||||
|
||||
def retrieve(self): |
||||
req = Request(self._get_url(), headers=self.headers) |
||||
response = urlopen(req).read() |
||||
cleaned = response.decode("utf-8")[len("process1('#"):-len("');")] |
||||
return self._parse_response(cleaned) |
||||
|
||||
|
||||
class LoginRequest(BaseCocoRequest): |
||||
|
||||
def __init__(self, nick: str, age: int, is_female: bool, zip_code: str): |
||||
self.nick = nick |
||||
self.age = str(age) |
||||
self.sex = str('2' if is_female else '1') |
||||
self.city = get_city_id(zip_code) |
||||
|
||||
def _get_url(self): |
||||
identifier = str(randint(100000000, 990000000)) + '0' + ''.join(choice(ascii_uppercase) for i in range(20)) |
||||
return self.host + '40' + self.nick + '*' + self.age + self.sex + self.city + identifier |
||||
|
||||
def _parse_response(self, response): |
||||
credentials = response[2:14] |
||||
return credentials[:6], credentials[6:] # user_id and password |
||||
|
||||
|
||||
class LoggedInRequest(BaseCocoRequest): |
||||
|
||||
def __init__(self, user_id, password): |
||||
self.user_id = user_id |
||||
self.password = password |
||||
|
||||
@property |
||||
def token(self): |
||||
return self.user_id + self.password |
||||
|
||||
|
||||
class PostLoginRequest(LoggedInRequest): |
||||
|
||||
client_id_str = "3551366741*0*1aopiig*-940693579*192.168.0.14*0" |
||||
|
||||
def _get_url(self): |
||||
return self.host + '52' + self.token + coco_cipher(self.client_id_str, self.password) |
||||
|
||||
def _parse_response(self, response : str): |
||||
"""Checks if the post-login was successful""" |
||||
logging.debug(response) |
||||
return response.startswith("99556") |
||||
|
||||
|
||||
class PulseRequest(LoggedInRequest): |
||||
user_match = re.compile(r"[0-9]{17}[A-Za-z0-9]+") |
||||
|
||||
def _get_url(self): |
||||
return self.host + "95" + self.token + "?" + str(random()) |
||||
|
||||
def _parse_response(self, response): |
||||
"""Can either be a single message or several messages""" |
||||
if response == "Z": |
||||
return [] |
||||
|
||||
response = response[3:] # cutting the 669 |
||||
split = response.split("#") |
||||
it = iter(split) |
||||
output_msgs = [] # type: List[Tuple[str,str]] |
||||
while True: |
||||
try: |
||||
current = next(it) |
||||
if re.match(self.user_match, current): |
||||
msg = next(it) |
||||
decoded_msg = decode_msg(msg) |
||||
output_msgs.append((current, decoded_msg)) |
||||
except StopIteration: |
||||
break |
||||
return output_msgs |
||||
|
||||
|
||||
class SendMsgRequest(PulseRequest): |
||||
|
||||
def __init__(self, user_id, password, conv_id: str, msg: str): |
||||
super().__init__(user_id, password) |
||||
self.conv_id = conv_id |
||||
self.msg = msg |
||||
|
||||
def _get_url(self): |
||||
return self.host + "99" + self.token + self.conv_id + str(randint(0, 6)) + encode_msg(self.msg) |
||||
|
||||
def _parse_response(self, response: str): |
||||
"""Response to a send message request can either just be #97x or an |
||||
actual message (like in pulse request)""" |
||||
if response.startswith("97"): |
||||
return [] |
||||
else: |
||||
return super()._parse_response(response) |
||||
@ -1,93 +0,0 @@
|
||||
from urllib.request import urlopen |
||||
import re |
||||
import bidict |
||||
|
||||
doc = {0: 65, 1: 66, 2: 67, 3: 68, 4: 69, 5: 70, 6: 71, 7: 72, 8: 73, 9: 74, 10: 75, 11: 76, 12: 77, 13: 78, 14: 79, |
||||
15: 80, 16: 81, 17: 82, 18: 83, 19: 84, 20: 101, 21: 102, 22: 103, 23: 104, 24: 105, 25: 106, 26: 107, |
||||
27: 108, 28: 109, 29: 110, 30: 85, 31: 86, 32: 87, 33: 88, 34: 89, 35: 90, 36: 97, 37: 98, 38: 99, 39: 100, |
||||
40: 111, 41: 112, 42: 113, 43: 114, 44: 115, 45: 116, 46: 117, 47: 118, 48: 119, 49: 120, 50: 121, 51: 122, |
||||
52: 48, 53: 49, 54: 50, 55: 51, 56: 52, 57: 53, 58: 54, 59: 55, 60: 56, 61: 57, 62: 43, 63: 47, 64: 61} |
||||
|
||||
|
||||
def coco_cipher(str: str, key: str): |
||||
"""Implementation of coco's weird 'enxo' cipher. key has to be the user's password, |
||||
retrieved from a previous request""" |
||||
|
||||
def none_int(var): |
||||
return var if var is not None else 0 |
||||
|
||||
def safe_get_charcode(s: str, idx: int): |
||||
try: |
||||
return ord(s[idx]) |
||||
except IndexError: |
||||
return None |
||||
|
||||
output, chr1, chr2, chr3 = "", 0, 0, 0 |
||||
enc, revo = {}, {} |
||||
for j in range(65): |
||||
revo[doc[j]] = j |
||||
result = "" |
||||
for i, char_n in enumerate(str): |
||||
result += chr(ord(key[i % len(key)]) ^ ord(char_n)) |
||||
i = 0 |
||||
|
||||
while i < len(str): |
||||
chr1 = safe_get_charcode(result, i) |
||||
i += 1 |
||||
chr2 = safe_get_charcode(result, i) |
||||
i += 1 |
||||
chr3 = safe_get_charcode(result, i) |
||||
i += 1 |
||||
|
||||
enc[0] = none_int(chr1) >> 2 |
||||
enc[1] = ((none_int(chr1) & 3) << 4) | (none_int(chr2) >> 4) |
||||
enc[2] = ((none_int(chr2) & 15) << 2) | (none_int(chr3) >> 6) |
||||
enc[3] = none_int(chr3) & 63 |
||||
if chr2 is None: |
||||
enc[2] = 64 |
||||
enc[3] = 64 |
||||
elif chr3 is None: |
||||
enc[3] = 64 |
||||
|
||||
for j in range(4): |
||||
output += chr(doc[enc[j]]) |
||||
return output |
||||
|
||||
|
||||
def get_city_id(postal_code: str): |
||||
response = str(urlopen("http://www.coco.fr/cocoland/%s.js" % postal_code).read(), 'utf-8') |
||||
first_city_code = re.search(r'[0-9]+', response) |
||||
if first_city_code: |
||||
return first_city_code.group() |
||||
elif 'ERROR' in response: |
||||
raise ValueError('Invalid postal code') |
||||
else: |
||||
RuntimeError('Unexpected output') |
||||
|
||||
|
||||
smilies = [":)", ":(", ";)", ":d", ":-o", ":s", ":$", "*-)", "-)", "^o)", ":p", "(l)", "(v)", ":'(", "(h)", "(f)", |
||||
":@", "(y)", "(n)", "(k)", "gr$", "(a)", "(6)", "(yn)", "+o(", "na$", "oh$", "tr$", "(e)", "sh$", "fu$", |
||||
"nw$", "ba$", "ao$", "db$", "si$", "oo$", "co$", "bi$", "cc$", "ye$", "mo$", "aa$", "ci$", "uu$", "ff$", |
||||
"zz$", "gt$", "ah$", "mm$", "?$", "xx$"] |
||||
|
||||
special_chars = bidict.bidict({" ": "_", "$": "*7", "%": "*g", "'": "*8", "(": "(", ")": ")", "*": "*s", "=": "*h", |
||||
"?": "=", "@": "*m", "^": "*l", "_": "*0", "€": "*d", "à": "*a", "â": "*k", "ç": "*c", "è": "*e", |
||||
"é": "*r", "ê": "*b", "î": "*i", "ï": "*j", "ô": "*o", "ù": "*f", "û": "*u"}) |
||||
|
||||
|
||||
def encode_msg(msg : str): |
||||
"""Encoding the message to coco's weird url-compatible format""" |
||||
for i in range(len(smilies)): |
||||
msg = msg.replace(smilies[i], ';' + str(i).zfill(2)) |
||||
|
||||
for char, replacement in special_chars.items(): |
||||
msg = msg.replace(char, replacement) |
||||
msg = ''.join([c for c in msg if ord(c) < 128]) |
||||
return msg |
||||
|
||||
|
||||
def decode_msg(msg: str): |
||||
"""Decoding coco's weird message format""" |
||||
for coded, char in special_chars.inv.items(): |
||||
msg = msg.replace(coded, char) |
||||
return msg |
||||
@ -1,29 +0,0 @@
|
||||
import logging |
||||
from typing import List |
||||
|
||||
from tools.commons import UserList |
||||
|
||||
|
||||
class ConnectProcessor: |
||||
def match(self, sender_id: str, users_list: UserList) -> bool: |
||||
"""Returns true if this is the kind of user connection a processor should respond to""" |
||||
pass |
||||
|
||||
def process(self, sender_id: str, users_list: UserList) -> str: |
||||
"""Processes a message and returns an answer""" |
||||
pass |
||||
|
||||
|
||||
class ConnectionDispatcher: |
||||
|
||||
def __init__(self, processor_list: List[ConnectProcessor]): |
||||
self.processor_list = processor_list |
||||
|
||||
def dispatch(self, sender_id: str, users_list: UserList) -> str: |
||||
"""Tells its first botprocessor to match the message to process this message and returns its answer""" |
||||
for processor in self.processor_list: |
||||
if processor.match(sender_id, users_list): |
||||
logging.info("Matched %s" % processor.__class__.__name__) |
||||
return processor.process(sender_id, users_list) |
||||
|
||||
return None |
||||
@ -1,148 +0,0 @@
|
||||
from tools.coco.client import CocoClient |
||||
from tools.commons import Message, BotMessage |
||||
from tools.constants import AUTHORIZED_USERIDS |
||||
from .commons import * |
||||
|
||||
|
||||
class DispatcherBotProcessor(MessageProcessor): |
||||
"""A processor that matches a context, then forwards the message to a list of sub-processors. |
||||
This enables the botprocessor-matching mechanism to behave kinda like a decision tree""" |
||||
|
||||
def __init__(self, processors_list: List[MessageProcessor]): |
||||
self.dispatcher = MessageDispatcher(processors_list) |
||||
|
||||
def process(self, text: str, sender_id: str, users_list: UserList): |
||||
return self.dispatcher.dispatch(text, sender_id, users_list) |
||||
|
||||
|
||||
class CommandsDispatcherProcessor(DispatcherBotProcessor): |
||||
"""Reacts to commands of the form '/botname command' or 'botname, command' """ |
||||
|
||||
def __init__(self, processors_list: List[MessageProcessor], trigger_word: str = None, default_response: str = None): |
||||
super().__init__(processors_list) |
||||
self.trigger = trigger_word |
||||
self.default_response = default_response if default_response is not None else "Commande non reconnue, pd" |
||||
|
||||
def match(self, text: str, sender_id: str, users_list: UserList): |
||||
trigger = self.trigger.upper() if self.trigger is not None else users_list.my_name.upper() |
||||
return text.upper().startswith(trigger + ",") \ |
||||
or text.upper().startswith("/" + trigger) |
||||
|
||||
def process(self, text: str, sender_id : str, users_list: UserList): |
||||
without_cmd = text[len(self.trigger)+1:] |
||||
response = super().process(without_cmd, sender_id, users_list) |
||||
return Message(self.default_response) if response is None else response |
||||
|
||||
|
||||
class BaseCocobotCommand(MessageProcessor): |
||||
|
||||
HELP_STR = None |
||||
_cmd_suffix = "" |
||||
|
||||
def __init__(self, cococlient: CocoClient): |
||||
self.cococlient = cococlient |
||||
|
||||
def match(self, text : str, sender_id : str, users_list : UserList): |
||||
return text.lower().startswith(self._cmd_suffix) |
||||
|
||||
|
||||
class CocoConnectCommand(BaseCocobotCommand): |
||||
HELP_STR = "/coconnect pseudo age code_postal" |
||||
_cmd_suffix = "nnect" |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
text = text[len(self._cmd_suffix):].strip() |
||||
try: |
||||
nick, age, zip_code = text.split() |
||||
nick = ''.join([c for c in nick if ord(c) < 128]) # removing non-utf8 chars |
||||
except ValueError: |
||||
return Message("Pas le bon nombre d'arguments, pd") |
||||
|
||||
if not nick.isalnum(): |
||||
return Message("Le pseudo doit être alphanumérique, pd") |
||||
|
||||
if len(age) != 2 or not age.isnumeric(): |
||||
return Message("L'âge c'est avec DEUX chiffres (déso bulbi)") |
||||
|
||||
if int(age) < 15: |
||||
return Message("L'âge minimum c'est 15 ans (déso bubbi)") |
||||
|
||||
if len(zip_code) != 5 or not zip_code.isnumeric(): |
||||
return Message("Le code postal c'est 5 chiffres, pd") |
||||
|
||||
try: |
||||
self.cococlient.connect(nick, int(age), True, zip_code) |
||||
except ValueError: |
||||
return Message("Le code postal a pas l'air d'être bon") |
||||
|
||||
if self.cococlient.is_connected: |
||||
return BotMessage("Connecté en tant que %s, de %s ans" % (nick, age)) |
||||
else: |
||||
return BotMessage("La connection a chié, déswe") |
||||
|
||||
|
||||
class CocoMsgCommand(BaseCocobotCommand): |
||||
HELP_STR = "/cocospeak message" |
||||
_cmd_suffix = "speak" |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
text = text[len(self._cmd_suffix):].strip() |
||||
return self.cococlient.send_msg(text) |
||||
|
||||
|
||||
class CocoBroadcastCommand(BaseCocobotCommand): |
||||
HELP_STR = "/cocoall message" |
||||
_cmd_suffix = "all" |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
text = text[len(self._cmd_suffix):].strip() |
||||
return self.cococlient.broadcast_msg(text) |
||||
|
||||
|
||||
class CocoSwitchCommand(BaseCocobotCommand): |
||||
HELP_STR = "/cocoswitch [pseudo de l'interlocuteur]" |
||||
_cmd_suffix = "switch" |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
text = text[len(self._cmd_suffix):].strip() |
||||
if text: |
||||
return self.cococlient.switch_conv(text) |
||||
else: |
||||
return self.cococlient.switch_conv() |
||||
|
||||
|
||||
class CocoListCommand(BaseCocobotCommand): |
||||
HELP_STR = "/cocolist" |
||||
_cmd_suffix = "list" |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
return self.cococlient.list_convs() |
||||
|
||||
|
||||
class CocoQuitCommand(BaseCocobotCommand): |
||||
HELP_STR = "/cocoquit" |
||||
_cmd_suffix = "quit" |
||||
|
||||
def match(self, text : str, sender_id : str, users_list : UserList): |
||||
return super().match(text, sender_id, users_list) and sender_id in AUTHORIZED_USERIDS |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
self.cococlient.disconnect() |
||||
return BotMessage("Déconnecté!") |
||||
|
||||
|
||||
class BotHelp(MessageProcessor): |
||||
"""Displays the help string for all processors in the list that have a helpt string""" |
||||
|
||||
def __init__(self, processors_list: List[BaseCocobotCommand]): |
||||
all_help_strs = [proc.HELP_STR |
||||
for proc in processors_list if proc.HELP_STR is not None] |
||||
self.help_str = ", ".join(all_help_strs) |
||||
|
||||
def match(self, text : str, sender_id : str, users_list : UserList): |
||||
return text.lower().startswith("help") |
||||
|
||||
def process(self, text : str, sender_id : str, users_list : UserList): |
||||
return BotMessage(self.help_str) |
||||
|
||||
|
||||
Loading…
Reference in new issue