Python validators 模块,url() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用validators.url()

项目:microurl    作者:francium    | 项目源码 | 文件源码
def route_micro(micro):
    '''
        Micro to real URL redirection handler.
    '''
    try:
        temp = lookup_micro(micro)

        if urlcheck(temp):
            return redirect(temp)
        elif domaincheck(temp):
            return redirect("http://" + temp)
        elif ipcheck(temp.split(':')[0]) and urlcheck('http://' + temp):
            # checks for plain ip or an ip with something after it
            return redirect("http://" + temp)
        else:
            abort(404)
    except Exception as e:
        # If micro is not registered, handle the exception from trying to look
        # it up and raise a 404 HTTP error.
        sys.stderr.write(str(e))
        abort(404)
项目:MalwrAgent    作者:michaelschratt    | 项目源码 | 文件源码
def __do_http_request(self, type_, url, data):
        """make http get and post requests"""
        parsed_url = self.__parse_url(url)
        parameter = self.__get_parameter_from_parsed_url(parsed_url)
        hostname = self.__get_host_from_parsed_url(parsed_url)
        url = hostname + parsed_url.path  # url is overwritten
        payload = {
            parameter: data
        }

        if type_ == 'GET':
            request = requests.get(url, payload)
        elif type_ == 'POST':
            request = requests.post(url, payload)
        else:
            request = None

        return self.__validate_request_status(request)
项目:find-that-charity    作者:TechforgoodCAST    | 项目源码 | 文件源码
def import_extract_main(chars={}, datafile=os.path.join("data", "ccew", "extract_main_charity.csv")):

    with open(datafile, encoding="latin1") as a:
        csvreader = csv.reader(a, doublequote=False, escapechar='\\')
        ccount = 0
        for row in csvreader:
            if len(row) > 1:
                row = clean_row(row)
                if row[1]:
                    chars[row[0]]["company_number"].append({
                        "number": parse_company_number(row[1]),
                        "url": "http://beta.companieshouse.gov.uk/company/" + parse_company_number(row[1]),
                        "source": "ccew"
                    })
                if row[9]:
                    chars[row[0]]["url"] = row[9]
                if row[6]:
                    chars[row[0]]["latest_income"] = int(row[6])
                ccount += 1
                if ccount % 10000 == 0:
                    print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount, end='')
        print('\r', "[CCEW] %s charities read from extract_main_charity.csv" % ccount)

    return chars
项目:find-that-charity    作者:TechforgoodCAST    | 项目源码 | 文件源码
def clean_chars(chars={}, pc_es=None, es_pc_index="postcode", es_pc_type="postcode"):

    ccount = 0
    for c in chars:
        if pc_es:
            geo_data = fetch_postcode(chars[c]["geo"]["postcode"], pc_es, es_pc_index, es_pc_type)
            if geo_data:
                chars[c]["geo"]["location"] = geo_data[0]
                chars[c]["geo"]["areas"] = geo_data[1]

        chars[c]["url"] = parse_url(chars[c]["url"])
        chars[c]["domain"] = get_domain(chars[c]["url"])
        chars[c]['org-ids'] = add_org_id_prefix(chars[c])

        chars[c]["alt_names"] = [n["name"] for n in chars[c]["names"] if n["name"] != chars[c]["known_as"]]

        # @TODO capitalisation of names

        ccount += 1
        if ccount % 10000 == 0:
            print('\r', "[Geo] %s charites added location details" % ccount, end='')
    print('\r', "[Geo] %s charites added location details" % ccount)

    return chars
项目:BTG    作者:conix-security    | 项目源码 | 文件源码
def checkType(self, argument):
        """
            Identify observable type
        """
        if validators.url(argument):
            return "URL"
        elif validators.md5(argument):
            return "MD5"
        elif validators.sha1(argument):
            return "SHA1"
        elif validators.sha256(argument):
            return "SHA256"
        elif validators.sha512(argument):
            return "SHA512"
        elif validators.ipv4(argument):
            return "IPv4"
        elif validators.ipv6(argument):
            return "IPv6"
        elif validators.domain(argument):
            return "domain"
        else:
            mod.display("MAIN", argument, "ERROR", "Unable to retrieve observable type")
            return None
