Python tweepy 模块,TweepError() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tweepy.TweepError()

项目:tweet-analysis    作者:D4D3VD4V3    | 项目源码 | 文件源码
def twittercallback():
    verification = request.args["oauth_verifier"]
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    try:
        auth.request_token = session["request_token"]
    except KeyError:
        flash("Please login again", "danger")
        return redirect(url_for("bp.home"))

    try:
        auth.get_access_token(verification)
    except tweepy.TweepError:
        flash("Failed to get access token", "danger")
        return redirect(url_for("bp.home"))

    session["access_token"] = auth.access_token
    session["access_token_secret"] = auth.access_token_secret

    return render_template("twittercallback.html", form=HashtagForm())
项目:picdescbot    作者:elad661    | 项目源码 | 文件源码
def send(self, picture):
        "Send a tweet. `picture` is a `Result` object from `picdescbot.common`"
        retries = 0
        status = None
        filename = picture.url.split('/')[-1]
        data = picture.download_picture()
        try:
            while retries < 3 and not status:
                if retries > 0:
                    self.log.info('retrying...')
                    data.seek(0)
                try:
                    status = self.api.update_with_media(filename=filename,
                                                        status=picture.caption,
                                                        file=data)
                except tweepy.TweepError as e:
                    self.log.error("Error when sending tweet: %s" % e)
                    retries += 1
                    if retries >= 3:
                        raise
                    else:
                        time.sleep(5)
        finally:
            data.close(really=True)
        return status.id
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def log_tweep_error(logger, tweep_error):
    """Log a TweepError exception."""
    if tweep_error.api_code:
        if tweep_error.api_code == 32:
            logger.error("invalid API authentication tokens")
        elif tweep_error.api_code == 34:
            logger.error("requested object (user, Tweet, etc) not found")
        elif tweep_error.api_code == 64:
            logger.error("your account is suspended and is not permitted")
        elif tweep_error.api_code == 130:
            logger.error("Twitter is currently in over capacity")
        elif tweep_error.api_code == 131:
            logger.error("internal Twitter error occurred")
        elif tweep_error.api_code == 135:
            logger.error("could not authenticate your API tokens")
        elif tweep_error.api_code == 136:
            logger.error("you have been blocked to perform this action")
        elif tweep_error.api_code == 179:
            logger.error("you are not authorized to see this Tweet")
        else:
            logger.error("error while using the REST API: %s", tweep_error)
    else:
        logger.error("error with Twitter: %s", tweep_error)
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def get_hydrated(writer, user_ids=None, screen_names=None):
    """Get hydrated Twitter User-objects from a list of user ids and/or screen names."""
    LOGGER.info("get_hydrated() starting")
    ensure_at_least_one(user_ids=user_ids, screen_names=screen_names)
    user_ids = user_ids if user_ids else []
    screen_names = screen_names if screen_names else []

    # initialize config and Twitter API
    config = read_config()
    api = get_oauth_api(config)  # OAuth gives more capacity for the users/lookup API

    # process user ids and/or screen names, storing returned users in JSON format
    num_users = 0
    for chunk in gen_chunks(user_ids, screen_names, size=LOOKUP_USERS_PER_REQUEST):
        try:
            num_users += write_objs(writer, api.lookup_users,
                                    {"user_ids": chunk[0], "screen_names": chunk[1]})
        except TweepError as err:
            log_tweep_error(LOGGER, err)
    LOGGER.info("downloaded %d user(s)", num_users)

    # finished
    LOGGER.info("get_hydrated() finished")
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def get_followers(writer, user_id=None, screen_name=None):
    """Get the ids of the followers for a Twitter user id or screen name."""
    LOGGER.info("get_followers() starting")
    ensure_only_one(user_id=user_id, screen_name=screen_name)

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process user id or screen name, storing returned ids in plain text
    args = {"count": FOLLOWERS_IDS_COUNT}
    if user_id is not None:
        args.update({"user_id": user_id})
    if screen_name is not None:
        args.update({"screen_name": screen_name})
    limit = config.getint("followers", "limit")
    try:
        num_ids = write_ids(writer, api.followers_ids, args, cursored=True, limit=limit)
        LOGGER.info("downloaded %d follower id(s)", num_ids)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_followers() finished")
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def get_friends(writer, user_id=None, screen_name=None):
    """Get the ids of the friends for a Twitter user id or screen name."""
    LOGGER.info("get_friends() starting")
    ensure_only_one(user_id=user_id, screen_name=screen_name)

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process user id or screen name, storing returned ids in plain text
    args = {"count": FRIENDS_IDS_COUNT}
    if user_id is not None:
        args.update({"user_id": user_id})
    if screen_name is not None:
        args.update({"screen_name": screen_name})
    limit = config.getint("friends", "limit")
    try:
        num_ids = write_ids(writer, api.friends_ids, args, cursored=True, limit=limit)
        LOGGER.info("downloaded %d friend id(s)", num_ids)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_friends() finished")
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def get_hydrated(writer, tweet_ids):
    """Get hydrated Tweet-objects from a list of Tweet ids."""
    LOGGER.info("get_hydrated() starting")

    # initialize config and Twitter API
    config = read_config()
    api = get_oauth_api(config)  # OAuth gives more capacity for the statuses/lookup API

    # process Tweet ids, storing returned Tweets in JSON format
    num_tweets = 0
    for chunk in gen_chunks(tweet_ids, size=LOOKUP_STATUSES_PER_REQUEST):
        try:
            num_tweets = write_objs(writer, api.statuses_lookup, {"id_": chunk[0]})
        except TweepError as err:
            log_tweep_error(LOGGER, err)
    LOGGER.info("downloaded %d Tweet(s)", num_tweets)

    # finished
    LOGGER.info("get_hydrated() finished")
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def get_retweets(writer, tweet_id):
    """Get hydrated Retweet-objects for a given Tweet id."""
    LOGGER.info("get_retweets() starting")

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process Tweet id, storing returned Retweets in JSON format
    try:
        num_retweets = write_objs(writer, api.retweets, {"id": tweet_id, "count": RETWEETS_COUNT})
        LOGGER.info("downloaded %d Retweet(s)", num_retweets)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_retweets() finished")
