Create a Slack messages pipeline in GCP

May 04, 2025

Introduction

Have you ever wanted to save your Slack messages in a database? Why, might you ask, would one do that? You can use your Slack data for so many things, one example would be as training data for your LLM to answer questions about the organization, work being done, and about the people working there and actively sending messages.

We can capture these Slack messages using a pipeline that records all public channel events, like people joining a channel, workflow triggers, threads, and yes including messages.

Architecture Overview

We will use a Slack App with the Events API to collect data. The app will listen for any message event and send it via a post request to our App Engine URL, in our GCP project, named slack_events. The app engine listens for Slack events and writes them to a PubSub topic slack_message in the same GCP project. We are using Pubsub's Write to BigQuery (Push) in the slack_message_bq subscription to automatically push any new message in the topic to a BQ table. A BigQuery scheduled query will load data from the staging table loaded by PubSub to a destination table. The query will parse the JSON data into the appropriate columns depending on the message type and subtype.

Architecture diagram

Steps

  1. Make sure you have a GCP billing account and enable the App Engine, PubSub, and BigQuery services.

  2. Next we need to create a Slack App that listens for all message events. Log in to your Slack and then click here to start creating the Slack App. Create an app from scratch Steps

We will name it Event Listener Steps

  1. Once the app is created in the desired Slack workspace, select Event Subscriptions on the side menu, and then subscribe to all the events you want to keep track of. For now, we are only subscribing to message events Steps

  2. You will notice an input field labelled Request URL. This will be the URL of our backend application where the Slack Events API will send POST requests with the data for each event. Our App Engine will be a simple Python backend using Fast API.

  3. Create a POST request endpoint that accepts the message events from the Slack API and sends that message to a PubSub topic

@app.post("/messages")
async def slack_events(request: Request, background_tasks: BackgroundTasks):
    """Handle Slack events and publish them to Pub/Sub in the background."""
    data = await request.json()
    # logger.info(f"Received Slack event") ## Optional - just for debugging

    # Check if this is a URL verification request
    if data.get("type") == "url_verification":
        return JSONResponse(content={"challenge": data.get("challenge")})

    # Add the task to the background queue
    background_tasks.add_task(
        push_pubsub,
        google_cloud_project,
        pubsub_topic,
        dict(data),
    )

    # Return immediately while the background task processes
    return JSONResponse(content={"status": "processing"})

# For local development
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8080)
  1. Create the function that pushes the data to Google PubSub using the environment variables passed from the .yaml file
# Configure Pub/Sub
pubsub_topic = os.getenv('PUBSUB_TOPIC')
google_cloud_project = os.getenv('GOOGLE_CLOUD_PROJECT')

print(f'PubSub Topic: {pubsub_topic}, Google Cloud Project: {google_cloud_project}')

def push_pubsub(project: str, topic: str, data: dict) -> None:
    try:
        publisher = pubsub_v1.PublisherClient()
        path = publisher.topic_path(project, topic)
        json_string = json.dumps(data, default=str)
        publish_future=publisher.publish(path, data=json_string.encode())
        message_id = publish_future.result()
    except Exception as e:
        logger.error(f"Failed to publish message of ID {message_id}: {e}", exc_info=True)
  1. Putting it all together and running our FastAPI app:
import json
import logging
import os

from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import JSONResponse
from google.cloud import pubsub_v1

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize FastAPI app
app = FastAPI()

# Configure Pub/Sub
pubsub_topic = os.getenv('PUBSUB_TOPIC')
google_cloud_project = os.getenv('GOOGLE_CLOUD_PROJECT')

print(f'PubSub Topic: {pubsub_topic}, Google Cloud Project: {google_cloud_project}')

def push_pubsub(project: str, topic: str, data: dict) -> None:
    try:
        publisher = pubsub_v1.PublisherClient()
        path = publisher.topic_path(project, topic)
        json_string = json.dumps(data, default=str)
        publish_future=publisher.publish(path, data=json_string.encode())
        message_id = publish_future.result()
    except Exception as e:
        logger.error(f"Failed to publish message of ID {message_id}: {e}", exc_info=True)