项目:poseidon    作者:sidus-dev    | 项目源码 | 文件源码
def do_import_bookmarks(filename):

    content = []
    first = _("Oops, import failed")
    second = _("could be corrupted or a invalid HTML bookmark file")

    with open(filename) as f: l = f.readlines()

    if not re.findall("<!DOCTYPE NETSCAPE-Bookmark-file-1>", l[0], re.IGNORECASE):
        dialog().error(first, "<span size='small'>\"<b>{}</b>\" {}.</span>".format(filename, second))
        return True

    title = re.findall(r'<a[^>]*>(.*?)</a>', str(l), re.IGNORECASE)
    url = re.findall(r'<a[^>]* href="([^"]*)"', str(l), re.IGNORECASE)

    for c, i in enumerate(title):
        if title[c] and url[c]: content.append([title[c]] + [url[c]])

    return content
项目:poseidon    作者:sidus-dev    | 项目源码 | 文件源码
def do_export_bookmarks(list):

    content = []

    header = "<!DOCTYPE NETSCAPE-Bookmark-file-1><!--This is an automatically generated file.\
    It will be read and overwritten. Do Not Edit! --><META HTTP-EQUIV=\"Content-Type\" CONTENT=\"text/html;\
    charset=UTF-8\"><Title>{}</Title><H1>{}</H1><DL><p>".format(_("Bookmarks"), _("Bookmarks"))
    footer = "</DL><p>"

    content.append(header)

    for i in list:

        timestamp = int(datetime.datetime.strptime(i[0], "%Y-%m-%d %H:%M").timestamp())
        title = i[1]
        url = i[2]

        content.append("<DT><A HREF=\"{}\" ADD_DATE=\"{}\">{}</A>".format(url, timestamp, title))

    content.append(footer)
    content = "".join([s for s in content])

    return content
项目:poseidon    作者:sidus-dev    | 项目源码 | 文件源码
def on_insert_bookmarks(self, title, url):

        with bookmarks_con:    
            cur = bookmarks_con.cursor()
            cur.execute("SELECT * FROM bookmarks;")
            urls = cur.fetchall()

            if len(urls) != 0:
                for i in urls:
                    if url == i[1]:
                        return True

            cur.execute("INSERT INTO bookmarks VALUES(?, ?, ?);",\
            (title.replace("\n","").strip(), url, time.strftime("%Y-%m-%d %H:%M")))

            self.refresh_liststore(1)

            return True
项目:poseidon    作者:sidus-dev    | 项目源码 | 文件源码
def on_js_switch(self, button, active):

        if not set_enable_javascript: return True

        page = self.tabs[self.current_page][0]
        settings = page.webview.get_settings()

        if button.get_active():
            settings.set_property("enable-javascript", True)
            self.js_label.set_markup(self.jse_label_text)
        else:
            settings.set_property("enable-javascript", False)
            self.js_label.set_markup(self.jsd_label_text)

        page.webview.set_settings(settings)
        url = page.webview.get_uri()
        if url and validators.url(url): page.webview.reload()
项目:poseidon    作者:sidus-dev    | 项目源码 | 文件源码
def on_decide_destination(self, download, name):

        url = download.get_request().get_uri()

        if not name: name = get_domain(url).replace(".", "_")
        if not "." in name:

            mime = download.get_response().get_mime_type()
            suf = mime.split("/")
            name = "{}.{}".format(name, suf[1])

        for i in self.dlview:
            for a in i:
                if type(a) == Gtk.ModelButton:
                    if a.get_name().split("/")[-1] == name:
                        self.downloads_menu.show()
                        return True

        if url: pathchooser().save(name, download, url)
项目:poseidon    作者:sidus-dev    | 项目源码 | 文件源码
def dynamic_title(self, view, title):

        url = view.get_uri()

        if not url and not title: title = tab_name
        if not title: title = url

        counter = 0

        for tab, widget in self.tabs:

            widget = self.check_tab(widget, 0)

            if tab.webview is view:
                if widget:
                    widget.set_text(minify(title, 50))
                    widget.set_tooltip_text("")
                    if len(title) > 50: widget.set_tooltip_text(title)

            counter += 1