项目:Trending-Places-in-OpenStreetMap    作者:geometalab    | 项目源码 | 文件源码
def tweet_status_trends(self):
        """
        Tweets the top trending places
        """
        logging.info("Updating status with Trending places....")

        try:
            base_text = "Top trending places in #OSM " + DATE.strftime('%d/%m') + ': '
            end_text = ''
            count_available = TWITTER_STATUS_LIMIT - len(base_text) - len(end_text)
            text = Ft().get_cities_from_file(str(DATE.date()), REGION, count_available)
            img = Ft.get_trending_graph(str(DATE.date()), REGION)
            if text:
                self.api.update_with_media(img, base_text + text + end_text)
                self.state['last_tweet'] = time.time()
            else:
                self.api.update_status(ERROR_MSG)
                logging.info("Could not update status. Rechecking in a while....")

        except tweepy.TweepError as e:
            self._log_tweepy_error('Can\'t update status because', e)
项目:twitter_LDA_topic_modeling    作者:kenneth-orton    | 项目源码 | 文件源码
def manage_auth_handlers(auths):
    index = 0
    while True:
        api = auths[index]
        try:
            limit = api.rate_limit_status()
            status_limit = limit['resources']['statuses']['/statuses/user_timeline']['remaining']
            if status_limit > 180:
                return api
        except tweepy.TweepError as e:
            #print('manage_auth_handlers ' + str(e))
            pass
        finally:
            if index == (len(auths) - 1):
                index = 0
            else:
                index += 1
项目:TwitterStockMonitor    作者:semaaJ    | 项目源码 | 文件源码
def get_latest_tweet(self):
        """Checks the twitter handle for new tweets.
           If there has been a new tweet, it will return the Tweet
           to be checked for companies"""

        try:
            latest_tweet = self.api.user_timeline(screen_name=self.handle, count=1)[0]
            tweet = latest_tweet.text.encode('ascii', 'ignore').decode('utf-8')  # Removes emjois

            with open(f'{LATEST_TWEET}{self.handle}.txt', "r") as f:
                old_tweet = f.read()

            if tweet != old_tweet:
                with open(f'{LATEST_TWEET}{self.handle}.txt', 'w') as f:
                    f.write(tweet)

                self.tweet_id = latest_tweet.id_str
                self.tweet = tweet

                return tweet

        except tweepy.TweepError as error:
            logging.debug(error)
项目:TwitterStockMonitor    作者:semaaJ    | 项目源码 | 文件源码
def initial_tweet(self, matches):
        """Tweets when a company is mentioned, along with it's sentiment."""

        sentiment = self.sentiment_analysis()
        sentiment_dict = {"positive": u"\U00002705",
                          "negative": u"\U0000274E",
                          "neutral": u"\U00002796"
                          }

        for comp in matches:
            try:
                self.api.update_status(f'{self.handle} just mentioned {comp.upper()} {sentiment}ly '
                                       f'in their latest tweet! '
                                       f'https://twitter.com/{self.handle}/status/{self.tweet_id}')

            except tweepy.TweepError as error:
                logging.debug(error)