@app.post("/messages")
async def slack_events(request: Request, background_tasks: BackgroundTasks):
    """Handle Slack events and publish them to Pub/Sub in the background."""
    data = await request.json()
    # logger.info(f"Received Slack event") ## Optional - just for debugging

    # Check if this is a URL verification request
    if data.get("type") == "url_verification":
        return JSONResponse(content={"challenge": data.get("challenge")})

    # Add the task to the background queue
    background_tasks.add_task(
        push_pubsub,
        google_cloud_project,
        pubsub_topic,
        dict(data),
    )

    # Return immediately while the background task processes
    return JSONResponse(content={"status": "processing"})

# For local development
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8080)

The full code can be found here.

  1. Before deploying the App Engine, we need to create a PubSub topic named slack_events_message for the App Engine to publish to. We need to add the name of the topic, along with the GCP project ID, as environment variables in the YAML file here.

  2. And we then create a BigQuery subscription to that topic that loads the data to our staging table staging.pubsub_slack_event.

  3. Make sure you have the gcloud CLI installed, if not download it from here. We will then deploy this App by running gcloud app deploy inside the app_engine directory.

  4. Get the URL of the slack_events App Engine service and paste it in the input field labelled Request URL from Step 4 - in the Slack Events Subscriptions page.

  5. Create 2 datasets in BigQuery, staging and development. Then create the tables that will receive the data

CREATE SCHEMA IF NOT EXISTS staging
OPTIONS (
  Description ="Initial raw data dataset"
);

CREATE SCHEMA IF NOT EXISTS development
OPTIONS (
  Description ="Development and testing dataset"
);

CREATE TABLE IF NOT EXISTS staging.pubsub_slack_event
(
    subscription_name STRING
  , message_id STRING
  , publish_time TIMESTAMP
  , attributes STRING
  , data STRING
)
PARTITION BY TIMESTAMP_TRUNC(publish_time, MONTH)
OPTIONS(
    description="Staging table to store the data from pubsub slack event"
  , require_partition_filter=TRUE
);

CREATE TABLE IF NOT EXISTS development.slack_message
(
    message_id STRING OPTIONS (description="PubSub message ID")
  , insert_timestamp TIMESTAMP OPTIONS (description="PubSub message insert timestamp")
  , team_id STRING
  , channel STRING
  , user_id STRING
  , username STRING
  , bot_id STRING OPTIONS (description="Slack bot ID in case the message is sent by a bot")
  , `type` STRING
  , subtype STRING
  , thread_ts STRING
  , `text` STRING
  , attachments ARRAY<STRING> OPTIONS (description="Slack message attachments")
  , auto_attachments ARRAY<STRING> OPTIONS (description="Attachments automatically added by Slack message preview")
  , event_ts TIMESTAMP OPTIONS (description="Slack event timestamp")
  , event_ts_epoch STRING OPTIONS (description="Slack event timestamp in epoch string format to be used as a unique key in combination with channel, user and bot_id")
)
PARTITION BY TIMESTAMP_TRUNC(insert_timestamp, MONTH)
CLUSTER BY message_id
OPTIONS(
    description="Slack public messages recorded from PubSub using Slack Events API"
  , require_partition_filter=TRUE
);
  1. The last part of the pipeline is the data processing we need to do in BigQuery on the raw message data. We will create a BigQuery scheduled query for that. We need to:
  • Select and parse the json data in staging.pubsub_slack_event
  • Handle deleted and updated messages
  • Load data to the destination table development.slack_message
-- scheduled queries to insert Slack Events data from staging pubsub into destination
DECLARE start_ts TIMESTAMP;
DECLARE end_ts   TIMESTAMP;

BEGIN TRANSACTION;

SET (start_ts, end_ts) = (
  SELECT AS STRUCT
         TIMESTAMP_SUB(TIMESTAMP(@run_time), INTERVAL 2 HOUR) AS start_ts
       , TIMESTAMP(@run_time) AS end_ts
);


