Browse Source

Init du dépot sur une base de scorbot

master
hadware 8 years ago
commit
0b2ed0e002
  1. 37
      cocobot.py
  2. 0
      tools/__init__.py
  3. 97
      tools/base.py
  4. 0
      tools/coco/__init__.py
  5. 58
      tools/coco/client.py
  6. 99
      tools/coco/requests.py
  7. 88
      tools/coco/tools.py
  8. 84
      tools/commons.py
  9. 2
      tools/constants.py
  10. 3
      tools/processors/__init__.py
  11. 36
      tools/processors/commons.py
  12. 29
      tools/processors/connections.py
  13. 33
      tools/processors/messages.py

37
cocobot.py

@ -0,0 +1,37 @@
#!/usr/bin/env python
import argparse
import asyncio
import logging
from tools.base import CoboBot
from tools.processors import MessageDispatcher, CommandsDispatcherProcessor, ConnectionDispatcher
from tools.processors.messages import RandomTrigger, PikaAttackCommand, \
RandomCommandTrigger
# setting up argument parser
parser = argparse.ArgumentParser(description='Le lou bot')
parser.add_argument('--cookie', type=str, help='usercookie to use')
parser.add_argument('--channel', type=str, help='channel to watch', default="")
parser.add_argument('--domain', type=str, help='domain to connect to', default="loult.family")
parser.add_argument('--port', type=int, help='port on which to connect the socket', default=80)
parser.add_argument('--method', type=str, help='http or https', default="https")
# setting up the various dispatchers
coco_commands = CommandsDispatcherProcessor([], "coco", default_response="de?")
root_messages_dispatcher = MessageDispatcher([coco_commands])
connections_dispatcher = ConnectionDispatcher([])
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
args = parser.parse_args()
cocobot = CoboBot(args.cookie, args.channel, args.domain, args.port, args.method,
root_messages_dispatcher, connections_dispatcher)
asyncio.get_event_loop().run_until_complete(pikabot.listen())

0
tools/__init__.py

97
tools/base.py

@ -0,0 +1,97 @@
import html
import json
import logging
import websockets
from tools.commons import AbstractResponse, Message, BotMessage, AttackCommand, Sound, UserList
from tools.processors import MessageDispatcher, ConnectionDispatcher
class CoboBot:
def __init__(self, cookie : str, channel : str, domain : str, port : int, method : str,
messages_dispatcher : MessageDispatcher,
connect_dispatcher : ConnectionDispatcher):
# setting up variables required by the server. The default is a Kabutops on the main lou server, I think
self.cookie = cookie
self.channel = "" if channel == "root" else channel
self.domain = domain
self.port = port
self.method = method
self.msg_dispatch = messages_dispatcher
self.cnt_dispatch = connect_dispatcher
self.user_list = None # type: UserList
async def _send_message(self, message):
if isinstance(message, dict):
await self.socket.send(json.dumps(message))
elif isinstance(message, bytes):
await self.socket.send(message)
async def _dispatch_response(self, response_obj : AbstractResponse):
if isinstance(response_obj, (Message, BotMessage, AttackCommand)):
await self._send_message(response_obj.to_dict())
elif isinstance(response_obj, Sound):
await self._send_message(response_obj.get_bytes())
async def _on_connect(self, msg_data):
# registering the user to the user list
self.user_list.add_user(msg_data["userid"], msg_data["params"])
logging.info("%s connected" % self.user_list.name(msg_data["userid"]))
message = self.cnt_dispatch.dispatch(msg_data["userid"], self.user_list)
await self._send_message(message)
async def _on_disconnect(self, msg_data):
# removing the user from the userlist
logging.info("%s disconnected" % self.user_list.name(msg_data["userid"]))
self.user_list.del_user(msg_data["userid"])
async def _on_message(self, msg_data):
msg_data["msg"] = html.unescape(msg_data["msg"]) # removing HTML shitty encoding
# logging the message to the DB
logging.info("%s says : \"%s\"" % (self.user_list.name(msg_data["userid"]), msg_data["msg"]))
response = None
# dispatching the message to the processors. If there's a response, send it to the chat
if not self.user_list.itsme(msg_data["userid"]):
response = self.msg_dispatch.dispatch(msg_data["msg"], msg_data["userid"], self.user_list)
if isinstance(response, list):
for response_obj in response:
await self._dispatch_response(response_obj)
elif isinstance(response, AbstractResponse):
await self._dispatch_response(response)
async def listen(self):
if self.method == "https":
socket_address = 'wss://%s/socket/%s' % (self.domain, self.channel)
else:
socket_address = 'ws://%s:%i/socket/%s' % (self.domain, self.port, self.channel)
logging.info("Listening to socket on %s" % socket_address)
async with websockets.connect(socket_address,
extra_headers={"cookie": "id=%s" % self.cookie}) as websocket:
self.socket = websocket
while True:
msg = await websocket.recv()
websocket.recv()
if type(msg) != bytes:
msg_data = json.loads(msg, encoding="utf-8")
msg_type = msg_data.get("type", "")
if msg_type == "userlist":
self.user_list = UserList(msg_data["users"])
logging.info(str(self.user_list))
elif msg_type == "msg":
await self._on_message(msg_data)
elif msg_type == "connect":
await self._on_connect(msg_data)
elif msg_type == "disconnect":
await self._on_disconnect(msg_data)
else:
logging.debug("Received sound file")