项目:TwitterStockMonitor    作者:semaaJ    | 项目源码 | 文件源码
def share_output(self):
        """Calls difference_in_shares from the Companies module,
            Outputs the data to twitter."""

        share_dict = company.get_company_dict()

        for comp in share_dict:
            try:
                self.api.update_status(
                    f'Since {share_dict[comp]["handle"]} mentioned {comp.upper()}, {share_dict[comp]["day"]} days ago, '
                    f'their shares have changed from {share_dict[comp]["initialSharePrice"]:.2f} to '
                    f"{share_dict[comp]['currentSharePrice']:} that's a {share_dict[comp]['shareChange']:.3f}% change!"
                    )

            except tweepy.TweepError as error:
                logging.debug(error)
项目:tweetopo    作者:zthxxx    | 项目源码 | 文件源码
def get_friends(self, callback, pages_limit=0):
        api = self._api
        user = self._user
        if user.friends_count > _FRIENDS_COUNT_MAX_:
            logging.warning('The user [%d]-[%s] has too many [%d] friends!'
                            % (user.id, user.screen_name, user.friends_count))
            return
        cursor = tweepy.Cursor(api.friends_ids, user_id=user.id, screen_name=user.screen_name)
        friends = []
        try:
            for friends_page in cursor.pages(pages_limit):
                friends.extend(friends_page)
            if callable(callback):
                callback(friends)
        except tweepy.TweepError as e:
            logging.warning([user.id, user.screen_name, e])
项目:ewe_ebooks    作者:jaymcgrath    | 项目源码 | 文件源码
def __init__(self, twitter_username):
        # TODO: Login to twitter for corpus generation using end user's credentials
        auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

        # Connect to Twitter - raise TweepError if we brick out on this
        try:
            api = tweepy.API(auth)
        except tweepy.TweepError:
            # TODO: make sure this error bubbles up and gets handled gracefully
            raise PermissionError("Twitter Auth failed")

        usr = api.get_user(twitter_username)
        self.username = twitter_username
        self.image = usr.profile_image_url
        # Exposes entire api - for debugging only
        # self.api = usr
        self.description = usr.description
        self.screen_name = usr.screen_name
        self.name = usr.name
项目:ewe_ebooks    作者:jaymcgrath    | 项目源码 | 文件源码
def __init__(self, twitter_username):
        # TODO: Login to twitter for corpus generation using end user's credentials
        auth = tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET)
        auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)

        # Connect to Twitter - raise TweepError if we brick out on this
        try:
            api = tweepy.API(auth)
        except tweepy.TweepError:
            # TODO: make sure this error bubbles up and gets handled gracefully
            raise PermissionError("Twitter Auth failed")

        usr = api.get_user(twitter_username)
        self.username = twitter_username
        self.image = usr.profile_image_url
        # Exposes entire api - for debugging only
        # self.api = usr
        self.description = usr.description
        self.screen_name = usr.screen_name
        self.name = usr.name
项目:meteosangue    作者:meteosangue    | 项目源码 | 文件源码
def tweet_status(status, image_path=None):
    try:
        auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET)
        auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET)
        api = tweepy.API(auth)
        if image_path:
            new_tweet = api.update_with_media(image_path, status=status)
        else:
            new_tweet = api.update_status(status)
    except tweepy.TweepError as ex:
        raise MeteoSangueException(ex)

    try:
        mention = '{0} {1}'.format(
            ' '.join([ass['twitter_id'] for ass in settings.BLOOD_ASSOCIATIONS if 'twitter_id' in ass]),
            'Nuovo bollettino meteo ?',
        )
        api.update_status(mention, in_reply_to_status_id=new_tweet.id)
    except tweepy.TweepError as ex:
        pass #Mention is allowed to fail silently
项目:TwitterBotClassifier    作者:JVogel27    | 项目源码 | 文件源码
def check_human_accounts():
    api = get_api(key3[0], key3[1], key3[2], key3[3])
    ids = []
    try:
        with open('..//humans.txt', 'r') as f:
            for line in f:
                ids.append(line.rstrip())
                ids.append('\n')

        os.remove('..//humans.txt')

        with open('..//existing_humans_copy.txt', 'r') as f_read:
            with open('..//humans.txt', 'w') as f_write:
                f_write.write(''.join(ids))
                for line in f_read:
                    user = api.get_user(line.rstrip())
                    print(line)
                    if not user.protected:
                        f_write.write(line)
    except tweepy.TweepError:
        f_read.close()
        f_write.close()
