Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bots/bugbuster/src/services/command-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ export class CommandProcessor {

private _listTeams: types.CommandImplementation = async () => {
const teams = await this._teamsManager.listWatchedTeams()
return { success: true, message: teams.join(', ') }
const message = teams.length > 0 ? teams.join(', ') : 'You have no watched teams.'
return { success: true, message }
}

private _addTeam: types.CommandImplementation = async ([team]: string[]) => {
Expand Down
3 changes: 3 additions & 0 deletions bots/bugbuster/src/services/issue-processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export class IssueProcessor {

public async listRelevantIssues(endCursor?: string): Promise<{ issues: lin.Issue[]; pagination?: lin.Pagination }> {
const watchedTeams = await this._teamsManager.listWatchedTeams()
if (watchedTeams.length === 0) {
throw new Error('You have no watched teams.')
}

return await this._linear.listIssues(
{
Expand Down
3 changes: 0 additions & 3 deletions bots/bugbuster/src/services/teams-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ export class TeamsManager {

public async listWatchedTeams(): Promise<string[]> {
const teamKeys = await this._getWatchedTeams()
if (teamKeys.length === 0) {
throw new Error('You have no watched teams.')
}
return teamKeys
}

Expand Down
2 changes: 1 addition & 1 deletion bots/slackbox/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@bp-bots/slackbox",
"scripts": {
"postinstall": "genenv -o ./.genenv/index.ts -e SLACKBOX_SLACK_REFRESH_TOKEN -e SLACKBOX_SLACK_CLIENT_ID -e SLACKBOX_SLACK_CLIENT_SECRET -e SLACKBOX_SLACK_SIGNING_SECRET -e SLACKBOX_GMAIL_OAUTH_CLIENT_ID -e SLACKBOX_GMAIL_OAUTH_CLIENT_SECRET -e SLACKBOX_GMAIL_OAUTH_AUTHORIZATION_CODE -e SLACKBOX_GMAIL_PUBSUB_TOPIC_NAME -e SLACKBOX_GMAIL_PUBSUB_WEBHOOK_SHARED_SECRET -e SLACKBOX_GMAIL_PUBSUB_WEBHOOK_SERVICE_ACCOUNT -e SLACKBOX_SLACK_CHANNEL",
"postinstall": "genenv -o ./.genenv/index.ts -e SLACKBOX_SLACK_REFRESH_TOKEN -e SLACKBOX_SLACK_CLIENT_ID -e SLACKBOX_SLACK_CLIENT_SECRET -e SLACKBOX_SLACK_SIGNING_SECRET -e SLACKBOX_GMAIL_OAUTH_CLIENT_ID -e SLACKBOX_GMAIL_OAUTH_CLIENT_SECRET -e SLACKBOX_GMAIL_OAUTH_AUTHORIZATION_CODE -e SLACKBOX_GMAIL_PUBSUB_TOPIC_NAME -e SLACKBOX_GMAIL_PUBSUB_WEBHOOK_SHARED_SECRET -e SLACKBOX_GMAIL_PUBSUB_WEBHOOK_SERVICE_ACCOUNT -e SLACKBOX_SLACK_CHANNEL -e SLACKBOX_FALLBACK_SLACK_CHANNEL",
"check:type": "tsc --noEmit",
"check:bplint": "bp lint",
"build": "bp add -y && bp build"
Expand Down
152 changes: 118 additions & 34 deletions bots/slackbox/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,9 @@ import * as genenv from '../.genenv'
import * as bp from '.botpress'

const DEFAULT_SLACK_CHANNEL = genenv.SLACKBOX_SLACK_CHANNEL
const FALLBACK_SLACK_CHANNEL = genenv.SLACKBOX_FALLBACK_SLACK_CHANNEL || DEFAULT_SLACK_CHANNEL

let cachedSlackConversationId: string | undefined

const getSlackConversationId = async (client: bp.Client, logger: bp.MessageHandlerProps['logger']): Promise<string> => {
if (cachedSlackConversationId) {
return cachedSlackConversationId
}

logger.info('Fetching Slack conversation ID (first time)')
const maxRetries = 3

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await client.callAction({
type: 'slack:startChannelConversation',
input: {
channelName: DEFAULT_SLACK_CHANNEL,
},
})
cachedSlackConversationId = response.output.conversationId
return cachedSlackConversationId
} catch (err) {
logger.warn(`Attempt ${attempt}/${maxRetries} failed: ${err}`)
if (attempt === maxRetries) {
throw err
}
await new Promise((resolve) => setTimeout(resolve, 2000))
}
}

throw new Error('Failed to get Slack conversation ID after retries')
}
const cachedSlackConversationIds: Record<string, string> = {}

const bot = new bp.Bot({
actions: {},
Expand All @@ -45,11 +16,21 @@ bot.on.message('*', async (props) => {
const { conversation, message, client, ctx, logger } = props

if (!conversation.integration.includes('gmail')) {
logger.info('[Slackbox] Not a Gmail message, skipping')
return
}

try {
const slackConversationId = await getSlackConversationId(client, logger)
const shouldForward = await _shouldForwardEmail(client, conversation, logger)
if (!shouldForward) {
logger.info('Email filtered out - no matching Integrations label')
return
}

const subject = (conversation.tags['gmail:subject'] || conversation.tags.subject) as string | undefined
const targetChannel = _getTargetChannel(subject)
const slackConversationId = await _getSlackConversationId(client, logger, targetChannel)

const notificationMessage = _mapGmailToSlack(conversation, message)

await client.createMessage({
Expand All @@ -61,13 +42,116 @@ bot.on.message('*', async (props) => {
text: notificationMessage,
},
})

logger.info('Email notification sent to Slack')
} catch (error) {
logger.error(`Failed to send email notification: ${error}`)
}
})

const _shouldForwardEmail = async (
client: bp.Client,
conversation: Conversation,
logger: bp.MessageHandlerProps['logger']
): Promise<boolean> => {
const threadId = conversation.tags['gmail:id']

if (!threadId) {
logger.info('[LabelCheck] No threadId, forwarding email')
return true
}

try {
const labelsResponse = await client.callAction({
type: 'gmail:listLabels',
input: {},
})
const labels = labelsResponse.output.labels || []

const threadResponse = await client.callAction({
type: 'gmail:getThread',
input: { id: threadId },
})
const messages = threadResponse.output.messages || []

const allLabelIds = new Set<string>()
messages.forEach((msg) => {
msg.labelIds?.forEach((labelId) => allLabelIds.add(labelId))
})

if (allLabelIds.size === 0) {
logger.info('[LabelCheck] No labels on thread, forwarding email')
return true
}

const labelsById = new Map<string, { type?: string; name?: string }>()
labels.forEach((label) => {
if (label.id) {
labelsById.set(label.id, { type: label.type || undefined, name: label.name || undefined })
}
})

const userLabels = [...allLabelIds].filter((labelId) => {
const label = labelsById.get(labelId)
return label?.type === 'user'
})

if (userLabels.length === 0) {
logger.info('[LabelCheck] No user labels, forwarding email')
return true
}

const hasIntegrationLabel = userLabels.some((labelId) => {
const label = labelsById.get(labelId)
return label?.name === 'Integrations' || label?.name?.startsWith('Integrations/')
})

return hasIntegrationLabel
} catch (error) {
logger.error(`[LabelCheck] Error checking email labels: ${error}`)
return true
}
}

const _getSlackConversationId = async (
client: bp.Client,
logger: bp.MessageHandlerProps['logger'],
channelName: string
): Promise<string> => {
if (cachedSlackConversationIds[channelName]) {
return cachedSlackConversationIds[channelName]
}

logger.info(`Fetching Slack conversation ID for channel: ${channelName}`)
const maxRetries = 3

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const response = await client.callAction({
type: 'slack:startChannelConversation',
input: {
channelName,
},
})
cachedSlackConversationIds[channelName] = response.output.conversationId
return cachedSlackConversationIds[channelName]
} catch (err) {
logger.warn(`Attempt ${attempt}/${maxRetries} failed: ${err}`)
if (attempt === maxRetries) {
throw err
}
await new Promise((resolve) => setTimeout(resolve, 2000))
}
}

throw new Error('Failed to get Slack conversation ID after retries')
}

const _getTargetChannel = (subject: string | undefined): string => {
if (subject?.toLowerCase().includes('test')) {
return FALLBACK_SLACK_CHANNEL
}
return DEFAULT_SLACK_CHANNEL
}

const _mapGmailToSlack = (conversation: Conversation, message: AnyIncomingMessage<bp.TBot>) => {
const subject = (conversation.tags['gmail:subject'] || conversation.tags.subject) as string | undefined
const fromEmail = (conversation.tags['gmail:email'] || conversation.tags.email) as string | undefined
Expand Down
2 changes: 1 addition & 1 deletion integrations/gmail/integration.definition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from './definitions'

export const INTEGRATION_NAME = 'gmail'
export const INTEGRATION_VERSION = '1.0.3'
export const INTEGRATION_VERSION = '1.0.4'

export default new sdk.IntegrationDefinition({
name: INTEGRATION_NAME,
Expand Down
90 changes: 51 additions & 39 deletions integrations/gmail/src/webhook-events/new-mail.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
// @ts-ignore
import { AxiosError } from 'axios'
// @ts-ignore
import parseMessage from 'gmail-api-parse-message'
import { parse as parseHtml } from 'node-html-parser'
import { GoogleClient } from '../google-api'
import { decodeBase64URL } from '../utils/string-utils'
import * as bp from '.botpress'

export const handleIncomingEmail = async (props: bp.HandlerProps) => {
const { req, client, ctx } = props
const { req, client, ctx, logger } = props
const bodyContent = JSON.parse(req.body || '{}')

const data = bodyContent.message?.data

if (!data) {
console.warn('Handler received an invalid body (no data)')
logger.warn('Handler received an invalid body (no data)')
return
}

Expand All @@ -22,13 +24,10 @@ export const handleIncomingEmail = async (props: bp.HandlerProps) => {
const historyId = `${historyIdNumber}`

if (!historyId) {
console.warn('Handler received an invalid body (no historyId)')
logger.warn('Handler received an invalid body (no historyId)')
return
}

// Only proceed if the incoming historyId is greater that the latest processed historyId
const googleClient = await GoogleClient.create({ client, ctx })

const {
state: { payload },
} = await client.getState({
Expand All @@ -39,18 +38,21 @@ export const handleIncomingEmail = async (props: bp.HandlerProps) => {

const lastHistoryId = payload.lastHistoryId ?? _fakeHistoryId(historyId)

if (!payload.lastHistoryId) {
await client.getOrSetState({
type: 'integration',
name: 'configuration',
id: ctx.integrationId,
payload: {
...payload,
lastHistoryId,
},
})
if (Number(historyId) <= Number(lastHistoryId)) {
logger.info(`HistoryId ${historyId} already processed (last: ${lastHistoryId}), skipping`)
return
}
await client.setState({
type: 'integration',
name: 'configuration',
id: ctx.integrationId,
payload: {
...payload,
lastHistoryId: historyId,
},
})

const googleClient = await GoogleClient.create({ client, ctx })
const history = await googleClient.getMyHistory(lastHistoryId)

const messageIds = history.history?.reduce((acc, h) => {
Expand All @@ -64,37 +66,38 @@ export const handleIncomingEmail = async (props: bp.HandlerProps) => {
}, [] as string[])

if (!messageIds?.length) {
console.info('Handler received an empty message id')
logger.info('Handler received an empty message id')
return
}

for (const messageId of messageIds) {
await _processMessage(props, messageId, googleClient, emailAddress)
await _processMessage(props, messageId, googleClient, emailAddress, logger)
}

await client.getOrSetState({
type: 'integration',
name: 'configuration',
id: ctx.integrationId,
payload: {
...payload,
lastHistoryId: historyId,
},
})
}

const _processMessage = async (
{ client }: bp.HandlerProps,
messageId: string,
googleClient: GoogleClient,
emailAddress: string
emailAddress: string,
logger: bp.HandlerProps['logger']
) => {
const gmailMessage = await googleClient.messages.get(messageId)
let gmailMessage
try {
gmailMessage = await googleClient.messages.get(messageId)
} catch (error: unknown) {
if (error instanceof AxiosError && (error?.code === '404' || error?.response?.status === 404)) {
logger.info(`Message ${messageId} not found, skipping (likely deleted)`)
return
}
throw error
}

const message = parseMessage(gmailMessage)
const threadId = message.threadId

if (!threadId) {
console.info('Handler received an empty chat id')
logger.info('Handler received an empty chat id')
throw new Error('Handler received an empty chat id')
}

Expand Down Expand Up @@ -150,17 +153,26 @@ const _processMessage = async (
// Extract the text content:
content = messageRoot.structuredText
} catch (thrown) {
console.error('Error while parsing html content', thrown)
logger.error('Error while parsing html content', thrown)
}
}

await client.getOrCreateMessage({
tags: { id: messageId },
type: 'text',
userId: user.id,
conversationId: conversation.id,
payload: { text: content },
})
try {
await client.getOrCreateMessage({
tags: { id: messageId },
type: 'text',
userId: user.id,
conversationId: conversation.id,
payload: { text: content },
})
} catch (error: unknown) {
const err = error instanceof Error ? error : new Error(String(error))
if (err?.message?.includes('already exists for a different conversation')) {
logger.info(`Message ${messageId} already exists, skipping`)
return
}
throw error
}

await client.getOrSetState({
type: 'conversation',
Expand Down
5 changes: 0 additions & 5 deletions integrations/linear/.sentryclirc

This file was deleted.

Loading
Loading