0
tools/coco/__init__.py

58
tools/coco/client.py

@ -0,0 +1,58 @@
from typing import List
from .requests import LoginRequest, PostLoginRequest
import logging
class Interlocutor:
def __init__(self, nick: str, age:int, city: str, is_male: bool, conv_id: str):
self.nick = nick
self.age = age
self.is_male = is_male
self.city = city
self.id = conv_id
@classmethod
def from_string(cls, str):
# 47130922100412004Rastapopoulos
# 47 (age) 1 (sexe) 30922 (city id) 100412(conv id)
age = str[:2]
is_male = int(str[2:3]) in (1, 6)
city_id = str[3:8]
conv_id = str[8:14]
nick = str[17:]
return cls(nick, age, city_id, is_male, conv_id)
def __eq__(self, other):
return other.nick == self.nick
class CocoClient:
def __init__(self, nick: str, age: int, is_female: bool, zip_code: str):
self.nick = nick
self.age = age
self.is_female = is_female
self.zip_code = zip_code
self.interlocutors = [] # type: List[Interlocutor]
self.user_id = None # type:str
self.user_pass = None # type:str
def connect(self):
login_req = LoginRequest(self.nick, self.age, self.is_female, self.zip_code)
self.user_id, self.user_pass = login_req.retrieve()
logging.info("Logged in to coco as %s" % self.nick)
post_login_req = PostLoginRequest(self.user_id, self.user_pass)
post_login_req.retrieve()
logging.info("Post login successful")
def pulse(self):
pass
def send_msg(self):
pass
def switch_conv(self, nick: str=None):
pass

99
tools/coco/requests.py