项目:TwitterBotClassifier    作者:JVogel27    | 项目源码 | 文件源码
def check_bot_accounts():
    api = get_api(key2[0], key2[1], key2[2], key2[3])
    ids = []

    try:
        with open('..//bots.txt', 'r') as f:
                for line in f:
                    ids.append(line.rstrip())
                    ids.append('\n')

        os.remove('..//bots.txt')
        with open('..//existing_bots_copy.txt', 'r') as f_read:
            with open('..//bots.txt', 'w') as f_write:
                f_write.write(''.join(ids))
                for line in f_read:
                    user = api.get_user(line.rstrip())
                    print(line)
                    if not user.protected:
                        f_write.write(line)
    except tweepy.TweepError:
        f_read.close()
        f_write.close()
项目:TwitterBotClassifier    作者:JVogel27    | 项目源码 | 文件源码
def get_data_from_humans():
    try:
        x = []
        with open('..//human_x.txt', 'r') as f:
            reader = csv.reader(f)
            for row in reader:
                x.append(row)

        os.remove('..//human_x.txt')

        with open('..//humans_copy.txt', 'r') as f_read:
            with open('..//human_x.txt', 'w') as f_write:
                writer = csv.writer(f_write, lineterminator='\n')
                for row in x:
                    writer.writerow(row)
                for count, line in enumerate(f_read):

                    row = get_data(line.rstrip(), api)

                    writer.writerow(row)

                    print(line)

    except tweepy.TweepError:
        f_write.close()
项目:twitter_search    作者:agalea91    | 项目源码 | 文件源码
def tweet_search(api, query, max_tweets, max_id, since_id, geocode):
    ''' Function that takes in a search string 'query', the maximum
        number of tweets 'max_tweets', and the minimum (i.e., starting)
        tweet id. It returns a list of tweepy.models.Status objects. '''

    searched_tweets = []
    while len(searched_tweets) < max_tweets:
        remaining_tweets = max_tweets - len(searched_tweets)
        try:
            new_tweets = api.search(q=query, count=remaining_tweets,
                                    since_id=str(since_id),
                                    max_id=str(max_id-1))
#                                    geocode=geocode)
            print('found',len(new_tweets),'tweets')
            if not new_tweets:
                print('no tweets found')
                break
            searched_tweets.extend(new_tweets)
            max_id = new_tweets[-1].id
        except tweepy.TweepError:
            print('exception raised, waiting 15 minutes')
            print('(until:', dt.datetime.now()+dt.timedelta(minutes=15), ')')
            time.sleep(15*60)
            break # stop the loop
    return searched_tweets, max_id
项目:cyphon    作者:dunbarcyber    | 项目源码 | 文件源码
def process_request(self, obj):
        """Convert a ReservoirQuery into an API request and get the response.

        Parameters
        ----------
        obj : |ReservoirQuery|

        Returns
        -------
        |Cargo|

        """
        try:
            # TODO(LH): handle rate limit
            cursor = self._get_statuses(obj)
            data = [status._json for status in cursor.items()]
            status_code = 200
        except tweepy.TweepError as error:
            data = []
            status_code = error.api_code

        return Cargo(status_code=status_code, data=data)
项目:aihackathon    作者:nicoheidtke    | 项目源码 | 文件源码
def extract_tweets_from_a_source(self, source):
        if '@' not in source:
            source = '@' + source

        extracted_tweets = []
        try:
            print "Extracting %s..." % source
            max_twitter_id = None
            while True:
                tw = self.API.user_timeline(screen_name=source, count=self.N_TWEETS_PER_REQUEST, max_id=max_twitter_id)
                if not len(tw):
                    break

                extracted_tweets += [self.get_filtered_tweet(t) for t in tw]
                earliest_tweet_date = extracted_tweets[-1]['created_at']
                if earliest_tweet_date < self.from_time:
                    break

                max_twitter_id = extracted_tweets[-1]['id'] - 1
        except TweepError:
            print "Error processing", source

        print "\textracted %d tweets for %s" % (len(extracted_tweets), source)
        self.TWEET_STORAGE += extracted_tweets
        return extracted_tweets
项目:measure    作者:okfn    | 项目源码 | 文件源码
def _handle_twitter_rate_limit(cursor):
    '''Handle twitter rate limits. If rate limit is reached, the next element
    will be accessed again after sleep time'''
    while True:
        try:
            yield cursor.next()
        except tweepy.RateLimitError:
            log.info('Twitter API rate limit error. Sleeping for {} secs.') \
                .format(TWITTER_API_RATE_LIMIT_PERIOD)
            sleep_time = TWITTER_API_RATE_LIMIT_PERIOD
            time.sleep(sleep_time)
        except tweepy.TweepError as e:
            if str(e.api_code) == TWITTER_API_USER_NOT_FOUND_ERROR_CODE:
                raise ValueError(
                    'Requested user was not found. Check your configuration')
            raise e
