Source code for cogs.scheduler

"""This cog handles scheduled messages to a channel."""
import asyncio
import datetime
import logging

import discord
from discord.ext import tasks, commands

import config
from utils import checks, quickembed
from utils.fjclasses import DbHelper, DiscordUser

logger = logging.getLogger(__name__)


[docs]class Scheduler(commands.Cog): """The Scheduler cog class.""" def __init__(self, bot): self.bot = bot self.scheduled_payloads = {} self.check_scheduler.start() self.bot.loop.create_task(self.showtime_schedule_task()) @tasks.loop(minutes=1.0) async def check_scheduler(self): """A routine task that checks for the weekly schedule and updates pending items when appropriate. .. Note:: This task is executed every minute to check for an update. New tasks are created by sending it to :func:`cogs.scheduler.scheduler_task`. .. Important:: Python 3.7 does not have an easy way for naming a task. A dictionary of tasks is created and handled instead. """ await self.bot.wait_until_ready() now = datetime.datetime.now() key_flag = '%s_flag' % now.strftime('%A').lower() key_time = '%s_time' % now.strftime('%A').lower() scheduler_list = DbHelper().chatroom_scheduler_list() # check if pending tasks match for payload in scheduler_list: if payload['name'] in self.scheduled_payloads: if ( payload.items() != self.scheduled_payloads[payload['name']]['data'].items() ): logger.info( 'Found difference in pending task - `{}`'.format( payload['name'] ) ) self.scheduled_payloads[payload['name']]['task'].cancel() await self.scheduled_payloads[payload['name']]['task'] task_dt = datetime.datetime.combine( datetime.date.today(), (datetime.datetime.min + payload[key_time]).time(), ) # add new task if ( payload['active'] and payload[key_flag] and task_dt > now and payload['name'] not in self.scheduled_payloads ): self.scheduled_payloads[payload['name']] = {} self.scheduled_payloads[payload['name']]['data'] = payload self.scheduled_payloads[payload['name']]['task_datetime'] = task_dt self.scheduled_payloads[payload['name']]['task_wait_time'] = ( task_dt - now ).total_seconds() payload_task = self.bot.loop.create_task(self.scheduler_task(payload)) # 3.8 has ability to set name to task, but currently on 3.7 self.scheduled_payloads[payload['name']]['task'] = payload_task
[docs] async def scheduler_task(self, payload): """Handles a single weekly scheduled message by creating the message and sleeping until the appropriate time defined in the message. .. important:: The channel the message is sent to is defined in :mod:`config`. Modify where appropriate for your own Discord server. :param payload: The event details to deliver. See :func:`utils.fjclasses.DbHelper.chatroom_scheduler_list`. """ try: embed = quickembed.info(desc='Event') embed.add_field(name=payload['message'], value='\u200b', inline=False) if payload['name'] in ['RAW', 'SmackDown', 'NXT']: channel = self.bot.get_channel(config.base['channel']['wwe']) elif 'AEW' in payload['name']: channel = self.bot.get_channel(config.base['channel']['aew']) else: channel = self.bot.get_channel(config.base['channel']['general']) logger.info( 'Task scheduled - channel:`{}` name:`{}` sleep_until:`{}`'.format( channel.name, payload['name'], self.scheduled_payloads[payload['name']]['task_datetime'], ) ) await asyncio.sleep( self.scheduled_payloads[payload['name']]['task_wait_time'] ) # final check before sending message if ( channel and payload['name'] in self.scheduled_payloads and payload.items() == self.scheduled_payloads[payload['name']]['data'].items() ): await channel.send('@everyone', embed=embed) else: logger.info( 'Task message not sent. Payload does not match. - `{}`'.format( payload['name'] ) ) except asyncio.CancelledError: logger.info('Task Cancelled - `{}`'.format(payload['name'])) finally: del self.scheduled_payloads[payload['name']] logger.info('Task End - `{}`'.format(payload['name']))
@commands.command(name='scheduler', hidden=True) @commands.is_owner() @checks.is_registered() async def scheduler_pending(self, ctx): """Displays a list of pending alert messages. .. note:: Only the bot owner can use this. :param ctx: The invocation context. """ user = DiscordUser(ctx.author) embed = quickembed.info(desc="Today's Scheduled Alerts (PT)", user=user) embed.add_field( name='\u200b', value='\n'.join( [ '{1} - **{0}**'.format(k, v['task_datetime']) for k, v in self.scheduled_payloads.items() ] ), ) await ctx.send(embed=embed)
[docs] async def showtime_schedule_task(self): """Retrieves the closest event from the database and creates a scheduled message to post to a channel. .. important:: The channel the message is sent to is defined in :mod:`config`. Modify where appropriate for your own Discord server. """ await self.bot.wait_until_ready() while not self.bot.is_closed(): event = DbHelper().future_events()[0] dt = datetime.datetime.now() event_start_timer = (event['date_time'] - dt).total_seconds() embed = quickembed.info(desc='Event') embed.add_field( name='{} has begun!'.format(event['name']), value='\u200b', inline=False ) event_length_timer = 14400 if event['ppv']: channel = self.bot.get_channel(config.base['channel']['ppv']) else: continue logger.info( 'showtime_schedule_task: channel:`{}` events:`{}` sleep until:`{}`'.format( channel.name, event['name'], dt + datetime.timedelta(seconds=event_start_timer), ) ) await asyncio.sleep(event_start_timer) if channel: await channel.send('@everyone', embed=embed) activity = discord.Activity( type=discord.ActivityType.watching, name=event['name'] ) await self.bot.change_presence(activity=activity) await asyncio.sleep(event_length_timer) await self.bot.change_presence(activity=None) logger.info('END showtime_schedule_task')
def setup(bot): """Required for cogs. :param bot: The Discord bot. """ bot.add_cog(Scheduler(bot))