项目:uclapi    作者:uclapi    | 项目源码 | 文件源码
def is_url_safe(url):
    if not url.startswith("https://"):
        return False

    if not validators.url(url, public=True):
        return False

    whitelist_urls = os.environ["WHITELISTED_CALLBACK_URLS"].split(';')
    if url in whitelist_urls:
        return True

    forbidden_urls = os.environ["FORBIDDEN_CALLBACK_URLS"].split(';')
    for furl in forbidden_urls:
        if furl in url:
            return False

    return True
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def __init__(self, url, max_worker=10, timeout=3,
                 scan_dict=None, verbose=False, status=None):
        self.site_lang = ''
        self.raw_base_url = url
        self.base_url = url
        self.max_worker = max_worker
        self.timeout = timeout
        self.scan_dict = scan_dict
        self.verbose = verbose
        self.first_item = ''
        self.dict_data = {}
        self.first_queue = []
        self.found_items = {}
        if status is None or len(status) == 0:
            self.status = [200, 301, 302, 304, 401, 403]
        else:
            self.status = [int(t) for t in status]
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def on_response(self, url, item, method, response, queue):
        if response.code in self.status:
            if item in self.found_items:
                return
            self.found_items[item] = None
            logger.warning('[Y] %s %s %s' % (response.code, method, url))
            # ??????????????????????
            if any(map(item.endswith, ['.php', '.asp', '.jsp'])):
                bak_list = self.make_bak_file_list(item)
                bak_list = [(t, 'HEAD') for t in bak_list]
                queue.extendleft(bak_list)
        else:
            if response.code == 405 and method != 'POST':
                queue.appendleft((item, 'POST'))

            if self.verbose:
                logger.info('[N] %s %s %s' % (response.code, method, url))
项目:hacker-scripts    作者:restran    | 项目源码 | 文件源码
def prepare_url(self):
        url_parsed = urlparse(self.raw_base_url)
        items = url_parsed.path.split('/')
        if len(items) > 0:
            item = items[-1]
            items = items[:-1]
            new_path = '/'.join(items)
        else:
            item = ''
            new_path = url_parsed.path
        url = urlunparse((url_parsed.scheme, url_parsed.netloc, new_path, '', '', ''))

        if item.endswith('.php'):
            self.site_lang = 'php'
        elif item.endswith('.asp'):
            self.site_lang = 'asp'
        elif item.endswith('.aspx'):
            self.site_lang = 'aspx'

        if self.site_lang != '':
            logger.info('site_lang: %s' % self.site_lang)
        self.base_url = url
        self.first_item = item
        logger.info('base_url: %s' % url)
        logger.info('first_item: %s' % item)
项目:amazon-reviews-scraper    作者:philipperemy    | 项目源码 | 文件源码
def get_comments_based_on_keyword(search):
    logging.info('SEARCH = {}'.format(search))
    url = 'http://www.amazon.co.jp/s/ref=nb_sb_noss_2?url=search-alias%3Daps&field-keywords=' + \
          search + '&rh=i%3Aaps%2Ck%3A' + search
    soup = get_soup(url)
    items = []
    for a in soup.find_all('a', class_='s-access-detail-page'):
        if a.find('h2') is not None and validators.url(a.get('href')):
            name = str(a.find('h2').string)
            link = a.get('href')
            items.append((link, name))
    logging.info('Found {} items.'.format(len(items)))
    for (link, name) in items:
        logging.debug('link = {}, name = {}'.format(link, name))
        product_id = extract_product_id(link)
        get_comments_with_product_id(product_id)
项目:emile-server    作者:gsort    | 项目源码 | 文件源码
def get_paginated_list(results, url, start, size, page_size=settings.PAGINATION_SIZE):
    # check if page exists
    count = size
    # make response
    obj = {}
    obj['start'] = start
    obj['page_size'] = page_size
    obj['count'] = count
    # make URLs
    # make previous url
    if start == 1:
        obj['previous'] = ''
    else:
        start_copy = max(1, start - page_size)
        page_size_copy = start - 1
        obj['previous'] = url + '?start=%d' % (start_copy)
    # make next url
    if start + page_size > count:
        obj['next'] = ''
    else:
        start_copy = start + page_size
        obj['next'] = url + '?start=%d' % (start_copy)
    # finally extract result according to bounds
    obj['results'] = results
    return obj
