我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用slacker.Error()。
def get_users_info(): """SlackAPI?usersAPI???????? Slacker ? users.list API ????? - https://github.com/os/slacker - https://api.slack.com/methods/users.info :return dict: key?slack_id?value????????? """ users = {} webapi = slacker.Slacker(settings.API_TOKEN) try: for d in webapi.users.list().body['members']: users[d['id']] = d['name'] except slacker.Error: logger.error('Cannot connect to Slack') return users
def is_user_in_group(user_id, group_name): try: # Get list of channels user_groups = slack.channels.list().body.get('channels', []) except slacker.Error as e: print(e) return True user_list = [] for group in user_groups: if group['name'] == group_name: user_list = group['members'] # This returns true if the user_id is in user_list, otherwise false return user_id in user_list # Get all the channel names from Slack
def _react(message, emojis): """ ????? emoji ? reaction ??? """ if isinstance(emojis, str): # tuple ????? emojis = (emojis, ) for emoji in emojis: try: message.react(emoji) except Error as error: # ?????????????????????????? if error.args[0] == 'already_reacted': pass else: raise
def _login_oauth(self): try: oauth = Slacker.oauth.access( client_id=SLACK_CLIENT_ID, client_secret=SLACK_CLIENT_SECRET, code=flask.request.args['code'], redirect_uri=WebServer.url_for('login') ).body except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'oauth', 'msg': str(err)}) return WebServer._basic_page('OAuth error', 'OAuth error: ' + str(err)) token = oauth['access_token'] identity_only = oauth['scope'].count(',') == 1 return self._login_with_token(token, identity_only)
def tokens_validation(self): self.log.info('Validating tokens') for token, enc_key in self.tokens.decrypt_keys_map().items(): self.log.info('Check token %s', token) try: user_info = Slacker(token).auth.test().body SlackArchive.api_call_delay() except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'tokens_validation', 'msg': str(err)}) self.log.exception('Error %s for token %s', str(err), token) del self.tokens[enc_key] continue self.log.info('Valid token') self.tokens.upsert(token, user_info)
def people_fetch_all(self): self.log.info('Fetching people list') try: people = self.api_handle.users.list().body SlackArchive.api_call_delay() except Error as err: self.log.exception('Fetching people list exception %s', str(err)) return # TODO add bulk_op wrapper for mongo_store for person in people['members']: item_id = person['id'] person_dict = dict(self.people[item_id]) \ if item_id in self.people.keys() else {} person_dict['name'] = person['profile']['real_name'] person_dict['login'] = person['name'] person_dict['avatar'] = person['profile']['image_72'] person_dict['active'] = (not person.get('deleted', True) and not person.get('is_bot', True)) self.people[item_id] = person_dict
def fetch_public_messages(self): self.log.info('Fetching public channels messages') self.create_messages_indices() try: chans_list = self.api_handle.channels.list().body random.shuffle(chans_list['channels']) SlackArchive.api_call_delay() except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'fetch_public_messages', 'msg': str(err)}) self.log.exception('Fetch public messages error %s', str(err)) return self.update_streams(chans_list) api_loader = functools.partial(self.api_handle.channels.history, inclusive=0, count=1000) self._fetch_stream_messages(api_loader, chans_list['channels'])
def send_message(self, message="", **kwargs): """Send a message to a user.""" import slacker if kwargs.get(ATTR_TARGET) is None: targets = [self._default_channel] else: targets = kwargs.get(ATTR_TARGET) data = kwargs.get('data') attachments = data.get('attachments') if data else None for target in targets: try: self.slack.chat.post_message(target, message, as_user=self._as_user, username=self._username, icon_emoji=self._icon, attachments=attachments, link_names=True) except slacker.Error as err: _LOGGER.error("Could not send slack notification. Error: %s", err)
def post_slack_message(message=None, channel=None, username=None, icon_emoji=None): """Format the message and post to the appropriate slack channel. Args: message (str): Message to post to slack channel (str): Desired channel. Must start with # """ LOG.debug('Slack Channel: %s\nSlack Message: %s', channel, message) slack = slacker.Slacker(SLACK_TOKEN) try: slack.chat.post_message(channel=channel, text=message, username=username, icon_emoji=icon_emoji) LOG.info('Message posted to %s', channel) except slacker.Error: LOG.info("error posted message to %s", channel)
def open_im_channel(user): try: # Try to open an im channel response = slack.im.open(user) except slacker.Error as e: # If that fails print the error and continue print(e) return None # https://api.slack.com/methods/im.open return response.body.get('channel', {}).get('id')
def get_channel_names(): try: user_groups = slack.channels.list().body.get('channels') except slacker.Error as e: print(e) return [] return {group['name']: group['id'] for group in user_groups} # Get a channel id by name
def get_rtm_uri(): rtm = slack.rtm.start() try: body = rtm.body except slacker.Error as e: print(e) return None return body.get('url') # Check if this is the main application running (not imported)
def _login_with_token(self, token, identity_only): try: api_auth = Slacker(token).auth.test().body assert api_auth['team_id'] == SLACK_TEAM_ID except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'auth.test', 'msg': str(err)}) return WebServer._basic_page('Auth error', 'Auth error: ' + str(err)) except AssertionError: return WebServer._basic_page('Wrong team', 'Wrong team: ' + api_auth['team']) return self._login_success(token, api_auth, identity_only)
def streams_fetch(self, token): enc_key = self.tokens.get_key_by_known_token(token) user_info = self.tokens[enc_key] if user_info['full_access']: try: self.log.info('Fetch channels for %s', user_info['login']) all_ch = Slacker(token).channels.list().body SlackArchive.api_call_delay() self.people.set_field(user_info['user'], 'channels', SlackArchive._filter_channel_ids(all_ch)) self.update_streams(all_ch) # not a duplicate op: fight races self.log.info('Fetch %s\'s private groups', user_info['login']) groups = Slacker(token).groups.list().body SlackArchive.api_call_delay() self.people.set_field(user_info['user'], 'groups', SlackArchive._filter_group_ids(groups)) self.update_streams(groups, user_info['user']) self.log.info('Fetch direct msgs for %s', user_info['login']) ims = Slacker(token).im.list().body SlackArchive.api_call_delay() self.people.set_field(user_info['user'], 'ims', SlackArchive._filter_im_ids(groups, ims)) self.update_streams(ims, user_info['user']) except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': 'channels_fetch', 'msg': str(err)}) self.log.exception('Fetch streams error %s', str(err)) # full access was revoked if str(err) == 'missing_scope': self.tokens.set_field(enc_key, 'full_access', False)
def _fetch_person_ims_history(self, user_info, api_handle): try: ims = api_handle.im.list().body SlackArchive.api_call_delay() except Error as err: self.mongo.db.z_errors.insert_one({'_id': time.time(), 'ctx': ('fetch_person_ims ' + user_info['login']), 'msg': str(err)}) self.log.exception('Fetch person groups error %s', str(err)) return api_loader = functools.partial(api_handle.im.history, inclusive=0, count=1000) self._fetch_stream_messages(api_loader, ims['ims'])
def to_str(obj, encoding='utf-8', **encode_args): r""" Returns a ``str`` of ``obj``, encoding using ``encoding`` if necessary. For example:: >>> some_str = b"\xff" >>> some_unicode = u"\u1234" >>> some_exception = Exception(u'Error: ' + some_unicode) >>> r(to_str(some_str)) b'\xff' >>> r(to_str(some_unicode)) b'\xe1\x88\xb4' >>> r(to_str(some_exception)) b'Error: \xe1\x88\xb4' >>> r(to_str([42])) b'[42]' See source code for detailed semantics. """ # Note: On py3, ``b'x'.__str__()`` returns ``"b'x'"``, so we need to do the # explicit check first. if isinstance(obj, str): return obj # We coerce to unicode if '__unicode__' is available because there is no # way to specify encoding when calling ``str(obj)``, so, eg, # ``str(Exception(u'\u1234'))`` will explode. if isinstance(obj, unicode) or hasattr(obj, "__unicode__"): # Note: unicode(u'foo') is O(1) (by experimentation) return unicode(obj).encode(encoding, **encode_args) return str(obj)
def _check_slack_token(self): # type: () -> None try: r = self.slack.api.test() self._check_if_response_is_successful(r) except SlackError as e: print(e) sys.exit(1) except Exception as e: print(e) sys.exit(1)
def get_service(hass, config): """Get the Slack notification service.""" import slacker try: return SlackNotificationService( config[CONF_CHANNEL], config[CONF_API_KEY], config.get(CONF_USERNAME, None), config.get(CONF_ICON, None)) except slacker.Error: _LOGGER.exception("Slack authentication failed") return None
def random_command(message, subcommand=None): """ ??????????????????????????? - https://github.com/os/slacker - https://api.slack.com/methods/channels.info - https://api.slack.com/methods/users.getPresence - https://api.slack.com/methods/users.info """ if subcommand == 'help': message.send(HELP) return # ??????????????? channel = message.body['channel'] webapi = slacker.Slacker(settings.API_TOKEN) try: cinfo = webapi.channels.info(channel) members = cinfo.body['channel']['members'] except slacker.Error: try: cinfo = webapi.groups.info(channel) members = cinfo.body['group']['members'] except slacker.Error: # TODO: ??????????????? # ??????????????return return # bot ? id ??? bot_id = message._client.login_data['self']['id'] members.remove(bot_id) member_id = None if subcommand != 'active': member_id = random.choice(members) else: # active ??????????? presence ????? random.shuffle(members) for member in members: presence = webapi.users.get_presence(member_id) if presence.body['presence'] == 'active': member_id = member break user_info = webapi.users.info(member_id) name = user_info.body['user']['name'] message.send('{} ?????????'.format(name))
def __init__(self, token, admin_name, default_path, bot_channel_name, notice_channel_name): self.token = token self.slacker = Slacker(self.token) self.default_path = default_path self.bot_channel_name = bot_channel_name self.bot_channel_id = [c_id['id'] for c_id in self.slacker.channels.list().body['channels'] if c_id['name'] == bot_channel_name[1:]][0] self.notice_channel_name = notice_channel_name self.notice_channel_id = [c_id['id'] for c_id in self.slacker.channels.list().body['channels'] if c_id['name'] == notice_channel_name[1:]][0] self.ignore_channel_list = [] self.load_ignore_channel_list() self.ignore_user_list = [] self.load_ignore_user_list() self.id = self.slacker.auth.test().body['user_id'] self.admin_id = self.get_user_id(admin_name) self.keywords = defaultdict(lambda: set()) try: with open(self.default_path+'data/keyword_list.txt', 'r', encoding='utf-8') as f: keyword = 'NONE' for line in f.readlines(): line = line.strip() if line[:10] == 'keyword : ': keyword = line[10:] elif line != '': self.keywords[keyword].add(line) except FileNotFoundError: pass self.kingname_alias = dict() try: with open(self.default_path+'data/kingname_alias.txt', 'r', encoding='utf-8') as f: for line in f.readlines(): line = line.strip().split(maxsplit=1) self.kingname_alias[line[0]] = line[1] except FileNotFoundError: pass self.slacking_dict = defaultdict(lambda: defaultdict(lambda: 0)) self.statistics_dict = defaultdict(lambda: defaultdict(lambda: 0)) self.commands = Commands(self.default_path+'commands.data') self.admin_commands = Commands(self.default_path+'admin_commands.data') self.hello_message = 'Factbot Start running!' self.error_message = 'Error Error <@%s>' % self.admin_id self.stop_message = 'Too many Error... <@%s>' % self.admin_id self.kill_message = 'Bye Bye!' self.spank_messages = ['?? ??', '??? ???', '???...', '<@%s> ?? ?? ? ???? ??' % self.admin_id, '?? ??? ??? ????? ????...\n\n...?? ??? ??!'] self.die_messages = [':innocent::gun:', '????', '?', '??? ?????', '????!', ':overwatch_reaper: ???,, ?? ?? ????...'] self.ALIVE = 0 self.RESTART = 1 self.DIE = 2 self.status = self.ALIVE self.version = '1.4.10'
def _downloader(self, item): lockdir = None try: url, target = item if os.path.exists(target): return base, joiner, name = target.rpartition("/") lockdir = self.lockdir / name try: lockdir.mkdir() except OSError: lockdir = None return meta_file = base + joiner + "meta-" + name + ".txt" try: res = requests.get( url, headers={"Authorization": "Bearer %s" %(self.token, )}, stream=True, timeout=60, ) except Exception as e: print "Error:", e with open_atomic(meta_file) as meta: meta.write("999\nException: %r" %(e, )) return with open_atomic(meta_file) as meta, open_atomic(target) as f: meta.write("%s\n%s" %( res.status_code, "\n".join( "%s: %s" %(key, res.headers[key]) for key in res.headers ), )) for chunk in res.iter_content(4096): f.write(chunk) self.counter += 1 print "Downloaded %s (%s left): %s" %( self.counter, self.pool.qsize(), url, ) except: if item is not None: self.pool.put(item) raise finally: if lockdir is not None: lockdir.rmdir()
def delete_old_files(self, date, confirm): date_str = date.strftime("%Y-%m-%d") dry_run = ( "" if confirm else " (PREVIEW ONLY; use '--confirm-delete' to actaully delete these files)" ) print "Deleting files created before %s... %s" %(date_str, dry_run) def delete_file(x): file_file, file_obj = x try: res = self.slack.files.delete(file_obj["id"]) assert_successful(res) except Error as e: print "Error deleting file %r: %s" %(file_obj["id"], e.message) self._error_count += 1 return self._deleted_count += 1 file_obj["_wayslack_deleted"] = True with open_atomic(str(file_file)) as f: json.dump(file_obj, f) pool = Threadpool(delete_file, queue_size=1, thread_count=10) self._deleted_count = 0 self._skipped_count = 0 self._error_count = 0 for dir in self.path.iterdir(): if dir.name >= date_str: continue for file_file, file_obj in self._iter_files_in_dir(dir): if file_obj.get("_wayslack_deleted"): continue err, file_path = self.archive.downloader.is_file_missing(file_obj) if err: self._skipped_count += 1 if VERBOSE: print "WARNING: %s: %s" %( str(file_file), err, ) print " File:", file_path print " URL:", file_obj["url_private"] continue self._deleted_count += 1 if confirm: if (self._deleted_count + self._error_count + self._skipped_count) % 10 == 0: print self._deleted_msg() pool.put((file_file, file_obj)) pool.join() print "Deleted files: %s%s" %(self._deleted_count, dry_run) if self._skipped_count and self._deleted_count: print "Skipped files: %s (this is 'normal'. See: https://stackoverflow.com/q/44742164/71522; use --verbose for more info)" %(self._skipped_count, ) if self._error_count: print "Errors: %s" %(self._error_count, )