CREATE TEMP TABLE latest AS (
  SELECT message_id
       , publish_time AS insert_timestamp
       , JSON_VALUE(data, '$.team_id') AS team_id
       , JSON_VALUE(data, '$.event.channel') AS channel
       , COALESCE(JSON_VALUE(data, '$.event.user')
                , JSON_VALUE(data, '$.event.message.user')
                , JSON_VALUE(data, '$.event.previous_message.user')
         ) AS user_id
       , COALESCE(JSON_VALUE(data, '$.event.username')
                , JSON_VALUE(data, '$.event.message.username')
                , JSON_VALUE(data, '$.event.previous_message.username')
         ) AS username
       , COALESCE(JSON_VALUE(data, '$.event.bot_id')
                , JSON_VALUE(data, '$.event.message.bot_id')
                , JSON_VALUE(data, '$.event.previous_message.bot_id')
         ) AS bot_id
       , JSON_VALUE(data, '$.event.type') AS type
       , JSON_VALUE(data, '$.event.subtype') AS subtype
       , COALESCE(JSON_VALUE(data, '$.event.thread_ts')
                , JSON_VALUE(data, '$.event.message.thread_ts')
                , JSON_VALUE(data, '$.event.previous_message.thread_ts')
                , JSON_VALUE(data, '$.event.event_ts')
                , JSON_VALUE(data, '$.event.message.event_ts')
         ) AS thread_ts
       , COALESCE(JSON_VALUE(data, '$.event.text')
                , JSON_VALUE(data, '$.event.message.text')
                , JSON_VALUE(data, '$.event.previous_message.text')
         ) AS text
       , COALESCE(JSON_EXTRACT_ARRAY(data, '$.event.files'), JSON_EXTRACT_ARRAY(data, '$.event.message.files')) AS attachments
       , COALESCE(JSON_EXTRACT_ARRAY(data, '$.event.attachments'), JSON_EXTRACT_ARRAY(data, '$.event.message.attachments')) AS auto_attachments
       , TIMESTAMP_MILLIS(CAST(ROUND(CAST(JSON_VALUE(data, '$.event.event_ts') AS FLOAT64) * 1000) AS INT64)) AS event_ts
       , JSON_VALUE(data, '$.event.event_ts') AS event_ts_epoch
    FROM staging.pubsub_slack_event
   WHERE publish_time >= start_ts
     AND publish_time < end_ts
 QUALIFY ROW_NUMBER() OVER (PARTITION BY data ORDER BY publish_time DESC) = 1 -- Filter to keep only the latest row for each unique `data`
)

CREATE TEMP TABLE deletes AS (
  SELECT channel
       , COALESCE(JSON_VALUE(data, '$.event.previous_message.user'), '') AS user_id
       , COALESCE(JSON_VALUE(data, '$.event.previous_message.bot_id'), '') AS bot_id
       , JSON_VALUE(data, '$.event.previous_message.event_ts') AS event_ts_epoch
    FROM latest
   WHERE subtype IN ('message_deleted', 'message_changed')
);

DELETE FROM development.slack_message AS m
 WHERE m.insert_timestamp < end_ts
   AND (
    EXISTS ( -- check if the message is deleted
      SELECT 1
        FROM deletes AS d
       WHERE m.channel = d.channel
         AND COALESCE(m.user_id, '') = d.user_id
         AND COALESCE(m.bot_id, '') = d.bot_id
         AND m.event_ts_epoch = d.event_ts_epoch
    )
    OR (  -- delete all messages in the time range
          m.insert_timestamp >= start_ts
      AND m.insert_timestamp < end_ts
    )
  );

INSERT INTO development.slack_message
SELECT l.message_id
     , l.insert_timestamp
     , l.team_id
     , l.channel
     , l.user_id
     , l.username
     , l.bot_id
     , l.`type`
     , l.subtype
     , l.thread_ts
     , l.`text`
     , l.attachments
     , l.auto_attachments
     , l.event_ts
     , l.event_ts_epoch
  FROM latest AS l
  LEFT JOIN development.slack_message AS t
    ON l.message_id = t.message_id
   AND t.insert_timestamp >= start_ts
   AND t.insert_timestamp < end_ts
  LEFT JOIN deletes AS d
    ON l.channel = d.channel
   AND l.user_id = d.user_id
   AND l.bot_id = d.bot_id
   AND l.event_ts_epoch = d.event_ts_epoch
 WHERE t.message_id IS NULL
   AND d.channel IS NULL
   AND l.subtype NOT IN ('message_deleted');

COMMIT TRANSACTION;
  1. Once we deploy our scheduled query to run at a desired interval (I think once a day is reasonable), we will have a complete pipeline that can be tested by sending messages in public channels and seeing how they look in the BigQuery table!

Sweet!

In an upcoming blog, we will see how to use the message data collected in BigQuery to create a RAG database that will work as a knowledge base for an LLM.

Best regards


Profile picture

Written by Khalid Ibrahim Adem A passionate developer and life-long learner.

© 2026 Khalid Ibrahim Adem. Bragging rights reserved 😎