项目:emile-server    作者:gsort    | 项目源码 | 文件源码
def format_urls_in_text(text):
    new_text = []

    accepted_protocols = ['http://', 'https://', 'ftp://', 'ftps://']

    for word in str(text).split():
        new_word = word
        accepted = [protocol for protocol in accepted_protocols if protocol in new_word]

        if not accepted:
            new_word = 'http://{0}'.format(new_word)

        if validators.url(new_word)==True:
            new_word = '<a href="{0}">{1}</a>'.format(new_word, word)
        else:
            new_word = word
        new_text.append(new_word)

    return ' '.join(new_text)
项目:cumulus-tv-m3u8-loader    作者:curif    | 项目源码 | 文件源码
def loadm3u(url):
  hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
         'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
         'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
         'Accept-Encoding': 'none',
         'Accept-Language': 'en-US,en;q=0.8',
         'Connection': 'keep-alive'}

  req = urllib2.Request(url, headers=hdr)
  response = urllib2.urlopen(req)
  data = response.read()

  if not 'EXTM3U' in data:
    raise Exception(url + " is not a m3u8 file.")

  #return data.encode('utf-8')
  return data
项目:cumulus-tv-m3u8-loader    作者:curif    | 项目源码 | 文件源码
def dictToM3U(cumulustv):
  channels = cumulustv["channels"]
  channelDataMap = [
    ("number", "tvg-id"),
    ("name", "tvg-name"),
    ("logo", "tvg-logo"),
    ("genres", "group-title"),
    ("country", "tvg-country"),
    ("lang", "tvg-language")
  ]
  m3uStr = "#EXTM3U\n"
  for channel in channels:
    m3uStr += "#EXTINF:-1"
    for dataId, extinfId in channelDataMap:
      if channel[dataId] is not None and channel[dataId] != "":
        m3uStr += " " + extinfId + "=\"" + channel[dataId].strip() + "\""
    m3uStr += "," + channel["name"].strip() + "\n"
    m3uStr += channel["url"] + "\n"

  return m3uStr
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def get_api_url():
    """
    Get management url from the config file
    """
    config_key = 'api_url'
    try:
        url = CONFIG.get(URL_SECTION, config_key)
        if validators.url(str(url)):
            return url
        else:
            print_config_error_and_exit(URL_SECTION, 'REST API URL(%s)' % config_key)
    except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
        return DEFAULT_API_URL
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def build_url(nodes):
    """
    Build a url with the given array of nodes for the url and return path and url respectively
    Ordering is important
    """
    path = str.join('/', nodes)
    url = str.join('/', [get_api_url(), path])
    return path, url
项目:lama    作者:CSE-POST    | 项目源码 | 文件源码
def parse_result(self):
        """
        Abstract parse_result method.
        It calls when analyze is finished.
        It uptade malware with indicators.
        """
        if not self._result:
            return

        json_ole = self.json_decode(self._result)
        if not json_ole:
            return

        for item in json_ole:
            if "IOC" in item["type"]:
                score = 7
                if "URL" in item['description'] and validators.url(item['keyword']):
                    extract_malware = self.malware.add_extract_malware(
                        self.module_cls_name, item['keyword'], Type.get_label(Type.URL))
                    Input.analyse_malware(extract_malware)
            elif "AutoExec" in item["type"]:
                score = 7
            elif "Suspicious" in item["type"]:
                score = 5
            elif "VBA string" in item["type"]:
                score = 3
            elif "Hex String" in item["type"]:
                score = 1
            else:
                score = -1

            indicator = Indicator.factory(module_cls_name=self.module_cls_name,
                                          name="item",
                                          content_type=Type.JSON,
                                          content=json.dumps(item),
                                          score=score)
            self._malware.get_module_status(self.module_cls_name
                                            ).add_indicator(indicator)