项目:MoodBot    作者:Bonanashelby    | 项目源码 | 文件源码
def tweet_grab(self, query, count):
        tweets = []
        try:
            tweets_fetched = self.api.search(q=query, count=count)

            for tweet in tweets_fetched:
                tweets_parsed = {}
                tweets_parsed['text'] = tweet.text
                tweets_parsed['sentiment'] = self.tweet_sentiment(tweet.text)

                if tweet.retweet_count > 0:
                    if tweets_parsed not in tweets:
                        tweets.append(tweets_parsed)
                else:
                    tweets.append(tweets_parsed)
            return tweets
        except tweepy.TweepError: # pragma no cover
            print('Error : ' + str(tweepy.TweepError))
项目:tweet-analysis    作者:D4D3VD4V3    | 项目源码 | 文件源码
def twittersignin():
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    try:
        redirect_url = auth.get_authorization_url()
        session["request_token"] = auth.request_token

    except tweepy.TweepError:
        flash("Failed to get request token", "danger")
        return redirect(url_for("bp.home"))
    return redirect(redirect_url)
项目:plexivity    作者:mutschler    | 项目源码 | 文件源码
def twitter():
    auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg",  "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter")
    auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET)

    api = tweepy.API(auth)
    try:
         if api.me().name:
             return redirect(url_for('index'))
    except tweepy.TweepError:
        pass

    redirect_url = auth.get_authorization_url()
    session["request_token"] = auth.request_token
    return redirect(redirect_url)
项目:tensorflow_seq2seq_chatbot    作者:higepon    | 项目源码 | 文件源码
def twitter_bot():
    # Only allocate part of the gpu memory when predicting.
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.2)
    tf_config = tf.ConfigProto(gpu_options=gpu_options)

    consumer_key = os.getenv("consumer_key")
    consumer_secret = os.getenv("consumer_secret")
    access_token = os.getenv("access_token")
    access_token_secret = os.getenv("access_token_secret")

    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    api = tweepy.API(auth)
    with tf.Session(config=tf_config) as sess:
        predictor = predict.EasyPredictor(sess)

        for tweet in tweets():
            status_id, status, bot_flag = tweet
            print("Processing {0}...".format(status.text))
            screen_name = status.author.screen_name
            replies = predictor.predict(status.text)
            if not replies:
                print("no reply")
                continue
            reply_body = replies[0]
            if reply_body is None:
                print("No reply predicted")
            else:
                try:
                    post_reply(api, bot_flag, reply_body, screen_name, status_id)
                except tweepy.TweepError as e:
                    # duplicate status
                    if e.api_code == 187:
                        pass
                    else:
                        raise
            mark_tweet_processed(status_id)
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def bulk_process(logger, output_dir, filename_tmpl, function, func_input, var_arg, resume=False):  # pylint: disable=too-many-arguments
    """Process a function in bulk using an iterable input and a variable argument."""
    if not path.exists(output_dir):
        makedirs(output_dir)
        logger.info("created output directory: %s", output_dir)
    num_processed = 0
    for basename, value in func_input:
        output_filename = path.join(output_dir, filename_tmpl % basename)

        # check if there is a previous processing and skip or resume it
        latest_id = None
        if path.exists(output_filename):
            if not resume:
                logger.warning("skipping existing file: %s", output_filename)
                continue
            latest_id = _get_latest_id(output_filename)

        # process the input element with the provided function
        try:
            logger.info("processing: %s", value)
            args = {var_arg: value}
            if latest_id is not None:
                args.update({"since_id": latest_id})
                logger.info("latest id processed: %d", latest_id)
            with open(output_filename, "a" if resume else "w") as writer:
                function(writer, **args)
            num_processed += 1
        except TweepError:
            logger.exception("exception while using the REST API")
    return num_processed
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def get_timeline(writer, user_id=None, screen_name=None, since_id=0):
    """Get hydrated Tweet-objects from a user timeline."""
    LOGGER.info("get_timeline() starting")
    ensure_only_one(user_id=user_id, screen_name=screen_name)

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process user id or screen name, storing returned Tweets in JSON format
    num_tweets = 0
    args = {"count": TIMELINE_COUNT}
    if user_id is not None:
        args.update({"user_id": user_id})
    if screen_name is not None:
        args.update({"screen_name": screen_name})
    if since_id > 0:
        args.update({"since_id": since_id})
    limit = config.getint("timeline", "limit")
    try:
        num_tweets = write_objs(writer, api.user_timeline, args, cursored=True, limit=limit)
        LOGGER.info("downloaded %d Tweet(s)", num_tweets)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("get_timeline() finished")
