Python telegram 模块,Bot() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用telegram.Bot()

项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_requires_usergroup_no_acc(self):
        """
            Test requires usergroup decorator if the user has no access
        """
        with patch("ownbot.auth.User") as user_mock:
            user_mock.return_value.has_access.return_value = False

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update = self.__get_dummy_update()
            called = my_command_handler(bot_mock, update)

            self.assertIsNone(called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_requires_usergroup_acc(self):
        """
            Test requires usergroup decorator if the user has access
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock:
            user_mock = user_mock.return_value
            user_mock.has_acces.return_value = True

            @ownbot.auth.requires_usergroup("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)
                return True

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            called = my_command_handler(bot_mock, update_mock)

            self.assertTrue(called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_assign_first_to(self):
        """
            Test assign first to decorator.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = True

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertTrue(user_mock.save.called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_assign_first_to_not_first(self):
        """
            Test assign first to decorator if the users is not first.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = False

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(bot, update):
                """Dummy command handler"""
                print(bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertFalse(user_mock.save.called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_assign_first_to_with_self(self):
        """
            Test assign first to decorator with self as first argument.
        """
        with patch("ownbot.auth.User") as user_mock,\
                patch("test_auth.Update") as update_mock,\
                patch("ownbot.auth.UserManager") as usrmgr_mock:

            user_mock = user_mock.return_value
            usrmgr_mock.return_value.group_is_empty.return_value = True

            @ownbot.auth.assign_first_to("foo")
            def my_command_handler(self, bot, update):
                """Dummy command handler"""
                print(self, bot, update)

            bot_mock = Mock(spec=Bot)
            update_mock = Update(1337)
            my_command_handler(None, bot_mock, update_mock)

            self.assertTrue(usrmgr_mock.return_value.group_is_empty.called)
            self.assertTrue(user_mock.save.called)
项目:telegram-autoposter    作者:vaniakosmos    | 项目源码 | 文件源码
def admin_access():
    def access(func):
        @wraps(func)
        def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
            user = update.effective_user
            admins = getattr(self, 'admins', None)

            if admins is None:
                self.logger.warning('Specify self.admins (list of users ids) parameter in '
                                    'manager or channel classes in order to restrict access to bot.')
                return func(self, bot, update, *args, **kwargs)

            if user.id not in admins:
                self.logger.info(f"Unauthorized access denied for {user}.")
                return

            return func(self, bot, update, *args, **kwargs)
        return wrapped
    return access
项目:memes-reposter    作者:vaniakosmos    | 项目源码 | 文件源码
def admin_access(admins_list=None):
    def access(func):
        @wraps(func)
        def wrapped(self, bot: Bot, update: Update, *args, **kwargs):
            user = update.effective_user
            admins = None
            if admins_list is None:
                admins = getattr(self, 'admins', None)

            if admins is None:
                self.logger.warning('Specify self.admins (list of users ids) parameter in '
                                    'manager or channel classes in order to restrict access to bot.')
                return func(self, bot, update, *args, **kwargs)

            if user.id not in admins:
                self.logger.info(f"Unauthorized access denied for {user}.")
                return

            return func(self, bot, update, *args, **kwargs)
        return wrapped
    return access
项目:memes-reposter    作者:vaniakosmos    | 项目源码 | 文件源码
def command_accept_choice(self, bot: Bot, update: Update):
        query = update.callback_query
        update.message = query.message  # because callback update doesn't have message at all
        chat_id = update.message.chat_id

        chosen_channel = self.chosen_channels.get(chat_id, None)
        if chosen_channel:
            chosen_channel.remove_commands_handlers(chat_id)

        chosen_channel: BaseChannel = self.channels[query.data]
        self.chosen_channels[chat_id] = chosen_channel
        chosen_channel.add_commands_handlers(chat_id)

        bot.edit_message_text(text=f'Chose {query.data} ({chosen_channel.channel_id}).',
                              chat_id=query.message.chat_id,
                              message_id=query.message.message_id)
        help = chosen_channel.help_text()
        update.message.reply_text('```\n' + help + '\n```', parse_mode=ParseMode.MARKDOWN)
项目:memes-reposter    作者:vaniakosmos    | 项目源码 | 文件源码
def distribute(self, bot: Bot, update: Update, args):
        if args.help:
            self.send_code(update, args.help)

        elif args.command == 'add':
            usage = self.subparsers['add'].format_usage()
            self.add(bot, update, args.tags, usage)

        elif args.command in ['remove', 'delete']:
            usage = self.subparsers['remove'].format_usage()
            self.remove(bot, update, args.tags, usage)

        elif args.command == 'show':
            self.show(bot, update)

        else:
            self.logger.error('Bad args: ' + str(args))
            raise Exception  # will never get this far (hopefully)
项目:memes-reposter    作者:vaniakosmos    | 项目源码 | 文件源码
def add(self, bot: Bot, update: Update, args: List[str], usage: str):
        if len(args) != 2:
            self.send_code(update, usage)
            return

        subreddits = {}
        name, score = args
        if score.isdecimal():
            score = int(score)
        else:
            self.send_code(update, usage)
            return
        subreddits[name] = score

        self.store.add(subreddits)
        self.show(bot, update)
项目:hardmob_promo    作者:leandrotoledo    | 项目源码 | 文件源码
def main():
    bot = Bot(environ.get('TOKEN'))
    chat_id = environ.get('CHAT_ID')

    post.updatedb()

    new_posts = post.get_posts(sent_only=False)

    if new_posts:
        for p in new_posts:
            try:
                bot.sendMessage(chat_id=chat_id, text=p.text(), parse_mode='HTML')
                post.mark_as_sent(p.uid)
            except BadRequest:
                logging.info('Bad post formatting: %d' % p.uid)

        logger.info('%d new post(s) have been sent!' % len(new_posts))
    else:
        logger.info('No new posts at this time!')
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def add_keyword(bot: telegram.Bot,
                update: telegram.Update,
                args: list):
    if update.message.chat_id not in global_vars.admin_list['TG']:
        return

    if len(args) == 0:
        update.message.reply_text('Usage: /add_keyword keyword1 keyword2 ...')
        return
    for keyword in args:
        logger.debug('keyword: ' + keyword)
        if keyword in global_vars.filter_list['keywords']:
            update.message.reply_text('Keyword: "' + keyword + '" already in list')
            continue
        global_vars.filter_list['keywords'].append(keyword)
    update.message.reply_text('Done.')
    save_data()
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def add_channel(bot: telegram.Bot,
                update: telegram.Update):
    if update.message.forward_from_chat:
        if update.message.forward_from_chat.type != 'channel':  # it seems forward_from_chat can only be channels
            update.message.reply_text(
                'Message type error. Please forward me a message from channel, or use /cancel to stop')
            return
        if update.message.forward_from_chat.id not in global_vars.filter_list['channels']:
            global_vars.filter_list['channels'].append(update.message.forward_from_chat.id)
            save_data()
            update.message.reply_text('Okay, please send me another, or use /cancel to stop')
        else:
            update.message.reply_text('Already in list. Send me another or use /cancel to stop')
        return CHANNEL
    else:
        if update.message.text == '/cancel':
            update.message.reply_text('Done.')
            return ConversationHandler.END
        else:
            update.message.reply_text(
                'Message type error. Please forward me a message from channel, or use /cancel to stop')
            return CHANNEL
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def audio_from_telegram(bot: telegram.Bot,
                        update: telegram.Update):
    message: telegram.Message = update.message

    tg_group_id = message.chat_id  # telegram group id
    forward_index = get_forward_index(tg_group_id=tg_group_id)

    reply_entity = list()

    reply_entity.append({
        'type': 'text',
        'data': {'text': '[ ?? ]'}
    })
    qq_message_id = send_from_tg_to_qq(forward_index,
                                       message=reply_entity,
                                       tg_group_id=tg_group_id,
                                       tg_user=message.from_user,
                                       tg_forward_from=message,
                                       tg_reply_to=message.reply_to_message)
    global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def location_from_telegram(bot: telegram.Bot,
                           update: telegram.Update):
    message: telegram.Message = update.message

    tg_group_id = message.chat_id  # telegram group id
    forward_index = get_forward_index(tg_group_id=tg_group_id)

    latitude = message.location.latitude
    longitude = message.location.longitude
    reply_entity = list()

    reply_entity.append({
        'type': 'text',
        'data': {'text': '????????' + get_location_from_baidu(latitude, longitude)}
    })
    qq_message_id = send_from_tg_to_qq(forward_index,
                                       message=reply_entity,
                                       tg_group_id=tg_group_id,
                                       tg_user=message.from_user,
                                       tg_forward_from=message,
                                       tg_reply_to=message.reply_to_message)
    global_vars.mdb.append_message(qq_message_id, message.message_id, forward_index, 0)
项目:python-telegram-bot-openshift    作者:lufte    | 项目源码 | 文件源码
def setup(webhook_url=None):
    """If webhook_url is not passed, run with long-polling."""
    logging.basicConfig(level=logging.WARNING)
    if webhook_url:
        bot = Bot(TOKEN)
        update_queue = Queue()
        dp = Dispatcher(bot, update_queue)
    else:
        updater = Updater(TOKEN)
        bot = updater.bot
        dp = updater.dispatcher
    dp.add_handler(MessageHandler([], example_handler))  # Remove this line
    # Add your handlers here
    if webhook_url:
        bot.set_webhook(webhook_url=webhook_url)
        thread = Thread(target=dp.start, name='dispatcher')
        thread.start()
        return update_queue, bot
    else:
        bot.set_webhook()  # Delete webhook
        updater.start_polling()
        updater.idle()
项目:verkehrsbot    作者:dirkonet    | 项目源码 | 文件源码
def bot_hook():
    """Entry point for the Telegram connection."""
    bot = telegram.Bot(botdata['BotToken'])
    dispatcher = Dispatcher(bot, None, workers=0)
    dispatcher.add_handler(CommandHandler('Abfahrten', abfahrten, pass_args=True))
    dispatcher.add_handler(CommandHandler('abfahrten', abfahrten, pass_args=True))
    dispatcher.add_handler(CommandHandler('Abfahrt', abfahrten, pass_args=True))
    dispatcher.add_handler(CommandHandler('abfahrt', abfahrten, pass_args=True))
    dispatcher.add_handler(CommandHandler('A', abfahrten, pass_args=True))
    dispatcher.add_handler(CommandHandler('a', abfahrten, pass_args=True))
    dispatcher.add_handler(CommandHandler('Hilfe', hilfe))
    dispatcher.add_handler(CommandHandler('hilfe', hilfe))
    dispatcher.add_handler(CommandHandler('help', hilfe))
    dispatcher.add_handler(MessageHandler(Filters.location, nearest_stations))
    update = telegram.update.Update.de_json(request.json, bot)
    dispatcher.process_update(update)

    return 'OK'
项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def authorized_only(command_handler: Callable[[Bot, Update], None]) -> Callable[..., Any]:
    """
    Decorator to check if the message comes from the correct chat_id
    :param command_handler: Telegram CommandHandler
    :return: decorated function
    """
    def wrapper(*args, **kwargs):
        bot, update = kwargs.get('bot') or args[0], kwargs.get('update') or args[1]

        # Reject unauthorized messages
        chat_id = int(_CONF['telegram']['chat_id'])
        if int(update.message.chat_id) != chat_id:
            logger.info('Rejected unauthorized message from: %s', update.message.chat_id)
            return wrapper

        logger.info('Executing handler: %s for chat_id: %s', command_handler.__name__, chat_id)
        try:
            return command_handler(*args, **kwargs)
        except BaseException:
            logger.exception('Exception occurred within Telegram module')
    return wrapper
项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def _status_table(bot: Bot, update: Update) -> None:
    """
    Handler for /status table.
    Returns the current TradeThread status in table format
    :param bot: telegram bot
    :param update: message update
    :return: None
    """
    # Fetch open trade
    trades = Trade.query.filter(Trade.is_open.is_(True)).all()
    if get_state() != State.RUNNING:
        send_msg('*Status:* `trader is not running`', bot=bot)
    elif not trades:
        send_msg('*Status:* `no active order`', bot=bot)
    else:
        trades_list = []
        for trade in trades:
            # calculate profit and send message to user
            current_rate = exchange.get_ticker(trade.pair)['bid']
            trades_list.append([
                trade.id,
                trade.pair,
                shorten_date(arrow.get(trade.open_date).humanize(only_distance=True)),
                '{:.2f}'.format(100 * trade.calc_profit(current_rate))
            ])

        columns = ['ID', 'Pair', 'Since', 'Profit']
        df_statuses = DataFrame.from_records(trades_list, columns=columns)
        df_statuses = df_statuses.set_index(columns[0])

        message = tabulate(df_statuses, headers='keys', tablefmt='simple')
        message = "<pre>{}</pre>".format(message)

        send_msg(message, parse_mode=ParseMode.HTML)
项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def _balance(bot: Bot, update: Update) -> None:
    """
    Handler for /balance
    Returns current account balance per crypto
    """
    output = ''
    balances = [
        c for c in exchange.get_balances()
        if c['Balance'] or c['Available'] or c['Pending']
    ]
    if not balances:
        output = '`All balances are zero.`'

    for currency in balances:
        output += """*Currency*: {Currency}
*Available*: {Available}
*Balance*: {Balance}
*Pending*: {Pending}

""".format(**currency)
    send_msg(output)
项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def _count(bot: Bot, update: Update) -> None:
    """
    Handler for /count.
    Returns the number of trades running
    :param bot: telegram bot
    :param update: message update
    :return: None
    """
    if get_state() != State.RUNNING:
        send_msg('`trader is not running`', bot=bot)
        return

    trades = Trade.query.filter(Trade.is_open.is_(True)).all()

    message = tabulate({
        'current': [len(trades)],
        'max': [_CONF['max_open_trades']]
    }, headers=['current', 'max'], tablefmt='simple')
    message = "<pre>{}</pre>".format(message)
    logger.debug(message)
    send_msg(message, parse_mode=ParseMode.HTML)
项目:freqtrade    作者:gcarq    | 项目源码 | 文件源码
def _help(bot: Bot, update: Update) -> None:
    """
    Handler for /help.
    Show commands of the bot
    :param bot: telegram bot
    :param update: message update
    :return: None
    """
    message = """
*/start:* `Starts the trader`
*/stop:* `Stops the trader`
*/status [table]:* `Lists all open trades`
            *table :* `will display trades in a table`
*/profit:* `Lists cumulative profit from all finished trades`
*/forcesell <trade_id>|all:* `Instantly sells the given trade or all trades, regardless of profit`
*/performance:* `Show performance of each finished trade grouped by pair`
*/count:* `Show number of trades running compared to allowed number of trades`
*/balance:* `Show account balance per currency`
*/help:* `This help message`
*/version:* `Show version`
    """
    send_msg(message, bot=bot)
项目:RaiWalletBot    作者:SergiySW    | 项目源码 | 文件源码
def broadcast(bot, update):
    info_log(update)
    bot = Bot(api_key)
    # list users from MySQL
    accounts_list = mysql_select_accounts_balances()
    # some users are bugged & stop broadcast - they deleted chat with bot. So we blacklist them
    BLACK_LIST = mysql_select_blacklist()
    for account in accounts_list:
        # if not in blacklist and has balance
        if ((account[0] not in BLACK_LIST) and (int(account[1]) > 0)):
            mysql_set_blacklist(account[0])
            print(account[0])
            push_simple(bot, account[0], update.message.text.replace('/broadcast ', ''))
            sleep(0.2)
            mysql_delete_blacklist(account[0]) # if someone deleted chat, broadcast will fail and he will remain in blacklist


# bootstrap
项目:RaiWalletBot    作者:SergiySW    | 项目源码 | 文件源码
def monitoring_block_count():
    # set bot
    bot = Bot(api_key)
    count = int(rpc({"action": "block_count"}, 'count'))
    reference_count = int(reference_block_count())

    http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
    response = http.request('GET', summary_url, headers=header, timeout=20.0)
    try:
        json_data = json.loads(response.data)
        community_count = int(json_data['blocks'])
    except ValueError as e:
        community_count = reference_count
    difference = int(math.fabs(community_count - count))

    response = http.request('GET', block_count_url, headers=header, timeout=20.0)
    raiwallet_count = int(response.data)

    if (difference > block_count_difference_threshold):
        # Warning to admins
        for user_id in admin_list:
            push(bot, user_id, 'Block count: {0}\nCommunity: {1}\nDifference: *{2}*\nReference: {3}\nraiwallet.info: {4}'.format(count, community_count, difference, reference_count, raiwallet_count))
        # trying to fix
        bootstrap_multi()
项目:notipy-server    作者:michaelimfeld    | 项目源码 | 文件源码
def get_telegram_bot():
    """
    Creates a telegram bot instance.

    Returns:
        telegram.Bot: The bot instance.

    Raises:
        TelegramConfigurationError: If the telegram bot was not properly
            configured.
    """
    logger = logging.getLogger()
    try:
        return Bot(Config().telegram_token)
    except (FileNotFoundError, ValueError, TelegramError) as exc:
        logger.error("Telegram token not present or invalid: '%s'", str(exc))
        raise TelegramConfigurationError("Telegram token not "
                                         "present or invalid.")
项目:jatayu    作者:debayan    | 项目源码 | 文件源码
def connect(self):
        config = ConfigParser.ConfigParser()
        if self.chatnetwork == 'telegram':
            try:
                config.read(self.keyslocation)
                self.telegram_access_token = config.get('telegram','access_token')
                self.logger.debug("Successfully read access keys")
            except Exception,e:
                self.logger.error("Could not read access keys: %s"%e)
                sys.exit(1)
            try:
                self.telegram_bot = telegram.Bot(token=self.telegram_access_token)
                self.telegram_updater = Updater(token=self.telegram_access_token)
                self.telegram_dispatcher = self.telegram_updater.dispatcher
                self.telegram_dispatcher.addTelegramMessageHandler(self.respond)
            except Exception,e:
                self.logger.error("Telegram bot connect failed: %s"%e)
                sys.exit(1)
        else:
            self.logger.error('Chatnetwork name not correct, found %s'%self.chatnetwork)
            return -1
项目:democracybot    作者:LiaungYip    | 项目源码 | 文件源码
def cmd_list_tokens(bot: telegram.Bot, update: telegram.Update):
    cid = update.message.chat_id

    if generic_checks(bot, update, "listtokens", "", 1):
        # Extract value
        text = update.message.text
        cmd, tag = text.split(" ", 1)

        q = Query()
        tokens = tokens_db.search(q.poll_tag == tag)

        msg = "There have been %i tokens generated for your poll:\n" % len(
            tokens)
        msg = msg + "\n".join(
            ["%i: %s" % (n + 1, t['token']) for n, t in enumerate(tokens)])

        bot.sendMessage(cid, msg)
项目:democracybot    作者:LiaungYip    | 项目源码 | 文件源码
def user_is_in_chat(uid: int, cid: int, bot: telegram.Bot):
    # Returns True if the specified user_id is in the specified chat_id.
    #
    # Returns False if:
    # * the bot isn't in the chat (so it can't see the members), or;
    # * the chat doesn't exist, or;
    # * the user isn't in the chat, or;
    # * the user doesn't exist.

    try:
        user = bot.get_chat_member(chat_id=cid, user_id=uid)
    except telegram.error.BadRequest as err:
        if "chat not found" in err.message.lower():
            logger.info(err_chat_not_found % (uid, cid))
            return False
        elif "user not found" in err.message.lower():
            logger.info(err_user_not_found % (uid, cid))
            return False
        else:
            logger.exception("Unhandled exception")
            raise
    logger.info("User %s is in chat %s" % (uid, cid))
    return True
项目:democracybot    作者:LiaungYip    | 项目源码 | 文件源码
def cmd_vote(bot: telegram.Bot, update: telegram.Update):
    cid = update.message.chat_id
    uid = update.message.from_user.id
    polls = get_polls(uid, bot)

    if update.message.chat.type != "private":
        bot.sendMessage(cid, text_private_chat_only)
        return ConversationHandler.END

    if len(polls) == 0:
        bot.sendMessage(cid, "You aren't eligible to vote in any polls.")
    else:
        keyboard_choices = [p["tag"] + ": " + p["title"] for p in polls]
        # keyboard array is a list of lists
        # because each list represents a new row
        # and we want each button on a separate row
        keyboard_array = [[k, ] for k in keyboard_choices]
        keyboard = ReplyKeyboardMarkup(keyboard_array,
                                       one_time_keyboard=True)
        bot.sendMessage(cid,
                        "Click the button for the poll you would like to vote in.",
                        reply_markup=keyboard)
    return state_vote_1
项目:democracybot    作者:LiaungYip    | 项目源码 | 文件源码
def get_polls(user_id: int, bot: telegram.Bot):
    # Gets a list of polls that the given user_id is allowed to vote in.
    #    * The poll must be active.
    #    * The user must belong to the poll's target_chat. (This is determined
    #      by asking the Telegram API - "Does user 123 belong to chat -456?")
    all_polls = polls_db.all()

    # Convert to set to get unique values
    target_chats = set([p['target_chat'] for p in all_polls
                        if p['active'] == True])

    # Ask telegram API - is user 123 in chat -456?
    chats_user_is_in = [c for c in target_chats
                        if user_is_in_chat(user_id, c, bot)]

    valid_polls = [p for p in all_polls
                   if p['target_chat'] in chats_user_is_in
                   and p['active'] == True]

    return valid_polls
项目:democracybot    作者:LiaungYip    | 项目源码 | 文件源码
def cmd_set_generic(bot: telegram.Bot, update: telegram.Update, field_name: str,
                    cmd_name: str, example: str):
    if generic_checks(bot, update, cmd_name, example, 2):
        # Extract value
        text = update.message.text
        cmd, tag, value = text.split(" ", 2)

        # Apply edit and
        edit_poll(tag, field_name, value)

        # send "success" feedback message
        new_p = get_poll(tag)
        msg = text_edit_successful.format(p=new_p)
        cid = update.message.chat_id
        bot.sendMessage(cid, msg)

        # Check if complete, and suggest to activate
        if check_if_complete(tag):
            msg2 = text_activate_suggestion.format(t=tag)
            bot.sendMessage(cid, msg2, parse_mode="Markdown")
    else:
        # A generic check failed.
        pass

    return
项目:TBot8-1    作者:Danzilla-cool    | 项目源码 | 文件源码
def main():
    global update_id
    # Telegram Bot Authorization Token
    f = open("../token.txt", "r")
    token = f.readline()
    f.close()
    bot = telegram.Bot(token=token)

    # get the first pending update_id, this is so we can skip over it in case
    # we get an "Unauthorized" exception.
    try:
        update_id = bot.getUpdates()[0].update_id
    except IndexError:
        update_id = None

    logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    while True:
        try:
            echo(bot)
        except NetworkError:
            sleep(1)
        except Unauthorized:
            # The user has removed or blocked the bot.
            update_id += 1
项目:BookingBot    作者:manu0466    | 项目源码 | 文件源码
def execute(self, chat_id, bot: Bot, update: Update):
        handled = False
        if update.message.text in self._buildings_dict:
            handled = True
            building_identifier = self._buildings_dict[update.message.text]
            classrooms = self.get_classroom_source().get_classrooms_in_building(building_identifier)
            keyboard_button = []
            counter = 0
            row = -1
            for classroom in classrooms:
                if counter == 0:
                    keyboard_button.append([])
                    row += 1
                keyboard_button[row].append(KeyboardButton("/"+classroom.get_name().lower()))
                counter += 1
                if counter == 3:
                    counter = 0
            keyboard_button.append(["/buildings"])
            reply_keyboard = ReplyKeyboardMarkup(keyboard_button)
            bot.send_message(chat_id=chat_id, text="Available classrooms", reply_markup=reply_keyboard)
        return handled
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def help_about(bot: telegram.Bot, update: telegram.Update):
    bot.send_message(
        chat_id=update.message.chat_id,
        text=strings.HELP_MESSAGE,
        parse_mode='HTML',
        disable_web_page_preview=True,
        reply_markup=strings.HELP_MESSAGE_KEYBOARD
    )

    bot_types.Botan.track(
        uid=update.message.chat_id,
        message=update.message.to_dict(),
        name='help.about'
    )
    logging.info('Command help: about.', extra={
        'telegram': {
            'update': update.to_dict(),
            'chat_id': update.message.chat_id,
            'message_id': update.message.message_id,
        },
        'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
    })
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def settings_message(bot: telegram.Bot, update: telegram.Update):
    chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
                                               admin_name=update.message.from_user.first_name)

    if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
        send_settings_message(bot, chat_settings)

    bot_types.Botan.track(
        uid=update.message.chat_id,
        message=update.message.to_dict(),
        name='settings.main'
    )
    logging.info('Command settings: main.', extra={
        'telegram': {
            'update': update.to_dict(),
            'chat_id': update.message.chat_id,
            'message_id': update.message.message_id,
        },
        'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
    })
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def send_settings_message(bot: telegram.Bot, chat_settings: bot_types.ChatSettings):
    bot.send_message(
        chat_id=chat_settings.id,
        text=strings.SETTINGS_MESSAGE % (
            chat_settings.admin_name,
            str(chat_settings.mode),
            '???????' if chat_settings.quiet else '????????',
            '?????? ????????????? ????' if chat_settings.admin_only else '??? ??????',
            chat_settings.yandex_key if chat_settings.yandex_key else '?? ??????????',
            str(chat_settings.voice),
            str(chat_settings.emotion),
            str(chat_settings.speed),
            '???????????' if chat_settings.as_audio else '????????? ?????????',
            '????????? ?????????' if chat_settings.as_audio else '???????????',
            '?????????' if chat_settings.quiet else '????????',
            '?????' if chat_settings.admin_only else '?????? ????????????? ????'
        ),
        parse_mode='HTML'
    )


# endregion


# region settings voice
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def settings_voice_message(bot: telegram.Bot, update: telegram.Update):
    chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
                                               admin_name=update.message.from_user.first_name)

    if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
        send_settings_voice_message(bot, update.message.chat_id)

    bot_types.Botan.track(
        uid=update.message.chat_id,
        message=update.message.to_dict(),
        name='settings.voice.get'
    )
    logging.info('Command settings: voice.', extra={
        'telegram': {
            'update': update.to_dict(),
            'chat_id': update.message.chat_id,
            'message_id': update.message.message_id,
        },
        'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
    })
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def send_settings_voice_message_callback(bot: telegram.Bot, voice: bot_types.Voice,
                                         callback_query_id: str, quiet: bool):
    if quiet:
        bot.answer_callback_query(
            callback_query_id=callback_query_id
        )
    else:
        bot.answer_callback_query(
            callback_query_id=callback_query_id,
            text=strings.NEW_VOICE_MESSAGE % str(voice)
        )


# endregion


# region settings emotion
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def settings_emotion_message(bot: telegram.Bot, update: telegram.Update):
    chat_settings = bot_types.ChatSettings.from_db(db, update.message.chat, admin_id=update.message.from_user.id,
                                               admin_name=update.message.from_user.first_name)

    if not chat_settings.admin_only or chat_settings.admin_id == update.message.from_user.id:
        send_settings_emotion_message(bot, update.message.chat_id)

    bot_types.Botan.track(
        uid=update.message.chat_id,
        message=update.message.to_dict(),
        name='settings.emotions.get'
    )
    logging.info('Command settings: emotion.', extra={
        'telegram': {
            'update': update.to_dict(),
            'chat_id': update.message.chat_id,
            'message_id': update.message.message_id,
        },
        'id': extentions.TextHelper.get_md5(str(update.message.chat_id) + str(update.message.message_id))
    })
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def send_settings_emotion_message_callback(bot: telegram.Bot, emotion: bot_types.Emotion,
                                           callback_query_id: str, quiet: bool):
    if quiet:
        bot.answer_callback_query(
            callback_query_id=callback_query_id
        )
    else:
        bot.answer_callback_query(
            callback_query_id=callback_query_id,
            text=strings.NEW_EMOTION_MESSAGE % str(emotion)
        )


# endregion


# region settings mode
项目:voiceru_bot    作者:just806me    | 项目源码 | 文件源码
def send_settings_mode_message_callback(bot: telegram.Bot, mode: bot_types.Mode,
                                        callback_query_id: str, quiet: bool):
    if quiet:
        bot.answer_callback_query(
            callback_query_id=callback_query_id
        )
    else:
        bot.answer_callback_query(
            callback_query_id=callback_query_id,
            text=strings.NEW_MODE_MESSAGE % str(mode)
        )


# endregion


# region settings audio
项目:tinder-telegram-bot    作者:arthurdk    | 项目源码 | 文件源码
def set_auto(bot: Bot, update: Update):
    """
    Handles /auto command
    :param bot:
    :param update:
    :return:
    """
    global data
    chat_id = update.message.chat_id
    if chat_id in data.conversations:
        data.conversations[chat_id].auto = not data.conversations[chat_id].auto
        if data.conversations[chat_id].auto:
            message = "Automatic mode enabled."
        else:
            message = "Automatic mode disabled."
        send_custom_message(bot=bot, chat_id=chat_id, message=message)
    else:
        send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
项目:tinder-telegram-bot    作者:arthurdk    | 项目源码 | 文件源码
def send_matches(bot: Bot, update: Update):
    """
    Send list of matches (pictures) to private chat
    :param bot:
    :param update:
    :return:
    """
    global data
    chat_id = update.message.chat_id
    sender_id = update.message.from_user.id
    if chat_id in data.conversations:
        try:
            conversation = data.conversations[chat_id]
            matches = conversation.get_matches()
            id = 0
            # TODO cache the photo urls for some time
            for match in matches:
                photo = match.user.get_photos(width='172')[0]
                try:
                    is_msg_sent = send_private_photo(bot=bot, caption=match.user.name + " ID=" + str(id), url=photo,
                                                     user_id=sender_id)

                    if not is_msg_sent:
                        notify_start_private_chat(bot=bot,
                                                  chat_id=chat_id,
                                                  incoming_message=update.message)
                        break
                    id += 1
                except error.BadRequest:
                    send_custom_message(bot=bot, chat_id=sender_id, message="One match could not be loaded: %s" % photo)
                time.sleep(0.1)
        except AttributeError as e:
            message = "An error happened."
            send_custom_message(bot=bot, chat_id=sender_id, message=message)
    else:
        send_error(bot=bot, chat_id=chat_id, name="account_not_setup")
项目:tinder-telegram-bot    作者:arthurdk    | 项目源码 | 文件源码
def unlink(bot: Bot, update: Update):
    """
    Handles /unlink command
    :param bot:
    :param update:
    :return:
    """
    global data
    sender = update.message.from_user.id
    chat_id = update.message.chat_id
    if chat_id in data.conversations:
        if sender == data.conversations[chat_id].owner:
            del data.conversations[chat_id]
            send_message(bot, chat_id, name="account_unlinked")
        else:
            send_error(bot, chat_id=chat_id, name="command_not_allowed")
    else:
        send_error(bot, chat_id=chat_id, name="account_not_setup")
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_admin_help(self):
        """
            Test admin help command
        """
        bot = Mock(spec=Bot)
        update = self.__get_dummy_update()

        AdminCommands._AdminCommands__admin_help(  # pylint: disable=no-member, protected-access
            bot, update)
        self.assertTrue(bot.sendMessage.called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_get_users_no_config(self):
        """
            Test get users command if no users are registered
        """
        bot = Mock(spec=Bot)
        update = self.__get_dummy_update()

        with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
            usrmgr_mock.return_value.config = {}
            AdminCommands._AdminCommands__get_users(  # pylint: disable=no-member, protected-access
                bot, update)
        self.assertTrue(bot.sendMessage.called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_get_users(self):
        """
            Test get users command
        """
        bot = Mock(spec=Bot)
        update = self.__get_dummy_update()

        with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
            config = {"foogroup": {"users": [{"id": 1337,
                                              "username": "@foouser"}],
                                   "unverified": ["@baruser"]}}
            usrmgr_mock.return_value.config = config
            AdminCommands._AdminCommands__get_users(  # pylint: disable=no-member, protected-access
                bot, update)
        self.assertTrue(bot.sendMessage.called)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_adduser_no_args(self):
        """
            Test adduser command if the wrong number of args is passed
        """
        bot = Mock(spec=Bot)
        update = self.__get_dummy_update()

        AdminCommands._AdminCommands__add_user(  # pylint: disable=no-member, protected-access
            bot, update, [])
        bot.sendMessage.assert_called_with(
            chat_id=1, text="Usage: adduser <user> <group>")
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_adduser_usr_already_in_grp(self):
        """
            Test adduser command if user is already in group
        """
        bot = Mock(spec=Bot)
        update = self.__get_dummy_update()

        with patch("ownbot.admincommands.UserManager") as usrmgr_mock:
            usrmgr_mock.return_value.add_user.return_value = False
            AdminCommands._AdminCommands__add_user(  # pylint: disable=no-member, protected-access
                bot, update, ["@foouser", "foogroup"])
            bot.sendMessage.assert_called_with(
                chat_id=1,
                text="The user '@foouser' is already in the group 'foogroup'!")
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def test_rmuser_no_args(self):
        """
            Test rmuser command if the wrong number of args is passed
        """
        bot = Mock(spec=Bot)
        update = self.__get_dummy_update()

        AdminCommands._AdminCommands__rm_user(  # pylint: disable=no-member, protected-access
            bot, update, [])

        bot.sendMessage.assert_called_with(chat_id=1,
                                           text="Usage: rmuser <user> <group>")