项目:storperf    作者:opnfv    | 项目源码 | 文件源码
def success():
    try:
        URL = session["url"]
        if URL.find("jobs") is not -1 and URL.find("metadata") is -1:
            data = urllib.urlopen(URL).read()
            data = json.loads(data)
            temp = data["job_ids"]
            if temp:
                info = {}
                for ID in temp:
                    url = URL + "?id=" + ID + "&type=metadata"
                    data_temp = urllib.urlopen(url).read()
                    data_temp = json.loads(data_temp)
                    report_data = get_data(data_temp)[-1]
                    info[ID] = report_data
                return render_template('plot_jobs.html', results=info)
        if validators.url(URL):
            data = urllib.urlopen(URL).read()
        else:
            data = open("./static/testdata/" + URL).read()
        data = json.loads(data)
        response = get_data(data)
        if response[0] == "single":
            metrics, report_data = response[1], response[2]
            results = response[3]
            return render_template('plot_tables.html',
                                   metrics=metrics, report_data=report_data,
                                   results=results)
        else:
            return render_template('plot_multi_data.html',
                                   results=response[1])
    except Exception as e:
        session['server_error'] = e.message + ' ' + repr(e.args)
        return redirect(url_for('file_not_found'))
项目:storperf    作者:opnfv    | 项目源码 | 文件源码
def url():
    if request.method == 'POST':
        url = request.form['url']
        session["url"] = url
        return redirect(url_for('success'))
项目:open-event-scraper    作者:fossasia    | 项目源码 | 文件源码
def validate_result(current, default, type):
    """
    Validates the data, whether it needs to be url, twitter, linkedin link etc.
    """
    if current is None:
        current = ""
    if default is None:
        default = ""
    if type == "URL" and validators.url(current, require_tld=True) and not validators.url(default, require_tld=True):
        return current
    if type == "EMAIL" and validators.email(current) and not validators.email(default):
        return current
    return default
项目:open-event-scraper    作者:fossasia    | 项目源码 | 文件源码
def fetch_tsv_data(gid):
    base_url = 'https://docs.google.com/spreadsheets/d/' + SHEET_ID + '/export?format=tsv'
    url = base_url + '&gid=' + gid
    logging.info('GET ' + url)
    res = urllib2.urlopen(url)
    return res.read()
项目:omnihash    作者:Miserlou    | 项目源码 | 文件源码
def iterate_bytechunks(hashme, is_string, use_json, hash_many):
    """
    Prep our bytes.
    """

    # URL
    if not is_string and validators.url(hashme):
        if not use_json:
            click.echo("Hashing content of URL " + click.style(hashme, bold=True) + "..", err=not hash_many)
        try:
            response = requests.get(hashme)
        except requests.exceptions.ConnectionError as e:
            raise ValueError("Not a valid URL. :(")
        except Exception as e:
            raise ValueError("Not a valid URL. {}.".format(e))
        if response.status_code != 200:
            click.echo("Response returned %s. :(" % response.status_code, err=True)
        bytechunks = response.iter_content()
    # File
    elif os.path.exists(hashme) and not is_string:
        if os.path.isdir(hashme):
            if not use_json:
                click.echo(click.style("Skipping", fg="yellow") + " directory " + "'" + hashme + "'..", err=True)
            return None

        if not use_json:
            click.echo("Hashing file " + click.style(hashme, bold=True) + "..", err=not hash_many)
        bytechunks = FileIter(open(hashme, mode='rb'))
    # String
    else:
        if not use_json:
            click.echo("Hashing string " + click.style(hashme, bold=True) + "..", err=not hash_many)
        bytechunks = (hashme.encode('utf-8'), )

    return bytechunks
项目:quantopian-tools    作者:Gitlitio    | 项目源码 | 文件源码
def test_valid_project_url():
    assert validators.url(quantopian_tools.__project_url__)
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def post(self):
        url = self.get_body_argument("url")
        if not validators.url(url):
            self.set_status(400, "bad URL")
            return
        with self._connect() as connection:
            try:
                createSource(connection, url)
            except sqlite3.IntegrityError:
                self.set_status(400, "duplicate URL")
                return
        self.set_status(201)
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def get(self, url):
        with self._connect() as connection:
            try:
                item = source(connection, url)
                self.write(json_encode(item))
            except IndexError:
                self.set_status(404, "Can't find '%s'" % url)
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def delete(self, url):
        with self._connect() as connection:
            count = delete(connection, url)
            self.set_status(204, "Deleted %s with count %d"% (url, count))
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def post(self):
        url = self.get_body_argument("url")
        if not validators.url(url):
            self.set_status(400, "bad URL")
            return
        with self._connect() as connection:
            try:
                createSource(connection, url)
            except sqlite3.IntegrityError:
                self.set_status(400, "duplicate URL")
                return
        self.set_status(201)
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def get(self, url):
        with self._connect() as connection:
            try:
                item = source(connection, url)
                self.write(json_encode(item))
            except IndexError:
                self.set_status(404, "Can't find '%s'" % url)