@ -0,0 +1,99 @@
from urllib.request import Request, urlopen
from random import randint, choice, random
from string import ascii_uppercase
from .tools import get_city_id, coco_cipher, encode_msg
class BaseCocoRequest:
host = 'http://cloud.coco.fr/'
headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:59.0) Gecko/20100101 Firefox/59.0',
'Cookie': '_ga=GA1.2.795717920.1518381607',
'Host': 'cloud.coco.fr',
'Referer': 'http://www.coco.fr/chat/index.html'}
def _get_url(self):
pass
def _parse_response(self, response):
pass
def retrieve(self):
req = Request(self._get_url(), headers=self.headers)
response = urlopen(req).read()
cleaned = response.decode("utf-8")[len("process1('#"):-len("');")]
return self._parse_response(cleaned)
class LoginRequest(BaseCocoRequest):
def __init__(self, nick: str, age: int, is_female: bool, zip_code: str):
self.nick = nick
self.age = str(age)
self.sex = str('2' if is_female else '1')
self.city = get_city_id(zip_code)
def _get_url(self):
identifier = str(randint(100000000, 990000000)) + '0' + ''.join(choice(ascii_uppercase) for i in range(20))
return self.host + '40' + self.nick + '*' + self.age + self.sex + self.city + identifier
def _parse_response(self, response):
credentials = response[2:14]
return credentials[:6], credentials[6:] # user_id and password
class LoggedInRequest(BaseCocoRequest):
def __init__(self, user_id, password):
self.user_id = user_id
self.password = password
@property
def token(self):
return self.user_id + self.password
class PostLoginRequest(LoggedInRequest):
client_id_str = "3551366741*0*1aopiig*-940693579*192.168.0.14*0"
def _get_url(self):
return self.host + '52' + self.token + coco_cipher(self.client_id_str, self.password)
def _parse_response(self, response):
"""Checks if the post-login was successful"""
# TODO
pass
class PulseRequest(LoggedInRequest):
# typical response :
# process1('#669276787#30130916276787003HotelDiscret#salut_toi#47130922100412004Rastapopoulos#Mes_hommages,_Mademoiselle...#47130922100412004Rastapopoulos#Jamais_un_mari_ne_sera_si_bien_veng*r_que_par_l*8amant_de_sa_femme.#40636427396758003leo913#cam.!7w2738702leo913#396758#');
# process1('#669276787#30130916276787003HotelDiscret#chaude=#292223#32130926292223003HDirect#Salut,_te_faire_payer_pour_un_plan_sexe_ca_te_plairais_=#');
# process1('#66945630927183748003WolfiSoDentelle#en_manque_de_sommeil_peut_etre_=_#');
# ^ le type a 45 balais donc il doit falloir couper après 669
# process1('#66929630926396791003Clouds#bonsoir,_comment_vas-tu_=_que_cherches_tu_=#');
# idem avec lui, il a 29 ans
def _get_url(self):
return self.host + "95" + self.token + "?" + str(random())
def _parse_response(self, response):
"""Can either be a single message or several messages"""
# TODO
pass
class SendMsgRequest(LoggedInRequest):
def __init__(self, user_id, password, conv_id: str, msg: str):
super().__init__(user_id, password)
self.conv_id = conv_id
self.msg = msg
def _get_url(self):
return self.host + "99" + self.token + self.conv_id + encode_msg(self.msg)
def _parse_response(self, response):
"""Response to a send message request can either just be #97x or an
actual message (like in pulse request)"""
# TODO
pass

88
tools/coco/tools.py

