Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 144 additions & 197 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,202 +1,149 @@
import psutil

import program_info
import main_sheet
import sheet_opener
import user


def first_greeting():
users = psutil.users()
names = []
for i in range(0, len(users)):
names.append(users[i][0])
if len(names) == 1:
print('Nice to meet you, {}!'.format(names[0]))
user_name = names[0]
else:
print("You are {}, aren't you? Press Y for confirmation, else press N: ".format(names[0]), end='')
s = input()
if s == 'Y':
print('So now we know that you are {}'.format(names[0]))
user_name = names[0]
if s == 'N':
print('Other available users are:')
for i in range(1, len(names)):
print('#{}: {}'.format(i, names[i]))
print('Who are you? Print index: #', end='')
while True:
number = input()
try:
number = int(number)
except ValueError:
print("You've typed some cringe, try again: #", end='')
continue
if number >= len(names):
print("There aren't so many options, enter smaller number: #", end='')
continue
break
print('OK, hi, {}!'.format(names[number]))
user_name = names[number]
if number == 0:
print("We are wondering why you at first refused to agree that you are {}".
format(names[number]))
return user.User.upload_user(user_name)


def add_row(words):
opener = sheet_opener.SheetOpener()
words_set = set(words[1:])
key = None
page = None
row = None
for i in words_set:
if i.startswith("key="):
key = i.removeprefix("key=")
if i.startswith("page="):
page = i.removeprefix("page=")
if i.startswith("row="):
row = i.removeprefix("row=")

current_user.add_enq(opener.sheet_objs[key], row, page)


def add_spreadsheet(words):
opener = sheet_opener.SheetOpener()
words_set = set(words[1:])
new_sheet_id = None
new_sheet_name = None
new_sheet_key = None
for i in words_set:
if i.startswith('https://docs.google.com/spreadsheets/d/'):
new_sheet_id = i.removeprefix('https://docs.google.com/spreadsheets/d/')
new_sheet_id = new_sheet_id[:new_sheet_id.find('/edit')]
if i.startswith("name="):
new_sheet_name = i.removeprefix("name=")
if i.startswith("key="):
new_sheet_key = i.removeprefix("key=")

was_existing = False
for i in opener.sheet_objs.values():
if i.sheet_id == new_sheet_id:
print("[ERROR] You've already registered that table")
was_existing = True
break
if i.key == new_sheet_key:
print("[ERROR] You've already used that key, choose another one")
was_existing = True
break
if i.name == new_sheet_name:
print("[ERROR] You've already used that name, choose another one")
was_existing = True
break
if not was_existing:
opener.add_sheet_object(sheet_opener.SheetObject(new_sheet_id,
new_sheet_name, new_sheet_key))
print("[RESULTS] Spreadsheet {} was added to your list!".format(new_sheet_name))


def display_sheets():
sheets_list = []
for sheet_obj in sheet_opener.SheetOpener().sheet_objs.values():
sheets_list.append(sheet_obj.name)
if len(sheets_list) == 0:
print("No available spreadsheets yet. You can add them via add_spreadsheet url name=... key=...")
import logging

from telegram import Bot, MessageEntity, ReplyKeyboardMarkup, ReplyKeyboardRemove, Update
from telegram.ext import (
Application,
CommandHandler,
ContextTypes,
ConversationHandler,
MessageHandler,
PicklePersistence,
Updater,
filters
)

# Enable logging

logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
CHOOSING, PARAMETERS_CHOICE = range(2)

reply_keyboard = [
["Add row", "Add spreadsheet"],
["Display sheets", "Display sheet keys"],
["Display targets", "Track all targets"],
["Exit"]
]
bookmark_tabs_emoji = '\U0001F4D1'
winking_face_emoji = '\U0001F609'
markup = ReplyKeyboardMarkup(reply_keyboard, one_time_keyboard=True)
with open('bot_token.txt', 'r') as file:
TOKEN = file.readline()


def return_bot():
return Bot(token=TOKEN)


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
if context.user_data:
reply_text = f"{winking_face_emoji} Hi again, {update.message.from_user['username']}!"
await update.message.reply_text(reply_text, reply_markup=markup)
await main_sheet.EnqProcessor.read_input(context, 'track all targets')
else:
result = '' # before was "Names of available spreadsheets are: ", it was found redundant
sheets_list.sort()
for i in sheets_list:
result += i + ', '
result = result[:-2]
print(result)


def display_sheet_keys():
keys_list = {}
for sheet_obj in sheet_opener.SheetOpener().sheet_objs.values():
keys_list[sheet_obj.key] = sheet_obj.name
if len(keys_list) == 0:
print("No available spreadsheets yet. You can add them via add_spreadsheet url name=... key=...")
else:
for key, value in keys_list.items():
print(f'"{key}" for spreadsheet "{value}"')


def display_targets():
targets = []
for sheet_obj in current_user.sheets_of_interest.values():
targets.append(sheet_obj.key)
if len(targets) == 0:
print("No available rows to watch yet. You can add them via add_row key=... page=... row=...")
else:
print("Available targets are:")
targets.sort()
for i in targets:
print('*', i)