项目:MalwrAgent    作者:michaelschratt    | 项目源码 | 文件源码
def __parse_url(url):
        """return urlparsed url"""
        return urlparse(url, allow_fragments=False)
项目:MalwrAgent    作者:michaelschratt    | 项目源码 | 文件源码
def f_http_get(self, url, ):
        """send http get request"""
        return self.__do_http_request('GET', url, self.input)
项目:MalwrAgent    作者:michaelschratt    | 项目源码 | 文件源码
def f_http_post(self, url):
        """send http post request"""
        return self.__do_http_request('POST', url, self.input)
项目:MalwrAgent    作者:michaelschratt    | 项目源码 | 文件源码
def f_retrieve_image(self, url=None):
        """retrieve an image, image is stored in memory only
        :rtype: StringIO object
        :param url: Image URL
        :return: Object of image
        """
        if self.input:
            if not self.input.startswith('http'):
                self.input = 'http://' + self.input
            output = BytesIO(requests.get(self.input).content)
        else:
            output = BytesIO(requests.get(url).content)
        return output
项目:pngify    作者:Cryptoc1    | 项目源码 | 文件源码
def strip_link(url):
    if validators.url(url) == True:
        return url.split("/")[3]
    else:
        exit("[!] Not a valid URL")
项目:pngify    作者:Cryptoc1    | 项目源码 | 文件源码
def main():
    parser = ArgumentParser(description="Transcode text from an imgur image to an mp3")
    parser.add_argument("-l", "--link", dest="link_input", help="The text or image file to transcode", metavar="STRING")
    parser.add_argument("-o", "--output", dest="file_output", help="The file name to output the result into (whether it be an image, or other file type)", metavar="FILE")

    if len(sys.argv) == 1:
        parser.print_help()
        sys.exit()

    args = vars(parser.parse_args())

    if args['link_input'] != None:
        output_filename = args['file_output']
        url = args['link_input']
        if output_filename == None: #if a filename isn't specified, make it the imgur image code
            output_fileWithExtension = strip_link(url)
            output_file = output_fileWithExtension.split(".", 1)[0]
            urllib.urlretrieve(url, "temp_"+output_fileWithExtension)
            os.system("python pngify.py -i temp_"+output_fileWithExtension+" -o "+output_file+".mp3; rm temp_"+output_filename) #run pngify.py then delete temp file
        else: #image name is specified
            o = output_filename.split(".", 1)
            output_file = output_filename.split(".", 1)[0]

            if o[len(o)-1] != "png": #if the user didn't input the file extension, add it in automatically
                output_filename = output_filename+".png"

            urllib.urlretrieve(url, "temp_"+output_filename) #download file
            os.system("python pngify.py -i temp_"+output_filename+" -o "+output_file+".mp3; rm temp_"+output_filename) #run pngify.py then delete temp file
项目:find-that-charity    作者:TechforgoodCAST    | 项目源码 | 文件源码
def parse_url(url):
    if url is None:
        return None

    url = url.strip()

    if validators.url(url):
        return url

    if validators.url("http://%s" % url):
        return "http://%s" % url

    if url in ["n.a", 'non.e', '.0', '-.-', '.none', '.nil', 'N/A', 'TBC',
               'under construction', '.n/a', '0.0', '.P', b'', 'no.website']:
        return None

    for i in ['http;//', 'http//', 'http.//', 'http:\\\\',
              'http://http://', 'www://', 'www.http://']:
        url = url.replace(i, 'http://')
    url = url.replace('http:/www', 'http://www')

    for i in ['www,', ':www', 'www:', 'www/', 'www\\\\', '.www']:
        url = url.replace(i, 'www.')

    url = url.replace(',', '.')
    url = url.replace('..', '.')

    if validators.url(url):
        return url

    if validators.url("http://%s" % url):
        return "http://%s" % url
项目:find-that-charity    作者:TechforgoodCAST    | 项目源码 | 文件源码
def get_domain(url=None, email=None):
    if url is None:
        return None
    u = urlparse(url)
    domain = u.netloc
    if domain.startswith('www.'):
        domain = domain[4:]
    return domain
