cacassetout #1

Manually merged
fomys merged 15 commits from cacassetout into master 2020-04-24 23:41:36 +02:00
52 changed files with 197 additions and 1987 deletions
Showing only changes of commit a590a4cba1 - Show all commits

View File

@ -13,7 +13,7 @@
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
sys.path.insert(0, os.path.abspath('../../'))
# -- Project information -----------------------------------------------------

View File

@ -1,2 +1,4 @@
cd doc
# Build html doc
pipenv run make html
cd ../

View File

@ -4,10 +4,12 @@
set -e
cd "${0%/*}/.."
cd "${0%/*}/../src"
echo "Running tests"
# Run test and ignore warnings
pipenv run pytest -p no:warnings
cd "../"

View File

@ -0,0 +1,147 @@
from __future__ import annotations
import importlib
import inspect
import logging
import os
import sys
import discord
import toml
from packaging.specifiers import SpecifierSet
from config import Config, config_types
from config.config_types import factory
from errors import IncompatibleModuleError
__version__ = "0.1.0"
MINIMAL_INFOS = ["version", "bot_version"]
class BotBase(discord.Client):
log = None
def __init__(self, data_folder: str = "data", modules_folder: str = "modules", *args, **kwargs):
super().__init__(*args, **kwargs)
# Create folders
os.makedirs(modules_folder, exist_ok=True)
os.makedirs(data_folder, exist_ok=True)
# Add module folder to search path
# TODO: Vérifier que ca ne casse rien
sys.path.insert(0, modules_folder)
# Setup logging
self.log = logging.getLogger('bot_base')
# Content: {"module_name": {"module": imported module, "class": initialized class}}
self.modules = {}
# Setup config
self.config = Config(path=os.path.join(data_folder, "config.toml"))
self.config.register("modules", factory(config_types.List, factory(config_types.Str)))
self.config.register("prefix", factory(config_types.Str))
self.config.register("admin_roles", factory(config_types.List, factory(config_types.discord_types.Role, self)))
self.config.register("admin_users", factory(config_types.List, factory(config_types.discord_types.User, self)))
self.config.register("main_guild", factory(config_types.discord_types.Guild, self))
self.config.register("locale", factory(config_types.Str))
self.config.register("data_folder", factory(config_types.Str))
self.config.register("modules_folder", factory(config_types.Str))
self.config.set({
"modules": [],
"prefix": "%",
"admin_roles": [],
"admin_users": [],
"main_guild": None,
"locale": "fr_FR.UTF8",
"data_folder": data_folder,
"modules_folder": modules_folder,
})
self.config.load()
self.load_module("test_module")
async def on_ready(self):
self.info("Bot ready.")
def load_module(self, module):
# Check if module exists
if not os.path.isdir(os.path.join(self.config["modules_folder"], module)):
raise ModuleNotFoundError(f"Module {module} not found in modules folder ({self.config['modules_folder']}.)")
if not os.path.isfile(os.path.join(self.config["modules_folder"], module, "infos.toml")):
raise IncompatibleModuleError(f"Module {module} is incompatible: no infos.toml found.")
# Check infos.toml integrity
with open(os.path.join(self.config["modules_folder"], module, "infos.toml")) as f:
infos = toml.load(f)
for key in MINIMAL_INFOS:
if key not in infos.keys():
raise IncompatibleModuleError(f"Missing information for module {module}: missing {key}.")
# Check bot_version
bot_version_specifier = SpecifierSet(infos["bot_version"])
if __version__ not in bot_version_specifier:
raise IncompatibleModuleError(f"Module {module} is not compatible with your current bot version "
f"(need {infos['bot_version']} and you have {__version__}).")
# Check if module have __main_class__
imported = importlib.import_module(module)
try:
main_class = imported.__main_class__
except AttributeError:
raise IncompatibleModuleError(f"Module {module} does not provide __main_class__.")
# Check if __main_class__ is a class
if not inspect.isclass(main_class):
raise IncompatibleModuleError(f"Module {module} contains __main_class__ but it is not a type.")
try:
main_class = main_class(self)
except TypeError:
# Module don't need client reference
main_class = main_class()
# Check if __main_class__ have __dispatch__ attribute
try:
dispatch = main_class.__dispatch__
except AttributeError:
raise IncompatibleModuleError(f"Module {module} mainclass ({main_class}) does not provide __dispatch__"
f" attribute)")
# Check if __dispatch__ is function
if not inspect.isfunction(dispatch):
raise IncompatibleModuleError(f"Module {module} mainclass ({main_class}) provides __dispatch__, but it is "
f"not a function ({dispatch}).")
# Check if __dispatch__ can have variable positional and keyword aguments (to avoid future error on each event)
sig = inspect.signature(dispatch)
args_present, kwargs_present = False, False
for p in sig.parameters.values():
if p.kind == p.VAR_POSITIONAL:
args_present = True
elif p.kind == p.VAR_KEYWORD:
kwargs_present = True
if not args_present:
raise IncompatibleModuleError(
f"Module {module} mainclass ({main_class}) provide __dispatch__ function, but "
f"this function doesn't accept variable positionnal arguments.")
if not kwargs_present:
raise IncompatibleModuleError(
f"Module {module} mainclass ({main_class}) provide __dispatch__ function, but "
f"this function doesn't accept variable keywords arguments.")
# Module is compatible!
# Add module to loaded modules
self.modules.update({
module: {
"imported": imported,
"initialized_class": main_class,
"dispatch": dispatch,
}
})
# Logging
def info(self, *args, **kwargs):
if self.log:
self.log.info(*args, **kwargs)
self.dispatch("on_log_info", *args, **kwargs)
def error(self, *args, **kwargs):
if self.log:
self.log.error(*args, **kwargs)
self.dispatch("on_log_error", *args, **kwargs)
def warning(self, *args, **kwargs):
if self.log:
self.log.warning(*args, **kwargs)
self.dispatch("on_log_warning", *args, **kwargs)

View File

@ -1,3 +1,4 @@
from config.base import Config
from . import config_types
from .base import Config
__all__ = ["Config"]
__all__ = ["Config", "config_types"]

View File

@ -1,16 +1,16 @@
from typing import Type
import config.config_types.discord_types
from config.config_types.base_type import BaseType
from config.config_types.bool import Bool
from config.config_types.color import Color
from config.config_types.dict import Dict
from config.config_types.float import Float
from config.config_types.int import Int
from config.config_types.list import List
from config.config_types.str import Str
from . import discord_types
from .base_type import BaseType
from .bool import Bool
from .color import Color
from .dict import Dict
from .float import Float
from .int import Int
from .list import List
from .str import Str
__all__ = ['factory', "BaseType", 'Dict', 'Float', 'Int', 'List', 'Str', 'discord_types', 'Bool', 'Color']
__all__ = ['factory', 'Dict', 'Float', 'Int', 'List', 'Str', 'discord_types', 'Bool', 'Color']
class Meta(type):
@ -32,7 +32,7 @@ def factory(type: Type[BaseType], *args, **kwargs):
>>> factory(Int, min=0, max=10)
<config_types.Int with parameters () {'min': 0, 'max': 10}>
:param type: Type to create
:param Type[BaseType] type: Type to create
:return: New type
"""

View File

@ -1,6 +1,6 @@
import typing
from config.config_types.base_type import BaseType
from .base_type import BaseType
class Bool(BaseType):

View File

@ -1,6 +1,6 @@
import typing
from config.config_types.base_type import BaseType
from .base_type import BaseType
class Color(BaseType):

View File

@ -1,6 +1,6 @@
import typing
from config.config_types.base_type import BaseType
from .base_type import BaseType
class Dict(BaseType):

View File

@ -1,6 +1,6 @@
from config.config_types.discord_types.channel import Channel
from config.config_types.discord_types.guild import Guild
from config.config_types.discord_types.user import User
from config.config_types.discord_types.role import Role
from config.config_types.discord_types.user import User
__all__ = ['Channel', "Guild", "User", "Role"]

View File

@ -6,22 +6,23 @@ import discord
from config.config_types.base_type import BaseType
LBI = typing.TypeVar('LBI')
if typing.TYPE_CHECKING:
from bot_base import BotBase
class Guild(BaseType):
#: :class:`LBI`: Client instance for checking
client: LBI
#: :class:`BotBase`: Client instance for checking
client: BotBase
#: :class:`typing.Optional` [:class:`int`]: Current guild id
value: typing.Optional[int]
#: :class:`typing.Optional` [:class:`discord.Guild`]: Current guild instance
guild_instance: typing.Optional[discord.Guild]
def __init__(self, client: LBI) -> None:
def __init__(self, client: BotBase) -> None:
"""
Base Guild type for config.
:param LBI client: Client instance
:param BotBase client: Client instance
:Basic usage:
@ -55,7 +56,7 @@ class Guild(BaseType):
if isinstance(value, discord.Guild):
id = value.id
if not self.client.is_ready():
self.client.warn("No check for guild `value` because client is not initialized!")
self.client.warning("No check for guild `value` because client is not initialized!")
return True
if self.client.get_guild(id):
return True