@ -0,0 +1,88 @@
from urllib.request import urlopen
import re
doc = {0: 65, 1: 66, 2: 67, 3: 68, 4: 69, 5: 70, 6: 71, 7: 72, 8: 73, 9: 74, 10: 75, 11: 76, 12: 77, 13: 78, 14: 79,
15: 80, 16: 81, 17: 82, 18: 83, 19: 84, 20: 101, 21: 102, 22: 103, 23: 104, 24: 105, 25: 106, 26: 107,
27: 108, 28: 109, 29: 110, 30: 85, 31: 86, 32: 87, 33: 88, 34: 89, 35: 90, 36: 97, 37: 98, 38: 99, 39: 100,
40: 111, 41: 112, 42: 113, 43: 114, 44: 115, 45: 116, 46: 117, 47: 118, 48: 119, 49: 120, 50: 121, 51: 122,
52: 48, 53: 49, 54: 50, 55: 51, 56: 52, 57: 53, 58: 54, 59: 55, 60: 56, 61: 57, 62: 43, 63: 47, 64: 61}
def coco_cipher(str: str, key: str):
"""Implementation of coco's weird 'enxo' cipher. key has to be the user's password,
retrieved from a previous request"""
def none_int(var):
return var if var is not None else 0
def safe_get_charcode(s: str, idx: int):
try:
return ord(s[idx])
except IndexError:
return None
output, chr1, chr2, chr3 = "", 0, 0, 0
enc, revo = {}, {}
for j in range(65):
revo[doc[j]] = j
result = ""
for i, char_n in enumerate(str):
result += chr(ord(key[i % len(key)]) ^ ord(char_n))
i = 0
while i < len(str):
chr1 = safe_get_charcode(result, i)
i += 1
chr2 = safe_get_charcode(result, i)
i += 1
chr3 = safe_get_charcode(result, i)
i += 1
enc[0] = none_int(chr1) >> 2
enc[1] = ((none_int(chr1) & 3) << 4) | (none_int(chr2) >> 4)
enc[2] = ((none_int(chr2) & 15) << 2) | (none_int(chr3) >> 6)
enc[3] = none_int(chr3) & 63
if chr2 is None:
enc[2] = 64
enc[3] = 64
elif chr3 is None:
enc[3] = 64
for j in range(4):
output += chr(doc[enc[j]])
return output
def get_city_id(postal_code: str):
response = str(urlopen("http://www.coco.fr/cocoland/%s.js" % postal_code).read(), 'utf-8')
first_city_code = re.search(r'[0-9]+', response)
if first_city_code:
return first_city_code.group()
elif 'ERROR' in response:
raise ValueError('Invalid postal code')
else:
RuntimeError('Unexpected output')
smilies = [":)", ":(", ";)", ":d", ":-o", ":s", ":$", "*-)", "-)", "^o)", ":p", "(l)", "(v)", ":'(", "(h)", "(f)",
":@", "(y)", "(n)", "(k)", "gr$", "(a)", "(6)", "(yn)", "+o(", "na$", "oh$", "tr$", "(e)", "sh$", "fu$",
"nw$", "ba$", "ao$", "db$", "si$", "oo$", "co$", "bi$", "cc$", "ye$", "mo$", "aa$", "ci$", "uu$", "ff$",
"zz$", "gt$", "ah$", "mm$", "?$", "xx$"]
special_chars = {" ": "~", "!": "!", "\"": "*8", "$": "*7", "%": "*g", "'": "*8", "(": "(", ")": ")", "*": "*s", "=": "*h",
"?": "=", "@": "*m", "^": "*l", "_": "*0", "": "*d", "à": "*a", "â": "*k", "ç": "*c", "è": "*e",
"é": "*r", "ê": "*b", "î": "*i", "ï": "*j", "ô": "*o", "ù": "*f", "û": "*u"}
def encode_msg(msg : str):
"""Encoding the message to coco's weird url-compatible format"""
for i in range(len(smilies)):
msg = msg.replace(smilies[i], ';' + str(i).zfill(2))
for char, replacement in special_chars.items():
msg = msg.replace(char, replacement)
return msg
def decode_msg(msg: str):
pass

84
tools/commons.py

@ -0,0 +1,84 @@
class UserList:
"""Wrapper around the 'currently connected users' dictionary"""
def __init__(self, userlist_data):
self.users = {user_data["userid"]: user_data for user_data in userlist_data}
for id, user in self.users.items():
if "you" in user["params"] and user["params"]["you"]:
self.my_id = id
def del_user(self, user_id):
del self.users[user_id]
def add_user(self, user_id, params):
self.users[user_id] = {"params": params}
def __getitem__(self, item : str):
return self.users[item]["params"]
def name(self, user_id):
return self.users[user_id]["params"]["name"]
def get_all_names(self):
return [user["params"]["name"] for user in self.users.values()]
def itsme(self, user_id):
return self.my_id == user_id
@property
def my_name(self):
return self.name(self.my_id)
def __str__(self):
return "Connected users :\n" \
"%s" % "\n".join(["\t - %s" % self.name(user_id) for user_id in self.users])
class AbstractResponse:
pass
class Message(AbstractResponse):
def __init__(self, msg: str):
self.msg = msg
def to_dict(self):
return {"lang": "fr", "msg": self.msg, "type": "msg"}
class BotMessage(AbstractResponse):
def __init__(self, msg: str):
self.msg = msg
def to_dict(self):
return {"type": "bot", "msg": self.msg}
class Sound(AbstractResponse):
def __init__(self, sound_filepath: str):
self.filepath = sound_filepath
def get_bytes(self):
with open(self.filepath, "rb") as soundfile:
return soundfile.read()
class AttackCommand(AbstractResponse):
def __init__(self, target_name: str, offset=1):
self.target = target_name
self.offset = offset
def to_dict(self):
return {"target": self.target, "order": self.offset, "type": "attack"}
class Sleep(AbstractResponse):
def __init__(self, duration : int):
self.duration = duration