项目:python-twitter-toolbox    作者:hhromic    | 项目源码 | 文件源码
def search(writer, query, since_id=0):
    """Get hydrated Tweet-objects using the Search API."""
    LOGGER.info("search() starting")

    # initialize config and Twitter API
    config = read_config()
    api = get_app_auth_api(config)

    # process the query, storing returned Tweets in JSON format
    num_tweets = 0
    search_params = {
        "q": query,
        "count": SEARCH_COUNT,
        "result_type": "recent",
    }
    if since_id > 0:
        search_params.update({"since_id": since_id})
    limit = config.getint("search", "limit")
    try:
        num_tweets = write_objs(writer, api.search, search_params,
                                cursored=True, limit=limit)
        LOGGER.info("downloaded %d Tweet(s)", num_tweets)
    except TweepError as err:
        log_tweep_error(LOGGER, err)

    # finished
    LOGGER.info("search() finished")
项目:aurora    作者:carnby    | 项目源码 | 文件源码
def handle(self, *args, **options):
        api_keys = settings.TWITTER_USER_KEYS
        auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret'])
        auth.set_access_token(api_keys['access_token_key'], api_keys['access_token_secret'])

        new_portraits = Portrait.objects.filter(active=True, demo_portrait=False)

        print('scheduled portraits', new_portraits.count())

        for portrait in new_portraits:
            is_new_portrait = portrait.portrait_content is None

            print('user', portrait.auth_screen_name)
            print('new', is_new_portrait)

            try:
                portrait_api(portrait)
                print('OK')
            except TweepError as err:
                print('ERROR', err)
                portrait.active = False
                portrait.save()
                continue
            except Exception as err:
                print('ERROR', err)
                continue

            # to avoid too many connections
            time.sleep(5)
项目:aurora    作者:carnby    | 项目源码 | 文件源码
def handle(self, *args, **options):
        api_keys = settings.TWITTER_USER_KEYS
        auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret'])

        try:
            redirect_url = auth.get_authorization_url()
        except tweepy.TweepError as e:
            print('Error! Failed to get request token.')
            raise e

        print(redirect_url)
        verifier = input('Verifier:')

        print(auth.get_access_token(verifier=verifier))
项目:coding-with-bots    作者:mtdukes    | 项目源码 | 文件源码
def random_tweet():
    # build a twitter list of five possible tweets
    tweet_list = [
    'Charm was a scheme for making strangers like and trust a person immediately, no matter what the charmer had in mind.',
    "There is no good reason good can't triumph over evil, if only angels will get organized along the lines of the mafia.",
    "Shrapnel was invented by an Englishman of the same name. Don't you wish you could have something named after you?",
    "If your brains were dynamite there wouldn't be enough to blow your hat off.",
    "And so on."
    ]
    x = 0
    # create a loop to tweet twice
    while x < 2:
        try:
            # generate a random variable between 0 and 4
            rand = random.randint(0, 4)
            # use that random variable to grab an item from tweet_list
            api.update_status(tweet_list[rand])
            # take a 30-second break so we don't overwhelm our followers
            sleep(30)
            #increment our counter
            x += 1
        # if there's an error, catch it instead of crashing and print out the error message
        except tweepy.TweepError, e:
            print 'Error sending tweet:', e[0][0]['message']

# a function to tweet a gif
项目:coding-with-bots    作者:mtdukes    | 项目源码 | 文件源码
def gif_tweet():
    try:
        # takes a file location and a text string to update status
        api.update_with_media('../img/pulp-fiction-search.gif','BEHOLD, a gif')
    # if there's an error, catch it instead of crashing and print out the error message
    except tweepy.TweepError, e:
            print 'Error sending tweet:', e[0][0]['message']

# utility function for retrieving secret keys
# from our secret key file
项目:Trending-Places-in-OpenStreetMap    作者:geometalab    | 项目源码 | 文件源码
def on_follow(self, f_id):
        """
        Follow back on being followed
        """
        try:
            self.api.create_friendship(f_id, follow=True)
            self.friends.append(f_id)
            logging.info('Followed user id {}'.format(f_id))
        except tweepy.TweepError as e:
            self._log_tweepy_error('Unable to follow user', e)
项目:Trending-Places-in-OpenStreetMap    作者:geometalab    | 项目源码 | 文件源码
def _follow_all(self):
        """
        follows all followers on initialization
        """
        logging.info("Following back all followers....")
        try:
            self.followers['new'] = list(set(self.followers['existing']) - set(self.friends))
            self._handle_followers()
        except tweepy.TweepError as e:
            self._log_tweepy_error('Can\'t follow back existing followers', e)