View File

@ -1,6 +1,6 @@
import typing
from config.config_types.base_type import BaseType
from .base_type import BaseType
class Float(BaseType):

View File

@ -1,6 +1,6 @@
import typing
from config.config_types.base_type import BaseType
from .base_type import BaseType
class Int(BaseType):

View File

@ -1,6 +1,6 @@
import typing
from config.config_types.base_type import BaseType
from .base_type import BaseType
class List(BaseType):

View File

@ -1,4 +1,4 @@
from config.config_types.base_type import BaseType
from .base_type import BaseType
class Str(BaseType):

View File

@ -1,28 +1,14 @@
class LBIException(Exception):
"""
Base exception class for LBI
All other exceptions are subclasses
"""
class BotBaseException(Exception):
pass
class ModuleException(LBIException):
"""
Base exception class for all module errors
"""
class ModuleException(BotBaseException):
pass
class ModuleNotInstalled(ModuleException):
"""
Raised when a module is not found in module directory
"""
class ModuleNotFound(ModuleException):
pass
class IncompatibleModule(ModuleException):
"""
Raised when a module is not compatible with bot version
"""
class IncompatibleModuleError(ModuleException):
pass

View File

@ -1,173 +1,13 @@
#!/usr/bin/python3
from __future__ import annotations
import asyncio
import importlib
import json
import locale
import logging
import logging.config
import os
import sys
import traceback
from typing import Dict
import discord
import humanize
from packaging.version import Version
from config import Config, config_types
from config.config_types import factory
from errors import IncompatibleModule
from modules.base import base_supported_type
__version__ = "0.1.0"
from bot_base.bot_base import BotBase
class Module:
name: str
def __init__(self, name: str):
"""
Init module
:param name: Module name
:type name: str
"""
self.name = name
MODULES.update({self.name: self})
@property
def type(self) -> str:
"""
Return module type. It can be python or lua
:return: Module type
:rtype: str
"""
if not os.path.exists(os.path.join("modules", self.name, "version.json")):
return ""
with open(os.path.join("modules", self.name, "version.json")) as file:
versions = json.load(file)
return versions["type"]
@property
def exists(self) -> bool:
"""
Check if module exists
:return: True if module is present in modules folders
:rtype: bool
"""
if not os.path.isdir(os.path.join("modules", self.name)):
return False
return True
@property
def complete(self) -> bool:
"""
Check if module is complete
:return: True if module is compatible
:rtype: Boolean
"""
# Check if version.json exists
if not os.path.exists(os.path.join("modules", self.name, "version.json")):
return False
with open(os.path.join("modules", self.name, "version.json")) as file:
versions = json.load(file)
if "version" not in versions.keys():
return False
if "dependencies" not in versions.keys():
return False
if "bot_version" not in versions.keys():
return False
if "type" not in versions.keys():
return False
if versions["type"] not in base_supported_type:
return False
return True
@property
def version(self) -> Version:
"""
Returns module version
:return: current module version
:rtype: Version
"""
with open(os.path.join("modules", self.name, "version.json")) as file:
versions = json.load(file)
return Version(versions["version"])
@property
def bot_version(self) -> dict:
"""
returns the min and max version of the bot that is compatible with the module
:return: Min and max version for bot
:rtype: dict
:raises IncompatibleModule: If bot_version is not properly formated (there must be min and max keys)
"""
with open(os.path.join("modules", self.name, "version.json")) as file:
versions = json.load(file)
try:
return {"min": Version(versions["bot_version"]["min"]),
"max": Version(versions["bot_version"]["max"])}
except KeyError:
raise IncompatibleModule(f"Module {self.name} is not compatible with bot (version.json does not "
f"contain bot_version.max or bot_version.min item)")
@property
def dependencies(self) -> dict:
"""
return list of dependencies version
:raise IncompatibleModule: If bot_version is not properly formated (there must be min and max keys for each dependencies)
:return: list of dependencies version
:rtype: dict
"""
with open(os.path.join("modules", self.name, "version.json")) as file:
versions = json.load(file)
try:
deps = {}
for name, dep in versions["dependencies"].items():
dep_ver = {"min": Version(dep["min"]),
"max": Version(dep["max"])}
deps.update({name: dep_ver})
return deps
except KeyError:
raise IncompatibleModule(f"Module {self.name} is not compatible with bot (version.json does not "
f"contain dependencies.modulename.max or dependencies.modulename.min item)")
@property
def compatible(self) -> bool:
"""
Check if module is compatible with current installation
:return: True if all dependencies are okays
:rtype: bool
"""
# Check bot version
bot_ver = Version(__version__)
if bot_ver < self.bot_version["min"]:
return False
if bot_ver > self.bot_version["max"]:
return False
for name, dep in self.dependencies.items():
if name not in MODULES.keys():
Module(name)
if MODULES[name].version < dep["min"]:
return False
if MODULES[name].version > dep["max"]:
return False
return True
MODULES: Dict[str, Module] = {}
def setup_logging(default_path='config/log_config.json', default_level=logging.INFO, env_key='LBI_LOG_CONFIG'):
def setup_logging(default_path='data/log_config.json', default_level=logging.INFO, env_key='LBI_LOG_CONFIG'):
"""Setup logging configuration
"""
path = default_path
@ -182,318 +22,14 @@ def setup_logging(default_path='config/log_config.json', default_level=logging.I
logging.basicConfig(level=default_level)
def modules_edit(func):
def wrapper(self, *args, **kwargs):
if self.reloading:
return func(self, *args, **kwargs)
else:
self.reloading = True
a = func(self, *args, **kwargs)
self.reloading = False
return a
return wrapper
def event(func):
def wrapper(self, *args, **kwargs):
if self.reloading:
return lambda: None
else:
return func(self, *args, **kwargs)
return wrapper
"""def async_event(func):
async def wrapper(self, *args, **kwargs):
if self.reloading:
return lambda: None
else:
return func(self, *args, **kwargs)
return wrapper"""
setup_logging()
log_discord = logging.getLogger('discord_types')
log_LBI = logging.getLogger('LBI')
log_communication = logging.getLogger('communication')
def load_modules_info():
for mod in os.listdir("modules"):
Module(mod)
class LBI(discord.Client):
by_id: ClientById
base_path = "data"
debug = log_LBI.debug
info = log_LBI.info
warning = log_LBI.warning
warn = warning
error = log_LBI.error
critical = log_LBI.critical
def __init__(self, config: Config = None, *args, **kwargs):
super().__init__(*args, **kwargs)
if config is None:
config = Config(path="data/config.toml", client=self)
self.reloading = False
self.by_id = ClientById(self)
self.ready = False
# Content: {"module_name": {"module": imported module, "class": initialized class}}
self.modules = {}
self.config = config
self.config.register("modules", factory(config_types.List, factory(config_types.Str)))
self.config.register("prefix", factory(config_types.Str))
self.config.register("admin_roles", factory(config_types.List, factory(config_types.discord_types.Role, self)))
self.config.register("admin_users", factory(config_types.List, factory(config_types.discord_types.User, self)))
self.config.register("main_guild", factory(config_types.discord_types.Guild, self))
self.config.register("locale", factory(config_types.Str))
self.config.set({
"modules": ["modules", "errors"],
"prefix": "%",
"admin_roles": [],
"admin_users": [],
"main_guild": None,
"locale": "fr_FR.UTF8",
})
locale.setlocale(locale.LC_TIME, self.config['locale'])
humanize.i18n.activate(self.config['locale'])
self.load_modules()
@modules_edit
def load_modules(self):
self.info("Starts to load modules...")
e = {}
for module in self.config["modules"]:
e.update({module: self.load_module(module)})
self.info("Finished to load all modules.")
return e
@modules_edit
def load_module(self, module):
"""
Status codes:
- 0: Module loaded
- 1: Module not in modules folder
- 2: Module incomplete
- 3: Module incompatible
:param module: Module name
:return: Status code
"""
# Check module compatibility
load_modules_info()
if not MODULES.get(module):
return 1
if not MODULES[module].exists:
return 1
if not MODULES[module].complete:
return 2
if not MODULES[module].compatible:
return 3
deps = MODULES[module].dependencies
for dep in deps.keys():
if dep not in self.modules.keys():
if dep != "base":
self.load_module(dep)
if MODULES[module].type == "python":
try:
self.info("Start loading module {module}...".format(module=module))
imported = importlib.import_module('modules.' + module)
importlib.reload(imported)
initialized_class = imported.MainClass(self)
self.modules.update({module: {"imported": imported, "initialized_class": initialized_class}})
self.info("Module {module} successfully imported.".format(module=module))
initialized_class.dispatch("load")
if module not in self.config["modules"]:
self.config["modules"].append(module)
self.config.save()
except AttributeError as e:
self.error("Module {module} doesn't have MainClass.".format(module=module))
raise e
return 0
elif MODULES[module].type == "lua":
self.info(f"Start loading module {module}...")
imported = importlib.import_module('modules.base.BaseLua')
importlib.reload(imported)
initialized_class = imported.BaseClassLua(self, path=f"modules/{module}/main")
self.modules.update({module: {"imported": imported, "initialized_class": initialized_class}})
self.info(f"Module {module} successfully imported.")
initialized_class.dispatch("load")
if module not in self.config["modules"]:
self.config["modules"].append(module)
self.config.save()
return 0
@modules_edit
def unload_module(self, module):
self.info("Start unload module {module}...".format(module=module))
try:
if module in self.config["modules"]:
self.config["modules"].remove(module)
self.config.save()
self.unload_all()
self.load_modules()
except KeyError as e:
self.error("Module {module} not loaded.").format(module=module)
return e
@modules_edit
def reload(self):
del self.modules
self.load_modules()
@modules_edit
def unload_all(self):
del self.modules
self.modules = {}
@event
def dispatch(self, event, *args, **kwargs):
# Dispatch to handle wait_* commands
super().dispatch(event, *args, **kwargs)
# Dispatch to modules
for module in self.modules.values():
module["initialized_class"].dispatch(event, *args, **kwargs)
async def on_error(self, event_method, *args, **kwargs):
"""Function called when error happend"""
# This event is special because it is call directly
self.error(traceback.format_exc())
for module in self.modules.values():
await module["initialized_class"].on_error(event_method, *args, **kwargs)
class ClientById:
client: LBI
def __init__(self, client_):
self.client = client_
async def fetch_message(self, id_, *args, **kwargs):
"""Find a message by id
:param id_: Id of message to find
:type id_: int
:raises discord_types.NotFound: This exception is raised when a message is not found (or not accessible by bot)
:rtype: discord.Message
:return: discord_types.Message instance if message is found.
"""
msg = None
for channel in self.client.get_all_channels():
try:
return await channel.fetch_message(id_, *args, **kwargs)
except discord.NotFound:
continue
if msg is None:
raise discord.NotFound(None, "Message not found")
async def edit_message(self, id, *args, **kwargs):
"""
Edit message by id
:param id: Id of the message to edit
:type id: int"""
message = await self.fetch_message(id)
return await message.edit(**kwargs)
async def remove_reaction(self, id_message, *args, **kwargs):
"""Remove reaction from message by id
:param id_message: Id of message
:type id_message: int"""
message = await self.fetch_message(id_message)
return await message.remove_reaction(*args, **kwargs)
async def send_message(self, id_, *args, **kwargs):
"""Send message by channel id
:param id_: Id of channel where to send message
:type id_: int"""
channel = self.client.get_channel(id_)
return channel.send(*args, **kwargs)
def get_role(self, id_=None, name=None, check=None, guilds=None):
"""Get role by id or with custom check"""
if guilds is None:
guilds = self.client.guilds
if id_ is not None:
for guild in guilds:
role = discord.utils.get(guild.roles, id=id_)
if role:
return role
if name is not None:
for guild in guilds:
role = discord.utils.get(guild.roles, name=name)
if role:
return role
if check is not None:
role = None
for guild in guilds:
for role_ in guild.roles:
if check(role_):
role = role_
break
if role is not None:
break
return role
return None
class Communication(asyncio.Protocol):
debug = log_communication.debug
info = log_communication.info
warning = log_communication.warning
error = log_communication.error
critical = log_communication.critical
name = "Communication"
def __init__(self, client):
self.client = client
self.transport = None
def connection_made(self, transport):
print('%s: connection made' % self.name)
self.transport = transport
def data_received(self, data):
print('%s: data received: %r' % (self.name, data))
def eof_received(self):
pass
def connection_lost(self, exc):
print('%s: connection lost: %s' % (self.name, exc))
if __name__ == "__main__":
client1 = LBI(max_messages=500000)
communication = Communication(client1)
client = BotBase(max_messages=500000)
async def start_bot():
await client1.start(os.environ.get("DISCORD_TOKEN"))
print(os.path.join("/tmp", os.path.dirname(os.path.realpath(__file__))) + ".sock")
await client.start(os.environ.get("DISCORD_TOKEN"))
loop = asyncio.get_event_loop()
t = loop.create_unix_server(Communication,
path=os.path.join("/tmp", os.path.dirname(os.path.realpath(__file__)) + ".sock"))
if not sys.platform == "win32":
loop.run_until_complete(t)
loop.create_task(start_bot())
loop.run_forever()

View File

@ -1,56 +0,0 @@
import datetime
import discord
import utils.emojis
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "Avalon"
help = {
"description": "Maître du jeu Avalon.",
"commands": {
"`{prefix}{command} join`": "",
"`{prefix}{command} quit`": "",
"`{prefix}{command} players list`": "",
"`{prefix}{command} players kick (<user_id>/<@mention>)`": "",
"`{prefix}{command} roles setup`": "",
"`{prefix}{command} roles list`": "",
}
}
help_active = True
command_text = "perdu"
color = 0xff6ba6
def __init__(self, client):
super().__init__(client)
self.config.set({"spectate_channel": 0,
"illustrations":{"merlin":"",
"perceval":"",
"gentil":"",
"assassin":"",
"mordred":"",
"morgane":"",
"oberon":"",
"mechant":""},
"couleurs":{"merlin":"",
"perceval":0,
"gentil":0,
"assassin":0,
"mordred":0,
"morgane":0,
"oberon":0,
"mechant":0,
"test":15},
"test":{"merlin":"",
"perceval":0,
"gentil":0,
"assassin":0,
"mordred":0,
"morgane":0,
"oberon":0,
"mechant":0,
"test":15}
})

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,3 +0,0 @@
from .BasePython import BaseClassPython
from .BaseLua import BaseClassLua
base_supported_type = ["python", "lua"]

View File

@ -1,276 +0,0 @@
"""Base class for module, never use directly !!!"""
import asyncio
import os
from typing import List, Union, Optional
import discord
from config import Config
from config import config_types
from config.config_types import factory
from storage import Objects
from utils import emojis
class BaseClass:
"""Base class for all modules, Override it to make submodules"""
name = ""
help = {
"description": "",
"commands": {
}
}
def __init__(self, client):
"""Initialize module class
Initialize module class, always call it to set self.client when you override it.
:param client: client instance
:type client: LBI"""
self.client = client
self.objects = Objects(path=os.path.join("data", self.name.lower()))
self.config = Config(path=os.path.join("data", self.name.lower(), "config.toml"))
self.config.register("help_active", factory(config_types.Bool))
self.config.register("color", factory(config_types.Color))
self.config.register("auth_everyone", factory(config_types.Bool))
self.config.register("authorized_roles",
factory(config_types.List, factory(config.config_types.discord_types.Role, client)))
self.config.register("authorized_users",
factory(config_types.List, factory(config.config_types.discord_types.User, client)))
self.config.register("command_text", factory(config_types.Str))
self.config.set({"help_active": True,
"color": 0x000000,
"auth_everyone": False,
"authorized_roles": [],
"authorized_users": [],
"command_text": self.name.lower()})
self.config.load()
async def send_help(self, channel):
embed = discord.Embed(
title="[{nom}] - Aide".format(nom=self.name),
description="*" + self.help["description"].format(prefix=self.client.config['prefix']) + "*",
color=self.config["color"]
)
for command, description in self.help["commands"].items():
embed.add_field(
name=command.format(prefix=self.client.config['prefix'], command=self.config["command_text"]),
value="-> " + description.format(prefix=self.client.config['prefix'],
command=self.config["command_text"]),
inline=False)
await channel.send(embed=embed)
def auth(self, user: discord.User, role_list: List[int] = None, user_list: List[int] = None,
guild: int = None):
"""
Return True if user is an owner of the bot or in authorized_users or he have a role in authorized_roles.
:param user: User to check
:param user_list: List of authorized users, if not specified use self.authorized_users
:param role_list: list of authorized roles, if not specified use self.authorized_roles
:param guild: Specific guild to search role
:type user_list: List[Int]
:type role_list: List[Int]
:type guild: Int
:type user: discord.User
"""
if self.config["auth_everyone"]:
return True
if user_list is None:
user_list = self.config["authorized_users"] + self.client.config['admin_users']
if user.id in user_list:
return True
if role_list is None:
role_list = self.config["authorized_roles"] + self.client.config['admin_roles']
if guild is None:
guilds = self.client.guilds
else:
guilds = [guild]
for guild in guilds:
if guild.get_member(user.id):
for role_id in role_list:
if role_id in [r.id for r in guild.get_member(user.id).roles]:
return True
return False
async def parse_command(self, message):
"""Parse a command_text from received message and execute function
Parse message like `{prefix}{command_text} subcommand` and call class method `com_{subcommand}`.
:param message: message to parse
:type message: discord.Message"""
command = self.client.config["prefix"] + (self.config["command_text"] if self.config["command_text"] else "")
if message.content.startswith(command):
content = message.content.split(" ", 1)[1 if " " in message.content else 0]
sub_command, args, kwargs = self._parse_command_content(content)
sub_command = "com_" + sub_command
if self.auth(message.author):
if sub_command in dir(self):
await self.__getattribute__(sub_command)(message, args, kwargs)
else:
await self.command(message, args, kwargs)
else:
await self.unauthorized(message)
@staticmethod
def _parse_command_content(content):
"""Parse string
Parse string like `subcommand argument "argument with spaces" -o -shortwaytopassoncharacteroption --longoption
-o "option with argument"`. You can override this function to change parsing.
:param content: content to parse
:type content: str
:return: parsed arguments: [subcommand, [arg1, arg2, ...], [(option1, arg1), (option2, arg2), ...]]
:rtype: tuple[str, list, list]"""
if not len(content.split()):
return "", [], []
# Sub_command
sub_command = content.split()[0]
args_ = [sub_command]
kwargs = []
if len(content.split()) > 1:
# Remove subcommand
content = content.lstrip(sub_command)
# Take the other part of command_text
content = content.lstrip().replace("\"", "\"\"")
# Splitting around quotes
quotes = [element.split("\" ") for element in content.split(" \"")]
# Split all sub chains but raw chains and flat the resulting list
args = [item.split() if item[0] != "\"" else [item, ] for sublist in quotes for item in sublist]
# Second plating
args = [item for sublist in args for item in sublist]
# args_ are arguments, kwargs are options with arguments
i = 0
while i < len(args):
if args[i].startswith("\""):
args_.append(args[i][1:-1])
elif args[i].startswith("--"):
if i + 1 >= len(args):
kwargs.append((args[i].lstrip("-"), None))
break
if args[i + 1][0] != "-":
kwargs.append((args[i].lstrip("-"), args[i + 1].strip("\"")))
i += 1
else:
kwargs.append((args[i].lstrip("-"), None))
elif args[i].startswith("-"):
if len(args[i]) == 2:
if i + 1 >= len(args):
break
if args[i + 1][0] != "-":
kwargs.append((args[i].lstrip("-"), args[i + 1].strip("\"")))
i += 1
else:
kwargs.append((args[i].lstrip("-"), None))
else:
kwargs.extend([(arg, None) for arg in args[i][1:]])
else:
args_.append(args[i])
i += 1
return sub_command, args_, kwargs
async def on_message(self, message: discord.Message):
"""Override this function to deactivate command_text parsing"""
if message.author.bot:
return
await self.parse_command(message)
async def command(self, message, args, kwargs):
"""Override this function to handle all messages starting with `{prefix}{command_text}`
Function which is executed for all command_text doesn't match with a `com_{subcommand}` function"""
await self.send_help(message.channel)
async def com_help(self, message, args, kwargs):
await self.send_help(message.channel)
async def unauthorized(self, message):
await message.channel.send("Vous n'êtes pas autorisé à effectuer cette commande")
def dispatch(self, event, *args, **kwargs):
# Method to call
method = 'on_' + event
try:
# Try to get coro, if not exists pass without raise an error
coro = getattr(self, method)
except AttributeError:
pass
else:
# Run event
asyncio.ensure_future(self.client._run_event(coro, method, *args, **kwargs), loop=self.client.loop)
async def on_error(self, event_method, *args, **kwargs):
pass
async def choice(self, message: discord.Message, choices: List[Union[discord.Emoji, discord.PartialEmoji, str]],
validation: bool = False,
validation_emote: Union[discord.Emoji, discord.PartialEmoji, str] = emojis.WHITE_CHECK_MARK,
minimal_choices: int = 1,
maximal_choices: Optional[int] = None,
timeout: Optional[float] = None,
user: Optional[discord.User] = None,
unique: bool = False):
final_choices: List[Union[discord.Emoji, discord.PartialEmoji, str]] = []
validation_step = False
for emoji in choices:
await message.add_reaction(emoji)
def check_add(reaction, u):
nonlocal validation_step, final_choices
if (not user.bot) and (user is None or u.id == user.id):
if validation_step and reaction.emoji == validation_emote:
return True
if reaction in choices:
if not unique or reaction.emoji not in final_choices:
final_choices.append(reaction.emoji)
if maximal_choices is not None and len(final_choices) > maximal_choices:
validation_step = False
asyncio.ensure_future(message.remove_reaction(validation_emote, self.client.user))
try:
asyncio.get_running_loop().run_until_complete(message.clear_reaction(validation_emote))
except discord.errors.Forbidden:
pass
return False
if len(final_choices) >= minimal_choices:
if validation:
asyncio.get_running_loop().run_until_complete(message.add_reaction(validation_emote))
validation_step = True
return False
else:
return True
return False
def check_remove(reaction: discord.Reaction, u):
nonlocal validation_step, final_choices
if (not user.bot) and (user is None or u.id == user.id):
if reaction.emoji in choices:
if not unique or reaction.count != 0:
final_choices.remove(reaction.emoji)
if len(final_choices) < minimal_choices:
if validation_step:
asyncio.ensure_future(message.remove_reaction(validation_emote, self.client.user))
try:
asyncio.get_running_loop().run_until_complete(message.clear_reaction(validation_emote))
except discord.errors.Forbidden:
pass
validation_step = False
return False
if (maximal_choices is None or len(final_choices) <= maximal_choices) and len(
final_choices) >= minimal_choices:
if validation:
asyncio.get_running_loop().run_until_complete(message.add_reaction(validation_emote))
validation_step = True
return False
else:
return True
return False
done, pending = await asyncio.wait([
self.client.wait_for('reaction_add', timeout=timeout, check=check_add),
self.client.wait_for('reaction_remove', timeout=timeout, check=check_remove)],
return_when=asyncio.FIRST_COMPLETED)
return final_choices

View File

@ -1,73 +0,0 @@
"""Base class for module, never use directly !!!"""
import asyncio
import discord
import lupa
from modules import BaseClass
class BaseClassLua(BaseClass):
"""Base class for all modules, Override it to make submodules"""
name = ""
help = {
"description": "",
"commands": {
}
}
help_active = False
color = 0x000000
command_text = None
authorized_users = []
authorized_roles = []
command_text = ""
def __init__(self, client, path):
"""Initialize module class
Initialize module class, always call it to set self.client when you override it.
:param client: client instance
:type client: NikolaTesla"""
super().__init__(client)
# Get lua globals
self.lua = lupa.LuaRuntime(unpack_returned_tuples=True)
self.luaMethods = self.lua.require(path)
def call(self, method, *args, **kwargs):
# Try to run lua method then python one
if self.luaMethods[method] is not None:
async def coro(*args, **kwargs):
self.luaMethods[method](self, asyncio.ensure_future, discord, *args, *kwargs)
asyncio.ensure_future(self.client._run_event(coro, method, *args, **kwargs), loop=self.client.loop)
try:
coro = getattr(self, method)
except AttributeError:
pass
else:
asyncio.ensure_future(self.client._run_event(coro, method, *args, **kwargs), loop=self.client.loop)
def dispatch(self, event, *args, **kwargs):
method = "on_"+event
self.call(method, *args, **kwargs)
async def parse_command(self, message):
"""Parse a command_text from received message and execute function
%git update
com_update(m..)
Parse message like `{prefix}{command_text} subcommand` and call class method `com_{subcommand}`.
:param message: message to parse
:type message: discord.Message"""
if message.content.startswith(self.client.config["prefix"] + (self.command_text if self.command_text else "")):
content = message.content.lstrip(
self.client.config["prefix"] + (self.command_text if self.command_text else ""))
sub_command, args, kwargs = self._parse_command_content(content)
sub_command = "com_" + sub_command
if self.auth(message.author):
self.call(sub_command, args, kwargs)
else:
await self.unauthorized(message)

View File

@ -1,8 +0,0 @@
"""Base class for module, never use directly !!!"""
from .Base import BaseClass
class BaseClassPython(BaseClass):
"""Base class for all modules, Override it to make submodules"""
pass

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,17 +0,0 @@
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "clean"
help = {
"description": "Supprime des messages",
"commands": {
"`{prefix}{command}`": "Supprime tous les messages du bot dans le salon"
}
}
async def command(self, message, args, kwargs):
def is_me(m):
return m.author == self.client.user
deleted = await message.channel.purge(limit=10000000, check=is_me)
await message.channel.send('Deleted {} message(s)'.format(len(deleted)))

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,120 +0,0 @@
import asyncio
import random
import traceback
import discord
from discord import Message
from config import config_types
from config.config_types import factory
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "errors"
authorized_users = []
authorized_roles = []
help = {
"description": "Montre toutes les erreurs du bot dans discord_types.",
"commands": {
"`{prefix}{command}`": "Renvoie une erreur de test.",
}
}
def __init__(self, client):
super().__init__(client)
self.config.register("dev_chan",
factory(config_types.List, factory(config_types.discord_types.Channel, client)))
self.config.register("memes", factory(config_types.List, factory(config_types.Str)))
self.config.register("icon", factory(config_types.Str))
self.config.set({"dev_chan": [], "memes": [""], "icon": ""})
self.errorsList = None
async def on_load(self):
if self.objects.save_exists('errorsList'):
self.errorsList = self.objects.load_object('errorsList')
else:
self.errorsList = []
async def on_ready(self):
for i in range(len(self.errorsList)):
try:
msg_id = self.errorsList.pop(0)
channel = self.client.get_channel(msg_id["channel_id"])
to_delete = await channel.fetch_message(msg_id["msg_id"])
await to_delete.delete()
except:
raise
self.objects.save_object('errorsList', self.errorsList)
async def command(self, message, args, kwargs):
raise Exception("KERNEL PANIC!!!")
async def on_error(self, event, *args, **kwargs):
"""Send error message"""
# Search first channel instance found in arg, then search in kwargs
channel = None
for arg in args:
if type(arg) == Message:
channel = arg.channel
break
if type(arg) == discord.TextChannel:
channel = arg
break
if channel is None:
for _, v in kwargs.items():
if type(v) == discord.Message:
channel = v.channel
break
if type(v) == discord.TextChannel:
channel = v
break # Create embed
embed = discord.Embed(
title="[Erreur] Aïe :/",
description="```python\n{0}```".format(traceback.format_exc()),
color=self.config["color"])
embed.set_image(url=random.choice(self.config["memes"]))
message_list = None
# Send message to dev channels
for chanid in self.config["dev_chan"]:
try:
await self.client.get_channel(chanid).send(
embed=embed.set_footer(text="Ce message ne s'autodétruira pas.", icon_url=self.config["icon"]))
except BaseException as e:
raise e
# Send message to current channel if exists
if channel is not None:
message = await channel.send(embed=embed.set_footer(text="Ce message va s'autodétruire dans une minute",
icon_url=self.config["icon"]))
msg_id = {"channel_id": message.channel.id, "msg_id": message.id}
self.errorsList.append(msg_id)
# Save message in errorsList now to keep them if a reboot happend during next 60 seconds
self.objects.save_object('errorsList', self.errorsList)
# Wait 60 seconds and delete message
# await asyncio.sleep(60)
try:
# channel = self.client.get_channel(msg_id["channel_id"])
# delete_message = await channel.fetch_message(msg_id["msg_id"])
# await delete_message.delete()
await message.add_reaction("🗑️")
try:
reaction, user = await self.client.wait_for('reaction_add',
timeout=60.0,
check=lambda r, u:
r.emoji == "🗑️" and not u.bot and self.auth(u))
except asyncio.TimeoutError:
await message.delete()
else:
await reaction.message.delete()
except:
raise
finally:
try:
self.errorsList.remove(msg_id)
except ValueError:
pass
# Save now to avoid deleting unkown message
self.objects.save_object('errorsList', self.errorsList)

View File

@ -1,14 +0,0 @@
{
"version": "0.1.0",
"type": "python",
"dependencies": {
"base": {
"min": "0.1.0",
"max": "0.1.0"
}
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,35 +0,0 @@
import discord
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "Aide"
help = {
"description": "Module d'aide",
"commands": {
"`{prefix}{command} list`": "Affiche une liste des modules ainsi qu'une desription",
"`{prefix}{command} <module>`": "Affiche l'aide sépcifique d'un module"# ,
# "`{prefix}{command} all`": "Affiche l'aide de tous les modules"
}
}
async def com_list(self, message, args, kwargs):
embed = discord.Embed(title="[Aide] - Liste des modules", color=self.config.color)
for moduleName in list(self.client.modules.keys()):
if self.client.modules[moduleName]["initialized_class"].config.help_active:
embed.add_field(
name=moduleName.capitalize(),
value=self.client.modules[moduleName]["initialized_class"].help["description"])
await message.channel.send(embed=embed)
# async def com_all(self, message, args, kwargs):
# for name, module in self.client.modules.items():
# await module["initialized_class"].send_help(message.channel)
async def command(self, message, args, kwargs):
if len(args) and args[0] in self.client.modules.keys() and self.client.modules[args[0]][
"initialized_class"].config.help_active:
await self.client.modules[args[0]]["initialized_class"].send_help(message.channel)
else :
await self.send_help(message.channel)

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,141 +0,0 @@
import os
import discord
from aiohttp import ClientConnectorError
from modules.base import BaseClassPython
from modules.modules.api import Api
class MainClass(BaseClassPython):
name = "modules"
help = {
"description": "Manage bot modules.",
"commands": {
"`{prefix}{command} list`": "List of available modules.",
"`{prefix}{command} enable <module>`": "Enable module `<module>`.",
"`{prefix}{command} disable <module>`": "Disable module `<module>`.",
"`{prefix}{command} reload <module>`": "Reload module `<module>`",
"`{prefix}{command} web_list`": "List all available modules from repository",
# "`{prefix}{command} web_source`": "List all source repositories",
# "`{prefix}{command} web_source remove <url>`": "Remove url from repository list",
# "`{prefix}{command} web_source add <url>`": "Add url to repository list",
}
}
def __init__(self, client):
super().__init__(client)
os.makedirs("modules", exist_ok=True)
self.api = Api()
@staticmethod
def get_all_modules():
all_items = os.listdir("modules")
modules = []
for item in all_items:
if item not in ["__init__.py", "base", "__pycache__"]:
if os.path.isfile(os.path.join("modules", item)):
modules.append(item[:-3])
else:
modules.append(item)
return set(modules)
async def com_enable(self, message, args, kwargs):
args = args[1:]
if len(args) == 0:
await message.channel.send("You must specify at least one module.")
return
if len(args) == 1 and args[0] == "*":
for module in self.get_all_modules():
e = self.client.load_module(module)
if e:
await message.channel.send("An error occurred during the loading of the module {module}."
.format(module=module))
await self.com_list(message, args, kwargs)
return
for arg in args:
e = self.client.load_module(arg)
if e == 1:
await message.channel.send(f"Module {arg} not exists.")
if e == 2:
await message.channel.send(f"Module {arg} is incompatible.")
elif e:
await message.channel.send(f"An error occurred during the loading of the module {arg}: {e}.")
await self.com_list(message, args, kwargs)
async def com_reload(self, message, args, kwargs):
args = args[1:]
if len(args) == 0:
await message.channel.send("You must specify at least one module.")
return
if len(args) == 1 and args[0] == "*":
for module in self.get_all_modules():
e = self.client.unload_module(module)
if e:
await message.channel.send(f"An error occurred during the unloading of the module {module}.")
e = self.client.load_module(module)
if e:
await message.channel.send(f"An error occurred during the loading of the module {module}.")
await self.com_list(message, args, kwargs)
return
for arg in args:
e = self.client.unload_module(arg)
if e:
await message.channel.send(f"An error occurred during the unloading of the module {arg}.")
e = self.client.load_module(arg)
if e:
await message.channel.send(f"An error occurred during the loading of the module {arg}.")
await self.com_list(message, [], [])
async def com_disable(self, message, args, kwargs):
args = args[1:]
if len(args) == 0:
await message.channel.send("You must specify at least one module.")
return
if len(args) == 1 and args[0] == "*":
for module in self.get_all_modules():
e = self.client.unload_module(module)
if e:
await message.channel.send(f"An error occurred during the loading of the module {module}.")
await self.com_list(message, args, kwargs)
return
for arg in args:
e = self.client.unload_module(arg)
if e:
await message.channel.send(f"An error occurred during the loading of the module {arg}: {e}.")
await self.com_list(message, [], [])
async def com_list(self, message, args, kwargs):
list_files = self.get_all_modules()
activated = set(self.client.config["modules"])
if len(activated):
activated_string = "\n+ " + "\n+ ".join(activated)
else:
activated_string = ""
if len(activated) != len(list_files):
deactivated_string = "\n- " + "\n- ".join(list_files.difference(activated))
else:
deactivated_string = ""
embed = discord.Embed(title="[Modules] - Liste des modules",
description="```diff{activated}{deactivated}```".format(
activated=activated_string,
deactivated=deactivated_string)
)
await message.channel.send(embed=embed)
async def com_web_list(self, message, args, kwargs):
try:
modules = await self.api.list()
except ClientConnectorError:
await message.channel.send("Connection impossible au serveur.")
return
text = ""
for module, versions in modules.items():
text += module + " - " + ", ".join(versions)
await message.channel.send(text)
async def com_web_dl(self, message, args, kwargs):
try:
await self.api.download(args[1], args[2])
except ClientConnectorError:
await message.channel.send("Connection impossible au serveur.")

View File

@ -1,40 +0,0 @@
import os
import shutil
import aiohttp
import aiofiles
import zipfile
class Api:
def __init__(self, host="localhost:5000"):
self.host = host
self.basepath = "http://"+host+"/api/current"
async def _get(self, endpoint):
if endpoint[0] != "/":
endpoint = "/" + endpoint
async with aiohttp.ClientSession() as session:
async with session.get(self.basepath+endpoint) as response:
return await response.json()
async def _download(self, endpoint, filename="temp"):
if endpoint[0] != "/":
endpoint = "/" + endpoint
async with aiohttp.ClientSession() as session:
async with session.get(self.basepath+endpoint) as resp:
f = await aiofiles.open(filename, mode='wb')
await f.write(await resp.read())
await f.close()
async def list(self):
return await self._get("modules/")
async def download(self, module, version):
await self._download("modules/"+module+"/"+version, filename="temp.zip")
# TODO: Supprimer le dossier ici
try:
shutil.rmtree(os.path.join("modules", module))
except:
print('Error while deleting directory')
with zipfile.ZipFile('temp.zip', "r") as z:
z.extractall(os.path.join("modules", module))

View File

@ -1,14 +0,0 @@
{
"version": "0.1.0",
"type": "python",
"dependencies": {
"base": {
"min": "0.1.0",
"max": "0.1.0"
}
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,31 +0,0 @@
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "NewMember"
help = {
"description": "Module d'accueil",
"commands": {
}
}
def __init__(self, client):
super().__init__(client)
self.config.set({"new_role": 0,
"motd": "Bienvenue !"})
async def on_ready(self):
guild = self.client.get_guild(self.client.config.main_guild)
for i, member in enumerate(guild.members):
if len(member.roles) == 1:
await member.add_roles(self.client.id.get_role(id_=self.config.new_role,
guilds=[self.client.get_guild(
self.client.config.main_guild)]))
if i % 50 == 0:
self.client.log(f"Attribution des roles automatique manqués... {i}/{len(guild.members)}")
async def on_member_join(self, member):
await member.add_roles(self.client.id.get_role(id_=self.config.new_role,
guilds=[self.client.get_guild(
self.client.config.main_guild)]))
await member.send(self.config.motd)

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,44 +0,0 @@
import time
import discord
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "Panic"
help = {
"description": "Dans quel état est Nikola Tesla",
"commands": {
"`{prefix}{command}`": "Donne l'état actuel de Nikola Tesla",
}
}
async def command(self, message, args, kwargs):
temperature = 0
with open("/sys/class/thermal/thermal_zone0/temp") as f:
temperature = int(f.read().rstrip("\n")) / 1000
with open("/proc/cpuinfo") as f:
cpu_count = f.read().count('\n\n')
embed = discord.Embed(title="[Panic] - Infos", color=self.config.color)
with open("/proc/loadavg") as f:
load_average = ["**" + str(round((val / cpu_count) * 100, 1)) + '%**' for val in
map(float, f.read().split(' ')[0:3])]
with open("/proc/uptime") as f:
uptime = time.gmtime(float(f.read().split(' ')[0]))
uptime = "**" + str(int(time.strftime('%-m', uptime)) - 1) + "** mois, **" + str(
int(time.strftime('%-d', uptime)) - 1) + "** jours, " + time.strftime(
'**%H** heures, **%M** minutes, **%S** secondes.', uptime)
embed.add_field(
name="Température",
value="Nikola est à **{temperature}°C**".format(temperature=temperature))
embed.add_field(
name="Charge moyenne",
value=f"{self.client.name} est en moyenne, utilisé à :\n sur une minute : %s\n sur cinq minutes : %s\n sur quinze minutes : %s" % tuple(
load_average))
embed.add_field(
name="Temps d'éveil",
value=f"{self.client.name} est éveillé depuis {uptime}".format(uptime=uptime))
await message.channel.send(embed=embed)

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,221 +0,0 @@
import datetime
import time
import discord
import humanize
import matplotlib.pyplot as np
import config
import utils.emojis
from config.config_types import factory
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "Perdu"
help = {
"description": "Module donnant les statistiques sur les perdants",
"commands": {
"`{prefix}{command}`": "Donne le classement des perdants de la semaine",
"`{prefix}{command} all`": "Donne le classement des perdants depuis toujours",
"`{prefix}{command} <nombre de jours>`": "Donne le classement des perdants sur la durée spécifiée",
"`{prefix}{command} stats [@mention]`": "Donne les statistiques d'un perdant.",
"`{prefix}{command} stats history": "Affiche un graphique avec le nombre de pertes."
}
}
def __init__(self, client):
super().__init__(client)
self.config.set({"channel": 0, "lost_role": 0, "min_delta": datetime.timedelta(minutes=26).total_seconds()})
self.config.register("channel", factory(config.config_types.Channel, self.client))
self.history = {}
async def on_ready(self):
await self.fill_history()
async def on_message(self, message: discord.Message):
# Fill history
if message.author.bot:
return
if message.channel.id == self.config.channel:
if message.author.id not in self.history.keys():
# Add new user if not found
self.history.update(
{message.author.id: ([(message.created_at, datetime.timedelta(seconds=0)), ])}
)
else:
# Update user and precompute timedelta
delta = message.created_at - self.history[message.author.id][-1][0]
if delta.total_seconds() >= self.config.min_delta:
self.history[message.author.id].append((message.created_at, delta))
await self.parse_command(message)
async def fill_history(self):
self.history = {}
async for message in self.client.get_channel(self.config.channel).history(limit=None):
if message.author.id not in self.history.keys():
# Add new user if not found
self.history.update({message.author.id: ([(message.created_at, datetime.timedelta(seconds=0)), ])})
else:
# Update user and precompute timedelta
delta = self.history[message.author.id][-1][0] - message.created_at
if delta.total_seconds() >= self.config.min_delta:
self.history[message.author.id].append((message.created_at, delta))
for user in self.history.keys():
self.history[user].sort(key=lambda x: x[0])
def get_top(self, top=10, since=datetime.datetime(year=1, month=1, day=1), with_user=None, only_users=None):
"""Return [(userid, [(date, delta), (date,delta), ...]), ... ]"""
# Extract only messages after until
if only_users is not None:
# Extract data for only_users
messages = []
for user in only_users:
try:
if self.history[user][-1][0] >= since:
messages.append((user, [message for message in self.history[user] if message[0] > since]))
except KeyError:
pass
messages.sort(key=lambda x: len(x[1]), reverse=True)
return messages
if with_user is None:
with_user = []
# Extract TOP top users, and with_users data
messages = []
for user in self.history.keys():
if self.history[user][-1][0] >= since:
messages.append((user, [message for message in self.history[user] if message[0] > since]))
messages.sort(key=lambda x: len(x[1]), reverse=True)
# Extract top-ten
saved_messages = messages[:min(top, len(messages))]
# Add with_user
saved_messages.extend([message for message in messages if message[0] in with_user])
return saved_messages
async def com_fill(self, message: discord.Message, args, kwargs):
if self.auth(message.author):
async with message.channel.typing():
await self.fill_history()
await message.channel.send("Fait.")
async def com_all(self, message: discord.Message, args, kwargs):
# Get all stats
top = self.get_top()
intervales = [sum(list(zip(*top[i][1]))[1], datetime.timedelta(0)) / len(top[i][1]) for i in range(len(top))]
embed_description = "\n".join(
f"{utils.emojis.write_with_number(i)} : <@{top[i][0]}> a **perdu {len(top[i][1])} fois** depuis la"
f" création du salon à en moyenne **"
f"{(str(intervales[i].days) + ' jours et' if intervales[i].days else '')} "
f"{str(int(intervales[i].total_seconds() % (24 * 3600) // 3600)) + ':' if intervales[i].total_seconds() > 3600 else ''}"
f"{int((intervales[i].total_seconds() % 3600) // 60)} "
f"{'heures' if intervales[i].total_seconds() > 3600 else 'minutes'} d'intervalle.**"
for i in range(len(top))
)[:2000]
await message.channel.send(embed=discord.Embed(title="G-Perdu - Tableau des scores",
description=embed_description,
color=self.config.color))
async def com_stats(self, message: discord.Message, args, kwargs):
# TODO: Finir sum
async with message.channel.typing():
if not ((not False or (not False or not ("sum" in args))) or not True):
if message.mentions:
top = self.get_top(only_users=[mention.id for mention in message.mentions] + [message.author.id])
else:
# TOP 5 + auteur
top = self.get_top(top=5, with_user=[message.author.id])
dates = []
new_top = {}
for t in top:
for date, _ in t[1]:
dates.append(date)
dates.sort()
dates.append(datetime.datetime.today() + datetime.timedelta(days=1))
for t in top:
user = t[0]
new_top.update({user: ([dates[0]], [0])})
i = 0
for date, _ in t[1]:
while date < dates[i]:
new_top[user][0].append(dates[i])
new_top[user][1].append(new_top[user][1][-1])
i += 1
new_top[user][0].append(date)
new_top[user][1].append(new_top[user][1][-1] + 1)
to_plot = [t[1][1:] for t in new_top.values()]
np.xlabel("Temps", fontsize=30)
np.ylabel("Score", fontsize=30)
np.title("Évolution du nombre de perdu au cours du temps.", fontsize=40)
np.legend()
file_name = f"/tmp/{time.time()}.png"
np.savefig(file_name, bbox_inches='tight')
await message.channel.send(file=discord.File(file_name))
if "history" in args:
since = datetime.datetime(year=1, month=1, day=1)
debut_message = "la création du salon"
top = 5
if "s" in [k[0] for k in kwargs]:
try:
d = [k[1] for k in kwargs if k[0] == "s"][0]
since = datetime.datetime.now() - datetime.timedelta(days=float(d))
debut_message = humanize.naturalday(since.date(), format='le %d %b')
except ValueError:
pass
if "t" in [k[0] for k in kwargs]:
try:
top = int([k[1] for k in kwargs if k[0] == "t"][0])
except ValueError:
pass
# Si mention, alors uniquement les mentions
if message.mentions:
top = self.get_top(since=since,
only_users=[mention.id for mention in message.mentions])
else:
# TOP 5 + auteur
top = self.get_top(since=since, top=top, with_user=[message.author.id])
new_top = {}
for t in top:
c = 0
counts = []
dates = []
for date, _ in t[1]:
c += 1
counts.append(c)
dates.append(date)
new_top.update({t[0]: (dates, counts)})
np.figure(num=None, figsize=(25, 15), dpi=120, facecolor='w', edgecolor='k')
for user, (dates, counts) in new_top.items():
np.plot_date(dates, counts, linestyle='-', label=str(self.client.get_user(user).name))
np.xlabel("Temps", fontsize=30)
np.ylabel("Score", fontsize=30)
np.legend(fontsize=20)
np.title(f"Évolution du nombre de perdu au cours du temps depuis {debut_message}.", fontsize=35)
file_name = f"/tmp/{time.time()}.png"
np.savefig(file_name, bbox_inches='tight')
await message.channel.send(file=discord.File(file_name))
async def command(self, message, args, kwargs):
if message.mentions:
await self.com_stats(message, args, kwargs)
since = datetime.datetime.now() - datetime.timedelta(days=7)
if args[0]:
try:
since = datetime.datetime.now() - datetime.timedelta(days=float(args[0]))
except ValueError:
pass
top = self.get_top(10, since)
intervales = [sum(list(zip(*top[i][1]))[1], datetime.timedelta(0)) / len(top[i][1]) for i in range(len(top))]
embed_description = "\n".join(
f"{utils.emojis.write_with_number(i)} : <@{top[i][0]}> a **perdu {len(top[i][1])} fois** depuis "
f"{humanize.naturalday(since.date(), format='le %d %b')} à en moyenne **"
f"{(str(intervales[i].days) + ' jours et' if intervales[i].days else '')} "
f"{str(int(intervales[i].total_seconds() % (24 * 3600) // 3600)) + ':' if intervales[i].total_seconds() > 3600 else ''}"
f"{int((intervales[i].total_seconds() % 3600) // 60)} "
f"{'heures' if intervales[i].total_seconds() > 3600 else 'minutes'} d'intervalle.**"
for i in range(len(top))
)[:2000]
await message.channel.send(embed=discord.Embed(title="G-Perdu - Tableau des scores",
description=embed_description,
color=self.config.color))

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,35 +0,0 @@
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "Purge"
help = {
"description": "Suppression de messages en block.",
"commands": {
"`{prefix}{command} <message_id>`": "Supprime tous les messages du salon jusqu'au message spécifié",
}
}
async def command(self, message, args, kwargs):
message_id = None
try:
message_id = int(args[0])
except ValueError:
pass
if len(args) and message_id is not None:
messages_list=[]
done=False
async for current in message.channel.history(limit=None):
if int(current.id) == message_id:
done = True
break
elif message.id != current.id:
messages_list.append(current)
if done:
chunks = [messages_list[x:x+99] for x in range(0, len(messages_list), 99)]
for chunk in chunks:
await message.channel.delete_messages(chunk)
await message.channel.send(f"**{len(messages_list)}** messages supprimés.")
else:
await message.channel.send("Le message spécifié n'a pas été trouvé.")
else:
await message.channel.send("Arguments invalides.")

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,38 +0,0 @@
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "ReadRules"
color = 0xff071f
help_active = False
help = {
"description": "Module d'accueil",
"commands": {
}
}
def __init__(self, client):
super().__init__(client)
self.config.set({"accepted_role": 0,
"new_role": 0,
"listen_chan": 0,
"log_chan": 0,
"passwords": [],
"succes_pm": "Félicitations, vous savez lire les règles!",
"succes": "{user} a désormais accepté."})
async def on_message(self, message):
if message.author.bot:
return
if message.channel.id == self.config.listen_chan:
if message.content.lower() in self.config.passwords:
new_role = self.client.id.get_role(id_=self.config.new_role, guilds=[message.channel.guild])
if new_role in message.author.roles:
await message.author.remove_roles(new_role)
await message.author.add_roles(self.client.id.get_role(id_=self.config.accepted_role,
guild=[message.channel.guild]))
await message.author.send(self.config.succes_pm)
await message.channel.guild.get_channel(self.config.log_chan).send(
self.config.succes.format(user=message.author.mention))
else:
await message.author.send(f"Le mot de passe que vous avez entré est incorrect : `{message.content}`.\nNous vous prions de lire le règlement afin d'accéder au serveur complet.")

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,19 +0,0 @@
import sys
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "Restart"
help = {
"description": "Module gérant les redémarrages de Nikola Tesla",
"commands": {
"`{prefix}{command}`": "Redémarre le bot.",
}
}
async def command(self, message, args, kwargs):
await message.channel.send(f"{message.author.mention}, Le bot va redémarrer.")
await self.client.logout()
# TODO: Faut vraiment faire mieux
sys.exit(0)

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,129 +0,0 @@
import discord
from modules.base import BaseClassPython
class RoleAttributionError(Exception):
pass
class AlreadyHasRoleError(RoleAttributionError):
pass
class AlreadyRemovedRole(RoleAttributionError):
pass
class UnavailableRoleError(RoleAttributionError):
pass
class MainClass(BaseClassPython):
name = "Roles"
help = {
"description": "Module gérant l'attribution des roles",
"commands": {
"`{prefix}{command} list`": "Liste les roles",
"`{prefix}{command} add <role> [role] ...`": "S'attribuer le(s) rôle(s) <role> ([role]...)",
"`{prefix}{command} remove <role> [role] ...`": "Se désattribuer le(s) rôle(s) <role> ([role]...)",
"`{prefix}{command} toggle <role> [role] ...`": "S'attribuer (ou désattribuer) le(s) rôle(s) <role> ([role]...)",
"`{prefix}{command} <role> [role] ...`": "Alias de `{prefix}{command} toggle`",
}
}
def __init__(self, client):
super().__init__(client)
self.config.set({"roles": {}})
async def com_list(self, message, args, kwargs):
response = discord.Embed(title="Roles disponibles", color=self.config.color)
for id_ in self.config.roles.keys():
role = message.guild.get_role(id_=int(id_))
if role is not None:
response.add_field(name=role.name, value=f"-> `{self.config.roles[id_]}`", inline=True)
await message.channel.send(embed=response)
async def com_add(self, message, args, kwargs):
if len(args) <= 1:
await message.channel.send("Il manque des arguments à la commande")
for role in args[1:]:
try:
await self.try_add_role(message.author, role)
except discord.errors.Forbidden:
await message.channel.send(f"Je n'ai pas la permission de modifier le role {role}.")
except AlreadyHasRoleError:
await message.channel.send(f"Vous avez déjà le role {role}.")
except UnavailableRoleError:
await message.channel.send(f"Le role {role} n'est pas une role disponible à l'autoattribution.")
async def com_remove(self, message, args, kwargs):
if len(args) <= 1:
await message.channel.send("Il manque des arguments à la commande")
for role in args[1:]:
try:
await self.try_remove_role(message.author, role)
except discord.errors.Forbidden:
await message.channel.send(f"Je n'ai pas la permission de modifier le role {role}.")
except AlreadyRemovedRole:
await message.channel.send(f"Vous n'avez pas le role {role}.")
except UnavailableRoleError:
await message.channel.send(f"Le role {role} n'est pas une role disponible à l'autoattribution.")
async def com_toggle(self, message, args, kwargs):
if len(args) <= 1:
await message.channel.send("Il manque des arguments à la commande")
for role in args[1:]:
try:
await self.try_toggle_role(message.author, role)
except discord.errors.Forbidden:
await message.channel.send(f"Je n'ai pas la permission de modifier le role {role}.")
except AlreadyHasRoleError:
await message.channel.send(f"Vous avez déjà le role {role}.")
except AlreadyRemovedRole:
await message.channel.send(f"Vous n'avez pas le role {role}.")
except UnavailableRoleError:
await message.channel.send(f"Le role {role} n'est pas une role disponible à l'autoattribution.")
async def command(self, message, args, kwargs):
if len(args) < 1:
await message.channel.send("Il manque des arguments à la commande")
for role in args:
try:
await self.try_toggle_role(message.author, role)
except discord.errors.Forbidden:
await message.channel.send(f"Je n'ai pas la permission de modifier le role {role}.")
except AlreadyHasRoleError:
await message.channel.send(f"Vous avez déjà le role {role}.")
except AlreadyRemovedRole:
await message.channel.send(f"Vous n'avez pas le role {role}.")
except UnavailableRoleError:
await message.channel.send(f"Le role {role} n'est pas une role disponible à l'autoattribution.")
def get_member(self, user):
return self.client.get_guild(self.client.config.main_guild).get_member(user.id)
def get_role(self, role):
role = self.client.id.get_role(name=role, guilds=[self.client.get_guild(self.client.config.main_guild)],
check=lambda x: x.name.lower() == role.lower())
if role is None or str(role.id) not in self.config.roles.keys():
raise UnavailableRoleError()
return role
async def try_toggle_role(self, user, role):
if self.get_role(role) in self.get_member(user).roles:
await self.try_remove_role(user, role)
else:
await self.try_add_role(user, role)
async def try_add_role(self, user, role):
role = self.get_role(role)
if role in user.roles:
raise AlreadyHasRoleError()
await self.get_member(user).add_roles(role, reason="Auto-attribution")
async def try_remove_role(self, user, role):
role = self.get_role(role)
if role not in user.roles:
raise AlreadyRemovedRole()
await self.get_member(user).remove_roles(role, reason="Auto-désattribution")

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1,25 +0,0 @@
import random
import discord
from modules.base import BaseClassPython
class MainClass(BaseClassPython):
name = "rtfgd"
help = {
"description": "Read the fucking google doc",
"commands": {
"{prefix}{command} <mention>": "Demande gentilment de lire le google doc"
}
}
def __init__(self, client):
super().__init__(client)
self.config.set({"memes": []})
async def command(self, message, args, kwargs):
await message.channel.send(
" ".join(member.mention for member in message.mentions),
embed=discord.Embed(title="Read da fu**ing GOOGLE DOCS ! (╯°□°)╯︵ ┻━┻",
color=self.config.color).set_image(url=random.choice(self.config.memes)))

View File

@ -1,11 +0,0 @@
{
"version":"0.1.0",
"type": "python",
"dependencies": {
},
"bot_version": {
"min": "0.1.0",
"max": "0.1.0"
}
}

View File

@ -1 +1,4 @@
from .jsonencoder import Encoder
from .objects import Objects
__all__ = ["Objects", "Encoder"]

View File

@ -1,7 +1,7 @@
import json
import os
from storage import jsonencoder
from . import jsonencoder
class Objects:

View File

@ -0,0 +1,3 @@
from . import emojis
__all__ = ["emojis"]