diff --git a/config/celery.py b/config/celery.py index 5c641355..54b411de 100644 --- a/config/celery.py +++ b/config/celery.py @@ -37,3 +37,9 @@ def setup_periodic_tasks(sender, **kwargs): crontab(hour=4, minute=5), app.signature("core.tasks.clear_static_content_cache"), ) + + # Fetch Slack activity. Executes daily at 3:07 AM. + sender.add_periodic_task( + crontab(hour=3, minute=7), + app.signature("slack.tasks.fetch_slack_activity"), + ) diff --git a/config/settings.py b/config/settings.py index 1a2c88d0..9178c2c1 100755 --- a/config/settings.py +++ b/config/settings.py @@ -90,6 +90,7 @@ INSTALLED_APPS += [ "mailing_list", "news", "core", + "slack", ] AUTH_USER_MODEL = "users.User" @@ -560,3 +561,5 @@ OAUTH_APP_NAME = ( # Frame loading X_FRAME_OPTIONS = "SAMEORIGIN" + +SLACK_BOT_TOKEN = env("SLACK_BOT_TOKEN", default="") diff --git a/env.template b/env.template index 36821f35..b9e668c2 100644 --- a/env.template +++ b/env.template @@ -53,3 +53,5 @@ GITHUB_OAUTH_CLIENT_SECRET= GOOGLE_OAUTH_CLIENT_ID= GOOGLE_OAUTH_CLIENT_SECRET= + +SLACK_BOT_TOKEN=changeme diff --git a/kube/boost/values-cppal-dev-gke.yaml b/kube/boost/values-cppal-dev-gke.yaml index 2e45992f..224f23ef 100644 --- a/kube/boost/values-cppal-dev-gke.yaml +++ b/kube/boost/values-cppal-dev-gke.yaml @@ -204,6 +204,11 @@ Env: secretKeyRef: name: hyperkitty key: database_name + - name: SLACK_BOT_TOKEN + valueFrom: + secretKeyRef: + name: slack + key: slack_bot_token # Static content cache timeout - name: STATIC_CACHE_TIMEOUT value: "60" diff --git a/kube/boost/values-production-gke.yaml b/kube/boost/values-production-gke.yaml index 768e17be..d980c98c 100644 --- a/kube/boost/values-production-gke.yaml +++ b/kube/boost/values-production-gke.yaml @@ -204,6 +204,11 @@ Env: secretKeyRef: name: hyperkitty key: database_name + - name: SLACK_BOT_TOKEN + valueFrom: + secretKeyRef: + name: slack + key: slack_bot_token # Static content cache timeout - name: STATIC_CACHE_TIMEOUT value: "60" diff --git a/kube/boost/values-stage-gke.yaml b/kube/boost/values-stage-gke.yaml index fce86cd7..b51dcd3f 100644 --- a/kube/boost/values-stage-gke.yaml +++ b/kube/boost/values-stage-gke.yaml @@ -204,6 +204,11 @@ Env: secretKeyRef: name: hyperkitty key: database_name + - name: SLACK_BOT_TOKEN + valueFrom: + secretKeyRef: + name: slack + key: slack_bot_token # Static content cache timeout - name: STATIC_CACHE_TIMEOUT value: "60" diff --git a/kube/boost/values.yaml b/kube/boost/values.yaml index 940a43ae..c28eab9e 100644 --- a/kube/boost/values.yaml +++ b/kube/boost/values.yaml @@ -184,6 +184,11 @@ Env: secretKeyRef: name: hyperkitty key: database_name + - name: SLACK_BOT_TOKEN + valueFrom: + secretKeyRef: + name: slack + key: slack_bot_token # Static content cache timeout - name: STATIC_CACHE_TIMEOUT value: "60" diff --git a/requirements.in b/requirements.in index 52ef8f8b..b51ddd1f 100644 --- a/requirements.in +++ b/requirements.in @@ -69,3 +69,4 @@ elasticsearch==7.17.9 # Github ghapi requests +slack_sdk diff --git a/requirements.txt b/requirements.txt index cc3fe0e2..9f5d0774 100644 --- a/requirements.txt +++ b/requirements.txt @@ -333,6 +333,8 @@ six==1.16.0 # django-rest-auth # fs # python-dateutil +slack-sdk==3.33.2 + # via -r ./requirements.in soupsieve==2.6 # via beautifulsoup4 sqlparse==0.5.1 diff --git a/slack/__init__.py b/slack/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/slack/apps.py b/slack/apps.py new file mode 100644 index 00000000..5cff7f25 --- /dev/null +++ b/slack/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class SlackConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "slack" diff --git a/slack/management/__init__.py b/slack/management/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/slack/management/commands/__init__.py b/slack/management/commands/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/slack/management/commands/fetch_slack_activity.py b/slack/management/commands/fetch_slack_activity.py new file mode 100644 index 00000000..33544325 --- /dev/null +++ b/slack/management/commands/fetch_slack_activity.py @@ -0,0 +1,326 @@ +import logging +import datetime +import functools + +from slack_sdk import WebClient +from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler +import djclick as click +from django.db import transaction, connection +from django.db.models.functions import Now, Cast +from django.db.models import Q, FloatField +from django.conf import settings +from django.core.management import CommandError + +from slack.models import ( + SlackUser, + SlackActivityBucket, + Channel, + ChannelUpdateGap, + Thread, + parse_ts, + ToTimestamp, +) + + +client = WebClient(token=settings.SLACK_BOT_TOKEN) +client.retry_handlers.append(RateLimitErrorRetryHandler(max_retry_count=10)) + +logger = logging.getLogger(__name__) + + +def get_my_channels(): + for page in client.conversations_list(): + for channel in page["channels"]: + if channel["is_member"]: + yield channel + + +def channel_messages_in_range(channel, oldest, latest): + """ + All messages in a channel newer than oldest (not inclusive so we don't + double count). Returns an iterator over pages, which are iterators over + messages. Newest messages come first. + """ + pages = client.conversations_history( + channel=channel, + oldest=oldest, + latest=latest, + inclusive=False, + ) + for page in pages: + yield page["messages"] + + +def thread_messages_newer(channel, thread_ts, oldest): + """ + All messages in a thread newer than oldest (not inclusive so we don't + double count). Returns an iterator over pages. Oldest messages come first. + """ + pages = client.conversations_replies( + channel=channel, + ts=thread_ts, + oldest=oldest, + inclusive=False, + ) + for page in pages: + yield page["messages"] + + +# Track users whose profile information has been updated in our DB and +# doesn't need to be checked again. +USERS_CACHE = {} + + +def get_or_create_user(user_id): + try: + return USERS_CACHE[user_id] + except KeyError: + # Even if the user exists already in our db, they may have changed + # their information in slack so we need to check. + user_data = client.users_info(user=user_id) + obj, _ = SlackUser.objects.update_or_create( + id=user_id, + defaults={ + "name": user_data.data["user"]["name"], + "real_name": user_data.data["user"].get("real_name", ""), + "email": user_data.data["user"]["profile"].get("email", ""), + "image_48": user_data["user"]["profile"].get("image_48", ""), + }, + ) + USERS_CACHE[user_id] = obj + return obj + + +def should_track_message(message): + # These are not regular messages + # https://api.slack.com/events/message#subtypes + return message.get("subtype") in {None, "me_message"} and "bot_id" not in message + + +def fill_channel_gap(gap: ChannelUpdateGap, debug: bool): + """ + Download and process channel messages (not including replies to threads) in + the (possibly unbounded) range specified by `gap`. + """ + logger.info( + "Fetching channel history for %r (%r) in range (%s,%s)", + gap.channel.name, + gap.channel.id, + gap.oldest_message_ts, + gap.newest_message_ts, + ) + pages = channel_messages_in_range( + channel=gap.channel.id, + latest=gap.newest_message_ts, + oldest=gap.oldest_message_ts, + ) + first = True + for page in pages: + # use a separate transaction per page to allow restoring from an + # interrupted run. + with transaction.atomic(): + for message in page: + if first and gap.newest_message_ts is None: + gap.channel.last_update_ts = message["ts"] + gap.channel.save() + first = False + # Shrink the gap, but no need to save until we've finished this + # page (transactionally). + gap.newest_message_ts = message["ts"] + + if not should_track_message(message): + continue + + if "user" in message: + user = get_or_create_user(message["user"]) + if debug: + gap.channel.seenmessage_set.create(ts=message["ts"]) + SlackActivityBucket.track_activity(gap.channel, user, message["ts"]) + + if message.get("thread_ts"): + # Track this thread in the db to be able to check for + # updates later. + Thread.objects.create( + channel=gap.channel, + thread_ts=message["thread_ts"], + # None indicates that this thread still must be updated + # even if it's old. + last_update_ts=None, + ) + gap.save() + logger.debug( + "Channel %r retrieved up to %s (%s)", + gap.channel.name, + # for the 'up to current' gap, newest_message_ts will be None + # and instead oldest_message_ts will be where we stopped. + gap.newest_message_ts or gap.oldest_message_ts, + parse_ts(gap.newest_message_ts or gap.oldest_message_ts), + ) + # If we get here we must have gotten up to gap.oldest_message_ts, the gap + # is now empty. If we're interrupted before we get here, the gap will stay + # and be picked up from where we left off on the next run. + gap.delete() + + +def do_thread(thread: Thread, debug: bool): + """ + Download and process new messages in the specified thread. + """ + pages = thread_messages_newer( + channel=thread.channel_id, + thread_ts=thread.thread_ts, + oldest=thread.last_update_ts, + ) + for page in pages: + with transaction.atomic(): + for message in page: + if message["thread_ts"] == message["ts"]: + # This is the parent message, it was already counted as a + # channel message. Slack always returns the first message + # even if it's older than the oldest we requested. + if thread.last_update_ts is None: + # However, still record that this thread was updated. + # I think this will only will only matter if all + # messages in the thread have been deleted. + thread.last_update_ts = message["ts"] + continue + + # We never need to look at this message again. Oldest messages + # come first unlike for channels. + thread.last_update_ts = message["ts"] + + if not should_track_message(message): + continue + + if debug: + thread.channel.seenmessage_set.create( + ts=message["ts"], + thread=thread, + ) + user = get_or_create_user(message["user"]) + SlackActivityBucket.track_activity(thread.channel, user, message["ts"]) + thread.save() + + +def locked(fn): + """ + Runs the decorated function while holding a lock to prevent multiple + concurrent instances. + """ + + @functools.wraps(fn) + def inner(*args, **kwargs): + cur = connection.cursor() + ID = 1028307 # random number to identify this command + cur.execute("SELECT pg_try_advisory_lock(%s);", [ID]) + (got_lock,) = cur.fetchone() + if not got_lock: + raise CommandError( + "Could not obtain lock: " + "another instance of this command must be running." + ) + try: + return fn(*args, **kwargs) + finally: + cur.execute("SELECT pg_advisory_unlock(%s);", [ID]) + + return inner + + +@click.command() +@click.argument("channels", nargs=-1) +@click.option( + "--debug", + is_flag=True, + help=( + "Store all messages seen to be able to " + "detect bugs (uses lots of database space)." + ), +) +@locked +def command(channels, debug): + """ + Download slack activity from channels the bot is a member of. + + CHANNELS is an optional list of channel names (without the #) to limit to. + If not provided, all channels the bot is a member of will be fetched. + + This is resumable -- it can be interrupted and restarted without losing + progress. + + Do not run multiple instances of this command in parallel. + """ + + channels = set(channels) + selected_channels = [] + if channels: + for channel_data in get_my_channels(): + if channel_data["name"] in channels: + selected_channels.append(channel_data) + channels.remove(channel_data["name"]) + if channels: + raise click.BadParameter( + f"Could not find channels {channels} (maybe the bot isn't a member?)" + ) + else: + # materialize this generator so we can iterate multiple times + selected_channels.extend(get_my_channels()) + + for channel_data in selected_channels: + with transaction.atomic(): + channel, created = Channel.objects.update_or_create( + id=channel_data["id"], + defaults={ + "name": channel_data["name"], + "topic": channel_data["topic"]["value"], + "purpose": channel_data["purpose"]["value"], + }, + ) + if created: + # we don't have any messages for this channel we just created + channel.channelupdategap_set.create( + oldest_message_ts=None, + newest_message_ts=None, + ) + elif ( + channel.last_update_ts + and not channel.channelupdategap_set.filter( + newest_message_ts=None + ).exists() + ): + # gap from the most recent fetch till now + channel.channelupdategap_set.create( + oldest_message_ts=channel.last_update_ts, + newest_message_ts=None, + ) + else: + assert ( + channel.channelupdategap_set.exists() + ), "We must have SOME gaps, time has passed since the last run!" + + gaps = ChannelUpdateGap.objects.filter( + channel__id__in={c["id"] for c in selected_channels} + ) + for gap in gaps: + fill_channel_gap(gap, debug) + + # We have to track threads we've seen and update independently, replies + # don't show up in main channel history[1]. + # + # [1]: + logger.info("Fetching threads") + threads = Thread.objects.annotate( + last_update_as_datetime=ToTimestamp( + Cast("last_update_ts", output_field=FloatField()) + ), + ).filter( + # Assume threads not updated for more than 1 month won't get posted to + # again. Otherwise it's too much work to check all threads ever. + # last_update_ts will be null for the threads do_channel just created, + # indicating they need to be updated at least once. + Q(last_update_as_datetime=None) + | Q(last_update_as_datetime__gte=Now() - datetime.timedelta(days=30)), + channel_id__in={c["id"] for c in selected_channels}, + ) + for thread in threads: + do_thread(thread, debug) diff --git a/slack/migrations/0001_initial.py b/slack/migrations/0001_initial.py new file mode 100644 index 00000000..87d66484 --- /dev/null +++ b/slack/migrations/0001_initial.py @@ -0,0 +1,186 @@ +# Generated by Django 4.2.16 on 2024-11-12 17:15 + +from django.db import migrations, models +import django.db.models.deletion +import django.db.models.functions.comparison +import django.db.models.lookups + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Channel", + fields=[ + ( + "id", + models.CharField(max_length=16, primary_key=True, serialize=False), + ), + ("name", models.TextField()), + ("topic", models.TextField()), + ("purpose", models.TextField()), + ("last_update_ts", models.CharField(max_length=32, null=True)), + ], + ), + migrations.CreateModel( + name="SlackUser", + fields=[ + ( + "id", + models.CharField(max_length=16, primary_key=True, serialize=False), + ), + ("name", models.TextField()), + ("real_name", models.TextField()), + ("email", models.TextField()), + ("image_48", models.URLField()), + ], + ), + migrations.CreateModel( + name="Thread", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("thread_ts", models.CharField(max_length=32)), + ("last_update_ts", models.CharField(max_length=32, null=True)), + ("db_created_at", models.DateTimeField(auto_now_add=True)), + ( + "channel", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, to="slack.channel" + ), + ), + ], + ), + migrations.CreateModel( + name="SlackActivityBucket", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("day", models.DateField()), + ("count", models.PositiveIntegerField()), + ( + "channel", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, to="slack.channel" + ), + ), + ( + "user", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + to="slack.slackuser", + ), + ), + ], + ), + migrations.CreateModel( + name="SeenMessage", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("ts", models.CharField(max_length=32)), + ("db_created_at", models.DateTimeField(auto_now_add=True)), + ( + "channel", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, to="slack.channel" + ), + ), + ( + "thread", + models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.CASCADE, + to="slack.thread", + ), + ), + ], + ), + migrations.CreateModel( + name="ChannelUpdateGap", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("oldest_message_ts", models.CharField(max_length=32, null=True)), + ("newest_message_ts", models.CharField(max_length=32, null=True)), + ( + "channel", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, to="slack.channel" + ), + ), + ], + ), + migrations.AddConstraint( + model_name="thread", + constraint=models.CheckConstraint( + check=django.db.models.lookups.GreaterThanOrEqual( + django.db.models.functions.comparison.Cast( + "last_update_ts", output_field=models.FloatField() + ), + django.db.models.functions.comparison.Cast( + "thread_ts", output_field=models.FloatField() + ), + ), + name="update_newer_than_created", + ), + ), + migrations.AlterUniqueTogether( + name="thread", + unique_together={("channel", "thread_ts")}, + ), + migrations.AlterUniqueTogether( + name="slackactivitybucket", + unique_together={("channel", "day", "user")}, + ), + migrations.AlterUniqueTogether( + name="seenmessage", + unique_together={("channel", "ts")}, + ), + migrations.AddConstraint( + model_name="channelupdategap", + constraint=models.CheckConstraint( + check=django.db.models.lookups.GreaterThan( + django.db.models.functions.comparison.Cast( + "newest_message_ts", output_field=models.FloatField() + ), + django.db.models.functions.comparison.Cast( + "oldest_message_ts", output_field=models.FloatField() + ), + ), + name="newest_newer_than_oldest", + ), + ), + ] diff --git a/slack/migrations/0002_thread_last_update_as_datetime.py b/slack/migrations/0002_thread_last_update_as_datetime.py new file mode 100644 index 00000000..679a91f6 --- /dev/null +++ b/slack/migrations/0002_thread_last_update_as_datetime.py @@ -0,0 +1,26 @@ +# Generated by Django 4.2.16 on 2024-11-13 19:05 + +from django.db import migrations, models +import django.db.models.functions.comparison +import slack.models + + +class Migration(migrations.Migration): + + dependencies = [ + ("slack", "0001_initial"), + ] + + operations = [ + migrations.AddIndex( + model_name="thread", + index=models.Index( + slack.models.ToTimestamp( + django.db.models.functions.comparison.Cast( + "last_update_ts", output_field=models.FloatField() + ) + ), + name="last_update_as_datetime", + ), + ), + ] diff --git a/slack/migrations/__init__.py b/slack/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/slack/models.py b/slack/models.py new file mode 100644 index 00000000..180762f1 --- /dev/null +++ b/slack/models.py @@ -0,0 +1,162 @@ +import datetime + +from django.db import models, connection +from django.db.models.expressions import Func +from django.db.models.functions import Cast + +""" +`ts` fields from slack are actually IDs that can be interpreted into +timestamps. We store them in a string as we get them from slack instead +of a DateTimeField to be able to pass them back exactly to slack without +precision or something causing round-tripping errors. +""" + + +def parse_ts(ts): + seconds = float(ts) + return datetime.datetime.fromtimestamp(seconds, tz=datetime.timezone.utc) + + +class ToTimestamp(Func): + """ + Implements the postgres to_timestamp(double precision) function. + + https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-TABLE + """ + + function = "to_timestamp" + output_field = models.DateTimeField() + + +class SlackUser(models.Model): + id = models.CharField(max_length=16, primary_key=True) + name = models.TextField() + real_name = models.TextField() + email = models.TextField() + image_48 = models.URLField() + + +class Channel(models.Model): + id = models.CharField(max_length=16, primary_key=True) + name = models.TextField() + topic = models.TextField() + purpose = models.TextField() + # We only need to check for new messages strictly newer than this. + last_update_ts = models.CharField(max_length=32, null=True) + + +class ChannelUpdateGap(models.Model): + """ + Stores a (potentially unbounded) period of time where we haven't counted + messages for a channel. Used to store progress and allow resumption. + """ + + channel = models.ForeignKey(Channel, on_delete=models.CASCADE) + # There may be more messages to fetch that are strictly newer than + # oldest_message_ts (null meaning no limit) + oldest_message_ts = models.CharField(max_length=32, null=True) + # but strictly older than newest_message_ts (null meaning no limit) + newest_message_ts = models.CharField(max_length=32, null=True) + + class Meta: + constraints = [ + models.CheckConstraint( + check=models.lookups.GreaterThan( + models.functions.Cast( + "newest_message_ts", output_field=models.FloatField() + ), + models.functions.Cast( + "oldest_message_ts", output_field=models.FloatField() + ), + ), + name="newest_newer_than_oldest", + ) + ] + + def __str__(self): + return f"({self.oldest_message_ts}, {self.newest_message_ts})" + + def __repr__(self): + return f"" + + +class SeenMessage(models.Model): + """ + DEBUG ONLY: Store all seen messages to double-check we don't see them + twice. + + Invariant: + + SeenMessage.objects.count() == \ + SlackActivityBucket.objects.aggregate(sum=Sum('count'))['sum'] + """ + + channel = models.ForeignKey(Channel, on_delete=models.CASCADE) + ts = models.CharField(max_length=32) + thread = models.ForeignKey("Thread", on_delete=models.CASCADE, null=True) + db_created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + unique_together = [("channel", "ts")] + + +class Thread(models.Model): + channel = models.ForeignKey(Channel, on_delete=models.CASCADE) + thread_ts = models.CharField(max_length=32) + last_update_ts = models.CharField(max_length=32, null=True) + db_created_at = models.DateTimeField(auto_now_add=True) + + class Meta: + unique_together = [("channel", "thread_ts")] + constraints = [ + models.CheckConstraint( + check=models.lookups.GreaterThanOrEqual( + models.functions.Cast( + "last_update_ts", output_field=models.FloatField() + ), + models.functions.Cast( + "thread_ts", output_field=models.FloatField() + ), + ), + name="update_newer_than_created", + ) + ] + indexes = [ + models.Index( + ToTimestamp(Cast("last_update_ts", output_field=models.FloatField())), + name="last_update_as_datetime", + ) + ] + + +class SlackActivityBucket(models.Model): + """ + Message count per user per channel per UTC day. + """ + + day = models.DateField() + user = models.ForeignKey(SlackUser, on_delete=models.CASCADE) + channel = models.ForeignKey(Channel, on_delete=models.CASCADE) + count = models.PositiveIntegerField() + + class Meta: + unique_together = [("channel", "day", "user")] + + @classmethod + def track_activity(self, channel, user, ts): + day = parse_ts(ts).date() + + with connection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO slack_slackactivitybucket (day, user_id, channel_id, count) + VALUES (%(day)s, %(user_id)s, %(channel_id)s, 1) + ON CONFLICT (day, user_id, channel_id) + DO UPDATE SET count = slack_slackactivitybucket.count + 1 + """, + { + "user_id": user.id, + "channel_id": channel.id, + "day": day, + }, + ) diff --git a/slack/tasks.py b/slack/tasks.py new file mode 100644 index 00000000..d06af4f1 --- /dev/null +++ b/slack/tasks.py @@ -0,0 +1,8 @@ +from django.core.management import call_command + +from config.celery import app + + +@app.task +def fetch_slack_activity(): + call_command("fetch_slack_activity")