项目:Trending-Places-in-OpenStreetMap    作者:geometalab    | 项目源码 | 文件源码
def _check_followers(self):
        """
        Checks followers.
        """
        logging.info("Checking for new followers...")

        try:
            self.followers['new'] = [f_id for f_id in self.api.followers_ids(self.id) if f_id not in self.followers['existing']]
            self.state['last_follow_check'] = time.time()

        except tweepy.TweepError as e:
            self._log_tweepy_error('Can\'t update followers', e)
项目:Seq2Seq-chatbot    作者:wataruhashimoto52    | 项目源码 | 文件源码
def twitter_bot():
    tf_config = tf.ConfigProto(device_count = {"GPU":0}, log_device_placement = True)

    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    api = tweepy.API(auth)

    with tf.Session(config = tf_config) as sess:
        predictor = predict.EasyPredictor(sess)

        for tweet in tweets():
            status_id, status, bot_flag = tweet
            print("Processing {0}...".format(status.text))
            screen_name = status.author.screen_name
            replies = predictor.predict(status.text)
            if not replies:
                print("no reply")
                continue

            reply_body = replies[0]
            if reply_body is None:
                print("No reply predicted")
            else:
                try:
                    if is_contain(status.text, '??????'):
                        special_reply(api, bot_flag, screen_name, status_id, code = 1)
                    elif is_contain(status.text, '????'):
                        special_reply(api, bot_flag, screen_name, status_id, code = 2)

                    elif is_contain(status.text, '?????'):
                        special_reply(api, bot_flag, screen_name, status_id, code = 3)

                    else:
                        post_reply(api, bot_flag, reply_body, screen_name, status_id)
                except tweepy.TweepError as e:
                    if e.api_code == 187:
                        pass
                    else:
                        raise
            mark_tweet_processed(status_id)
项目:twitter_LDA_topic_modeling    作者:kenneth-orton    | 项目源码 | 文件源码
def get_tweets(user_id, api):
    cursor = tweepy.Cursor(api.user_timeline, user_id).pages()
    while True:
        try:
            tweets = [page for page in cursor]

        except tweepy.TweepError as e:
            tweets = []
            api_codes = [401, 404, 500]
            if not str(e): break
            if(int(filter(str.isdigit, str(e))) in api_codes): break
            print('get_tweets: ' + str(e))

    return tweets
项目:twitter_LDA_topic_modeling    作者:kenneth-orton    | 项目源码 | 文件源码
def user_status_count(user_id, api):
    try: 
        user = api.get_user(user_id=user_id)
        if(user.statuses_count):
            count = user.statuses_count

    except tweepy.TweepError as e:
        count = 0
        #print(e.message[0]['message'])

    finally:
        return count
项目:twitter_LDA_topic_modeling    作者:kenneth-orton    | 项目源码 | 文件源码
def verify_working_credentials(api):
    verified = True
    try:
        api.verify_credentials()
    except tweepy.TweepError as e:
        verified = False
    finally:
        return verified
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def send_token():
    global auth
    auth = tweepy.OAuthHandler(consumer_key, consumer_secret, callback_url)

    try:
        redirect_url = auth.get_authorization_url()
        session['request_token'] = auth.request_token
    except tweepy.TweepError:
        print ('Error! Failed to get request token.')

    return redirect(redirect_url)

# callback. once twitter authorizes user, it sends user back to this page
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def get_verification():
    global auth
    #get the verifier key from the request url
    verifier = request.args['oauth_verifier']

    auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
    token = session['request_token']
    del session['request_token']

    auth.request_token = token

    try:
        auth.get_access_token(verifier)
    except tweepy.TweepError:
        print 'Error! Failed to get access token.'

    api = tweepy.API(auth)
    # cache info to avoid rate limit
    access_info['api'] = api
    access_info['following'] = api.friends_ids(api.me().screen_name)
    access_info['followers'] = api.followers_ids(api.me().screen_name)
    access_info["access_token"] = auth.access_token
    access_info["access_token_secret"] = auth.access_token_secret


    return redirect(url_for('twitter_DL'))
