我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用telegram.Bot()。
def test_requires_usergroup_no_acc(self): """ Test requires usergroup decorator if the user has no access """ with patch("ownbot.auth.User") as user_mock: user_mock.return_value.has_access.return_value = False @ownbot.auth.requires_usergroup("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) return True bot_mock = Mock(spec=Bot) update = self.__get_dummy_update() called = my_command_handler(bot_mock, update) self.assertIsNone(called)
def test_requires_usergroup_acc(self): """ Test requires usergroup decorator if the user has access """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock: user_mock = user_mock.return_value user_mock.has_acces.return_value = True @ownbot.auth.requires_usergroup("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) return True bot_mock = Mock(spec=Bot) update_mock = Update(1337) called = my_command_handler(bot_mock, update_mock) self.assertTrue(called)
def test_assign_first_to(self): """ Test assign first to decorator. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = True @ownbot.auth.assign_first_to("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertTrue(user_mock.save.called)
def test_assign_first_to_not_first(self): """ Test assign first to decorator if the users is not first. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = False @ownbot.auth.assign_first_to("foo") def my_command_handler(bot, update): """Dummy command handler""" print(bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertFalse(user_mock.save.called)
def test_assign_first_to_with_self(self): """ Test assign first to decorator with self as first argument. """ with patch("ownbot.auth.User") as user_mock,\ patch("test_auth.Update") as update_mock,\ patch("ownbot.auth.UserManager") as usrmgr_mock: user_mock = user_mock.return_value usrmgr_mock.return_value.group_is_empty.return_value = True @ownbot.auth.assign_first_to("foo") def my_command_handler(self, bot, update): """Dummy command handler""" print(self, bot, update) bot_mock = Mock(spec=Bot) update_mock = Update(1337) my_command_handler(None, bot_mock, update_mock) self.assertTrue(usrmgr_mock.return_value.group_is_empty.called) self.assertTrue(user_mock.save.called)
def admin_access(): def access(func): @wraps(func) def wrapped(self, bot: Bot, update: Update, *args, **kwargs): user = update.effective_user admins = getattr(self, 'admins', None) if admins is None: self.logger.warning('Specify self.admins (list of users ids) parameter in ' 'manager or channel classes in order to restrict access to bot.') return func(self, bot, update, *args, **kwargs) if user.id not in admins: self.logger.info(f"Unauthorized access denied for {user}.") return return func(self, bot, update, *args, **kwargs) return wrapped return access
def admin_access(admins_list=None): def access(func): @wraps(func) def wrapped(self, bot: Bot, update: Update, *args, **kwargs): user = update.effective_user admins = None if admins_list is None: admins = getattr(self, 'admins', None) if admins is None: self.logger.warning('Specify self.admins (list of users ids) parameter in ' 'manager or channel classes in order to restrict access to bot.') return func(self, bot, update, *args, **kwargs) if user.id not in admins: self.logger.info(f"Unauthorized access denied for {user}.") return return func(self, bot, update, *args, **kwargs) return wrapped return access
def command_accept_choice(self, bot: Bot, update: Update): query = update.callback_query update.message = query.message # because callback update doesn't have message at all chat_id = update.message.chat_id chosen_channel = self.chosen_channels.get(chat_id, None) if chosen_channel: chosen_channel.remove_commands_handlers(chat_id) chosen_channel: BaseChannel = self.channels[query.data] self.chosen_channels[chat_id] = chosen_channel chosen_channel.add_commands_handlers(chat_id) bot.edit_message_text(text=f'Chose {query.data} ({chosen_channel.channel_id}).', chat_id=query.message.chat_id, message_id=query.message.message_id) help = chosen_channel.help_text() update.message.reply_text('```\n' + help + '\n```', parse_mode=ParseMode.MARKDOWN)
def distribute(self, bot: Bot, update: Update, args): if args.help: self.send_code(update, args.help) elif args.command == 'add': usage = self.subparsers['add'].format_usage() self.add(bot, update, args.tags, usage) elif args.command in ['remove', 'delete']: usage = self.subparsers['remove'].format_usage() self.remove(bot, update, args.tags, usage) elif args.command == 'show': self.show(bot, update) else: self.logger.error('Bad args: ' + str(args)) raise Exception # will never get this far (hopefully)
def add(self, bot: Bot, update: Update, args: List[str], usage: str): if len(args) != 2: self.send_code(update, usage) return subreddits = {} name, score = args if score.isdecimal(): score = int(score) else: self.send_code(update, usage) return subreddits[name] = score self.store.add(subreddits) self.show(bot, update)
def main(): bot = Bot(environ.get('TOKEN')) chat_id = environ.get('CHAT_ID') post.updatedb() new_posts = post.get_posts(sent_only=False) if new_posts: for p in new_posts: try: bot.sendMessage(chat_id=chat_id, text=p.text(), parse_mode='HTML') post.mark_as_sent(p.uid) except BadRequest: logging.info('Bad post formatting: %d' % p.uid) logger.info('%d new post(s) have been sent!' % len(new_posts)) else: logger.info('No new posts at this time!')
def add_keyword(bot: telegram.Bot, update: telegram.Update, args: list): if update.message.chat_id not in global_vars.admin_list['TG']: return if len(args) == 0: update.message.reply_text('Usage: /add_keyword keyword1 keyword2 ...') return for keyword in args: logger.debug('keyword: ' + keyword) if keyword in global_vars.filter_list['keywords']: update.message.reply_text('Keyword: "' + keyword + '" already in list') continue global_vars.filter_list['keywords'].append(keyword) update.message.reply_text('Done.') save_data()
def add_channel(bot: telegram.Bot, update: telegram.Update): if update.message.forward_from_chat: if update.message.forward_from_chat.type != 'channel': # it seems forward_from_chat can only be channels update.message.reply_text( 'Message type error. Please forward me a message from channel, or use /cancel to stop') return if update.message.forward_from_chat.id not in global_vars.filter_list['channels']: global_vars.filter_list['channels'].append(update.message.forward_from_chat.id) save_data() update.message.reply_text('Okay, please send me another, or use /cancel to stop') else: update.message.reply_text('Already in list. Send me another or use /cancel to stop') return CHANNEL else: if update.message.text == '/cancel': update.message.reply_text('Done.') return ConversationHandler.END else: update.message.reply_text( 'Message type error. Please forward me a message from channel, or use /cancel to stop') return CHANNEL
def audio_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '[ ?? ]'} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def location_from_telegram(bot: telegram.Bot, update: telegram.Update): message: telegram.Message = update.message tg_group_id = message.chat_id # telegram group id forward_index = get_forward_index(tg_group_id=tg_group_id) latitude = message.location.latitude longitude = message.location.longitude reply_entity = list() reply_entity.append({ 'type': 'text', 'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)} }) qq_message_id = send_from_tg_to_qq(forward_index, message=reply_entity, tg_group_id=tg_group_id, tg_user=message.from_user, tg_forward_from=message, tg_reply_to=message.reply_to_message) global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
def setup(webhook_url=None): """If webhook_url is not passed, run with long-polling.""" logging.basicConfig(level=logging.WARNING) if webhook_url: bot = Bot(TOKEN) update_queue = Queue() dp = Dispatcher(bot, update_queue) else: updater = Updater(TOKEN) bot = updater.bot dp = updater.dispatcher dp.add_handler(MessageHandler([], example_handler)) # Remove this line # Add your handlers here if webhook_url: bot.set_webhook(webhook_url=webhook_url) thread = Thread(target=dp.start, name='dispatcher') thread.start() return update_queue, bot else: bot.set_webhook() # Delete webhook updater.start_polling() updater.idle()
def bot_hook(): """Entry point for the Telegram connection.""" bot = telegram.Bot(botdata['BotToken']) dispatcher = Dispatcher(bot, None, workers=0) dispatcher.add_handler(CommandHandler('Abfahrten', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('abfahrten', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('Abfahrt', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('abfahrt', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('A', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('a', abfahrten, pass_args=True)) dispatcher.add_handler(CommandHandler('Hilfe', hilfe)) dispatcher.add_handler(CommandHandler('hilfe', hilfe)) dispatcher.add_handler(CommandHandler('help', hilfe)) dispatcher.add_handler(MessageHandler(Filters.location, nearest_stations)) update = telegram.update.Update.de_json(request.json, bot) dispatcher.process_update(update) return 'OK'
def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]: """ Decorator to check if the message comes from the correct chat_id :param command_handler: Telegram CommandHandler :return: decorated function """ def wrapper(*args, **kwargs): bot, update = kwargs.get('bot') or args[0], kwargs.get('update') or args[1] # Reject unauthorized messages chat_id = int(_CONF['telegram']['chat_id']) if int(update.message.chat_id) != chat_id: logger.info('Rejected unauthorized message from: %s', update.message.chat_id) return wrapper logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id) try: return command_handler(*args, **kwargs) except BaseException: logger.exception('Exception occurred within Telegram module') return wrapper
def _status_table(bot: Bot, update: Update) -> None: """ Handler for /status table. Returns the current TradeThread status in table format :param bot: telegram bot :param update: message update :return: None """ # Fetch open trade trades = Trade.query.filter(Trade.is_open.is_(True)).all() if get_state() != State.RUNNING: send_msg('*Status:* `trader is not running`', bot=bot) elif not trades: send_msg('*Status:* `no active order`', bot=bot) else: trades_list = [] for trade in trades: # calculate profit and send message to user current_rate = exchange.get_ticker(trade.pair)['bid'] trades_list.append([ trade.id, trade.pair, shorten_date(arrow.get(trade.open_date).humanize(only_distance=True)), '{:.2f}'.format(100 * trade.calc_profit(current_rate)) ]) columns = ['ID', 'Pair', 'Since', 'Profit'] df_statuses = DataFrame.from_records(trades_list, columns=columns) df_statuses = df_statuses.set_index(columns[0]) message = tabulate(df_statuses, headers='keys', tablefmt='simple') message = "<pre>{}</pre>".format(message) send_msg(message, parse_mode=ParseMode.HTML)
def _balance(bot: Bot, update: Update) -> None: """ Handler for /balance Returns current account balance per crypto """ output = '' balances = [ c for c in exchange.get_balances() if c['Balance'] or c['Available'] or c['Pending'] ] if not balances: output = '`All balances are zero.`' for currency in balances: output += """*Currency*: {Currency} *Available*: {Available} *Balance*: {Balance} *Pending*: {Pending} """.format(**currency) send_msg(output)
def _count(bot: Bot, update: Update) -> None: """ Handler for /count. Returns the number of trades running :param bot: telegram bot :param update: message update :return: None """ if get_state() != State.RUNNING: send_msg('`trader is not running`', bot=bot) return trades = Trade.query.filter(Trade.is_open.is_(True)).all() message = tabulate({ 'current': [len(trades)], 'max': [_CONF['max_open_trades']] }, headers=['current', 'max'], tablefmt='simple') message = "<pre>{}</pre>".format(message) logger.debug(message) send_msg(message, parse_mode=ParseMode.HTML)
def _help(bot: Bot, update: Update) -> None: """ Handler for /help. Show commands of the bot :param bot: telegram bot :param update: message update :return: None """ message = """ */start:* `Starts the trader` */stop:* `Stops the trader` */status [table]:* `Lists all open trades` *table :* `will display trades in a table` */profit:* `Lists cumulative profit from all finished trades` */forcesell <trade_id>|all:* `Instantly sells the given trade or all trades, regardless of profit` */performance:* `Show performance of each finished trade grouped by pair` */count:* `Show number of trades running compared to allowed number of trades` */balance:* `Show account balance per currency` */help:* `This help message` */version:* `Show version` """ send_msg(message, bot=bot)
def broadcast(bot, update): info_log(update) bot = Bot(api_key) # list users from MySQL accounts_list = mysql_select_accounts_balances() # some users are bugged & stop broadcast - they deleted chat with bot. So we blacklist them BLACK_LIST = mysql_select_blacklist() for account in accounts_list: # if not in blacklist and has balance if ((account[0] not in BLACK_LIST) and (int(account[1]) > 0)): mysql_set_blacklist(account[0]) print(account[0]) push_simple(bot, account[0], update.message.text.replace('/broadcast ', '')) sleep(0.2) mysql_delete_blacklist(account[0]) # if someone deleted chat, broadcast will fail and he will remain in blacklist # bootstrap
def monitoring_block_count(): # set bot bot = Bot(api_key) count = int(rpc({"action": "block_count"}, 'count')) reference_count = int(reference_block_count()) http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where()) response = http.request('GET', summary_url, headers=header, timeout=20.0) try: json_data = json.loads(response.data) community_count = int(json_data['blocks']) except ValueError as e: community_count = reference_count difference = int(math.fabs(community_count - count)) response = http.request('GET', block_count_url, headers=header, timeout=20.0) raiwallet_count = int(response.data) if (difference > block_count_difference_threshold): # Warning to admins for user_id in admin_list: push(bot, user_id, 'Block count: {0}\nCommunity: {1}\nDifference: *{2}*\nReference: {3}\nraiwallet.info: {4}'.format(count, community_count, difference, reference_count, raiwallet_count)) # trying to fix bootstrap_multi()
def get_telegram_bot(): """ Creates a telegram bot instance. Returns: telegram.Bot: The bot instance. Raises: TelegramConfigurationError: If the telegram bot was not properly configured. """ logger = logging.getLogger() try: return Bot(Config().telegram_token) except (FileNotFoundError, ValueError, TelegramError) as exc: logger.error("Telegram token not present or invalid: '%s'", str(exc)) raise TelegramConfigurationError("Telegram token not " "present or invalid.")
def connect(self): config = ConfigParser.ConfigParser() if self.chatnetwork == 'telegram': try: config.read(self.keyslocation) self.telegram_access_token = config.get('telegram','access_token') self.logger.debug("Successfully read access keys") except Exception,e: self.logger.error("Could not read access keys: %s"%e) sys.exit(1) try: self.telegram_bot = telegram.Bot(token=self.telegram_access_token) self.telegram_updater = Updater(token=self.telegram_access_token) self.telegram_dispatcher = self.telegram_updater.dispatcher self.telegram_dispatcher.addTelegramMessageHandler(self.respond) except Exception,e: self.logger.error("Telegram bot connect failed: %s"%e) sys.exit(1) else: self.logger.error('Chatnetwork name not correct, found %s'%self.chatnetwork) return -1
def cmd_list_tokens(bot: telegram.Bot, update: telegram.Update): cid = update.message.chat_id if generic_checks(bot, update, "listtokens", "", 1): # Extract value text = update.message.text cmd, tag = text.split(" ", 1) q = Query() tokens = tokens_db.search(q.poll_tag == tag) msg = "There have been %i tokens generated for your poll:\n" % len( tokens) msg = msg + "\n".join( ["%i: %s" % (n + 1, t['token']) for n, t in enumerate(tokens)]) bot.sendMessage(cid, msg)
def user_is_in_chat(uid: int, cid: int, bot: telegram.Bot): # Returns True if the specified user_id is in the specified chat_id. # # Returns False if: # * the bot isn't in the chat (so it can't see the members), or; # * the chat doesn't exist, or; # * the user isn't in the chat, or; # * the user doesn't exist. try: user = bot.get_chat_member(chat_id=cid, user_id=uid) except telegram.error.BadRequest as err: if "chat not found" in err.message.lower(): logger.info(err_chat_not_found % (uid, cid)) return False elif "user not found" in err.message.lower(): logger.info(err_user_not_found % (uid, cid)) return False else: logger.exception("Unhandled exception") raise logger.info("User %s is in chat %s" % (uid, cid)) return True
def cmd_vote(bot: telegram.Bot, update: telegram.Update): cid = update.message.chat_id uid = update.message.from_user.id polls = get_polls(uid, bot) if update.message.chat.type != "private": bot.sendMessage(cid, text_private_chat_only) return ConversationHandler.END if len(polls) == 0: bot.sendMessage(cid, "You aren't eligible to vote in any polls.") else: keyboard_choices = [p["tag"] + ": " + p["title"] for p in polls] # keyboard array is a list of lists # because each list represents a new row # and we want each button on a separate row keyboard_array = [[k, ] for k in keyboard_choices] keyboard = ReplyKeyboardMarkup(keyboard_array, one_time_keyboard=True) bot.sendMessage(cid, "Click the button for the poll you would like to vote in.", reply_markup=keyboard) return state_vote_1
def get_polls(user_id: int, bot: telegram.Bot): # Gets a list of polls that the given user_id is allowed to vote in. # * The poll must be active. # * The user must belong to the poll's target_chat. (This is determined # by asking the Telegram API - "Does user 123 belong to chat -456?") all_polls = polls_db.all() # Convert to set to get unique values target_chats = set([p['target_chat'] for p in all_polls if p['active'] == True]) # Ask telegram API - is user 123 in chat -456? chats_user_is_in = [c for c in target_chats if user_is_in_chat(user_id, c, bot)] valid_polls = [p for p in all_polls if p['target_chat'] in chats_user_is_in and p['active'] == True] return valid_polls
def cmd_set_generic(bot: telegram.Bot, update: telegram.Update, field_name: str, cmd_name: str, example: str): if generic_checks(bot, update, cmd_name, example, 2): # Extract value text = update.message.text cmd, tag, value = text.split(" ", 2) # Apply edit and edit_poll(tag, field_name, value) # send "success" feedback message new_p = get_poll(tag) msg = text_edit_successful.format(p=new_p) cid = update.message.chat_id bot.sendMessage(cid, msg) # Check if complete, and suggest to activate if check_if_complete(tag): msg2 = text_activate_suggestion.format(t=tag) bot.sendMessage(cid, msg2, parse_mode="Markdown") else: # A generic check failed. pass return
def main(): global update_id # Telegram Bot Authorization Token f = open("../token.txt", "r") token = f.readline() f.close() bot = telegram.Bot(token=token) # get the first pending update_id, this is so we can skip over it in case # we get an "Unauthorized" exception. try: update_id = bot.getUpdates()[0].update_id except IndexError: update_id = None logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') while True: try: echo(bot) except NetworkError: sleep(1) except Unauthorized: # The user has removed or blocked the bot. update_id += 1
def execute(self, chat_id, bot: Bot, update: Update): handled = False if update.message.text in self._buildings_dict: handled = True building_identifier = self._buildings_dict[update.message.text] classrooms = self.get_classroom_source().get_classrooms_in_building(building_identifier) keyboard_button = [] counter = 0 row = -1 for classroom in classrooms: if counter == 0: keyboard_button.append([]) row += 1 keyboard_button[row].append(KeyboardButton("/"+classroom.get_name().lower())) counter += 1 if counter == 3: counter = 0 keyboard_button.append(["/buildings"]) reply_keyboard = ReplyKeyboardMarkup(keyboard_button) bot.send_message(chat_id=chat_id, text="Available classrooms", reply_markup=reply_keyboard) return handled
def help_about(bot: telegram.Bot, update: telegram.Update): bot.send_message( chat_id=update.message.chat_id, text=strings.HELP_MESSAGE, parse_mode='HTML', disable_web_page_preview=True, reply_markup=strings.HELP_MESSAGE_KEYBOARD ) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='help.about' ) logging.info('Command help: about.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def settings_message(bot: telegram.Bot, update: telegram.Update): chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id, admin_name=update.message.from_user.first_name) if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id: send_settings_message(bot, chat_settings) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='settings.main' ) logging.info('Command settings: main.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def send_settings_message(bot: telegram.Bot, chat_settings: bot_types.ChatSettings): bot.send_message( chat_id=chat_settings.id, text=strings.SETTINGS_MESSAGE % ( chat_settings.admin_name, str(chat_settings.mode), '???????' if chat_settings.quiet else '????????', '?????? ????????????? ????' if chat_settings.admin_only else '??? ??????', chat_settings.yandex_key if chat_settings.yandex_key else '?? ??????????', str(chat_settings.voice), str(chat_settings.emotion), str(chat_settings.speed), '???????????' if chat_settings.as_audio else '????????? ?????????', '????????? ?????????' if chat_settings.as_audio else '???????????', '?????????' if chat_settings.quiet else '????????', '?????' if chat_settings.admin_only else '?????? ????????????? ????' ), parse_mode='HTML' ) # endregion # region settings voice
def settings_voice_message(bot: telegram.Bot, update: telegram.Update): chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id, admin_name=update.message.from_user.first_name) if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id: send_settings_voice_message(bot, update.message.chat_id) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='settings.voice.get' ) logging.info('Command settings: voice.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def send_settings_voice_message_callback(bot: telegram.Bot, voice: bot_types.Voice, callback_query_id: str, quiet: bool): if quiet: bot.answer_callback_query( callback_query_id=callback_query_id ) else: bot.answer_callback_query( callback_query_id=callback_query_id, text=strings.NEW_VOICE_MESSAGE % str(voice) ) # endregion # region settings emotion
def settings_emotion_message(bot: telegram.Bot, update: telegram.Update): chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id, admin_name=update.message.from_user.first_name) if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id: send_settings_emotion_message(bot, update.message.chat_id) bot_types.Botan.track( uid=update.message.chat_id, message=update.message.to_dict(), name='settings.emotions.get' ) logging.info('Command settings: emotion.', extra={ 'telegram': { 'update': update.to_dict(), 'chat_id': update.message.chat_id, 'message_id': update.message.message_id, }, 'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id)) })
def send_settings_emotion_message_callback(bot: telegram.Bot, emotion: bot_types.Emotion, callback_query_id: str, quiet: bool): if quiet: bot.answer_callback_query( callback_query_id=callback_query_id ) else: bot.answer_callback_query( callback_query_id=callback_query_id, text=strings.NEW_EMOTION_MESSAGE % str(emotion) ) # endregion # region settings mode
def send_settings_mode_message_callback(bot: telegram.Bot, mode: bot_types.Mode, callback_query_id: str, quiet: bool): if quiet: bot.answer_callback_query( callback_query_id=callback_query_id ) else: bot.answer_callback_query( callback_query_id=callback_query_id, text=strings.NEW_MODE_MESSAGE % str(mode) ) # endregion # region settings audio
def set_auto(bot: Bot, update: Update): """ Handles /auto command :param bot: :param update: :return: """ global data chat_id = update.message.chat_id if chat_id in data.conversations: data.conversations[chat_id].auto = not data.conversations[chat_id].auto if data.conversations[chat_id].auto: message = "Automatic mode enabled." else: message = "Automatic mode disabled." send_custom_message(bot=bot, chat_id=chat_id, message=message) else: send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def send_matches(bot: Bot, update: Update): """ Send list of matches (pictures) to private chat :param bot: :param update: :return: """ global data chat_id = update.message.chat_id sender_id = update.message.from_user.id if chat_id in data.conversations: try: conversation = data.conversations[chat_id] matches = conversation.get_matches() id = 0 # TODO cache the photo urls for some time for match in matches: photo = match.user.get_photos(width='172')[0] try: is_msg_sent = send_private_photo(bot=bot, caption=match.user.name + " ID=" + str(id), url=photo, user_id=sender_id) if not is_msg_sent: notify_start_private_chat(bot=bot, chat_id=chat_id, incoming_message=update.message) break id += 1 except error.BadRequest: send_custom_message(bot=bot, chat_id=sender_id, message="One match could not be loaded: %s" % photo) time.sleep(0.1) except AttributeError as e: message = "An error happened." send_custom_message(bot=bot, chat_id=sender_id, message=message) else: send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
def unlink(bot: Bot, update: Update): """ Handles /unlink command :param bot: :param update: :return: """ global data sender = update.message.from_user.id chat_id = update.message.chat_id if chat_id in data.conversations: if sender == data.conversations[chat_id].owner: del data.conversations[chat_id] send_message(bot, chat_id, name="account_unlinked") else: send_error(bot, chat_id=chat_id, name="command_not_allowed") else: send_error(bot, chat_id=chat_id, name="account_not_setup")
def test_admin_help(self): """ Test admin help command """ bot = Mock(spec=Bot) update = self.__get_dummy_update() AdminCommands._AdminCommands__admin_help( # pylint: disable=no-member, protected-access bot, update) self.assertTrue(bot.sendMessage.called)
def test_get_users_no_config(self): """ Test get users command if no users are registered """ bot = Mock(spec=Bot) update = self.__get_dummy_update() with patch("ownbot.admincommands.UserManager") as usrmgr_mock: usrmgr_mock.return_value.config = {} AdminCommands._AdminCommands__get_users( # pylint: disable=no-member, protected-access bot, update) self.assertTrue(bot.sendMessage.called)
def test_get_users(self): """ Test get users command """ bot = Mock(spec=Bot) update = self.__get_dummy_update() with patch("ownbot.admincommands.UserManager") as usrmgr_mock: config = {"foogroup": {"users": [{"id": 1337, "username": "@foouser"}], "unverified": ["@baruser"]}} usrmgr_mock.return_value.config = config AdminCommands._AdminCommands__get_users( # pylint: disable=no-member, protected-access bot, update) self.assertTrue(bot.sendMessage.called)
def test_adduser_no_args(self): """ Test adduser command if the wrong number of args is passed """ bot = Mock(spec=Bot) update = self.__get_dummy_update() AdminCommands._AdminCommands__add_user( # pylint: disable=no-member, protected-access bot, update, []) bot.sendMessage.assert_called_with( chat_id=1, text="Usage: adduser <user> <group>")
def test_adduser_usr_already_in_grp(self): """ Test adduser command if user is already in group """ bot = Mock(spec=Bot) update = self.__get_dummy_update() with patch("ownbot.admincommands.UserManager") as usrmgr_mock: usrmgr_mock.return_value.add_user.return_value = False AdminCommands._AdminCommands__add_user( # pylint: disable=no-member, protected-access bot, update, ["@foouser", "foogroup"]) bot.sendMessage.assert_called_with( chat_id=1, text="The user '@foouser' is already in the group 'foogroup'!")
def test_rmuser_no_args(self): """ Test rmuser command if the wrong number of args is passed """ bot = Mock(spec=Bot) update = self.__get_dummy_update() AdminCommands._AdminCommands__rm_user( # pylint: disable=no-member, protected-access bot, update, []) bot.sendMessage.assert_called_with(chat_id=1, text="Usage: rmuser <user> <group>")