"""This cog handles scheduled messages to a channel."""
import asyncio
import datetime
import logging
import discord
from discord.ext import tasks, commands
import config
from utils import checks, quickembed
from utils.fjclasses import DbHelper, DiscordUser
logger = logging.getLogger(__name__)
[docs]class Scheduler(commands.Cog):
"""The Scheduler cog class."""
def __init__(self, bot):
self.bot = bot
self.scheduled_payloads = {}
self.check_scheduler.start()
self.bot.loop.create_task(self.showtime_schedule_task())
@tasks.loop(minutes=1.0)
async def check_scheduler(self):
"""A routine task that checks for the weekly schedule and updates pending items when appropriate.
.. Note::
This task is executed every minute to check for an update. New tasks are created by sending it to
:func:`cogs.scheduler.scheduler_task`.
.. Important::
Python 3.7 does not have an easy way for naming a task. A dictionary of tasks is created and handled
instead.
"""
await self.bot.wait_until_ready()
now = datetime.datetime.now()
key_flag = '%s_flag' % now.strftime('%A').lower()
key_time = '%s_time' % now.strftime('%A').lower()
scheduler_list = DbHelper().chatroom_scheduler_list()
# check if pending tasks match
for payload in scheduler_list:
if payload['name'] in self.scheduled_payloads:
if (
payload.items()
!= self.scheduled_payloads[payload['name']]['data'].items()
):
logger.info(
'Found difference in pending task - `{}`'.format(
payload['name']
)
)
self.scheduled_payloads[payload['name']]['task'].cancel()
await self.scheduled_payloads[payload['name']]['task']
task_dt = datetime.datetime.combine(
datetime.date.today(),
(datetime.datetime.min + payload[key_time]).time(),
)
# add new task
if (
payload['active']
and payload[key_flag]
and task_dt > now
and payload['name'] not in self.scheduled_payloads
):
self.scheduled_payloads[payload['name']] = {}
self.scheduled_payloads[payload['name']]['data'] = payload
self.scheduled_payloads[payload['name']]['task_datetime'] = task_dt
self.scheduled_payloads[payload['name']]['task_wait_time'] = (
task_dt - now
).total_seconds()
payload_task = self.bot.loop.create_task(self.scheduler_task(payload))
# 3.8 has ability to set name to task, but currently on 3.7
self.scheduled_payloads[payload['name']]['task'] = payload_task
[docs] async def scheduler_task(self, payload):
"""Handles a single weekly scheduled message by creating the message and sleeping until the appropriate time
defined in the message.
.. important::
The channel the message is sent to is defined in :mod:`config`.
Modify where appropriate for your own Discord server.
:param payload: The event details to deliver. See :func:`utils.fjclasses.DbHelper.chatroom_scheduler_list`.
"""
try:
embed = quickembed.info(desc='Event')
embed.add_field(name=payload['message'], value='\u200b', inline=False)
if payload['name'] in ['RAW', 'SmackDown', 'NXT']:
channel = self.bot.get_channel(config.base['channel']['wwe'])
elif 'AEW' in payload['name']:
channel = self.bot.get_channel(config.base['channel']['aew'])
else:
channel = self.bot.get_channel(config.base['channel']['general'])
logger.info(
'Task scheduled - channel:`{}` name:`{}` sleep_until:`{}`'.format(
channel.name,
payload['name'],
self.scheduled_payloads[payload['name']]['task_datetime'],
)
)
await asyncio.sleep(
self.scheduled_payloads[payload['name']]['task_wait_time']
)
# final check before sending message
if (
channel
and payload['name'] in self.scheduled_payloads
and payload.items()
== self.scheduled_payloads[payload['name']]['data'].items()
):
await channel.send('@everyone', embed=embed)
else:
logger.info(
'Task message not sent. Payload does not match. - `{}`'.format(
payload['name']
)
)
except asyncio.CancelledError:
logger.info('Task Cancelled - `{}`'.format(payload['name']))
finally:
del self.scheduled_payloads[payload['name']]
logger.info('Task End - `{}`'.format(payload['name']))
@commands.command(name='scheduler', hidden=True)
@commands.is_owner()
@checks.is_registered()
async def scheduler_pending(self, ctx):
"""Displays a list of pending alert messages.
.. note::
Only the bot owner can use this.
:param ctx: The invocation context.
"""
user = DiscordUser(ctx.author)
embed = quickembed.info(desc="Today's Scheduled Alerts (PT)", user=user)
embed.add_field(
name='\u200b',
value='\n'.join(
[
'{1} - **{0}**'.format(k, v['task_datetime'])
for k, v in self.scheduled_payloads.items()
]
),
)
await ctx.send(embed=embed)
[docs] async def showtime_schedule_task(self):
"""Retrieves the closest event from the database and creates a scheduled message to post to a channel.
.. important::
The channel the message is sent to is defined in :mod:`config`.
Modify where appropriate for your own Discord server.
"""
await self.bot.wait_until_ready()
while not self.bot.is_closed():
event = DbHelper().future_events()[0]
dt = datetime.datetime.now()
event_start_timer = (event['date_time'] - dt).total_seconds()
embed = quickembed.info(desc='Event')
embed.add_field(
name='{} has begun!'.format(event['name']), value='\u200b', inline=False
)
event_length_timer = 14400
if event['ppv']:
channel = self.bot.get_channel(config.base['channel']['ppv'])
else:
continue
logger.info(
'showtime_schedule_task: channel:`{}` events:`{}` sleep until:`{}`'.format(
channel.name,
event['name'],
dt + datetime.timedelta(seconds=event_start_timer),
)
)
await asyncio.sleep(event_start_timer)
if channel:
await channel.send('@everyone', embed=embed)
activity = discord.Activity(
type=discord.ActivityType.watching, name=event['name']
)
await self.bot.change_presence(activity=activity)
await asyncio.sleep(event_length_timer)
await self.bot.change_presence(activity=None)
logger.info('END showtime_schedule_task')
def setup(bot):
"""Required for cogs.
:param bot: The Discord bot.
"""
bot.add_cog(Scheduler(bot))