项目:twitter_trolls    作者:merqurio    | 项目源码 | 文件源码
def troll_bot_analyzer(user, api):
    try:
        user_data = data_user(user, api)
    except tweepy.TweepError:
        logging.error("This user is protected or does not exist. His  information cannot be accessed")
    else:
        if len(user_data["tweets"]) == 0:
            logging.error("There is not enough information to classify this user")
            return False
        hashtags_per_tweet = float(user_data["number_hashtags"]) / len(user_data["tweets"])
        mentions_per_tweet = float(user_data["number_mentions"]) / len(user_data["tweets"])
        percentage_tweet_with_mention = float(user_data["tweet_with_mentions"]) / len(user_data["tweets"])
        percentage_tweet_with_hashtag = float(user_data["tweets_with_hashtags"]) / len(user_data["tweets"])
        signs_per_char, capitals_per_char, activity, percentage_tweet_with_omg = drama_queen(user_data)
        periodicity, answer = periodicity_answer(user_data)
        diversity_hashtags = tweet_iteration_hashtags(user_data)
        diversity_tweets = tweet_iteration_stemming(user_data)
        urls_percentage = tweet_iteration_urls(user_data)
        num_stalker, who_stalker = stalker(user_data)
        per_drama_queen = percentage_drama_queen(activity, percentage_tweet_with_omg, capitals_per_char, signs_per_char,
                                                 percentage_tweet_with_hashtag, percentage_tweet_with_mention,
                                                 mentions_per_tweet, hashtags_per_tweet)
        per_bot = percentage_bot(periodicity, answer, diversity_tweets)
        per_stalker, famous, non_famous = percentage_stalker(num_stalker, who_stalker, mentions_per_tweet, percentage_tweet_with_mention, api)
        if per_stalker == 0:
            per_stalker = num_stalker
        per_spammer = percentage_spammer(diversity_tweets, diversity_hashtags, urls_percentage)
        per_hater = (1 - sentiment(user_data)) * 100

        max_value = [per_bot, per_drama_queen, per_stalker, per_hater, per_spammer]
        index = max_value.index(max(max_value))
        labels = ["bot", "drama_queen", "stalker", "hater", "spammer"]
        final = labels[index]

        return {"user_id": user, "bot": per_bot, "drama_queen": per_drama_queen, "stalker": per_stalker, "hater": per_hater, "spammer": per_spammer, "famous": famous, "non_famous": non_famous, "stalked": who_stalker, "final": final}
项目:tweetopo    作者:zthxxx    | 项目源码 | 文件源码
def get_user(self, uid=None, name=None):
        if not self._api:
            raise tweepy.TweepError('Api NOT inited!')
        try:
            user = self._api.get_user(user_id=uid, screen_name=name)
            self._user = user
        except tweepy.TweepError as e:
            logging.error('Uid ({0}) and name ({1}) has error: {2}'.format(uid, name, e))
            if e.api_code in self._IGNORE_ERROR_CODES:
                return None
            raise e
        return self
项目:tweetopo    作者:zthxxx    | 项目源码 | 文件源码
def authentication(method):
        def judge(self, *args, **kwargs):
            if not self._api:
                raise tweepy.TweepError('Api NOT inited!')
            if not self._user:
                raise tweepy.TweepError('User NOT inited!')
            method(self, *args, **kwargs)
            return self
        return judge
项目:TwitTools    作者:hazimhanif    | 项目源码 | 文件源码
def get_user_init():
    global status
    global path
    global rev
    try:
        path = 'D:/Twitter/Depth-%d/%s' % (depth_num,str(user_id))
        try:
            os.makedirs(path)
        except FileExistsError:
            print("Folder already exist")
            status="skip"
            return

        data = api.get_user(user_id)
        if data['protected']==False:
            get_user(data)
            rev=1
        else:
            status="skip"
            print("Protected")

    except tweepy.RateLimitError:
        countdown(960)
        get_user_init()

    except tweepy.TweepError as e:
            if tweepy.TweepError is "[{'message': 'Over capacity', 'code': 130}]" or e[0]['code'] is 130:
                countdown_te(600,e)
                get_user_init()
            elif tweepy.TweepError is "[{'message': 'User not found.', 'code':50}]" or e[0]['code'] is 50:
                status="skip"
                return
            else:
                print(e)
项目:TwitTools    作者:hazimhanif    | 项目源码 | 文件源码
def get_id(sn):
    try:
        return(api.get_user(screen_name=sn)['id'])
    except tweepy.RateLimitError:
        countdown(960)
        get_id(sn)

    except tweepy.TweepError as e:
            if tweepy.TweepError is "[{u'message': u'Over capacity', u'code': 130}]" or e is "[{u'message': u'Over capacity', u'code': 130}]":
                countdown_te(600,e)
                get_id(sn)
            else:
                print(e)
项目:TwitTools    作者:hazimhanif    | 项目源码 | 文件源码
def limit_handler(cursor):
    while True:
        try:
            yield cursor.next()
        except tweepy.RateLimitError:
            countdown(960)
        except tweepy.TweepError as e:
            if tweepy.TweepError is "[{u'message': u'Over capacity', u'code': 130}]" or e is "[{u'message': u'Over capacity', u'code': 130}]":
                countdown_te(600,e)
            else:
                print(e)