项目:linkerUniverse    作者:LinkerNetworks    | 项目源码 | 文件源码
def enumerate_http_resources(package, package_path):
    with (package_path / 'resource.json').open() as json_file:
        resource = json.load(json_file)

    for name, url in resource.get('images', {}).items():
        if name != 'screenshots':
            yield url, pathlib.Path(package, 'images')

    for name, url in resource.get('assets', {}).get('uris', {}).items():
        yield url, pathlib.Path(package, 'uris')

    for k in resource.get('cli', {}).get('binaries', {}):
        url = resource.get('cli', {}).get('binaries', {}).get(k, {}).get('x86-64', {}).get('url', '')
        yield url, pathlib.Path(package, 'cli', k)

    command_path = (package_path / 'command.json')
    if command_path.exists():
        with command_path.open() as json_file:
            commands = json.load(json_file)

        for url in commands.get("pip", []):
            if url.startswith('http'):
                yield url, pathlib.Path(package, 'commands')

    def traverse_yield(d, key='root'):
        if isinstance(d, dict):
            if 'default' in d and str(d['default']).startswith('http'):
                url = d['default']
                if valid_download(url):
                    yield url, pathlib.Path(package, 'config', key)
            else:
                for k, v in d.items():
                    yield from traverse_yield(v, k)

    config_path = (package_path / 'config.json')
    if config_path.exists():
        with config_path.open() as json_file:
            config = json.load(json_file)
            yield from traverse_yield(config)
项目:linkerUniverse    作者:LinkerNetworks    | 项目源码 | 文件源码
def add_http_resource(dir_path, url, base_path):
    archive_path = (dir_path / base_path / pathlib.Path(urllib.parse.urlparse(url).path).name)
    print('Adding {} at {}.'.format(url, archive_path))
    os.makedirs(str(archive_path.parent), exist_ok=True)
    urllib.request.urlretrieve(url, str(archive_path))
项目:linkerUniverse    作者:LinkerNetworks    | 项目源码 | 文件源码
def valid_download(url):
    return bool(validators.url(url)) and int(httplib2.Http().request(url, 'HEAD')[0]['status']) < 400
项目:Electrify    作者:jyapayne    | 项目源码 | 文件源码
def url_exists(path):
    if validators.url(path):
        return True
    return False
项目:aws_pwn    作者:dagrz    | 项目源码 | 文件源码
def validate_account_signin(account):
    result = {
        'accountAlias': None,
        'accountId': None,
        'signinUri': 'https://' + account + '.signin.aws.amazon.com/',
        'exists': False,
        'error': None
    }

    if re.match(r'\d{12}', account):
        result['accountId'] = account
    else:
        result['accountAlias'] = account

    if not validators.url(result['signinUri']):
        result['error'] = 'Invalid URI'
        return result

    try:
        r = requests.get(result['signinUri'], allow_redirects=False)
        if r.status_code == 302:
            result['exists'] = True
    except requests.exceptions.RequestException as e:
        result['error'] = e

    return result
项目:bruteforce-http-auth    作者:erforschr    | 项目源码 | 文件源码
def run(self):

        while True:
            if self.count >= self.workers_number:
                break

            try:
                test = self.queue_logging.get()

                if test is None:
                    self.count += 1

                else:
                    if test['success']:
                        log.success('Authentication successful: Username: \"{}\" Password: \"{}\" URL: {}'.format(test['username'], test['password'], test['target']['url']))

                    else:
                        if self.verbose:
                            log.debug('Authentication failed: Username: \"{}\" Password: \"{}\" URL: {}'.format(test['username'], test['password'], test['target']['url']))

                    self.progress += 1

                    if self.progress % 10 == 0:
                        if self.verbose:
                            log.info('Progress : {}'.format(self.progress))
                        else:
                            log.info('Progress : {}'.format(self.progress), update=True)

            except Exception as e:
                traceback.print_exc()
                log.error('WorkerLogging => {} : {}'.format(type(e), e))

        log.info('Progress : {} (end)'.format(self.progress))
项目:LC-QuAD    作者:AskNowQA    | 项目源码 | 文件源码
def has_url(_string):
    if validators.url(_string):
        return True
    return False