def words_starts_with(words_list, string):
string_list = string.split()
if len(string_list) > len(words_list):
return False
for i in range(len(string_list)):
if string_list[i] != words_list[i]:
return False
return True

def main():
global current_user
prog_info = program_info.ProgramInfo.upload()
if not prog_info.last_user_name:
current_user = first_greeting()
prog_info.last_user_name = current_user.name
prog_info.save()
else:
print('Hi, {}!'.format(prog_info.last_user_name))
current_user = user.User.upload_user(prog_info.last_user_name)
opener = sheet_opener.SheetOpener()
opener.load_saved()

for key in current_user.sheets_of_interest:
current_user.enquire(key)

while True:
s = input()
words = s.split()
for i in range(10):
if len(words) == 0:
break
if words_starts_with(words, 'add spreadsheet'):
add_spreadsheet(words)
words = words[5:]
elif words_starts_with(words, 'add row'):
add_row(words)
words = words[5:]
elif words_starts_with(words, 'display sheets'):
display_sheets()
words = words[2:]
elif words_starts_with(words, 'display sheet keys'):
display_sheet_keys()
words = words[3:]
elif words_starts_with(words, 'display targets'):
display_targets()
words = words[2:]
elif words_starts_with(words, 'track all targets'):
words = words[3:]
for key in current_user.sheets_of_interest:
current_user.enquire(key)
elif words_starts_with(words, 'exit'):
print('Goodbye, {}!'.format(current_user.name))
break
else:
print(f'[Misprint] "{words[0]}" is unknown command, skipping it')
words = words[1:]

current_user.save()
opener.save_sheets()
prog_info.save()


current_user = None
if __name__ == '__main__':
reply_text = f"{bookmark_tabs_emoji}Hi! I am Check Sheets Bot. I can track your"\
f" spreadsheets in Google docs and inform you about changes!\n"
reply_text += (
"You are newcomer so it will be good of you to tell me what to track."
)
context.user_data['sheet_opener'] = sheet_opener.SheetOpener()
context.user_data['sheet_opener'].create_dummy_self()
context.user_data['user_obj'] = user.User(update, return_bot)
await update.message.reply_text(reply_text, reply_markup=markup)
return CHOOSING


'''
async def callback_30(context: ContextTypes.DEFAULT_TYPE, second_par):
print(context.user_data)
# await context.bot.send_message(chat_id='@examplechannel', text='One message every minute')
await main_sheet.EnqProcessor.read_input(context, 'track_all_targets')
'''


async def command_choice(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Perform predefined choice or ask for parameters."""
context.user_data.pop('current_routine', None)
text = update.message.text.lower()
if text == '/start':
await update.message.reply_text("It seems that you've already done int")
return CHOOSING
res = await main_sheet.EnqProcessor.read_input(context, text)
if res and res != 'UnknownCommand':
context.user_data['current_routine'] = text
await update.message.reply_text(res)
return PARAMETERS_CHOICE
if res == 'UnknownCommand':
await update.message.reply_text("Sorry I didn't understand it.")
'''
global application
job_queue = application.job_queue
job_queue.run_once(callback_30, 2)
'''
return CHOOSING


async def parameters_choice(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
if 'current_routine' not in context.user_data.keys():
await update.message.reply_text('Seems that you want to insert parameters,'
' but not said earlier for what')
return CHOOSING
command = context.user_data['current_routine']
context.user_data.pop('current_routine', None)
words = update.message.text.split()
res = await main_sheet.EnqProcessor.read_input(context,
command, words)
if res == 'BadText':
await update.message.reply_text('Seems that you tried to insert '
'parameters for something invalid')
elif res:
await update.message.reply_text(res)
return CHOOSING


async def exit_bot(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text(
f"Ok, {update.message.from_user.full_name}, until next time!",
reply_markup=ReplyKeyboardRemove(),
)
return ConversationHandler.END


def main() -> None:
"""Run the bot."""
global application
persistence = PicklePersistence(filepath="checksheets_data")
application = Application.builder().token(TOKEN).\
persistence(persistence).build()

# Add conversation handler with the states CHOOSING, PARAMETERS_CHOICE
conv_handler = ConversationHandler(
entry_points=[CommandHandler("start", start)],
states={
CHOOSING: [
MessageHandler(filters.TEXT & ~filters.Regex("^Exit$"), command_choice)
],
PARAMETERS_CHOICE: [
MessageHandler(
filters=filters.TEXT | filters.Entity(MessageEntity.URL),
callback=parameters_choice)
],
},
fallbacks=[MessageHandler(filters.Regex("^Exit$"), exit_bot)],
name="my_conversation",
persistent=True,
)
application.add_handler(conv_handler)
# Run the bot until the user presses Ctrl-C
application.run_polling()


application = None
if __name__ == "__main__":
main()

Loading