diff --git a/bun.lockb b/bun.lockb index 0817243f..24d0a84b 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 84cc1b2f..bde8579d 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,6 @@ "dependencies": { "@elizaos/adapter-postgres": "0.1.9", "@elizaos/client-direct": "0.1.9", - "@elizaos/client-twitter": "0.1.9", "@elizaos/core": "0.1.9", "@elizaos/plugin-bootstrap": "0.1.9", "@elizaos/plugin-node": "0.1.9", @@ -24,6 +23,8 @@ "@turnkey/http": "2.18.0", "@turnkey/sdk-server": "1.7.3", "@turnkey/viem": "0.6.10", + "agent-twitter-client": "0.0.18", + "discord.js": "14.16.3", "amqplib": "0.10.5", "axios": "1.7.9", "d3-dsv": "2", diff --git a/src/clients/farcaster/interactions.ts b/src/clients/farcaster/interactions.ts index 2e96dbf4..01df38e1 100644 --- a/src/clients/farcaster/interactions.ts +++ b/src/clients/farcaster/interactions.ts @@ -97,7 +97,7 @@ export class FarcasterInteractionManager { }) if (!shouldContinue) { - elizaLogger.info('AgentcoinClient received message event but it was suppressed') + elizaLogger.info('FarcasterClient received message event but it was suppressed') return } diff --git a/src/clients/index.ts b/src/clients/index.ts index 5bb3ca3c..70b7a18a 100644 --- a/src/clients/index.ts +++ b/src/clients/index.ts @@ -1,7 +1,7 @@ import { AgentcoinClientInterface } from '@/clients/agentcoin' import FarcasterClientInterface from '@/clients/farcaster' import TelegramClientInterface from '@/clients/telegram' -import { TwitterClientInterface } from '@elizaos/client-twitter' +import TwitterClientInterface from '@/clients/twitter' import { AgentRuntime, Character, Client, Clients } from '@elizaos/core' export async function initializeClients( diff --git a/src/clients/twitter/base.ts b/src/clients/twitter/base.ts new file mode 100644 index 00000000..07a75433 --- /dev/null +++ b/src/clients/twitter/base.ts @@ -0,0 +1,683 @@ +import type { TwitterConfig } from '@/clients/twitter/environment' +import { RawTweetType, TwitterCookie } from '@/clients/twitter/types' +import { + ActionTimelineType, + type IAgentRuntime, + type IImageDescriptionService, + type Memory, + type State, + type UUID, + elizaLogger, + getEmbeddingZeroVector, + stringToUuid +} from '@elizaos/core' +import { type QueryTweetsResponse, Scraper, SearchMode, type Tweet } from 'agent-twitter-client' +import { EventEmitter } from 'events' + +export function extractAnswer(text: string): string { + const startIndex = text.indexOf('Answer: ') + 8 + const endIndex = text.indexOf('<|endoftext|>', 11) + return text.slice(startIndex, endIndex) +} + +type TwitterProfile = { + id: string + username: string + screenName: string + bio: string + nicknames: string[] +} + +class RequestQueue { + private queue: (() => Promise)[] = [] + private processing = false + + async add(request: () => Promise): Promise { + return new Promise((resolve, reject) => { + this.queue.push(async () => { + try { + const result = await request() + resolve(result) + } catch (error) { + reject(error) + } + }) + void this.processQueue() + }) + } + + private async processQueue(): Promise { + if (this.processing || this.queue.length === 0) { + return + } + this.processing = true + + while (this.queue.length > 0) { + const request = this.queue.shift() + try { + await request() + } catch (error) { + console.error('Error processing request:', error) + this.queue.unshift(request) + await this.exponentialBackoff(this.queue.length) + } + await this.randomDelay() + } + + this.processing = false + } + + private async exponentialBackoff(retryCount: number): Promise { + const delay = Math.pow(2, retryCount) * 1000 + await new Promise((resolve) => setTimeout(resolve, delay)) + } + + private async randomDelay(): Promise { + const delay = Math.floor(Math.random() * 2000) + 1500 + await new Promise((resolve) => setTimeout(resolve, delay)) + } +} + +export class ClientBase extends EventEmitter { + static _twitterClients: { [accountIdentifier: string]: Scraper } = {} + twitterClient: Scraper + runtime: IAgentRuntime + twitterConfig: TwitterConfig + directions: string + lastCheckedTweetId: bigint | null = null + imageDescriptionService: IImageDescriptionService + temperature = 0.5 + + requestQueue: RequestQueue = new RequestQueue() + + profile: TwitterProfile | null + + async cacheTweet(tweet: Tweet): Promise { + if (!tweet) { + console.warn('Tweet is undefined, skipping cache') + return + } + + await this.runtime.cacheManager.set(`twitter/tweets/${tweet.id}`, tweet) + } + + async getCachedTweet(tweetId: string): Promise { + const cached = await this.runtime.cacheManager.get(`twitter/tweets/${tweetId}`) + + return cached + } + + async getTweet(tweetId: string): Promise { + const cachedTweet = await this.getCachedTweet(tweetId) + + if (cachedTweet) { + return cachedTweet + } + + const tweet = await this.requestQueue.add(() => this.twitterClient.getTweet(tweetId)) + + await this.cacheTweet(tweet) + return tweet + } + + // eslint-disable-next-line no-use-before-define + callback: (self: ClientBase) => Promise = null + + onReady(): void { + throw new Error('Not implemented in base class, please call from subclass') + } + + /** + * Parse the raw tweet data into a standardized Tweet object. + */ + private parseTweet(raw: RawTweetType, depth = 0, maxDepth = 3): Tweet { + // If we've reached maxDepth, don't parse nested quotes/retweets further + const canRecurse = depth < maxDepth + + const quotedStatus = + raw.quoted_status_result?.result && canRecurse + ? this.parseTweet(raw.quoted_status_result.result, depth + 1, maxDepth) + : undefined + + const retweetedStatus = + raw.retweeted_status_result?.result && canRecurse + ? this.parseTweet(raw.retweeted_status_result.result, depth + 1, maxDepth) + : undefined + + const t: Tweet = { + bookmarkCount: raw.bookmarkCount ?? raw.legacy?.bookmark_count ?? undefined, + conversationId: raw.conversationId ?? raw.legacy?.conversation_id_str, + hashtags: raw.hashtags ?? raw.legacy?.entities?.hashtags ?? [], + html: raw.html, + id: raw.id ?? raw.rest_id ?? raw.id_str ?? undefined, + inReplyToStatus: raw.inReplyToStatus, + inReplyToStatusId: + raw.inReplyToStatusId ?? raw.legacy?.in_reply_to_status_id_str ?? undefined, + isQuoted: raw.legacy?.is_quote_status === true, + isPin: raw.isPin, + isReply: raw.isReply, + isRetweet: raw.legacy?.retweeted === true, + isSelfThread: raw.isSelfThread, + // language: raw.legacy?.lang, + likes: raw.legacy?.favorite_count ?? 0, + name: + raw.name ?? + raw?.user_results?.result?.legacy?.name ?? + raw.core?.user_results?.result?.legacy?.name, + mentions: raw.mentions ?? raw.legacy?.entities?.user_mentions ?? [], + permanentUrl: + raw.permanentUrl ?? + (raw.core?.user_results?.result?.legacy?.screen_name && raw.rest_id + ? // eslint-disable-next-line max-len + `https://x.com/${raw.core?.user_results?.result?.legacy?.screen_name}/status/${raw.rest_id}` + : undefined), + photos: + raw.photos ?? + (raw.legacy?.entities?.media + ?.filter((media) => media.type === 'photo') + .map((media) => ({ + id: media.id_str, + url: media.media_url_https, + alt_text: media.alt_text + })) || + []), + place: raw.place, + poll: raw.poll ?? null, + quotedStatus, + quotedStatusId: raw.quotedStatusId ?? raw.legacy?.quoted_status_id_str ?? undefined, + // quotes: raw.legacy?.quote_count ?? 0, + replies: raw.legacy?.reply_count ?? 0, + retweets: raw.legacy?.retweet_count ?? 0, + retweetedStatus, + retweetedStatusId: raw.legacy?.retweeted_status_id_str ?? undefined, + text: raw.text ?? raw.legacy?.full_text ?? undefined, + thread: raw.thread || [], + timeParsed: raw.timeParsed + ? new Date(raw.timeParsed) + : raw.legacy?.created_at + ? new Date(raw.legacy?.created_at) + : undefined, + timestamp: + raw.timestamp ?? + (raw.legacy?.created_at ? new Date(raw.legacy.created_at).getTime() / 1000 : undefined), + urls: raw.urls ?? raw.legacy?.entities?.urls ?? [], + userId: raw.userId ?? raw.legacy?.user_id_str ?? undefined, + username: raw.username ?? raw.core?.user_results?.result?.legacy?.screen_name ?? undefined, + videos: raw.videos ?? [], + views: raw.views?.count ? Number(raw.views.count) : 0, + sensitiveContent: raw.sensitiveContent + } + + return t + } + + constructor(runtime: IAgentRuntime, twitterConfig: TwitterConfig) { + super() + this.runtime = runtime + this.twitterConfig = twitterConfig + const username = twitterConfig.TWITTER_USERNAME + if (ClientBase._twitterClients[username]) { + this.twitterClient = ClientBase._twitterClients[username] + } else { + this.twitterClient = new Scraper() + ClientBase._twitterClients[username] = this.twitterClient + } + + this.directions = + '- ' + + this.runtime.character.style.all.join('\n- ') + + '- ' + + this.runtime.character.style.post.join() + } + + async init(): Promise { + const username = this.twitterConfig.TWITTER_USERNAME + const password = this.twitterConfig.TWITTER_PASSWORD + const email = this.twitterConfig.TWITTER_EMAIL + let retries = this.twitterConfig.TWITTER_RETRY_LIMIT + const twitter2faSecret = this.twitterConfig.TWITTER_2FA_SECRET + + if (!username) { + throw new Error('Twitter username not configured') + } + + const cachedCookies = await this.getCachedCookies(username) + + if (cachedCookies) { + elizaLogger.info('Using cached cookies') + await this.setCookiesFromArray(cachedCookies) + } + + elizaLogger.log('Waiting for Twitter login') + while (retries > 0) { + try { + if (await this.twitterClient.isLoggedIn()) { + // cookies are valid, no login required + elizaLogger.info('Successfully logged in.') + break + } else { + await this.twitterClient.login(username, password, email, twitter2faSecret) + if (await this.twitterClient.isLoggedIn()) { + // fresh login, store new cookies + elizaLogger.info('Successfully logged in.') + elizaLogger.info('Caching cookies') + await this.cacheCookies(username, await this.twitterClient.getCookies()) + break + } + } + } catch (error) { + if (error instanceof Error) { + elizaLogger.error(`Login attempt failed: ${error.message}`) + } else { + elizaLogger.error(`Login attempt failed: ${error}`) + } + } + + retries-- + elizaLogger.error(`Failed to login to Twitter. Retrying... (${retries} attempts left)`) + + if (retries === 0) { + elizaLogger.error('Max retries reached. Exiting login process.') + throw new Error('Twitter login failed after maximum retries.') + } + + await new Promise((resolve) => setTimeout(resolve, 2000)) + } + // Initialize Twitter profile + this.profile = await this.fetchProfile(username) + + if (this.profile) { + elizaLogger.log('Twitter user ID:', this.profile.id) + elizaLogger.log('Twitter loaded:', JSON.stringify(this.profile, null, 10)) + // Store profile info for use in responses + this.runtime.character.twitterProfile = { + id: this.profile.id, + username: this.profile.username, + screenName: this.profile.screenName, + bio: this.profile.bio, + nicknames: this.profile.nicknames + } + } else { + throw new Error('Failed to load profile') + } + + await this.loadLatestCheckedTweetId() + await this.populateTimeline() + } + + async fetchOwnPosts(count: number): Promise { + elizaLogger.debug('fetching own posts') + const homeTimeline = await this.twitterClient.getUserTweets(this.profile.id, count) + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return homeTimeline.tweets.map((t) => this.parseTweet(t as RawTweetType)) + } + + /** + * Fetch timeline for twitter account, optionally only from followed accounts + */ + async fetchHomeTimeline(count: number, following?: boolean): Promise { + elizaLogger.debug('fetching home timeline') + const homeTimeline = following + ? await this.twitterClient.fetchFollowingTimeline(count, []) + : await this.twitterClient.fetchHomeTimeline(count, []) + + elizaLogger.debug('Raw home timeline:', homeTimeline) + const processedTimeline = homeTimeline + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + .filter((t) => t.__typename !== 'TweetWithVisibilityResults') // Filter out visibility-restricted tweets + .map((tweet) => this.parseTweet(tweet)) + + // elizaLogger.debug("process homeTimeline", processedTimeline); + return processedTimeline + } + + async fetchTimelineForActions(count: number): Promise { + elizaLogger.debug('fetching timeline for actions') + + const agentUsername = this.twitterConfig.TWITTER_USERNAME + + const homeTimeline = + this.twitterConfig.ACTION_TIMELINE_TYPE === ActionTimelineType.Following + ? await this.twitterClient.fetchFollowingTimeline(count, []) + : await this.twitterClient.fetchHomeTimeline(count, []) + + // Parse, filter out self-tweets, limit to count + return homeTimeline + .map((tweet) => this.parseTweet(tweet)) + .filter((tweet) => tweet.username !== agentUsername) // do not perform action on self-tweets + .slice(0, count) + // TODO: Once the 'count' parameter is fixed in the 'fetchTimeline' + // method of the 'agent-twitter-client', + // this workaround can be removed. + // Related issue: https://github.com/elizaos/agent-twitter-client/issues/43 + } + + async fetchSearchTweets( + query: string, + maxTweets: number, + searchMode: SearchMode, + cursor?: string + ): Promise { + try { + // if we dont get a response in 5 seconds, something is wrong + const timeoutPromise = new Promise((resolve) => + setTimeout(() => resolve({ tweets: [] }), 15000) + ) + + try { + const result = await this.requestQueue.add( + async () => + await Promise.race([ + this.twitterClient.fetchSearchTweets(query, maxTweets, searchMode, cursor), + timeoutPromise + ]) + ) + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return (result ?? { tweets: [] }) as QueryTweetsResponse + } catch (error) { + elizaLogger.error('Error fetching search tweets:', error) + return { tweets: [] } + } + } catch (error) { + elizaLogger.error('Error fetching search tweets:', error) + return { tweets: [] } + } + } + + private async populateTimeline(): Promise { + elizaLogger.debug('populating timeline...') + + const cachedTimeline = await this.getCachedTimeline() + + // Check if the cache file exists + if (cachedTimeline) { + // Read the cached search results from the file + + // Get the existing memories from the database + const existingMemories = await this.runtime.messageManager.getMemoriesByRoomIds({ + roomIds: cachedTimeline.map((tweet) => + stringToUuid(tweet.conversationId + '-' + this.runtime.agentId) + ) + }) + + // TODO: load tweets not in cache? + + // Create a Set to store the IDs of existing memories + const existingMemoryIds = new Set(existingMemories.map((memory) => memory.id.toString())) + + // Check if any of the cached tweets exist in the existing memories + const someCachedTweetsExist = cachedTimeline.some((tweet) => + existingMemoryIds.has(stringToUuid(tweet.id + '-' + this.runtime.agentId)) + ) + + if (someCachedTweetsExist) { + // Filter out the cached tweets that already exist in the database + const tweetsToSave = cachedTimeline.filter( + (tweet) => !existingMemoryIds.has(stringToUuid(tweet.id + '-' + this.runtime.agentId)) + ) + + console.log({ + processingTweets: tweetsToSave.map((tweet) => tweet.id).join(',') + }) + + // Save the missing tweets as memories + for (const tweet of tweetsToSave) { + elizaLogger.log('Saving Tweet', tweet.id) + + const roomId = stringToUuid(tweet.conversationId + '-' + this.runtime.agentId) + + const userId = + tweet.userId === this.profile.id ? this.runtime.agentId : stringToUuid(tweet.userId) + + if (tweet.userId === this.profile.id) { + await this.runtime.ensureConnection( + this.runtime.agentId, + roomId, + this.profile.username, + this.profile.screenName, + 'twitter' + ) + } else { + await this.runtime.ensureConnection( + userId, + roomId, + tweet.username, + tweet.name, + 'twitter' + ) + } + + const content = { + text: tweet.text, + url: tweet.permanentUrl, + source: 'twitter', + inReplyTo: tweet.inReplyToStatusId + ? stringToUuid(tweet.inReplyToStatusId + '-' + this.runtime.agentId) + : undefined + } + + elizaLogger.log('Creating memory for tweet', tweet.id) + + // check if it already exists + const memory = await this.runtime.messageManager.getMemoryById( + stringToUuid(tweet.id + '-' + this.runtime.agentId) + ) + + if (memory) { + elizaLogger.log('Memory already exists, skipping timeline population') + break + } + + await this.runtime.messageManager.createMemory({ + id: stringToUuid(tweet.id + '-' + this.runtime.agentId), + userId, + content, + agentId: this.runtime.agentId, + roomId, + embedding: getEmbeddingZeroVector(), + createdAt: tweet.timestamp * 1000 + }) + + await this.cacheTweet(tweet) + } + + elizaLogger.log(`Populated ${tweetsToSave.length} missing tweets from the cache.`) + return + } + } + + const timeline = await this.fetchHomeTimeline(cachedTimeline ? 10 : 50) + const username = this.twitterConfig.TWITTER_USERNAME + + // Get the most recent 20 mentions and interactions + const mentionsAndInteractions = await this.fetchSearchTweets( + `@${username}`, + 20, + SearchMode.Latest + ) + + // Combine the timeline tweets and mentions/interactions + const allTweets = [...timeline, ...mentionsAndInteractions.tweets] + + // Create a Set to store unique tweet IDs + const tweetIdsToCheck = new Set() + const roomIds = new Set() + + // Add tweet IDs to the Set + for (const tweet of allTweets) { + tweetIdsToCheck.add(tweet.id) + roomIds.add(stringToUuid(tweet.conversationId + '-' + this.runtime.agentId)) + } + + // Check the existing memories in the database + const existingMemories = await this.runtime.messageManager.getMemoriesByRoomIds({ + roomIds: Array.from(roomIds) + }) + + // Create a Set to store the existing memory IDs + const existingMemoryIds = new Set(existingMemories.map((memory) => memory.id)) + + // Filter out the tweets that already exist in the database + const tweetsToSave = allTweets.filter( + (tweet) => !existingMemoryIds.has(stringToUuid(tweet.id + '-' + this.runtime.agentId)) + ) + + elizaLogger.debug({ + processingTweets: tweetsToSave.map((tweet) => tweet.id).join(',') + }) + + await this.runtime.ensureUserExists( + this.runtime.agentId, + this.profile.username, + this.runtime.character.name, + 'twitter' + ) + + // Save the new tweets as memories + for (const tweet of tweetsToSave) { + elizaLogger.log('Saving Tweet', tweet.id) + + const roomId = stringToUuid(tweet.conversationId + '-' + this.runtime.agentId) + const userId = + tweet.userId === this.profile.id ? this.runtime.agentId : stringToUuid(tweet.userId) + + if (tweet.userId === this.profile.id) { + await this.runtime.ensureConnection( + this.runtime.agentId, + roomId, + this.profile.username, + this.profile.screenName, + 'twitter' + ) + } else { + await this.runtime.ensureConnection(userId, roomId, tweet.username, tweet.name, 'twitter') + } + + const content = { + text: tweet.text, + url: tweet.permanentUrl, + source: 'twitter', + inReplyTo: tweet.inReplyToStatusId ? stringToUuid(tweet.inReplyToStatusId) : undefined + } + + await this.runtime.messageManager.createMemory({ + id: stringToUuid(tweet.id + '-' + this.runtime.agentId), + userId, + content, + agentId: this.runtime.agentId, + roomId, + embedding: getEmbeddingZeroVector(), + createdAt: tweet.timestamp * 1000 + }) + + await this.cacheTweet(tweet) + } + + // Cache + await this.cacheTimeline(timeline) + await this.cacheMentions(mentionsAndInteractions.tweets) + } + + async setCookiesFromArray(cookiesArray: TwitterCookie[]): Promise { + const cookieStrings = cookiesArray.map( + (cookie) => + `${cookie.key}=${cookie.value}; Domain=${cookie.domain}; Path=${cookie.path}; ${ + cookie.secure ? 'Secure' : '' + }; ${cookie.httpOnly ? 'HttpOnly' : ''}; SameSite=${cookie.sameSite || 'Lax'}` + ) + await this.twitterClient.setCookies(cookieStrings) + } + + async saveRequestMessage(message: Memory, state: State): Promise { + if (message.content.text) { + const recentMessage = await this.runtime.messageManager.getMemories({ + roomId: message.roomId, + count: 1, + unique: false + }) + + if (recentMessage.length > 0 && recentMessage[0].content === message.content) { + elizaLogger.debug('Message already saved', recentMessage[0].id) + } else { + await this.runtime.messageManager.createMemory({ + ...message, + embedding: getEmbeddingZeroVector() + }) + } + + await this.runtime.evaluate(message, { + ...state, + twitterClient: this.twitterClient + }) + } + } + + async loadLatestCheckedTweetId(): Promise { + const latestCheckedTweetId = await this.runtime.cacheManager.get( + `twitter/${this.profile.username}/latest_checked_tweet_id` + ) + + if (latestCheckedTweetId) { + this.lastCheckedTweetId = BigInt(latestCheckedTweetId) + } + } + + async cacheLatestCheckedTweetId(): Promise { + if (this.lastCheckedTweetId) { + await this.runtime.cacheManager.set( + `twitter/${this.profile.username}/latest_checked_tweet_id`, + this.lastCheckedTweetId.toString() + ) + } + } + + async getCachedTimeline(): Promise { + return await this.runtime.cacheManager.get(`twitter/${this.profile.username}/timeline`) + } + + async cacheTimeline(timeline: Tweet[]): Promise { + await this.runtime.cacheManager.set(`twitter/${this.profile.username}/timeline`, timeline, { + expires: Date.now() + 10 * 1000 + }) + } + + async cacheMentions(mentions: Tweet[]): Promise { + await this.runtime.cacheManager.set(`twitter/${this.profile.username}/mentions`, mentions, { + expires: Date.now() + 10 * 1000 + }) + } + + async getCachedCookies(username: string): Promise { + return await this.runtime.cacheManager.get(`twitter/${username}/cookies`) + } + + async cacheCookies(username: string, cookies: TwitterCookie[]): Promise { + await this.runtime.cacheManager.set(`twitter/${username}/cookies`, cookies) + } + + async fetchProfile(username: string): Promise { + try { + const profile = await this.requestQueue.add(async () => { + const profile = await this.twitterClient.getProfile(username) + return { + id: profile.userId, + username, + screenName: profile.name || this.runtime.character.name, + bio: + profile.biography || + (typeof this.runtime.character.bio === 'string' + ? this.runtime.character.bio + : this.runtime.character.bio.length > 0 + ? this.runtime.character.bio[0] + : ''), + nicknames: this.runtime.character.twitterProfile?.nicknames || [] + } satisfies TwitterProfile + }) + + return profile + } catch (error) { + console.error('Error fetching Twitter profile:', error) + throw error + } + } +} diff --git a/src/clients/twitter/environment.ts b/src/clients/twitter/environment.ts new file mode 100644 index 00000000..c3cc6ace --- /dev/null +++ b/src/clients/twitter/environment.ts @@ -0,0 +1,205 @@ +import { ActionTimelineType, parseBooleanFromText, type IAgentRuntime } from '@elizaos/core' +import { z, ZodError } from 'zod' + +export const DEFAULT_MAX_TWEET_LENGTH = 280 + +const twitterUsernameSchema = z + .string() + .min(1, 'An X/Twitter Username must be at least 1 character long') + .max(15, 'An X/Twitter Username cannot exceed 15 characters') + .refine((username) => { + // Allow wildcard '*' as a special case + if (username === '*') return true + + // Twitter usernames can: + // - Start with digits now + // - Contain letters, numbers, underscores + // - Must not be empty + return /^[A-Za-z0-9_]+$/.test(username) + }, 'An X Username can only contain letters, numbers, and underscores') + +/** + * This schema defines all required/optional environment settings, + * including new fields like TWITTER_SPACES_ENABLE. + */ +export const twitterEnvSchema = z.object({ + TWITTER_DRY_RUN: z.boolean(), + TWITTER_USERNAME: z.string().min(1, 'X/Twitter username is required'), + TWITTER_PASSWORD: z.string().min(1, 'X/Twitter password is required'), + TWITTER_EMAIL: z.string().email('Valid X/Twitter email is required'), + MAX_TWEET_LENGTH: z.number().int().default(DEFAULT_MAX_TWEET_LENGTH), + TWITTER_SEARCH_ENABLE: z.boolean().default(false), + TWITTER_2FA_SECRET: z.string(), + TWITTER_RETRY_LIMIT: z.number().int(), + TWITTER_POLL_INTERVAL: z.number().int(), + TWITTER_TARGET_USERS: z.array(twitterUsernameSchema).default([]), + // I guess it's possible to do the transformation with zod + // not sure it's preferable, maybe a readability issue + // since more people will know js/ts than zod + /* + z + .string() + .transform((val) => val.trim()) + .pipe( + z.string() + .transform((val) => + val ? val.split(',').map((u) => u.trim()).filter(Boolean) : [] + ) + .pipe( + z.array( + z.string() + .min(1) + .max(15) + .regex( + /^[A-Za-z][A-Za-z0-9_]*[A-Za-z0-9]$|^[A-Za-z]$/, + 'Invalid Twitter username format' + ) + ) + ) + .transform((users) => users.join(',')) + ) + .optional() + .default(''), + */ + POST_INTERVAL_MIN: z.number().int(), + POST_INTERVAL_MAX: z.number().int(), + ENABLE_ACTION_PROCESSING: z.boolean(), + ACTION_INTERVAL: z.number().int(), + POST_IMMEDIATELY: z.boolean(), + TWITTER_SPACES_ENABLE: z.boolean().default(false), + MAX_ACTIONS_PROCESSING: z.number().int(), + ACTION_TIMELINE_TYPE: z.nativeEnum(ActionTimelineType).default(ActionTimelineType.ForYou) +}) + +export type TwitterConfig = z.infer + +/** + * Helper to parse a comma-separated list of Twitter usernames + * (already present in your code). + */ +function parseTargetUsers(targetUsersStr?: string | null): string[] { + if (!targetUsersStr?.trim()) { + return [] + } + return targetUsersStr + .split(',') + .map((user) => user.trim()) + .filter(Boolean) +} + +function safeParseInt(value: string | undefined | null, defaultValue: number): number { + if (!value) return defaultValue + const parsed = Number.parseInt(value) + return isNaN(parsed) ? defaultValue : Math.max(1, parsed) +} + +/** + * Validates or constructs a TwitterConfig object using zod, + * taking values from the IAgentRuntime or process.env as needed. + */ +// This also is organized to serve as a point of documentation for the client +// most of the inputs from the framework (env/character) + +// we also do a lot of typing/parsing here +// so we can do it once and only once per character +export async function validateTwitterConfig(runtime: IAgentRuntime): Promise { + try { + const twitterConfig = { + TWITTER_DRY_RUN: + parseBooleanFromText( + runtime.getSetting('TWITTER_DRY_RUN') || process.env.TWITTER_DRY_RUN + ) ?? false, // parseBooleanFromText return null if "", map "" to false + + TWITTER_USERNAME: runtime.getSetting('TWITTER_USERNAME') || process.env.TWITTER_USERNAME, + + TWITTER_PASSWORD: runtime.getSetting('TWITTER_PASSWORD') || process.env.TWITTER_PASSWORD, + + TWITTER_EMAIL: runtime.getSetting('TWITTER_EMAIL') || process.env.TWITTER_EMAIL, + + // number as string? + MAX_TWEET_LENGTH: safeParseInt( + runtime.getSetting('MAX_TWEET_LENGTH') || process.env.MAX_TWEET_LENGTH, + DEFAULT_MAX_TWEET_LENGTH + ), + + TWITTER_SEARCH_ENABLE: + parseBooleanFromText( + runtime.getSetting('TWITTER_SEARCH_ENABLE') || process.env.TWITTER_SEARCH_ENABLE + ) ?? false, + + // string passthru + TWITTER_2FA_SECRET: + runtime.getSetting('TWITTER_2FA_SECRET') || process.env.TWITTER_2FA_SECRET || '', + + // int + TWITTER_RETRY_LIMIT: safeParseInt( + runtime.getSetting('TWITTER_RETRY_LIMIT') || process.env.TWITTER_RETRY_LIMIT, + 5 + ), + + // int in seconds + TWITTER_POLL_INTERVAL: safeParseInt( + runtime.getSetting('TWITTER_POLL_INTERVAL') || process.env.TWITTER_POLL_INTERVAL, + 120 // 2m + ), + + // comma separated string + TWITTER_TARGET_USERS: parseTargetUsers( + runtime.getSetting('TWITTER_TARGET_USERS') || process.env.TWITTER_TARGET_USERS + ), + + // int in minutes + POST_INTERVAL_MIN: safeParseInt( + runtime.getSetting('POST_INTERVAL_MIN') || process.env.POST_INTERVAL_MIN, + 90 // 1.5 hours + ), + + // int in minutes + POST_INTERVAL_MAX: safeParseInt( + runtime.getSetting('POST_INTERVAL_MAX') || process.env.POST_INTERVAL_MAX, + 180 // 3 hours + ), + + // bool + ENABLE_ACTION_PROCESSING: + parseBooleanFromText( + runtime.getSetting('ENABLE_ACTION_PROCESSING') || process.env.ENABLE_ACTION_PROCESSING + ) ?? false, + + // init in minutes (min 1m) + ACTION_INTERVAL: safeParseInt( + runtime.getSetting('ACTION_INTERVAL') || process.env.ACTION_INTERVAL, + 5 // 5 minutes + ), + + // bool + POST_IMMEDIATELY: + parseBooleanFromText( + runtime.getSetting('POST_IMMEDIATELY') || process.env.POST_IMMEDIATELY + ) ?? false, + + TWITTER_SPACES_ENABLE: + parseBooleanFromText( + runtime.getSetting('TWITTER_SPACES_ENABLE') || process.env.TWITTER_SPACES_ENABLE + ) ?? false, + + MAX_ACTIONS_PROCESSING: safeParseInt( + runtime.getSetting('MAX_ACTIONS_PROCESSING') || process.env.MAX_ACTIONS_PROCESSING, + 1 + ), + + ACTION_TIMELINE_TYPE: + runtime.getSetting('ACTION_TIMELINE_TYPE') || process.env.ACTION_TIMELINE_TYPE + } + + return twitterEnvSchema.parse(twitterConfig) + } catch (error) { + if (error instanceof ZodError) { + const errorMessages = error.errors + .map((err) => `${err.path.join('.')}: ${err.message}`) + .join('\n') + throw new Error(`X/Twitter configuration validation failed:\n${errorMessages}`) + } + throw error + } +} diff --git a/src/clients/twitter/index.ts b/src/clients/twitter/index.ts new file mode 100644 index 00000000..e04c3329 --- /dev/null +++ b/src/clients/twitter/index.ts @@ -0,0 +1,77 @@ +import { ClientBase } from '@/clients/twitter/base' +import { validateTwitterConfig, type TwitterConfig } from '@/clients/twitter/environment' +import { TwitterInteractionClient } from '@/clients/twitter/interactions' +import { TwitterPostClient } from '@/clients/twitter/post' +import { TwitterSearchClient } from '@/clients/twitter/search' +import { AgentcoinRuntime } from '@/common/runtime' +import { elizaLogger, type Client } from '@elizaos/core' + +/** + * A manager that orchestrates all specialized Twitter logic: + * - client: base operations (login, timeline caching, etc.) + * - post: autonomous posting logic + * - search: searching tweets / replying logic + * - interaction: handling mentions, replies + * - space: launching and managing Twitter Spaces (optional) + */ +class TwitterManager { + client: ClientBase + post: TwitterPostClient + search: TwitterSearchClient + interaction: TwitterInteractionClient + + constructor(runtime: AgentcoinRuntime, twitterConfig: TwitterConfig) { + // Pass twitterConfig to the base client + this.client = new ClientBase(runtime, twitterConfig) + + // Posting logic + this.post = new TwitterPostClient(this.client, runtime) + + // Optional search logic (enabled if TWITTER_SEARCH_ENABLE is true) + if (twitterConfig.TWITTER_SEARCH_ENABLE) { + elizaLogger.warn('Twitter/X client running in a mode that:') + elizaLogger.warn('1. violates consent of random users') + elizaLogger.warn('2. burns your rate limit') + elizaLogger.warn('3. can get your account banned') + elizaLogger.warn('use at your own risk') + this.search = new TwitterSearchClient(this.client, runtime) + } + + // Mentions and interactions + this.interaction = new TwitterInteractionClient(this.client, runtime) + + elizaLogger.info('🐦 Twitter client initialized') + } +} + +export const TwitterClientInterface: Client = { + async start(runtime: AgentcoinRuntime) { + const twitterConfig: TwitterConfig = await validateTwitterConfig(runtime) + + elizaLogger.log('Twitter client started') + + const manager = new TwitterManager(runtime, twitterConfig) + + // Initialize login/session + await manager.client.init() + + // Start the posting loop + await manager.post.start() + + // Start the search logic if it exists + if (manager.search) { + await manager.search.start() + } + + // Start interactions (mentions, replies) + await manager.interaction.start() + + return manager + }, + + async stop(_runtime: AgentcoinRuntime) { + elizaLogger.warn('Twitter client does not support stopping yet') + } +} + +export default TwitterClientInterface diff --git a/src/clients/twitter/interactions.ts b/src/clients/twitter/interactions.ts new file mode 100644 index 00000000..e0da231e --- /dev/null +++ b/src/clients/twitter/interactions.ts @@ -0,0 +1,662 @@ +import type { ClientBase } from '@/clients/twitter/base' +import { buildConversationThread, sendTweet, wait } from '@/clients/twitter/utils' +import { hasActions } from '@/common/functions' +import { AgentcoinRuntime } from '@/common/runtime' +import { + composeContext, + type Content, + elizaLogger, + generateMessageResponse, + generateShouldRespond, + getEmbeddingZeroVector, + type HandlerCallback, + type IImageDescriptionService, + type Memory, + messageCompletionFooter, + ModelClass, + ServiceType, + shouldRespondFooter, + stringToUuid +} from '@elizaos/core' +import { SearchMode, type Tweet } from 'agent-twitter-client' + +export const twitterMessageHandlerTemplate = + ` +# Areas of Expertise +{{knowledge}} + +# About {{agentName}} (@{{twitterUserName}}): +{{bio}} +{{lore}} +{{topics}} + +{{providers}} + +{{characterPostExamples}} + +{{postDirections}} + +Recent interactions between {{agentName}} and other users: +{{recentPostInteractions}} + +{{recentPosts}} + +# TASK: Generate a post/reply in the voice, style and perspective of {{agentName}} +(@{{twitterUserName}}) while using the thread of tweets as additional context: + +Current Post: +{{currentPost}} +Here is the descriptions of images in the Current post. +{{imageDescriptions}} + +Thread of Tweets You Are Replying To: +{{formattedConversation}} + +# INSTRUCTIONS: Generate a post in the voice, style and perspective of {{agentName}} +(@{{twitterUserName}}). You MUST include an action if the current post text includes a prompt that +is similar to one of the available actions mentioned here: +{{actionNames}} +{{actions}} + +Here is the current post text again. Remember to include an action if the current post text +includes a prompt that asks for one of the available actions mentioned above +(does not need to be exact) +{{currentPost}} +Here is the descriptions of images in the Current post. +{{imageDescriptions}} +` + messageCompletionFooter + +export const twitterShouldRespondTemplate = (targetUsersStr: string): string => + `# INSTRUCTIONS: Determine if {{agentName}} (@{{twitterUserName}}) should respond to the message + and participate in the conversation. Do not comment. Just respond with "true" or "false". + +Response options are RESPOND, IGNORE and STOP. + +PRIORITY RULE: ALWAYS RESPOND to these users regardless of topic or message content: +${targetUsersStr}. Topic relevance should be ignored for these users. + +For other users: +- {{agentName}} should RESPOND to messages directed at them +- {{agentName}} should RESPOND to conversations relevant to their background +- {{agentName}} should IGNORE irrelevant messages +- {{agentName}} should IGNORE very short messages unless directly addressed +- {{agentName}} should STOP if asked to stop +- {{agentName}} should STOP if conversation is concluded +- {{agentName}} is in a room with other users and wants to be conversational, but not annoying. + +IMPORTANT: +- {{agentName}} (aka @{{twitterUserName}}) is particularly sensitive about being annoying, +so if there is any doubt, it is better to IGNORE than to RESPOND. +- For users not in the priority list, {{agentName}} (@{{twitterUserName}}) should err on +the side of IGNORE rather than RESPOND if in doubt. + +Recent Posts: +{{recentPosts}} + +Current Post: +{{currentPost}} + +Thread of Tweets You Are Replying To: +{{formattedConversation}} + +# INSTRUCTIONS: Respond with [RESPOND] if {{agentName}} should respond, or [IGNORE] +if {{agentName}} should not respond to the last message and [STOP] if {{agentName}} +should stop participating in the conversation. +` + shouldRespondFooter + +export class TwitterInteractionClient { + client: ClientBase + runtime: AgentcoinRuntime + + private isDryRun: boolean + + constructor(client: ClientBase, runtime: AgentcoinRuntime) { + this.client = client + this.runtime = runtime + this.isDryRun = this.client.twitterConfig.TWITTER_DRY_RUN + } + + async start(): Promise { + const handleTwitterInteractionsLoop = (): void => { + void this.handleTwitterInteractions() + setTimeout( + handleTwitterInteractionsLoop, + // Defaults to 2 minutes + this.client.twitterConfig.TWITTER_POLL_INTERVAL * 1000 + ) + } + handleTwitterInteractionsLoop() + } + + async handleTwitterInteractions(): Promise { + elizaLogger.log('Checking Twitter interactions') + + const twitterUsername = this.client.profile.username + try { + // Check for mentions + const mentionCandidates = ( + await this.client.fetchSearchTweets(`@${twitterUsername}`, 20, SearchMode.Latest) + ).tweets + + elizaLogger.log('Completed checking mentioned tweets:', mentionCandidates.length) + let uniqueTweetCandidates = [...mentionCandidates] + // Only process target users if configured + if (this.client.twitterConfig.TWITTER_TARGET_USERS.length) { + const TARGET_USERS = this.client.twitterConfig.TWITTER_TARGET_USERS + + elizaLogger.log('Processing target users:', TARGET_USERS) + + if (TARGET_USERS.length > 0) { + // Create a map to store tweets by user + const tweetsByUser = new Map() + + // Fetch tweets from all target users + for (const username of TARGET_USERS) { + try { + const userTweets = ( + await this.client.twitterClient.fetchSearchTweets( + `from:${username}`, + 3, + SearchMode.Latest + ) + ).tweets + + // Filter for unprocessed, non-reply, recent tweets + const validTweets = userTweets.filter((tweet) => { + const isUnprocessed = + !this.client.lastCheckedTweetId || + Number.parseInt(tweet.id) > this.client.lastCheckedTweetId + const isRecent = Date.now() - tweet.timestamp * 1000 < 2 * 60 * 60 * 1000 + + elizaLogger.log(`Tweet ${tweet.id} checks:`, { + isUnprocessed, + isRecent, + isReply: tweet.isReply, + isRetweet: tweet.isRetweet + }) + + return isUnprocessed && !tweet.isReply && !tweet.isRetweet && isRecent + }) + + if (validTweets.length > 0) { + tweetsByUser.set(username, validTweets) + elizaLogger.log(`Found ${validTweets.length} valid tweets from ${username}`) + } + } catch (error) { + elizaLogger.error(`Error fetching tweets for ${username}:`, error) + continue + } + } + + // Select one tweet from each user that has tweets + const selectedTweets: Tweet[] = [] + for (const [username, tweets] of tweetsByUser) { + if (tweets.length > 0) { + // Randomly select one tweet from this user + const randomTweet = tweets[Math.floor(Math.random() * tweets.length)] + selectedTweets.push(randomTweet) + elizaLogger.log( + `Selected tweet from ${username}: ${randomTweet.text?.substring(0, 100)}` + ) + } + } + + // Add selected tweets to candidates + uniqueTweetCandidates = [...mentionCandidates, ...selectedTweets] + } + } else { + elizaLogger.log('No target users configured, processing only mentions') + } + + // Sort tweet candidates by ID in ascending order + uniqueTweetCandidates + .sort((a, b) => a.id.localeCompare(b.id)) + .filter((tweet) => tweet.userId !== this.client.profile.id) + + // for each tweet candidate, handle the tweet + for (const tweet of uniqueTweetCandidates) { + if (!this.client.lastCheckedTweetId || BigInt(tweet.id) > this.client.lastCheckedTweetId) { + // Generate the tweetId UUID the same way it's done in handleTweet + const tweetId = stringToUuid(tweet.id + '-' + this.runtime.agentId) + + // Check if we've already processed this tweet + const existingResponse = await this.runtime.messageManager.getMemoryById(tweetId) + + if (existingResponse) { + elizaLogger.log(`Already responded to tweet ${tweet.id}, skipping`) + continue + } + elizaLogger.log('New Tweet found', tweet.permanentUrl) + + const roomId = stringToUuid(tweet.conversationId + '-' + this.runtime.agentId) + + const userIdUUID = + tweet.userId === this.client.profile.id + ? this.runtime.agentId + : stringToUuid(tweet.userId) + + const messageText = tweet.text + const username = tweet.username + + const shouldContinue = await this.runtime.handle('message', { + text: messageText, + sender: username, + source: 'twitter', + timestamp: new Date(tweet.timestamp * 1000) + }) + + if (!shouldContinue) { + elizaLogger.info('TwitterClient received message event but it was suppressed') + return + } + + await this.runtime.ensureUserRoomConnection({ + roomId, + userId: userIdUUID, + username, + name: tweet.name, + email: username, + source: 'twitter' + }) + + const thread = await buildConversationThread(tweet, this.client) + + const message = { + content: { + text: tweet.text, + imageUrls: tweet.photos?.map((photo) => photo.url) || [] + }, + agentId: this.runtime.agentId, + userId: userIdUUID, + roomId + } + + await this.handleTweet({ + tweet, + message, + thread + }) + + // Update the last checked tweet ID after processing each tweet + this.client.lastCheckedTweetId = BigInt(tweet.id) + } + } + + // Save the latest checked tweet ID to the file + await this.client.cacheLatestCheckedTweetId() + + elizaLogger.log('Finished checking Twitter interactions') + } catch (error) { + elizaLogger.error('Error handling Twitter interactions:', error) + } + } + + private async handleTweet({ + tweet, + message, + thread + }: { + tweet: Tweet + message: Memory + thread: Tweet[] + }): Promise<{ text: string; action: string }> { + // Only skip if tweet is from self AND not from a target user + if ( + tweet.userId === this.client.profile.id && + !this.client.twitterConfig.TWITTER_TARGET_USERS.includes(tweet.username) + ) { + return + } + + if (!message.content.text) { + elizaLogger.log('Skipping Tweet with no text', tweet.id) + return { text: '', action: 'IGNORE' } + } + + elizaLogger.log('Processing Tweet: ', tweet.id) + const formatTweet = (tweet: Tweet): string => { + return `ID: ${tweet.id} From: ${tweet.name} (@${tweet.username}) + Text: ${tweet.text}` + } + const currentPost = formatTweet(tweet) + + const formattedConversation = thread + .map( + (tweet) => `@${tweet.username} (${new Date(tweet.timestamp * 1000).toLocaleString('en-US', { + hour: '2-digit', + minute: '2-digit', + month: 'short', + day: 'numeric' + })}): + ${tweet.text}` + ) + .join('\n\n') + + const imageDescriptionsArray = [] + try { + for (const photo of tweet.photos) { + const description = await this.runtime + .getService(ServiceType.IMAGE_DESCRIPTION) + .describeImage(photo.url) + imageDescriptionsArray.push(description) + } + } catch (error) { + // Handle the error + elizaLogger.error('Error Occured during describing image: ', error) + } + + let state = await this.runtime.composeState(message, { + twitterClient: this.client.twitterClient, + twitterUserName: this.client.twitterConfig.TWITTER_USERNAME, + currentPost, + formattedConversation, + imageDescriptions: + imageDescriptionsArray.length > 0 + ? `\nImages in Tweet:\n${imageDescriptionsArray + .map( + (desc: { title: string; description: string }, i: number) => + `Image ${i + 1}: Title: ${desc.title}\nDescription: ${desc.description}` + ) + .join('\n\n')}` + : '' + }) + + // check if the tweet exists, save if it doesn't + const tweetId = stringToUuid(tweet.id + '-' + this.runtime.agentId) + const tweetExists = await this.runtime.messageManager.getMemoryById(tweetId) + + if (!tweetExists) { + elizaLogger.log('tweet does not exist, saving') + const userIdUUID = stringToUuid(tweet.userId) + const roomId = stringToUuid(tweet.conversationId) + + const message = { + id: tweetId, + agentId: this.runtime.agentId, + content: { + text: tweet.text, + url: tweet.permanentUrl, + imageUrls: tweet.photos?.map((photo) => photo.url) || [], + inReplyTo: tweet.inReplyToStatusId + ? stringToUuid(tweet.inReplyToStatusId + '-' + this.runtime.agentId) + : undefined + }, + userId: userIdUUID, + roomId, + createdAt: tweet.timestamp * 1000 + } + void this.client.saveRequestMessage(message, state) + } + + // get usernames into str + const validTargetUsersStr = this.client.twitterConfig.TWITTER_TARGET_USERS.join(',') + + const shouldRespondContext = composeContext({ + state, + template: + this.runtime.character.templates?.twitterShouldRespondTemplate || + this.runtime.character?.templates?.shouldRespondTemplate || + twitterShouldRespondTemplate(validTargetUsersStr) + }) + + const shouldRespond = await generateShouldRespond({ + runtime: this.runtime, + context: shouldRespondContext, + modelClass: ModelClass.MEDIUM + }) + + // Promise<"RESPOND" | "IGNORE" | "STOP" | null> { + if (shouldRespond !== 'RESPOND') { + elizaLogger.log('Not responding to message') + return { text: 'Response Decision:', action: shouldRespond } + } + + const context = composeContext({ + state: { + ...state, + // Convert actionNames array to string + actionNames: Array.isArray(state.actionNames) + ? state.actionNames.join(', ') + : state.actionNames || '', + actions: Array.isArray(state.actions) ? state.actions.join('\n') : state.actions || '', + // Ensure character examples are included + characterPostExamples: this.runtime.character.messageExamples + ? this.runtime.character.messageExamples + .map((example) => + example + .map( + (msg) => + `${msg.user}: ${msg.content.text}${msg.content.action ? ` [Action: ${msg.content.action}]` : ''}` + ) + .join('\n') + ) + .join('\n\n') + : '' + }, + template: + this.runtime.character.templates?.twitterMessageHandlerTemplate || + this.runtime.character?.templates?.messageHandlerTemplate || + twitterMessageHandlerTemplate + }) + + let shouldContinue = await this.runtime.handle('prellm', { + state, + responses: [], + memory: message + }) + + if (!shouldContinue) { + elizaLogger.info('TwitterClient received prellm event but it was suppressed') + return + } + + const response = await generateMessageResponse({ + runtime: this.runtime, + context, + modelClass: ModelClass.LARGE + }) + + const removeQuotes = (str: string): string => str.replace(/^['"](.*)['"]$/, '$1') + + const stringId = stringToUuid(tweet.id + '-' + this.runtime.agentId) + + response.inReplyTo = stringId + + response.text = removeQuotes(response.text) + + shouldContinue = await this.runtime.handle('postllm', { + state, + responses: [], + memory: message, + content: response + }) + + if (!shouldContinue) { + elizaLogger.info('TwitterClient received postllm event but it was suppressed') + return + } + + if (response.text) { + if (this.isDryRun) { + elizaLogger.info( + `Dry run: Selected Post: ${tweet.id} - ${tweet.username}: + ${tweet.text}\nAgent's Output:\n${response.text}` + ) + } else { + try { + const callback: HandlerCallback = async (response: Content, tweetId?: string) => { + const memories = await sendTweet( + this.client, + response, + message.roomId, + this.client.twitterConfig.TWITTER_USERNAME, + tweetId || tweet.id + ) + return memories + } + + const messageResponses = await callback(response) + + state = await this.runtime.updateRecentMessageState(state) + + for (const responseMessage of messageResponses) { + if (responseMessage === messageResponses[messageResponses.length - 1]) { + responseMessage.content.action = response.action + } else { + responseMessage.content.action = 'CONTINUE' + } + await this.runtime.messageManager.createMemory(responseMessage) + } + const responseTweetId = messageResponses[messageResponses.length - 1]?.content?.tweetId + + if (!hasActions(messageResponses)) { + return + } + + // `preaction` event + shouldContinue = await this.runtime.handle('preaction', { + state, + responses: messageResponses, + memory: message + }) + + if (!shouldContinue) { + elizaLogger.info('TwitterClient received preaction event but it was suppressed') + return + } + + await this.runtime.processActions( + message, + messageResponses, + state, + async (response: Content) => { + shouldContinue = await this.runtime.handle('postaction', { + state, + responses: messageResponses, + memory: message, + content: response + }) + + if (!shouldContinue) { + elizaLogger.info('TwitterClient received postaction event but it was suppressed') + return + } + + return callback(response, responseTweetId) + } + ) + + const responseInfo = `Context:\n\n${context}\n\n + Selected Post: ${tweet.id} - ${tweet.username}: ${tweet.text}\n + Agent's Output:\n${response.text}` + + await this.runtime.cacheManager.set( + `twitter/tweet_generation_${tweet.id}.txt`, + responseInfo + ) + await wait() + } catch (error) { + elizaLogger.error(`Error sending response tweet: ${error}`) + } + } + } + } + + async buildConversationThread(tweet: Tweet, maxReplies = 10): Promise { + const thread: Tweet[] = [] + const visited: Set = new Set() + + const processThread = async (currentTweet: Tweet, depth = 0): Promise => { + elizaLogger.log('Processing tweet:', { + id: currentTweet.id, + inReplyToStatusId: currentTweet.inReplyToStatusId, + depth + }) + + if (!currentTweet) { + elizaLogger.log('No current tweet found for thread building') + return + } + + if (depth >= maxReplies) { + elizaLogger.log('Reached maximum reply depth', depth) + return + } + + // Handle memory storage + const memory = await this.runtime.messageManager.getMemoryById( + stringToUuid(currentTweet.id + '-' + this.runtime.agentId) + ) + if (!memory) { + const roomId = stringToUuid(currentTweet.conversationId + '-' + this.runtime.agentId) + const userId = stringToUuid(currentTweet.userId) + + await this.runtime.ensureConnection( + userId, + roomId, + currentTweet.username, + currentTweet.name, + 'twitter' + ) + + await this.runtime.messageManager.createMemory({ + id: stringToUuid(currentTweet.id + '-' + this.runtime.agentId), + agentId: this.runtime.agentId, + content: { + text: currentTweet.text, + source: 'twitter', + url: currentTweet.permanentUrl, + imageUrls: currentTweet.photos?.map((photo) => photo.url) || [], + inReplyTo: currentTweet.inReplyToStatusId + ? stringToUuid(currentTweet.inReplyToStatusId + '-' + this.runtime.agentId) + : undefined + }, + createdAt: currentTweet.timestamp * 1000, + roomId, + userId: + currentTweet.userId === this.client.profile.id + ? this.runtime.agentId + : stringToUuid(currentTweet.userId), + embedding: getEmbeddingZeroVector() + }) + } + + if (visited.has(currentTweet.id)) { + elizaLogger.log('Already visited tweet:', currentTweet.id) + return + } + + visited.add(currentTweet.id) + thread.unshift(currentTweet) + + if (currentTweet.inReplyToStatusId) { + elizaLogger.log('Fetching parent tweet:', currentTweet.inReplyToStatusId) + try { + const parentTweet = await this.client.getTweet(currentTweet.inReplyToStatusId) + + if (parentTweet) { + elizaLogger.log('Found parent tweet:', { + id: parentTweet.id, + text: parentTweet.text?.slice(0, 50) + }) + await processThread(parentTweet, depth + 1) + } else { + elizaLogger.log('No parent tweet found for:', currentTweet.inReplyToStatusId) + } + } catch (error) { + elizaLogger.log('Error fetching parent tweet:', { + tweetId: currentTweet.inReplyToStatusId, + error + }) + } + } else { + elizaLogger.log('Reached end of reply chain at:', currentTweet.id) + } + } + + // Need to bind this context for the inner function + await processThread.bind(this)(tweet, 0) + + return thread + } +} diff --git a/src/clients/twitter/post.ts b/src/clients/twitter/post.ts new file mode 100644 index 00000000..8946fa01 --- /dev/null +++ b/src/clients/twitter/post.ts @@ -0,0 +1,1318 @@ +import type { ClientBase } from '@/clients/twitter/base' +import { DEFAULT_MAX_TWEET_LENGTH } from '@/clients/twitter/environment' +import { twitterMessageHandlerTemplate } from '@/clients/twitter/interactions' +import { MediaData, RawTweetType } from '@/clients/twitter/types' +import { buildConversationThread, fetchMediaData } from '@/clients/twitter/utils' +import { AgentcoinRuntime } from '@/common/runtime' +import { + ActionResponse, + cleanJsonResponse, + composeContext, + elizaLogger, + extractAttributes, + generateText, + generateTweetActions, + getEmbeddingZeroVector, + type IImageDescriptionService, + ModelClass, + parseJSONObjectFromText, + postActionResponseFooter, + ServiceType, + State, + stringToUuid, + type TemplateType, + truncateToCompleteSentence, + type UUID +} from '@elizaos/core' +import type { Tweet } from 'agent-twitter-client' +import { Client, Events, GatewayIntentBits, Partials, TextChannel } from 'discord.js' + +const MAX_TIMELINES_TO_FETCH = 15 + +const twitterPostTemplate = ` +# Areas of Expertise +{{knowledge}} + +# About {{agentName}} (@{{twitterUserName}}): +{{bio}} +{{lore}} +{{topics}} + +{{providers}} + +{{characterPostExamples}} + +{{postDirections}} + +# Task: Generate a post in the voice, style and perspective of {{agentName}} @{{twitterUserName}}. +Write a post that is {{adjective}} about {{topic}} (without mentioning {{topic}} directly), +from the perspective of {{agentName}}. Do not add commentary or acknowledge this request, +just write the post. +Your response should be 1, 2, or 3 sentences (choose the length at random). +Your response should not contain any questions. Brief, concise statements only. The total character + count MUST be less than {{maxTweetLength}}. No emojis. Use \\n\\n (double spaces) + between statements if there are multiple statements in your response.` + +export const twitterActionTemplate = + ` +# INSTRUCTIONS: Determine actions for {{agentName}} (@{{twitterUserName}}) based on: +{{bio}} +{{postDirections}} + +Guidelines: +- ONLY engage with content that DIRECTLY relates to character's core interests +- Direct mentions are priority IF they are on-topic +- Skip ALL content that is: + - Off-topic or tangentially related + - From high-profile accounts unless explicitly relevant + - Generic/viral content without specific relevance + - Political/controversial unless central to character + - Promotional/marketing unless directly relevant + +Actions (respond only with tags): +[LIKE] - Perfect topic match AND aligns with character (9.8/10) +[RETWEET] - Exceptional content that embodies character's expertise (9.5/10) +[QUOTE] - Can add substantial domain expertise (9.5/10) +[REPLY] - Can contribute meaningful, expert-level insight (9.5/10) + +Tweet: +{{currentTweet}} + +# Respond with qualifying action tags only. Default to NO action unless extremely +confident of relevance.` + postActionResponseFooter + +interface PendingTweet { + tweetTextForPosting: string + roomId: UUID + rawTweetContent: string + discordMessageId: string + channelId: string + timestamp: number +} + +type PendingTweetApprovalStatus = 'PENDING' | 'APPROVED' | 'REJECTED' + +export class TwitterPostClient { + client: ClientBase + runtime: AgentcoinRuntime + twitterUsername: string + private isProcessing = false + private lastProcessTime = 0 + private stopProcessingActions = false + private isDryRun: boolean + private discordClientForApproval: Client + private approvalRequired = false + private discordApprovalChannelId: string + private approvalCheckInterval: number + + constructor(client: ClientBase, runtime: AgentcoinRuntime) { + this.client = client + this.runtime = runtime + this.twitterUsername = this.client.twitterConfig.TWITTER_USERNAME + this.isDryRun = this.client.twitterConfig.TWITTER_DRY_RUN + + // Log configuration on initialization + elizaLogger.log('Twitter Client Configuration:') + elizaLogger.log(`- Username: ${this.twitterUsername}`) + elizaLogger.log(`- Dry Run Mode: ${this.isDryRun ? 'enabled' : 'disabled'}`) + elizaLogger.log( + // eslint-disable-next-line max-len + `- Post Interval: ${this.client.twitterConfig.POST_INTERVAL_MIN}-${this.client.twitterConfig.POST_INTERVAL_MAX} minutes` + ) + elizaLogger.log( + `- Action Processing: ${ + this.client.twitterConfig.ENABLE_ACTION_PROCESSING ? 'enabled' : 'disabled' + }` + ) + elizaLogger.log(`- Action Interval: ${this.client.twitterConfig.ACTION_INTERVAL} minutes`) + elizaLogger.log( + `- Post Immediately: ${this.client.twitterConfig.POST_IMMEDIATELY ? 'enabled' : 'disabled'}` + ) + elizaLogger.log( + `- Search Enabled: ${ + this.client.twitterConfig.TWITTER_SEARCH_ENABLE ? 'enabled' : 'disabled' + }` + ) + + const targetUsers = this.client.twitterConfig.TWITTER_TARGET_USERS + if (targetUsers) { + elizaLogger.log(`- Target Users: ${targetUsers}`) + } + + if (this.isDryRun) { + elizaLogger.log( + 'Twitter client initialized in dry run mode - no actual tweets should be posted' + ) + } + + // Initialize Discord webhook + const approvalRequired: boolean = + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + this.runtime.getSetting('TWITTER_APPROVAL_ENABLED')?.toLocaleLowerCase() === 'true' + if (approvalRequired) { + const discordToken = this.runtime.getSetting('TWITTER_APPROVAL_DISCORD_BOT_TOKEN') + const approvalChannelId = this.runtime.getSetting('TWITTER_APPROVAL_DISCORD_CHANNEL_ID') + + const APPROVAL_CHECK_INTERVAL = + Number.parseInt(this.runtime.getSetting('TWITTER_APPROVAL_CHECK_INTERVAL')) || 5 * 60 * 1000 // 5 minutes + + this.approvalCheckInterval = APPROVAL_CHECK_INTERVAL + + if (!discordToken || !approvalChannelId) { + throw new Error( + 'TWITTER_APPROVAL_DISCORD_BOT_TOKEN and TWITTER_APPROVAL_DISCORD_CHANNEL_ID are required for approval workflow' + ) + } + + this.approvalRequired = true + this.discordApprovalChannelId = approvalChannelId + + // Set up Discord client event handlers + this.setupDiscordClient() + } + } + + private setupDiscordClient(): void { + this.discordClientForApproval = new Client({ + intents: [ + GatewayIntentBits.Guilds, + GatewayIntentBits.GuildMessages, + GatewayIntentBits.MessageContent, + GatewayIntentBits.GuildMessageReactions + ], + partials: [Partials.Channel, Partials.Message, Partials.Reaction] + }) + this.discordClientForApproval.once(Events.ClientReady, (readyClient) => { + elizaLogger.log(`Discord bot is ready as ${readyClient.user.tag}!`) + + // Generate invite link with required permissions + // eslint-disable-next-line max-len + const invite = `https://discord.com/api/oauth2/authorize?client_id=${readyClient.user.id}&permissions=274877991936&scope=bot` + // 274877991936 includes permissions for: + // - Send Messages + // - Read Messages/View Channels + // - Read Message History + + elizaLogger.log( + `Use this link to properly invite the Twitter Post Approval Discord bot: ${invite}` + ) + }) + // Login to Discord + void this.discordClientForApproval.login( + this.runtime.getSetting('TWITTER_APPROVAL_DISCORD_BOT_TOKEN') + ) + } + + async start(): Promise { + if (!this.client.profile) { + await this.client.init() + } + + const generateNewTweetLoop = async (): Promise => { + const lastPost = await this.runtime.cacheManager.get<{ + timestamp: number + }>('twitter/' + this.twitterUsername + '/lastPost') + + const lastPostTimestamp = lastPost?.timestamp ?? 0 + const minMinutes = this.client.twitterConfig.POST_INTERVAL_MIN + const maxMinutes = this.client.twitterConfig.POST_INTERVAL_MAX + const randomMinutes = Math.floor(Math.random() * (maxMinutes - minMinutes + 1)) + minMinutes + const delay = randomMinutes * 60 * 1000 + + if (Date.now() > lastPostTimestamp + delay) { + await this.generateNewTweet() + } + + setTimeout(() => { + void generateNewTweetLoop() // Set up next iteration + }, delay) + + elizaLogger.log(`Next tweet scheduled in ${randomMinutes} minutes`) + } + + const processActionsLoop = async (): Promise => { + const actionInterval = this.client.twitterConfig.ACTION_INTERVAL // Defaults to 5 minutes + + while (!this.stopProcessingActions) { + try { + const results = await this.processTweetActions() + if (results) { + elizaLogger.log(`Processed ${results.length} tweets`) + elizaLogger.log(`Next action processing scheduled in ${actionInterval} minutes`) + // Wait for the full interval before next processing + await new Promise( + (resolve) => setTimeout(resolve, actionInterval * 60 * 1000) // now in minutes + ) + } + } catch (error) { + elizaLogger.error('Error in action processing loop:', error) + // Add exponential backoff on error + await new Promise((resolve) => setTimeout(resolve, 30000)) // Wait 30s on error + } + } + } + + if (this.client.twitterConfig.POST_IMMEDIATELY) { + await this.generateNewTweet() + } + + void generateNewTweetLoop() + elizaLogger.log('Tweet generation loop started') + + if (this.client.twitterConfig.ENABLE_ACTION_PROCESSING) { + processActionsLoop().catch((error) => { + elizaLogger.error('Fatal error in process actions loop:', error) + }) + } + + // Start the pending tweet check loop if enabled + if (this.approvalRequired) this.runPendingTweetCheckLoop() + } + + private runPendingTweetCheckLoop(): void { + setInterval(async () => { + await this.handlePendingTweet() + }, this.approvalCheckInterval) + } + + createTweetObject(tweetResult: RawTweetType, client: ClientBase, twitterUsername: string): Tweet { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return { + id: tweetResult.rest_id, + name: client.profile.screenName, + username: client.profile.username, + text: tweetResult.legacy.full_text, + conversationId: tweetResult.legacy.conversation_id_str, + createdAt: tweetResult.legacy.created_at, + timestamp: new Date(tweetResult.legacy.created_at).getTime(), + userId: client.profile.id, + inReplyToStatusId: tweetResult.legacy.in_reply_to_status_id_str, + permanentUrl: `https://twitter.com/${twitterUsername}/status/${tweetResult.rest_id}`, + hashtags: [], + mentions: [], + photos: [], + thread: [], + urls: [], + videos: [] + } as Tweet + } + + async processAndCacheTweet( + runtime: AgentcoinRuntime, + client: ClientBase, + tweet: Tweet, + roomId: UUID, + rawTweetContent: string + ): Promise { + // Cache the last post details + await runtime.cacheManager.set(`twitter/${client.profile.username}/lastPost`, { + id: tweet.id, + timestamp: Date.now() + }) + + // Cache the tweet + await client.cacheTweet(tweet) + + // Log the posted tweet + elizaLogger.log(`Tweet posted:\n ${tweet.permanentUrl}`) + + // Ensure the room and participant exist + await runtime.ensureRoomExists(roomId) + await runtime.ensureParticipantInRoom(runtime.agentId, roomId) + + // Create a memory for the tweet + await runtime.messageManager.createMemory({ + id: stringToUuid(tweet.id + '-' + runtime.agentId), + userId: runtime.agentId, + agentId: runtime.agentId, + content: { + text: rawTweetContent.trim(), + url: tweet.permanentUrl, + source: 'twitter' + }, + roomId, + embedding: getEmbeddingZeroVector(), + createdAt: tweet.timestamp + }) + } + + async handleNoteTweet( + client: ClientBase, + content: string, + tweetId?: string, + mediaData?: MediaData[] + ): Promise { + try { + const noteTweetResult: { + errors: { + message: string + }[] + data: { + notetweet_create: { + tweet_results: { + result: Tweet + } + } + } + } = await client.requestQueue.add( + async () => await client.twitterClient.sendNoteTweet(content, tweetId, mediaData) + ) + + if (noteTweetResult.errors && noteTweetResult.errors.length > 0) { + // Note Tweet failed due to authorization. Falling back to standard Tweet. + const truncateContent = truncateToCompleteSentence( + content, + this.client.twitterConfig.MAX_TWEET_LENGTH + ) + return await this.sendStandardTweet(client, truncateContent, tweetId) + } else { + return noteTweetResult.data.notetweet_create.tweet_results.result + } + } catch (error) { + throw new Error(`Note Tweet failed: ${error}`) + } + } + + async sendStandardTweet( + client: ClientBase, + content: string, + tweetId?: string, + mediaData?: MediaData[] + ): Promise { + try { + const standardTweetResult = await client.requestQueue.add( + async () => await client.twitterClient.sendTweet(content, tweetId, mediaData) + ) + const body: { + data: { + create_tweet: { + tweet_results: { + result: Tweet + } + } + } + } = await standardTweetResult.json() + if (!body?.data?.create_tweet?.tweet_results?.result) { + elizaLogger.error('Error sending tweet; Bad response:', body) + return null + } + return body.data.create_tweet.tweet_results.result + } catch (error) { + elizaLogger.error('Error sending standard Tweet:', error) + throw error + } + } + + async postTweet( + runtime: AgentcoinRuntime, + client: ClientBase, + tweetTextForPosting: string, + roomId: UUID, + rawTweetContent: string, + twitterUsername: string, + mediaData?: MediaData[] + ): Promise { + try { + elizaLogger.log(`Posting new tweet:\n`) + + let result + + if (tweetTextForPosting.length > DEFAULT_MAX_TWEET_LENGTH) { + result = await this.handleNoteTweet(client, tweetTextForPosting, undefined, mediaData) + } else { + result = await this.sendStandardTweet(client, tweetTextForPosting, undefined, mediaData) + } + + const tweet = this.createTweetObject(result, client, twitterUsername) + + await this.processAndCacheTweet(runtime, client, tweet, roomId, rawTweetContent) + } catch (error) { + elizaLogger.error('Error sending tweet:', error) + return null + } + } + + /** + * Generates and posts a new tweet. If isDryRun is true, only logs what would have been posted. + */ + async generateNewTweet(): Promise { + elizaLogger.log('Generating new tweet') + + try { + const roomId = stringToUuid('user_twitter_feed:' + this.client.profile.username) + + await this.runtime.ensureUserRoomConnection({ + roomId, + userId: this.runtime.agentId, + username: this.client.profile.username, + name: this.client.profile.username, + source: 'twitter' + }) + + const topics = this.runtime.character.topics.join(', ') + const maxTweetLength = this.client.twitterConfig.MAX_TWEET_LENGTH + const state = await this.runtime.composeState( + { + userId: this.runtime.agentId, + roomId, + agentId: this.runtime.agentId, + content: { + text: topics || '', + action: 'TWEET' + } + }, + { + twitterUserName: this.client.profile.username, + maxTweetLength + } + ) + + let shouldContinue = await this.runtime.handle('prellm', { + state, + responses: [], + memory: null + }) + + if (!shouldContinue) { + elizaLogger.info('AgentcoinClient received prellm event but it was suppressed') + return + } + + const context = composeContext({ + state, + template: this.runtime.character.templates?.twitterPostTemplate || twitterPostTemplate + }) + + elizaLogger.debug('generate post prompt:\n' + context) + + const response = await generateText({ + runtime: this.runtime, + context, + modelClass: ModelClass.SMALL + }) + + shouldContinue = await this.runtime.handle('postllm', { + state, + responses: [], + memory: null, + content: { text: response } + }) + + if (!shouldContinue) { + elizaLogger.info('AgentcoinClient received postllm event but it was suppressed') + return + } + + const rawTweetContent = cleanJsonResponse(response) + + // First attempt to clean content + let tweetTextForPosting = null + let mediaData = null + + // Try parsing as JSON first + const parsedResponse = parseJSONObjectFromText(rawTweetContent) + if (parsedResponse?.text) { + tweetTextForPosting = parsedResponse.text + } + + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + if (parsedResponse?.attachments && parsedResponse?.attachments.length > 0) { + mediaData = await fetchMediaData(parsedResponse.attachments) + } + + // Try extracting text attribute + if (!tweetTextForPosting) { + const parsingText = extractAttributes(rawTweetContent, ['text']).text + if (parsingText) { + tweetTextForPosting = truncateToCompleteSentence( + extractAttributes(rawTweetContent, ['text']).text, + this.client.twitterConfig.MAX_TWEET_LENGTH + ) + } + } + + // Use the raw text + if (!tweetTextForPosting) { + tweetTextForPosting = rawTweetContent + } + + if (maxTweetLength) { + tweetTextForPosting = truncateToCompleteSentence(tweetTextForPosting, maxTweetLength) + } + + const removeQuotes = (str: string): string => str.replace(/^['"](.*)['"]$/, '$1') + + const fixNewLines = (str: string): string => str.replaceAll(/\\n/g, '\n\n') // ensures double spaces + + // Final cleaning + tweetTextForPosting = removeQuotes(fixNewLines(tweetTextForPosting)) + + if (this.isDryRun) { + elizaLogger.info(`Dry run: would have posted tweet: ${tweetTextForPosting}`) + return + } + + try { + if (this.approvalRequired) { + // Send for approval instead of posting directly + elizaLogger.log(`Sending Tweet For Approval:\n ${tweetTextForPosting}`) + await this.sendForApproval(tweetTextForPosting, roomId, rawTweetContent) + elizaLogger.log('Tweet sent for approval') + } else { + elizaLogger.log(`Posting new tweet:\n ${tweetTextForPosting}`) + void this.postTweet( + this.runtime, + this.client, + tweetTextForPosting, + roomId, + rawTweetContent, + this.twitterUsername, + mediaData + ) + } + } catch (error) { + elizaLogger.error('Error sending tweet:', error) + } + } catch (error) { + elizaLogger.error('Error generating new tweet:', error) + } + } + + private async generateTweetContent( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + tweetState: any, + options?: { + template?: TemplateType + context?: string + } + ): Promise { + const context = composeContext({ + state: tweetState, + template: + options?.template || + this.runtime.character.templates?.twitterPostTemplate || + twitterPostTemplate + }) + + const response = await generateText({ + runtime: this.runtime, + context: options?.context || context, + modelClass: ModelClass.SMALL + }) + + elizaLogger.log('generate tweet content response:\n' + response) + + // First clean up any markdown and newlines + const cleanedResponse = cleanJsonResponse(response) + + // Try to parse as JSON first + const jsonResponse = parseJSONObjectFromText(cleanedResponse) + if (jsonResponse.text) { + const truncateContent = truncateToCompleteSentence( + jsonResponse.text, + this.client.twitterConfig.MAX_TWEET_LENGTH + ) + return truncateContent + } + if (typeof jsonResponse === 'object') { + const possibleContent = jsonResponse.content || jsonResponse.message || jsonResponse.response + if (possibleContent) { + const truncateContent = truncateToCompleteSentence( + possibleContent, + this.client.twitterConfig.MAX_TWEET_LENGTH + ) + return truncateContent + } + } + + let truncateContent = null + // Try extracting text attribute + const parsingText = extractAttributes(cleanedResponse, ['text']).text + if (parsingText) { + truncateContent = truncateToCompleteSentence( + parsingText, + this.client.twitterConfig.MAX_TWEET_LENGTH + ) + } + + if (!truncateContent) { + // If not JSON or no valid content found, clean the raw text + truncateContent = truncateToCompleteSentence( + cleanedResponse, + this.client.twitterConfig.MAX_TWEET_LENGTH + ) + } + + return truncateContent + } + + /** + * Processes tweet actions (likes, retweets, quotes, replies). If isDryRun is true, + * only simulates and logs actions without making API calls. + */ + private async processTweetActions(): Promise< + { + tweetId: string + actionResponse: ActionResponse + executedActions: string[] + }[] + > { + if (this.isProcessing) { + elizaLogger.log('Already processing tweet actions, skipping') + return [] + } + + try { + this.isProcessing = true + this.lastProcessTime = Date.now() + + elizaLogger.log('Processing tweet actions') + + await this.runtime.ensureUserExists( + this.runtime.agentId, + this.twitterUsername, + this.runtime.character.name, + 'twitter' + ) + + const timelines = await this.client.fetchTimelineForActions(MAX_TIMELINES_TO_FETCH) + const maxActionsProcessing = this.client.twitterConfig.MAX_ACTIONS_PROCESSING + const processedTimelines: { + tweet: Tweet + actionResponse: ActionResponse + tweetState: State + roomId: UUID + }[] = [] + + for (const tweet of timelines) { + try { + // Skip if we've already processed this tweet + const memory = await this.runtime.messageManager.getMemoryById( + stringToUuid(tweet.id + '-' + this.runtime.agentId) + ) + if (memory) { + elizaLogger.log(`Already processed tweet ID: ${tweet.id}`) + continue + } + + const roomId = stringToUuid(tweet.conversationId + '-' + this.runtime.agentId) + + const tweetState = await this.runtime.composeState( + { + userId: this.runtime.agentId, + roomId, + agentId: this.runtime.agentId, + content: { text: '', action: '' } + }, + { + twitterUserName: this.twitterUsername, + currentTweet: `ID: ${tweet.id}\nFrom: ${tweet.name} + (@${tweet.username})\nText: ${tweet.text}` + } + ) + + const actionContext = composeContext({ + state: tweetState, + template: + this.runtime.character.templates?.twitterActionTemplate || twitterActionTemplate + }) + + const actionResponse = await generateTweetActions({ + runtime: this.runtime, + context: actionContext, + modelClass: ModelClass.SMALL + }) + + if (!actionResponse) { + elizaLogger.log(`No valid actions generated for tweet ${tweet.id}`) + continue + } + processedTimelines.push({ + tweet, + actionResponse, + tweetState, + roomId + }) + } catch (error) { + elizaLogger.error(`Error processing tweet ${tweet.id}:`, error) + continue + } + } + + const sortProcessedTimeline = (arr: typeof processedTimelines): typeof processedTimelines => { + return arr.sort((a, b) => { + // Count the number of true values in the actionResponse object + const countTrue = (obj: typeof a.actionResponse): number => + Object.values(obj).filter(Boolean).length + + const countA = countTrue(a.actionResponse) + const countB = countTrue(b.actionResponse) + + // Primary sort by number of true values + if (countA !== countB) { + return countB - countA + } + + // Secondary sort by the "like" property + if (a.actionResponse.like !== b.actionResponse.like) { + return a.actionResponse.like ? -1 : 1 + } + + // Tertiary sort keeps the remaining objects with equal weight + return 0 + }) + } + // Sort the timeline based on the action decision score, + const sortedTimelines = sortProcessedTimeline(processedTimelines).slice( + 0, + maxActionsProcessing + ) + + return this.processTimelineActions(sortedTimelines) + } catch (error) { + elizaLogger.error('Error in processTweetActions:', error) + throw error + } finally { + this.isProcessing = false + } + } + + /** + * Processes a list of timelines by executing the corresponding tweet actions. + * Each timeline includes the tweet, action response, tweet state, and room context. + * Results are returned for tracking completed actions. + * + * @param timelines - Array of objects containing tweet details, action responses, and + * state information. + * @returns A promise that resolves to an array of results with details of executed actions. + */ + private async processTimelineActions( + timelines: { + tweet: Tweet + actionResponse: ActionResponse + tweetState: State + roomId: UUID + }[] + ): Promise< + { + tweetId: string + actionResponse: ActionResponse + executedActions: string[] + }[] + > { + const results = [] + for (const timeline of timelines) { + const { actionResponse, tweetState, roomId, tweet } = timeline + try { + const executedActions: string[] = [] + // Execute actions + if (actionResponse.like) { + if (this.isDryRun) { + elizaLogger.info(`Dry run: would have liked tweet ${tweet.id}`) + executedActions.push('like (dry run)') + } else { + try { + await this.client.twitterClient.likeTweet(tweet.id) + executedActions.push('like') + elizaLogger.log(`Liked tweet ${tweet.id}`) + } catch (error) { + elizaLogger.error(`Error liking tweet ${tweet.id}:`, error) + } + } + } + + if (actionResponse.retweet) { + if (this.isDryRun) { + elizaLogger.info(`Dry run: would have retweeted tweet ${tweet.id}`) + executedActions.push('retweet (dry run)') + } else { + try { + await this.client.twitterClient.retweet(tweet.id) + executedActions.push('retweet') + elizaLogger.log(`Retweeted tweet ${tweet.id}`) + } catch (error) { + elizaLogger.error(`Error retweeting tweet ${tweet.id}:`, error) + } + } + } + + if (actionResponse.quote) { + try { + // Build conversation thread for context + const thread = await buildConversationThread(tweet, this.client) + const formattedConversation = thread + .map( + (t) => + `@${t.username} (${new Date(t.timestamp * 1000).toLocaleString()}): ${t.text}` + ) + .join('\n\n') + + // Generate image descriptions if present + const imageDescriptions = [] + if (tweet.photos?.length > 0) { + elizaLogger.log('Processing images in tweet for context') + for (const photo of tweet.photos) { + const description = await this.runtime + .getService(ServiceType.IMAGE_DESCRIPTION) + .describeImage(photo.url) + imageDescriptions.push(description) + } + } + + // Handle quoted tweet if present + let quotedContent = '' + if (tweet.quotedStatusId) { + try { + const quotedTweet = await this.client.twitterClient.getTweet(tweet.quotedStatusId) + if (quotedTweet) { + quotedContent = `\nQuoted Tweet from + @${quotedTweet.username}:\n${quotedTweet.text}` + } + } catch (error) { + elizaLogger.error('Error fetching quoted tweet:', error) + } + } + + // Compose rich state with all context + const enrichedState = await this.runtime.composeState( + { + userId: this.runtime.agentId, + roomId: stringToUuid(tweet.conversationId + '-' + this.runtime.agentId), + agentId: this.runtime.agentId, + content: { + text: tweet.text, + action: 'QUOTE' + } + }, + { + twitterUserName: this.twitterUsername, + currentPost: `From @${tweet.username}: ${tweet.text}`, + formattedConversation, + imageContext: + imageDescriptions.length > 0 + ? `\nImages in Tweet:\n${imageDescriptions + .map((desc, i) => `Image ${i + 1}: ${desc}`) + .join('\n')}` + : '', + quotedContent + } + ) + + const quoteContent = await this.generateTweetContent(enrichedState, { + template: + this.runtime.character.templates?.twitterMessageHandlerTemplate || + twitterMessageHandlerTemplate + }) + + if (!quoteContent) { + elizaLogger.error('Failed to generate valid quote tweet content') + return + } + + elizaLogger.log('Generated quote tweet content:', quoteContent) + // Check for dry run mode + if (this.isDryRun) { + elizaLogger.info( + `Dry run: A quote tweet for tweet ID ${tweet.id} + would have been posted with the following content: "${quoteContent}".` + ) + executedActions.push('quote (dry run)') + } else { + // Send the tweet through request queue + const result = await this.client.requestQueue.add( + async () => await this.client.twitterClient.sendQuoteTweet(quoteContent, tweet.id) + ) + + const body: { + data: { + create_tweet: { + tweet_results: { + result: Tweet + } + } + } + } = await result.json() + if (body?.data?.create_tweet?.tweet_results?.result) { + elizaLogger.log('Successfully posted quote tweet') + executedActions.push('quote') + + // Cache generation context for debugging + await this.runtime.cacheManager.set( + `twitter/quote_generation_${tweet.id}.txt`, + `Context:\n${enrichedState}\n\nGenerated Quote:\n${quoteContent}` + ) + } else { + elizaLogger.error('Quote tweet creation failed:', body) + } + } + } catch (error) { + elizaLogger.error('Error in quote tweet generation:', error) + } + } + + if (actionResponse.reply) { + try { + await this.handleTextOnlyReply(tweet, tweetState, executedActions) + } catch (error) { + elizaLogger.error(`Error replying to tweet ${tweet.id}:`, error) + } + } + + // Add these checks before creating memory + await this.runtime.ensureRoomExists(roomId) + await this.runtime.ensureUserExists( + stringToUuid(tweet.userId), + tweet.username, + tweet.name, + 'twitter' + ) + await this.runtime.ensureParticipantInRoom(this.runtime.agentId, roomId) + + if (!this.isDryRun) { + // Then create the memory + await this.runtime.messageManager.createMemory({ + id: stringToUuid(tweet.id + '-' + this.runtime.agentId), + userId: stringToUuid(tweet.userId), + content: { + text: tweet.text, + url: tweet.permanentUrl, + source: 'twitter', + action: executedActions.join(',') + }, + agentId: this.runtime.agentId, + roomId, + embedding: getEmbeddingZeroVector(), + createdAt: tweet.timestamp * 1000 + }) + } + + results.push({ + tweetId: tweet.id, + actionResponse, + executedActions + }) + } catch (error) { + elizaLogger.error(`Error processing tweet ${tweet.id}:`, error) + continue + } + } + + return results + } + + /** + * Handles text-only replies to tweets. If isDryRun is true, only logs what would + * have been replied without making API calls. + */ + private async handleTextOnlyReply( + tweet: Tweet, + tweetState: unknown, + executedActions: string[] + ): Promise { + try { + // Build conversation thread for context + const thread = await buildConversationThread(tweet, this.client) + const formattedConversation = thread + .map((t) => `@${t.username} (${new Date(t.timestamp * 1000).toLocaleString()}): ${t.text}`) + .join('\n\n') + + // Generate image descriptions if present + const imageDescriptions = [] + if (tweet.photos?.length > 0) { + elizaLogger.log('Processing images in tweet for context') + for (const photo of tweet.photos) { + const description = await this.runtime + .getService(ServiceType.IMAGE_DESCRIPTION) + .describeImage(photo.url) + imageDescriptions.push(description) + } + } + + // Handle quoted tweet if present + let quotedContent = '' + if (tweet.quotedStatusId) { + try { + const quotedTweet = await this.client.twitterClient.getTweet(tweet.quotedStatusId) + if (quotedTweet) { + quotedContent = `\nQuoted Tweet from @${quotedTweet.username}:\n${quotedTweet.text}` + } + } catch (error) { + elizaLogger.error('Error fetching quoted tweet:', error) + } + } + + // Compose rich state with all context + const enrichedState = await this.runtime.composeState( + { + userId: this.runtime.agentId, + roomId: stringToUuid(tweet.conversationId + '-' + this.runtime.agentId), + agentId: this.runtime.agentId, + content: { text: tweet.text, action: '' } + }, + { + twitterUserName: this.twitterUsername, + currentPost: `From @${tweet.username}: ${tweet.text}`, + formattedConversation, + imageContext: + imageDescriptions.length > 0 + ? `\nImages in Tweet:\n${imageDescriptions + .map((desc, i) => `Image ${i + 1}: ${desc}`) + .join('\n')}` + : '', + quotedContent + } + ) + + // Generate and clean the reply content + const replyText = await this.generateTweetContent(enrichedState, { + template: + this.runtime.character.templates?.twitterMessageHandlerTemplate || + twitterMessageHandlerTemplate + }) + + if (!replyText) { + elizaLogger.error('Failed to generate valid reply content') + return + } + + if (this.isDryRun) { + elizaLogger.info(`Dry run: reply to tweet ${tweet.id} would have been: ${replyText}`) + executedActions.push('reply (dry run)') + return + } + + elizaLogger.debug('Final reply text to be sent:', replyText) + + let result + + if (replyText.length > DEFAULT_MAX_TWEET_LENGTH) { + result = await this.handleNoteTweet(this.client, replyText, tweet.id) + } else { + result = await this.sendStandardTweet(this.client, replyText, tweet.id) + } + + if (result) { + elizaLogger.log('Successfully posted reply tweet') + executedActions.push('reply') + + // Cache generation context for debugging + await this.runtime.cacheManager.set( + `twitter/reply_generation_${tweet.id}.txt`, + `Context:\n${enrichedState}\n\nGenerated Reply:\n${replyText}` + ) + } else { + elizaLogger.error('Tweet reply creation failed') + } + } catch (error) { + elizaLogger.error('Error in handleTextOnlyReply:', error) + } + } + + async stop(): Promise { + this.stopProcessingActions = true + } + + private async sendForApproval( + tweetTextForPosting: string, + roomId: UUID, + rawTweetContent: string + ): Promise { + try { + const embed = { + title: 'New Tweet Pending Approval', + description: tweetTextForPosting, + fields: [ + { + name: 'Character', + value: this.client.profile.username, + inline: true + }, + { + name: 'Length', + value: tweetTextForPosting.length.toString(), + inline: true + } + ], + footer: { + text: "Reply with '👍' to post or '❌' to discard, This will automatically expire and remove after 24 hours if no response received" + }, + timestamp: new Date().toISOString() + } + + const channel = await this.discordClientForApproval.channels.fetch( + this.discordApprovalChannelId + ) + + if (!channel || !(channel instanceof TextChannel)) { + throw new Error('Invalid approval channel') + } + + const message = await channel.send({ embeds: [embed] }) + + // Store the pending tweet + const pendingTweetsKey = `twitter/${this.client.profile.username}/pendingTweet` + const currentPendingTweets = + (await this.runtime.cacheManager.get(pendingTweetsKey)) || [] + // Add new pending tweet + currentPendingTweets.push({ + tweetTextForPosting, + roomId, + rawTweetContent, + discordMessageId: message.id, + channelId: this.discordApprovalChannelId, + timestamp: Date.now() + }) + + // Store updated array + await this.runtime.cacheManager.set(pendingTweetsKey, currentPendingTweets) + + return message.id + } catch (error) { + elizaLogger.error('Error Sending Twitter Post Approval Request:', error) + return null + } + } + + private async checkApprovalStatus(discordMessageId: string): Promise { + try { + // Fetch message and its replies from Discord + const channel = await this.discordClientForApproval.channels.fetch( + this.discordApprovalChannelId + ) + + elizaLogger.log(`channel ${JSON.stringify(channel)}`) + + if (!(channel instanceof TextChannel)) { + elizaLogger.error('Invalid approval channel') + return 'PENDING' + } + + // Fetch the original message and its replies + const message = await channel.messages.fetch(discordMessageId) + + // Look for thumbs up reaction ('👍') + const thumbsUpReaction = message.reactions.cache.find( + (reaction) => reaction.emoji.name === '👍' + ) + + // Look for reject reaction ('❌') + const rejectReaction = message.reactions.cache.find( + (reaction) => reaction.emoji.name === '❌' + ) + + // Check if the reaction exists and has reactions + if (rejectReaction) { + const count = rejectReaction.count + if (count > 0) { + return 'REJECTED' + } + } + + // Check if the reaction exists and has reactions + if (thumbsUpReaction) { + // You might want to check for specific users who can approve + // For now, we'll return true if anyone used thumbs up + const count = thumbsUpReaction.count + if (count > 0) { + return 'APPROVED' + } + } + + return 'PENDING' + } catch (error) { + elizaLogger.error('Error checking approval status:', error) + return 'PENDING' + } + } + + private async cleanupPendingTweet(discordMessageId: string): Promise { + const pendingTweetsKey = `twitter/${this.client.profile.username}/pendingTweet` + const currentPendingTweets = + (await this.runtime.cacheManager.get(pendingTweetsKey)) || [] + + // Remove the specific tweet + const updatedPendingTweets = currentPendingTweets.filter( + (tweet) => tweet.discordMessageId !== discordMessageId + ) + + if (updatedPendingTweets.length === 0) { + await this.runtime.cacheManager.delete(pendingTweetsKey) + } else { + await this.runtime.cacheManager.set(pendingTweetsKey, updatedPendingTweets) + } + } + + private async handlePendingTweet(): Promise { + elizaLogger.log('Checking Pending Tweets...') + const pendingTweetsKey = `twitter/${this.client.profile.username}/pendingTweet` + const pendingTweets = + (await this.runtime.cacheManager.get(pendingTweetsKey)) || [] + + for (const pendingTweet of pendingTweets) { + // Check if tweet is older than 24 hours + const isExpired = Date.now() - pendingTweet.timestamp > 24 * 60 * 60 * 1000 + + if (isExpired) { + elizaLogger.log('Pending tweet expired, cleaning up') + + // Notify on Discord about expiration + try { + const channel = await this.discordClientForApproval.channels.fetch(pendingTweet.channelId) + if (channel instanceof TextChannel) { + const originalMessage = await channel.messages.fetch(pendingTweet.discordMessageId) + await originalMessage.reply('This tweet approval request has expired (24h timeout).') + } + } catch (error) { + elizaLogger.error('Error sending expiration notification:', error) + } + + await this.cleanupPendingTweet(pendingTweet.discordMessageId) + return + } + + // Check approval status + elizaLogger.log('Checking approval status...') + const approvalStatus: PendingTweetApprovalStatus = await this.checkApprovalStatus( + pendingTweet.discordMessageId + ) + + if (approvalStatus === 'APPROVED') { + elizaLogger.log('Tweet Approved, Posting') + await this.postTweet( + this.runtime, + this.client, + pendingTweet.tweetTextForPosting, + pendingTweet.roomId, + pendingTweet.rawTweetContent, + this.twitterUsername + ) + + // Notify on Discord about posting + try { + const channel = await this.discordClientForApproval.channels.fetch(pendingTweet.channelId) + if (channel instanceof TextChannel) { + const originalMessage = await channel.messages.fetch(pendingTweet.discordMessageId) + await originalMessage.reply('Tweet has been posted successfully! ✅') + } + } catch (error) { + elizaLogger.error('Error sending post notification:', error) + } + + await this.cleanupPendingTweet(pendingTweet.discordMessageId) + } else if (approvalStatus === 'REJECTED') { + elizaLogger.log('Tweet Rejected, Cleaning Up') + await this.cleanupPendingTweet(pendingTweet.discordMessageId) + // Notify about Rejection of Tweet + try { + const channel = await this.discordClientForApproval.channels.fetch(pendingTweet.channelId) + if (channel instanceof TextChannel) { + const originalMessage = await channel.messages.fetch(pendingTweet.discordMessageId) + await originalMessage.reply('Tweet has been rejected! ❌') + } + } catch (error) { + elizaLogger.error('Error sending rejection notification:', error) + } + } + } + } +} diff --git a/src/clients/twitter/search.ts b/src/clients/twitter/search.ts new file mode 100644 index 00000000..66a28c44 --- /dev/null +++ b/src/clients/twitter/search.ts @@ -0,0 +1,299 @@ +import type { ClientBase } from '@/clients/twitter/base' +import { buildConversationThread, sendTweet, wait } from '@/clients/twitter/utils' +import { + composeContext, + type Content, + elizaLogger, + generateMessageResponse, + generateText, + type HandlerCallback, + type IAgentRuntime, + type IImageDescriptionService, + messageCompletionFooter, + ModelClass, + ServiceType, + stringToUuid +} from '@elizaos/core' +import { SearchMode } from 'agent-twitter-client' + +const twitterSearchTemplate = + `{{timeline}} + +{{providers}} + +Recent interactions between {{agentName}} and other users: +{{recentPostInteractions}} + +About {{agentName}} (@{{twitterUserName}}): +{{bio}} +{{lore}} +{{topics}} + +{{postDirections}} + +{{recentPosts}} + +# Task: Respond to the following post in the style and perspective of {{agentName}} +(aka @{{twitterUserName}}). Write a {{adjective}} response for {{agentName}} to say +directly in response to the post. don't generalize. +{{currentPost}} + +IMPORTANT: Your response CANNOT be longer than 20 words. +Aim for 1-2 short sentences maximum. Be concise and direct. + +Your response should not contain any questions. Brief, concise statements only. +No emojis. Use \\n\\n (double spaces) between statements. + +` + messageCompletionFooter + +export class TwitterSearchClient { + client: ClientBase + runtime: IAgentRuntime + twitterUsername: string + private respondedTweets: Set = new Set() + + constructor(client: ClientBase, runtime: IAgentRuntime) { + this.client = client + this.runtime = runtime + this.twitterUsername = this.client.twitterConfig.TWITTER_USERNAME + } + + async start(): Promise { + this.engageWithSearchTermsLoop() + } + + private engageWithSearchTermsLoop(): void { + void this.engageWithSearchTerms() + const randomMinutes = Math.floor(Math.random() * (120 - 60 + 1)) + 60 + elizaLogger.log(`Next twitter search scheduled in ${randomMinutes} minutes`) + setTimeout(() => this.engageWithSearchTermsLoop(), randomMinutes * 60 * 1000) + } + + private async engageWithSearchTerms(): Promise { + elizaLogger.log('Engaging with search terms') + try { + const searchTerm = [...this.runtime.character.topics][ + Math.floor(Math.random() * this.runtime.character.topics.length) + ] + + elizaLogger.log('Fetching search tweets') + // TODO: we wait 5 seconds here to avoid getting rate limited on startup, but we should queue + await new Promise((resolve) => setTimeout(resolve, 5000)) + const recentTweets = await this.client.fetchSearchTweets(searchTerm, 20, SearchMode.Top) + elizaLogger.log('Search tweets fetched') + + const homeTimeline = await this.client.fetchHomeTimeline(50) + + await this.client.cacheTimeline(homeTimeline) + + const formattedHomeTimeline = + `# ${this.runtime.character.name}'s Home Timeline\n\n` + + homeTimeline + .map((tweet) => { + return `ID: ${tweet.id}\nFrom: ${tweet.name} (@${tweet.username})${tweet.inReplyToStatusId ? ` In reply to: ${tweet.inReplyToStatusId}` : ''}\nText: ${tweet.text}\n---\n` + }) + .join('\n') + + // randomly slice .tweets down to 20 + const slicedTweets = recentTweets.tweets.sort(() => Math.random() - 0.5).slice(0, 20) + + if (slicedTweets.length === 0) { + elizaLogger.log('No valid tweets found for the search term', searchTerm) + return + } + + const prompt = ` + Here are some tweets related to the search term "${searchTerm}": + + ${[...slicedTweets, ...homeTimeline] + .filter((tweet) => { + // ignore tweets where any of the thread tweets contain a tweet by the bot + const thread = tweet.thread + const botTweet = thread.find((t) => t.username === this.twitterUsername) + return !botTweet + }) + .map( + (tweet) => ` + ID: ${tweet.id}${tweet.inReplyToStatusId ? ` In reply to: ${tweet.inReplyToStatusId}` : ''} + From: ${tweet.name} (@${tweet.username}) + Text: ${tweet.text} + ` + ) + .join('\n')} + + Which tweet is the most interesting and relevant for Ruby to reply to? + Please provide only the ID of the tweet in your response. + Notes: + - Respond to English tweets only + - Respond to tweets that don't have a lot of hashtags, links, URLs or images + - Respond to tweets that are not retweets + - Respond to tweets where there is an easy exchange of ideas to have with the user + - ONLY respond with the ID of the tweet` + + const mostInterestingTweetResponse = await generateText({ + runtime: this.runtime, + context: prompt, + modelClass: ModelClass.SMALL + }) + + const tweetId = mostInterestingTweetResponse.trim() + const selectedTweet = slicedTweets.find( + (tweet) => tweet.id.toString().includes(tweetId) || tweetId.includes(tweet.id.toString()) + ) + + if (!selectedTweet) { + elizaLogger.warn('No matching tweet found for the selected ID') + elizaLogger.log('Selected tweet ID:', tweetId) + return + } + + elizaLogger.log('Selected tweet to reply to:', selectedTweet?.text) + + if (selectedTweet.username === this.twitterUsername) { + elizaLogger.log('Skipping tweet from bot itself') + return + } + + const conversationId = selectedTweet.conversationId + const roomId = stringToUuid(conversationId + '-' + this.runtime.agentId) + + const userIdUUID = stringToUuid(selectedTweet.userId) + + await this.runtime.ensureConnection( + userIdUUID, + roomId, + selectedTweet.username, + selectedTweet.name, + 'twitter' + ) + + // crawl additional conversation tweets, if there are any + await buildConversationThread(selectedTweet, this.client) + + const message = { + id: stringToUuid(selectedTweet.id + '-' + this.runtime.agentId), + agentId: this.runtime.agentId, + content: { + text: selectedTweet.text, + url: selectedTweet.permanentUrl, + inReplyTo: selectedTweet.inReplyToStatusId + ? stringToUuid(selectedTweet.inReplyToStatusId + '-' + this.runtime.agentId) + : undefined + }, + userId: userIdUUID, + roomId, + // Timestamps are in seconds, but we need them in milliseconds + createdAt: selectedTweet.timestamp * 1000 + } + + if (!message.content.text) { + elizaLogger.warn('Returning: No response text found') + return + } + + // Fetch replies and retweets + const replies = selectedTweet.thread + const replyContext = replies + .filter((reply) => reply.username !== this.twitterUsername) + .map((reply) => `@${reply.username}: ${reply.text}`) + .join('\n') + + let tweetBackground = '' + if (selectedTweet.isRetweet) { + const originalTweet = await this.client.requestQueue.add(() => + this.client.twitterClient.getTweet(selectedTweet.id) + ) + tweetBackground = `Retweeting @${originalTweet.username}: ${originalTweet.text}` + } + + // Generate image descriptions using GPT-4 vision API + const imageDescriptions = [] + for (const photo of selectedTweet.photos) { + const description = await this.runtime + .getService(ServiceType.IMAGE_DESCRIPTION) + .describeImage(photo.url) + imageDescriptions.push(description) + } + + let state = await this.runtime.composeState(message, { + twitterClient: this.client.twitterClient, + twitterUserName: this.twitterUsername, + timeline: formattedHomeTimeline, + tweetContext: `${tweetBackground} + + Original Post: + By @${selectedTweet.username} + ${selectedTweet.text}${replyContext.length > 0 && `\nReplies to original post:\n${replyContext}`} + ${`Original post text: ${selectedTweet.text}`} + ${selectedTweet.urls.length > 0 ? `URLs: ${selectedTweet.urls.join(', ')}\n` : ''}${imageDescriptions.length > 0 ? `\nImages in Post (Described): ${imageDescriptions.join(', ')}\n` : ''} + ` + }) + + await this.client.saveRequestMessage(message, state) + + const context = composeContext({ + state, + template: this.runtime.character.templates?.twitterSearchTemplate || twitterSearchTemplate + }) + + const responseContent = await generateMessageResponse({ + runtime: this.runtime, + context, + modelClass: ModelClass.LARGE + }) + + responseContent.inReplyTo = message.id + + const response = responseContent + + if (!response.text) { + elizaLogger.warn('Returning: No response text found') + return + } + + elizaLogger.log(`Bot would respond to tweet ${selectedTweet.id} with: ${response.text}`) + try { + const callback: HandlerCallback = async (response: Content) => { + const memories = await sendTweet( + this.client, + response, + message.roomId, + this.twitterUsername, + selectedTweet.id + ) + return memories + } + + const responseMessages = await callback(responseContent) + + state = await this.runtime.updateRecentMessageState(state) + + for (const responseMessage of responseMessages) { + await this.runtime.messageManager.createMemory(responseMessage, false) + } + + state = await this.runtime.updateRecentMessageState(state) + + await this.runtime.evaluate(message, state) + + await this.runtime.processActions(message, responseMessages, state, callback) + + this.respondedTweets.add(selectedTweet.id) + const responseInfo = `Context:\n\n${context}\n\nSelected Post: + ${selectedTweet.id} - ${selectedTweet.username}: ${selectedTweet.text}\n + Agent's Output:\n${response.text}` + + await this.runtime.cacheManager.set( + `twitter/tweet_generation_${selectedTweet.id}.txt`, + responseInfo + ) + + await wait() + } catch (error) { + console.error(`Error sending response post: ${error}`) + } + } catch (error) { + console.error('Error engaging with search terms:', error) + } + } +} diff --git a/src/clients/twitter/types.ts b/src/clients/twitter/types.ts new file mode 100644 index 00000000..13275b1e --- /dev/null +++ b/src/clients/twitter/types.ts @@ -0,0 +1,158 @@ +import { Tweet } from 'agent-twitter-client' +import { z } from 'zod' + +export type MediaData = { + data: Buffer + mediaType: string +} + +export interface SpaceConfig { + mode: 'BROADCAST' | 'LISTEN' | 'INTERACTIVE' + title?: string + description?: string + languages?: string[] +} + +// Define Zod schema for Twitter API response +const TweetLegacySchema = z.object({ + full_text: z.string(), + conversation_id_str: z.string(), + created_at: z.string(), + user_id_str: z.string(), + in_reply_to_status_id_str: z.string().nullable().optional(), + bookmark_count: z.number().optional(), + is_quote_status: z.boolean().optional(), + retweeted: z.boolean().optional(), + favorite_count: z.number().optional(), + quote_count: z.number().optional(), + reply_count: z.number().optional(), + retweet_count: z.number().optional(), + retweeted_status_id_str: z.string().optional(), + quoted_status_id_str: z.string().optional(), + lang: z.string().optional(), + entities: z + .object({ + hashtags: z.array(z.any()).optional(), + user_mentions: z.array(z.any()).optional(), + urls: z.array(z.any()).optional(), + media: z + .array( + z.object({ + id_str: z.string(), + media_url_https: z.string(), + type: z.string(), + alt_text: z.string().optional() + }) + ) + .optional() + }) + .optional() +}) + +const UserResultSchema = z.object({ + legacy: z + .object({ + name: z.string().optional(), + screen_name: z.string().optional() + }) + .optional() +}) + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const CoreSchema = z.object({ + user_results: z + .object({ + result: UserResultSchema.optional() + }) + .optional() +}) + +// Define the schema for raw tweet data +export interface RawTweetType { + id?: string + id_str?: string + rest_id?: string + text?: string + html?: string + bookmarkCount?: number + conversationId?: string + hashtags?: unknown[] + inReplyToStatus?: Tweet + inReplyToStatusId?: string + isPin?: boolean + isReply?: boolean + isSelfThread?: boolean + mentions?: unknown[] + name?: string + permanentUrl?: string + photos?: { id: string; url: string; alt_text: string }[] + place?: unknown + poll?: { + options: { + label: string + position: number + votes: number + }[] + id: string + } + quotedStatusId?: string + thread?: Tweet[] + timestamp?: number + timeParsed?: Date + urls?: unknown[] + userId?: string + username?: string + videos?: { id: string; preview: string }[] + views?: { count?: string | number } + sensitiveContent?: boolean + legacy?: z.infer + core?: z.infer + quoted_status_result?: { + result?: RawTweetType + } + retweeted_status_result?: { + result?: RawTweetType + } + user_results?: { + result?: z.infer + } +} + +const TweetResultSchema = z.object({ + rest_id: z.string(), + legacy: TweetLegacySchema +}) + +const TweetResultsSchema = z.object({ + result: TweetResultSchema +}) + +export const CreateTweetResponseSchema = z.object({ + data: z + .object({ + create_tweet: z + .object({ + tweet_results: TweetResultsSchema + }) + .nullable() + .optional(), + notetweet_create: z + .object({ + tweet_results: TweetResultsSchema + }) + .nullable() + .optional() + }) + .nullable() + .optional() +}) + +export interface TwitterCookie { + key: string + value: string + domain: string + path: string + secure: boolean + httpOnly: boolean + sameSite?: 'Lax' | 'Strict' | 'None' | string +} diff --git a/src/clients/twitter/utils.ts b/src/clients/twitter/utils.ts new file mode 100644 index 00000000..af849ec1 --- /dev/null +++ b/src/clients/twitter/utils.ts @@ -0,0 +1,419 @@ +import type { ClientBase } from '@/clients/twitter/base' +import { CreateTweetResponseSchema, MediaData } from '@/clients/twitter/types' +import type { Content, Media, Memory, UUID } from '@elizaos/core' +import { elizaLogger, getEmbeddingZeroVector, stringToUuid } from '@elizaos/core' +import type { Tweet } from 'agent-twitter-client' +import fs from 'fs' +import path from 'path' + +export const wait = (minTime = 1000, maxTime = 3000): Promise => { + const waitTime = Math.floor(Math.random() * (maxTime - minTime + 1)) + minTime + return new Promise((resolve) => setTimeout(resolve, waitTime)) +} + +export const isValidTweet = (tweet: Tweet): boolean => { + // Filter out tweets with too many hashtags, @s, or $ signs, probably spam or garbage + const hashtagCount = (tweet.text?.match(/#/g) || []).length + const atCount = (tweet.text?.match(/@/g) || []).length + const dollarSignCount = (tweet.text?.match(/\$/g) || []).length + const totalCount = hashtagCount + atCount + dollarSignCount + + return hashtagCount <= 1 && atCount <= 2 && dollarSignCount <= 1 && totalCount <= 3 +} + +export async function buildConversationThread( + tweet: Tweet, + client: ClientBase, + maxReplies = 10 +): Promise { + const thread: Tweet[] = [] + const visited: Set = new Set() + + async function processThread(currentTweet: Tweet, depth = 0): Promise { + elizaLogger.debug('Processing tweet:', { + id: currentTweet.id, + inReplyToStatusId: currentTweet.inReplyToStatusId, + depth + }) + + if (!currentTweet) { + elizaLogger.debug('No current tweet found for thread building') + return + } + + // Stop if we've reached our reply limit + if (depth >= maxReplies) { + elizaLogger.debug('Reached maximum reply depth', depth) + return + } + + // Handle memory storage + const memory = await client.runtime.messageManager.getMemoryById( + stringToUuid(currentTweet.id + '-' + client.runtime.agentId) + ) + if (!memory) { + const roomId = stringToUuid(currentTweet.conversationId + '-' + client.runtime.agentId) + const userId = stringToUuid(currentTweet.userId) + + await client.runtime.ensureConnection( + userId, + roomId, + currentTweet.username, + currentTweet.name, + 'twitter' + ) + + await client.runtime.messageManager.createMemory({ + id: stringToUuid(currentTweet.id + '-' + client.runtime.agentId), + agentId: client.runtime.agentId, + content: { + text: currentTweet.text, + source: 'twitter', + url: currentTweet.permanentUrl, + inReplyTo: currentTweet.inReplyToStatusId + ? stringToUuid(currentTweet.inReplyToStatusId + '-' + client.runtime.agentId) + : undefined + }, + createdAt: currentTweet.timestamp * 1000, + roomId, + userId: + currentTweet.userId === client.profile.id + ? client.runtime.agentId + : stringToUuid(currentTweet.userId), + embedding: getEmbeddingZeroVector() + }) + } + + if (visited.has(currentTweet.id)) { + elizaLogger.debug('Already visited tweet:', currentTweet.id) + return + } + + visited.add(currentTweet.id) + thread.unshift(currentTweet) + + elizaLogger.debug('Current thread state:', { + length: thread.length, + currentDepth: depth, + tweetId: currentTweet.id + }) + + // If there's a parent tweet, fetch and process it + if (currentTweet.inReplyToStatusId) { + elizaLogger.debug('Fetching parent tweet:', currentTweet.inReplyToStatusId) + try { + const parentTweet = await client.twitterClient.getTweet(currentTweet.inReplyToStatusId) + + if (parentTweet) { + elizaLogger.debug('Found parent tweet:', { + id: parentTweet.id, + text: parentTweet.text?.slice(0, 50) + }) + await processThread(parentTweet, depth + 1) + } else { + elizaLogger.debug('No parent tweet found for:', currentTweet.inReplyToStatusId) + } + } catch (error) { + elizaLogger.error('Error fetching parent tweet:', { + tweetId: currentTweet.inReplyToStatusId, + error + }) + } + } else { + elizaLogger.debug('Reached end of reply chain at:', currentTweet.id) + } + } + + await processThread(tweet, 0) + + elizaLogger.debug('Final thread built:', { + totalTweets: thread.length, + tweetIds: thread.map((t) => ({ + id: t.id, + text: t.text?.slice(0, 50) + })) + }) + + return thread +} + +export async function fetchMediaData(attachments: Media[]): Promise { + return Promise.all( + attachments.map(async (attachment: Media) => { + if (/^(http|https):\/\//.test(attachment.url)) { + // Handle HTTP URLs + const response = await fetch(attachment.url) + if (!response.ok) { + throw new Error(`Failed to fetch file: ${attachment.url}`) + } + const mediaBuffer = Buffer.from(await response.arrayBuffer()) + const mediaType = attachment.contentType + return { data: mediaBuffer, mediaType } + } else if (fs.existsSync(attachment.url)) { + // Handle local file paths + const mediaBuffer = await fs.promises.readFile(path.resolve(attachment.url)) + const mediaType = attachment.contentType + return { data: mediaBuffer, mediaType } + } else { + throw new Error(`File not found: ${attachment.url}. Make sure the path is correct.`) + } + }) + ) +} + +export async function sendTweet( + client: ClientBase, + content: Content, + roomId: UUID, + twitterUsername: string, + inReplyTo: string +): Promise { + const maxTweetLength = client.twitterConfig.MAX_TWEET_LENGTH + const isLongTweet = maxTweetLength > 280 + + const tweetChunks = splitTweetContent(content.text, maxTweetLength) + const sentTweets: Tweet[] = [] + let previousTweetId = inReplyTo + + for (const chunk of tweetChunks) { + let mediaData = null + + if (content.attachments && content.attachments.length > 0) { + mediaData = await fetchMediaData(content.attachments) + } + + const cleanChunk = deduplicateMentions(chunk.trim()) + + const result = await client.requestQueue.add(async () => + isLongTweet + ? client.twitterClient.sendLongTweet(cleanChunk, previousTweetId, mediaData) + : client.twitterClient.sendTweet(cleanChunk, previousTweetId, mediaData) + ) + + const rawBody = await result.json() + + // Parse and validate the response with Zod + const parseResult = CreateTweetResponseSchema.safeParse(rawBody) + + if (parseResult.success) { + const body = parseResult.data + const tweetResult = isLongTweet + ? body?.data?.notetweet_create?.tweet_results?.result + : body?.data?.create_tweet?.tweet_results?.result + + if (tweetResult) { + const finalTweet: Tweet = { + id: tweetResult.rest_id, + text: tweetResult.legacy.full_text, + conversationId: tweetResult.legacy.conversation_id_str, + timestamp: new Date(tweetResult.legacy.created_at).getTime() / 1000, + userId: tweetResult.legacy.user_id_str, + inReplyToStatusId: tweetResult.legacy.in_reply_to_status_id_str || null, + permanentUrl: `https://twitter.com/${twitterUsername}/status/${tweetResult.rest_id}`, + hashtags: [], + mentions: [], + photos: [], + thread: [], + urls: [], + videos: [] + } + sentTweets.push(finalTweet) + previousTweetId = finalTweet.id + } else { + elizaLogger.error('Error sending tweet chunk: No tweet result found', { + chunk, + response: body + }) + } + } else { + elizaLogger.error('Error parsing tweet response:', { + chunk, + errors: parseResult.error.format(), + response: rawBody + }) + } + + // Wait a bit between tweets to avoid rate limiting issues + await wait(1000, 2000) + } + + const memories: Memory[] = sentTweets.map((tweet) => ({ + id: stringToUuid(tweet.id + '-' + client.runtime.agentId), + agentId: client.runtime.agentId, + userId: client.runtime.agentId, + content: { + tweetId: tweet.id, + text: tweet.text, + source: 'twitter', + url: tweet.permanentUrl, + inReplyTo: tweet.inReplyToStatusId + ? stringToUuid(tweet.inReplyToStatusId + '-' + client.runtime.agentId) + : undefined + }, + roomId, + embedding: getEmbeddingZeroVector(), + createdAt: tweet.timestamp * 1000 + })) + + return memories +} + +function splitTweetContent(content: string, maxLength: number): string[] { + const paragraphs = content.split('\n\n').map((p) => p.trim()) + const tweets: string[] = [] + let currentTweet = '' + + for (const paragraph of paragraphs) { + if (!paragraph) continue + + if ((currentTweet + '\n\n' + paragraph).trim().length <= maxLength) { + if (currentTweet) { + currentTweet += '\n\n' + paragraph + } else { + currentTweet = paragraph + } + } else { + if (currentTweet) { + tweets.push(currentTweet.trim()) + } + if (paragraph.length <= maxLength) { + currentTweet = paragraph + } else { + // Split long paragraph into smaller chunks + const chunks = splitParagraph(paragraph, maxLength) + tweets.push(...chunks.slice(0, -1)) + currentTweet = chunks[chunks.length - 1] + } + } + } + + if (currentTweet) { + tweets.push(currentTweet.trim()) + } + + return tweets +} + +function extractUrls(paragraph: string): { + textWithPlaceholders: string + placeholderMap: Map +} { + // replace https urls with placeholder + const urlRegex = /https?:\/\/[^\s]+/g + const placeholderMap = new Map() + + let urlIndex = 0 + const textWithPlaceholders = paragraph.replace(urlRegex, (match) => { + // twitter url would be considered as 23 characters + // <> is also 23 characters + const placeholder = `<>` // Placeholder without . ? ! etc + placeholderMap.set(placeholder, match) + urlIndex++ + return placeholder + }) + + return { textWithPlaceholders, placeholderMap } +} + +function splitSentencesAndWords(text: string, maxLength: number): string[] { + // Split by periods, question marks and exclamation marks + // Note that URLs in text have been replaced with `<>` and won't be split by dots + const sentences = text.match(/[^.!?]+[.!?]+|[^.!?]+$/g) || [text] + const chunks: string[] = [] + let currentChunk = '' + + for (const sentence of sentences) { + if ((currentChunk + ' ' + sentence).trim().length <= maxLength) { + if (currentChunk) { + currentChunk += ' ' + sentence + } else { + currentChunk = sentence + } + } else { + // Can't fit more, push currentChunk to results + if (currentChunk) { + chunks.push(currentChunk.trim()) + } + + // If current sentence itself is less than or equal to maxLength + if (sentence.length <= maxLength) { + currentChunk = sentence + } else { + // Need to split sentence by spaces + const words = sentence.split(' ') + currentChunk = '' + for (const word of words) { + if ((currentChunk + ' ' + word).trim().length <= maxLength) { + if (currentChunk) { + currentChunk += ' ' + word + } else { + currentChunk = word + } + } else { + if (currentChunk) { + chunks.push(currentChunk.trim()) + } + currentChunk = word + } + } + } + } + } + + // Handle remaining content + if (currentChunk) { + chunks.push(currentChunk.trim()) + } + + return chunks +} + +function deduplicateMentions(paragraph: string): string { + // Regex to match mentions at the beginning of the string + const mentionRegex = /^@(\w+)(?:\s+@(\w+))*(\s+|$)/ + + // Find all matches + const matches = paragraph.match(mentionRegex) + + if (!matches) { + return paragraph // If no matches, return the original string + } + + // Extract mentions from the match groups + let mentions = matches.slice(0, 1)[0].trim().split(' ') + + // Deduplicate mentions + mentions = [...new Set(mentions)] + + // Reconstruct the string with deduplicated mentions + const uniqueMentionsString = mentions.join(' ') + + // Find where the mentions end in the original string + const endOfMentions = paragraph.indexOf(matches[0]) + matches[0].length + + // Construct the result by combining unique mentions with the rest of the string + return uniqueMentionsString + ' ' + paragraph.slice(endOfMentions) +} + +function restoreUrls(chunks: string[], placeholderMap: Map): string[] { + return chunks.map((chunk) => { + // Replace all <> in chunk back to original URLs using regex + return chunk.replace(/<>/g, (match) => { + const original = placeholderMap.get(match) + return original || match // Return placeholder if not found (theoretically won't happen) + }) + }) +} + +function splitParagraph(paragraph: string, maxLength: number): string[] { + // 1) Extract URLs and replace with placeholders + const { textWithPlaceholders, placeholderMap } = extractUrls(paragraph) + + // 2) Use first section's logic to split by sentences first, then do secondary split + const splittedChunks = splitSentencesAndWords(textWithPlaceholders, maxLength) + + // 3) Replace placeholders back to original URLs + const restoredChunks = restoreUrls(splittedChunks, placeholderMap) + + return restoredChunks +}