2
tools/constants.py

@ -0,0 +1,2 @@
DEFAULT_COOKIE = "nwoiw"
DEFAULT_CHANNEL = "zizi"

3
tools/processors/__init__.py

@ -0,0 +1,3 @@
from .messages import *
from .connections import *
from .commons import MessageDispatcher

36
tools/processors/commons.py

@ -0,0 +1,36 @@
import logging
from typing import List
from tools.commons import UserList
class MessageProcessor:
"""Parent class for all processors. A processor is basically a bot response triggered by a condition.
The match method returns a boolean telling if the trigger condition is verified, and the process
method returns a string of the output message. All bot processors are to be grouped in a list then
given to a Dispatcher"""
def match(self, text : str, sender_id : str, users_list : UserList) -> bool:
"""Returns true if this is the kind of text a processor should respond to"""
pass
def process(self, text : str, sender_id : str, users_list : UserList) -> str:
"""Processes a message and returns an answer"""
pass
class MessageDispatcher:
"""Dispatches the current input message to the first botprocessor that matches the context given
to the dispatch method."""
def __init__(self, processor_list : List[MessageProcessor]):
self.processor_list = processor_list
def dispatch(self,text : str, sender_id : str, users_list : UserList) -> str:
"""Tells its first botprocessor to match the message to process this message and returns its answer"""
for processor in self.processor_list:
if processor.match(text, sender_id, users_list):
logging.info("Matched %s" % processor.__class__.__name__)
return processor.process(text, sender_id, users_list)
return None

29
tools/processors/connections.py

@ -0,0 +1,29 @@
import logging
from typing import List
from tools.commons import UserList
class ConnectProcessor:
def match(self, sender_id: str, users_list: UserList) -> bool:
"""Returns true if this is the kind of user connection a processor should respond to"""
pass
def process(self, sender_id: str, users_list: UserList) -> str:
"""Processes a message and returns an answer"""
pass
class ConnectionDispatcher:
def __init__(self, processor_list: List[ConnectProcessor]):
self.processor_list = processor_list
def dispatch(self, sender_id: str, users_list: UserList) -> str:
"""Tells its first botprocessor to match the message to process this message and returns its answer"""
for processor in self.processor_list:
if processor.match(sender_id, users_list):
logging.info("Matched %s" % processor.__class__.__name__)
return processor.process(sender_id, users_list)
return None

33
tools/processors/messages.py

@ -0,0 +1,33 @@
from tools.commons import Message
from .commons import *
class DispatcherBotProcessor(MessageProcessor):
"""A processor that matches a context, then forwards the message to a list of sub-processors.
This enables the botprocessor-matching mechanism to behave kinda like a decision tree"""
def __init__(self, processors_list : List[MessageProcessor]):
self.dispatcher = MessageDispatcher(processors_list)
def process(self, text : str, sender_id : str, users_list : UserList):
return self.dispatcher.dispatch(text, sender_id, users_list)
class CommandsDispatcherProcessor(DispatcherBotProcessor):
"""Reacts to commands of the form '/botname command' or 'botname, command' """
def __init__(self, processors_list: List[MessageProcessor], trigger_word: str = None, default_response :str = None):
super().__init__(processors_list)
self.trigger = trigger_word
self.default_response = default_response if default_response is not None else "Commande non reconnue, pd"
def match(self, text : str, sender_id : str, users_list : UserList):
trigger = self.trigger.upper() if self.trigger is not None else users_list.my_name.upper()
return text.upper().startswith(trigger + ",") \
or text.upper().startswith("/" + trigger)
def process(self, text : str, sender_id : str, users_list : UserList):
without_cmd = text[len(users_list.my_name)+1:]
response = super().process(without_cmd, sender_id, users_list)
return Message(self.default_response) if response is None else response
Loading…
Cancel
Save