Python lxml.etree 模块,_Element() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lxml.etree._Element()

项目:talonspider    作者:howie6879    | 项目源码 | 文件源码
def extract_value(self, html):
        """
        Use css_select or re_select to extract a field value
        :return: 
        """
        value = ''
        if self.css_select:
            value = html.cssselect(self.css_select)
        elif self.xpath_select:
            value = html.xpath(self.xpath_select)
        else:
            raise ValueError('%s field: css_select or xpath_select is expected' % self.__class__.__name__)
        if isinstance(value, list) and len(value) == 1:
            if isinstance(value[0], etree._Element):
                text = ''
                for node in value[0].itertext():
                    text += node.strip()
                value = text
        return value
项目:dpspider    作者:doupengs    | 项目源码 | 文件源码
def xpathOne(self,xpath):
        '''
        :param xpath: <class str|xpath expression>
        :function: xpath match one
        :return: <class Parser>
        '''
        try:
            labels = self._html.xpath(xpath)
        except Exception as e:
            printText("[Error]parser.py Parser xpathOne:%s %s %s"%(e,xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug)
            return Parser(data='',url=self.url,logFile=self.logFile,color=self.color,debug=self.debug)
        if len(labels) > 0:
            label = labels[0]
            return Parser(data=etree.tostring(label,encoding="unicode",method="html"),url=self.url,logFile=self.logFile,color=self.color,
            debug=self.debug) if isinstance(label,etree._Element) else Parser(data=label,url=self.url,logFile=self.logFile,color=self.color,debug=self.debug)
        else:
            printText("[WARING]parser.py Parser xpathOne parse None:%s %s"%(xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug)
            return Parser(data='',url=self.url,logFile=self.logFile,color=self.color,debug=self.debug)
项目:dpspider    作者:doupengs    | 项目源码 | 文件源码
def xpathAll(self,xpath):
        '''
        :param xpath: <class str|xpath expression>
        :function: xpath match all
        :return: [<class Parser>,<class Parser>...]
        '''
        try:
            labels = self._html.xpath(xpath)
        except Exception as e:
            printText("[Error]parser.py Parser xpathAll:%s %s %s"%(e,xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug)
            return []
        if len(labels)>0:
            return [Parser(data=etree.tostring(label,encoding="unicode",method="html"),url=self.url,logFile=self.logFile,color=self.color,
            debug=self.debug) if isinstance(label,etree._Element) else Parser(data=label,url=self.url,logFile=self.logFile,color=self.color,
            debug=self.debug) for label in labels]
        else:
            printText("[WARING]parser.py Parser xpathAll parse None:%s %s"%(xpath,self.url),logFile=self.logFile,color=self.color,debug=self.debug)
            return []
项目:leetcode    作者:thomasyimgit    | 项目源码 | 文件源码
def _get_root(self, value):
        if isinstance(value, basestring):
            root = fromstring(unicode('<root>') + value + unicode('</root>'),
                              self.parser)[0]
        elif isinstance(value, etree._Element):
            root = self._copy(value)
        elif isinstance(value, PyQuery):
            root = value
        else:
            raise TypeError(
                'Value must be string, PyQuery or Element. Got %r' % value)
        if hasattr(root, 'text') and isinstance(root.text, basestring):
            root_text = root.text
        else:
            root_text = ''
        return root, root_text
项目:pykeepass    作者:pschmitt    | 项目源码 | 文件源码
def __init__(self, name=None, element=None, icon=None):
        if element is None:
            element = Element('Group')
            name = xmlfactory.create_name_element(name)
            uuid = xmlfactory.create_uuid_element()
            element.append(uuid)
            element.append(name)
            if icon:
                icon_el = xmlfactory.create_icon_element(icon)
                element.append(icon_el)
        assert type(element) in [_Element, Element, ObjectifiedElement], \
            'The provided element is not an LXML Element, but {}'.format(
                type(element)
            )
        assert element.tag == 'Group', 'The provided element is not a Group '\
            'element, but a {}'.format(element.tag)
        self._element = element
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def merge_nodes(src: etree._Element, dst: etree._Element):
    """ Merges the node ``src`` including their subelements to ``dst``. The
        Nodes are considered as equal - and thus merged - if their fully qualified names are
        identical.
        Different matching and merging strategies will be added as needed.
    """
    def child_with_qname(element: etree._Element, qname: etree.QName):
        for child in element.iterchildren(qname.text):
            if etree.QName(child).text == qname.text:
                return child

    merged_elements = set()

    for child in dst.iterchildren():
        twin = child_with_qname(src, etree.QName(child))
        if twin is not None:
            merge_nodes(twin, child)
            merged_elements.add(twin)

    for child in src.iterchildren():
        if child in merged_elements:
            continue
        dst.append(deepcopy(child))
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def __call__(self, transformation_root: etree._Element,
                 copy: bool = None, **context: AnyType) -> AnyType:
        copy = self.config.copy if copy is None else copy
        self._init_transformation(transformation_root, copy, context)

        for step in self.steps:
            _step_name = step.name if hasattr(step, 'name') else step.__name__
            dbg("Processing rule '{}'.".format(_step_name))

            self.states.current_step = step
            try:
                if isinstance(step, Rule):
                    self._apply_rule(step)
                else:
                    self._apply_handlers(step)
            except AbortTransformation:
                dbg("Aborting due to 'AbortTransformation'.")
                break

        if self.config.result_object:
            result = dot_lookup(self, self.config.result_object)
        else:
            result = None
        self._finalize_transformation()
        return result
项目:Tencent_Cartoon_Download    作者:Fretice    | 项目源码 | 文件源码
def _get_root(self, value):
        if isinstance(value, basestring):
            root = fromstring(unicode('<root>') + value + unicode('</root>'),
                              self.parser)[0]
        elif isinstance(value, etree._Element):
            root = self._copy(value)
        elif isinstance(value, PyQuery):
            root = value
        else:
            raise TypeError(
                'Value must be string, PyQuery or Element. Got %r' % value)
        if hasattr(root, 'text') and isinstance(root.text, basestring):
            root_text = root.text
        else:
            root_text = ''
        return root, root_text
项目:dpparser    作者:doupengs    | 项目源码 | 文件源码
def xpathone(self, xpath):
        """
        :param xpath: xpath expression
        :function: xpath match one
        :return: class Parser
        """
        try:
            labels = self.__html.xpath(xpath)
        except Exception as e:
            Logger.error("%s [-] <xpath> %s <url> %s" % (e, xpath, self.url))
            return Parser(data='', url=self.url)
        if len(labels) > 0:
            label = labels[0]
            return Parser(
                data=etree.tostring(label, encoding="unicode", method="html"),
                url=self.url
            ) if isinstance(label, etree._Element) else Parser(data=label, url=self.url)
        else:
            Logger.warning("The parsing result is None [-] <xpath> %s <url> %s" % (xpath, self.url))
            return Parser(data='', url=self.url)
项目:dpparser    作者:doupengs    | 项目源码 | 文件源码
def xpathall(self, xpath):
        """
        :param xpath: xpath expression
        :function: xpath match all
        :return: [class Parser, class Parser, ...]
        """
        try:
            labels = self.__html.xpath(xpath)
        except Exception as e:
            Logger.error("%s [-] <xpath> %s <url> %s" % (e, xpath, self.url))
            return []
        if len(labels) > 0:
            return [Parser(data=etree.tostring(label, encoding="unicode", method="html"), url=self.url)
                    if isinstance(label, etree._Element) else Parser(data=label, url=self.url) for label in labels]
        else:
            Logger.warning("The parsing result is None [-] <xpath> %s <url> %s" % (xpath, self.url))
            return []
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def Query(self, uri):
        """Performs a query and returns a resulting feed or entry.

        Args:
          uri: A string representing the URI of the feed that is to be queried.

        Returns:
          On success, a tuple in the form:
          (boolean succeeded=True, ElementTree._Element result)
          On failure, a tuple in the form:
          (boolean succeeded=False, {'status': HTTP status code from server,
                                     'reason': HTTP reason from the server,
                                     'body': HTTP body of the server's response})
        """
        result = self.Get(uri)
        return result
项目:webmon    作者:KarolBedkowski    | 项目源码 | 文件源码
def _get_elements_by_xpath(filter_, data, expression):
    try:
        from lxml import etree
    except ImportError:
        raise common.FilterError(filter_, "module lxml not found")
    # pylint: disable=no-member
    html_parser = etree.HTMLParser(encoding='utf-8', recover=True,
                                   strip_cdata=True)
    document = etree.fromstringlist([data], html_parser)
    for elem in document.xpath(expression):
        # pylint: disable=protected-access
        if isinstance(elem, etree._Element):
            text = etree.tostring(elem)
        else:
            text = str(elem)
        if isinstance(text, str):
            yield text
        else:
            yield text.decode('utf-8')
项目:webmon    作者:KarolBedkowski    | 项目源码 | 文件源码
def _filter(self, item: str, result: common.Result) -> ty.Iterable[str]:
        try:
            from lxml import etree
        except ImportError:
            raise common.FilterError(self, "module lxml not found")
        # pylint: disable=no-member
        html_parser = etree.HTMLParser(encoding='utf-8', recover=True,
                                       strip_cdata=True)
        document = etree.fromstringlist([item], html_parser)
        for elem in document.findall(".//*[@id='" + self._conf["sel"] + "']"):
            # pylint: disable=protected-access
            if isinstance(elem, etree._Element):
                text = etree.tostring(elem)  # type: ty.Union[str, bytes]
                if text:
                    if hasattr(text, 'decode'):
                        yield text.decode('utf-8')
                    else:
                        yield str(text)
            else:
                yield str(elem)
项目:python-zeep    作者:mvantellingen    | 项目源码 | 文件源码
def test_any_value_element_tree():
    schema = get_any_schema()

    item = etree.Element('{http://tests.python-zeep.org}lxml')
    etree.SubElement(item, 'node').text = 'foo'

    container_elm = schema.get_element('{http://tests.python-zeep.org/}container')

    # Create via arg
    obj = container_elm(item)
    node = etree.Element('document')
    container_elm.render(node, obj)
    expected = """
        <document>
            <ns0:container xmlns:ns0="http://tests.python-zeep.org/">
                <ns0:lxml xmlns:ns0="http://tests.python-zeep.org">
                    <node>foo</node>
                </ns0:lxml>
            </ns0:container>
        </document>
    """
    assert_nodes_equal(expected, node)
    item = container_elm.parse(node.getchildren()[0], schema)
    assert isinstance(item._value_1, etree._Element)
    assert item._value_1.tag == '{http://tests.python-zeep.org}lxml'
项目:python-zeep    作者:mvantellingen    | 项目源码 | 文件源码
def _render_value_item(self, parent, value, render_path):
        if value is None:  # can be an lxml element
            return

        elif isinstance(value, etree._Element):
            parent.append(value)

        elif self.restrict:
            if isinstance(value, list):
                for val in value:
                    self.restrict.render(parent, val, None, render_path)
            else:
                self.restrict.render(parent, value, None, render_path)
        else:
            if isinstance(value.value, list):
                for val in value.value:
                    value.xsd_elm.render(parent, val, render_path)
            else:
                value.xsd_elm.render(parent, value.value, render_path)
项目:python-zeep    作者:mvantellingen    | 项目源码 | 文件源码
def _validate_item(self, value, render_path):
        if value is None:  # can be an lxml element
            return

        # Check if we received a proper value object. If we receive the wrong
        # type then return a nice error message
        if self.restrict:
            expected_types = (etree._Element, dict,) + self.restrict.accepted_types
        else:
            expected_types = (etree._Element, dict, AnyObject)

        if not isinstance(value, expected_types):
            type_names = [
                '%s.%s' % (t.__module__, t.__name__) for t in expected_types
            ]
            err_message = "Any element received object of type %r, expected %s" % (
                type(value).__name__, ' or '.join(type_names))

            raise TypeError('\n'.join((
                err_message,
                "See http://docs.python-zeep.org/en/master/datastructures.html"
                "#any-objects for more information"
            )))
项目:python-zeep    作者:mvantellingen    | 项目源码 | 文件源码
def load(self, schema, node):
        """Load the XML Schema passed in via the node attribute.

        :type schema: zeep.xsd.schema.Schema
        :type node: etree._Element

        """
        if node is None:
            return

        if not schema.documents.has_schema_document_for_ns(self._target_namespace):
            raise RuntimeError(
                "The document needs to be registered in the schema before " +
                "it can be loaded")

        # Disable XML schema validation for now
        # if len(node) > 0:
        #     self.xml_schema = etree.XMLSchema(node)
        visitor = SchemaVisitor(schema, self)
        visitor.visit_schema(node)
项目:python-xml    作者:emusical    | 项目源码 | 文件源码
def map_node_to_class(self, node):
        if isinstance(node, etree._ProcessingInstruction):
            return nodes.ProcessingInstruction
        elif isinstance(node, etree._Comment):
            return nodes.Comment
        elif isinstance(node, etree._ElementTree):
            return nodes.Document
        elif isinstance(node, etree._Element):
            return nodes.Element
        elif isinstance(node, LXMLAttribute):
            return nodes.Attribute
        elif isinstance(node, LXMLText):
            if node.is_cdata:
                return nodes.CDATA
            else:
                return nodes.Text
        raise exceptions.Xml4hImplementationBug(
            'Unrecognized type for implementation node: %s' % node)
项目:python-xml    作者:emusical    | 项目源码 | 文件源码
def new_impl_element(self, tagname, ns_uri=None, parent=None):
        if ns_uri is not None:
            if ':' in tagname:
                tagname = tagname.split(':')[1]
            my_nsmap = {None: ns_uri}
            # Add any xmlns attribute prefix mappings from parent's document
            # TODO This doesn't seem to help
            curr_node = parent
            while curr_node.__class__ == etree._Element:
                for n, v in curr_node.attrib.items():
                    if '{%s}' % nodes.Node.XMLNS_URI in n:
                        _, prefix = n.split('}')
                        my_nsmap[prefix] = v
                curr_node = self.get_node_parent(curr_node)
            return etree.Element('{%s}%s' % (ns_uri, tagname), nsmap=my_nsmap)
        else:
            return etree.Element(tagname)
项目:python-xml    作者:emusical    | 项目源码 | 文件源码
def find_node_elements(self, node, name='*', ns_uri='*'):
        # TODO Any proper way to find namespaced elements by name?
        name_match_nodes = node.getiterator()
        # Filter nodes by name and ns_uri if necessary
        results = []
        for n in name_match_nodes:
            # Ignore the current node
            if n == node:
                continue
            # Ignore non-Elements
            if not n.__class__ == etree._Element:
                continue
            if ns_uri != '*' and self.get_node_namespace_uri(n) != ns_uri:
                continue
            if name != '*' and self.get_node_local_name(n) != name:
                continue
            results.append(n)
        return results
项目:python-xml    作者:emusical    | 项目源码 | 文件源码
def lookup_ns_prefix_for_uri(self, node, uri):
        if uri == nodes.Node.XMLNS_URI:
            return 'xmlns'
        result = None
        if hasattr(node, 'nsmap') and uri in node.nsmap.values():
            for n, v in node.nsmap.items():
                if v == uri:
                    result = n
                    break
        # TODO This is a slow hack necessary due to lxml's immutable nsmap
        if result is None or re.match('ns\d', result):
            # We either have no namespace prefix in the nsmap, in which case we
            # will try looking for a matching xmlns attribute, or we have
            # a namespace prefix that was probably assigned automatically by
            # lxml and we'd rather use a human-assigned prefix if available.
            curr_node = node  # self.get_node_parent(node)
            while curr_node.__class__ == etree._Element:
                for n, v in curr_node.attrib.items():
                    if v == uri and ('{%s}' % nodes.Node.XMLNS_URI) in n:
                        result = n.split('}')[1]
                        return result
                curr_node = self.get_node_parent(curr_node)
        return result
项目:VulnWhisperer    作者:austin-taylor    | 项目源码 | 文件源码
def format_payload(self, api_version, data):
        """ Return appropriate QualysGuard API call.

        """
        # Check if payload is for API v1 or API v2.
        if (api_version in (1, 2)):
            # Check if string type.
            if type(data) == str:
                # Convert to dictionary.
                logger.debug('Converting string to dict:\n%s' % data)
                # Remove possible starting question mark & ending ampersands.
                data = data.lstrip('?')
                data = data.rstrip('&')
                # Convert to dictionary.
                data = urllib.parse.parse_qs(data)
                logger.debug('Converted:\n%s' % str(data))
        elif api_version in ('am', 'was', 'am2'):
            if type(data) == etree._Element:
                logger.debug('Converting lxml.builder.E to string')
                data = etree.tostring(data)
                logger.debug('Converted:\n%s' % data)
        return data
项目:fnapy    作者:alexandriagroup    | 项目源码 | 文件源码
def __init__(self, content):
        # we need to turn content into valid utf8
        content = to_unicode(content, encoding='iso-8859-1').encode('utf-8')


        # content is a string
        if len(content) != 0:
            self.dict = xml2dict(content)
            # etree._Element
            self.element = etree.fromstring(content)
        else:
            self.dict = OrderedDict()
            self.element = etree.Element('empty_content')

        # Raw XML
        self.xml = content

        self.tag = re.sub(pattern='{[^}]+}', repl='', string=self.element.tag,
                          flags=0)
项目:py-openmath    作者:OpenMath    | 项目源码 | 文件源码
def elements_equal(e1, e2):
    """ Checks if two xml elements are equal.

    :param e1: First element to check.
    :type e1: etree._Element

    :param e2: Seconds element to check.
    :type e2: etree._Element

    Adapted from http://stackoverflow.com/a/24349916.

    :rtype: bool
    """

    if e1.tag != e2.tag:
        return False

    e1te = e1.text.strip() if e1.text is not None else ''
    e2te = e2.text.strip() if e2.text is not None else ''
    if e1te != e2te:
        return False

    e1ta = e1.tail.strip() if e1.tail is not None else ''
    e2ta = e2.tail.strip() if e2.tail is not None else ''
    if e1ta != e2ta:
        return False
    if e1.attrib != e2.attrib:
        return False
    if len(e1) != len(e2):
        return False

    return all(elements_equal(c1, c2) for c1, c2 in zip(e1, e2))
项目:talonspider    作者:howie6879    | 项目源码 | 文件源码
def __init__(self, html):
        if html is None or not isinstance(html, etree._Element):
            raise ValueError("etree._Element is expected")
        for field_name, field_value in self._fields.items():
            get_field = getattr(self, 'tal_%s' % field_name, None)
            value = field_value.extract_value(html) if isinstance(field_value, BaseField) else field_value
            if get_field:
                value = get_field(value)
            setattr(self, field_name, value)
项目:pyhpecw7    作者:HPENetworking    | 项目源码 | 文件源码
def staged_to_string(self):
        """Convert the staging area to a list of strings.

        Returns:
            A list of string representing the configuration in the
            staging area.
        """
        cfgs = []
        for cfg in self.staged:
            if isinstance(cfg['config'], etree._Element):
                cfgs.append(etree.tostring(cfg['config']))
            else:
                cfgs.append(cfg['config'])

        return cfgs
项目:pykeepass    作者:pschmitt    | 项目源码 | 文件源码
def __init__(self, title=None, username=None, password=None, url=None,
                 notes=None, tags=None, expires=False, expiry_time=None,
                 icon=None, element=None):
        if element is None:
            element = Element('Entry')
            title = xmlfactory.create_title_element(title)
            uuid = xmlfactory.create_uuid_element()
            username = xmlfactory.create_username_element(username)
            password = xmlfactory.create_password_element(password)
            times = xmlfactory.create_times_element(expires, expiry_time)
            if url:
                url_el = xmlfactory.create_url_element(url)
                element.append(url_el)
            if notes:
                notes_el = xmlfactory.create_notes_element(notes)
                element.append(notes_el)
            if tags:
                tags_el = xmlfactory.create_tags_element(tags)
                element.append(tags_el)
            if icon:
                icon_el = xmlfactory.create_icon_element(icon)
                element.append(icon_el)
            element.append(title)
            element.append(uuid)
            element.append(username)
            element.append(password)
            element.append(times)
        assert type(element) in [_Element, Element, ObjectifiedElement], \
            'The provided element is not an LXML Element, but {}'.format(
                type(element)
            )
        assert element.tag == 'Entry', 'The provided element is not an Entry '\
            'element, but a {}'.format(element.tag)

        self._element = element
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def append(name, symbol=Ref('previous_result'), copy_element=False):
    """ Appends the object referenced by ``symbol`` (default: the result of the previous
        :term:`handler function`) to the object referenced by ``name`` in the :term:`context`
        namespace. If the object is an element and ``copy_element`` is ``True``, a copy is appended
        to the target.
    """
    def handler(context, previous_result, transformation):
        obj = symbol(transformation)
        if copy_element and isinstance(obj, etree._Element):
            obj = deepcopy(obj)
        dot_lookup(context, name).append(obj)
        return previous_result
    return handler
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def find(element, path):
    """ A helper function around a :attr:`lxml.etree._Element.find` that passes the element's
        namespace mapping.
    """
    return element.find(path, namespaces=element.nsmap)
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def is_root_element(element: etree._Element) -> bool:
    """ Tests whether the given element is the root of the tree object.
        Not to be mixed up with the root element of a possible sub-document a transformation may
        be called with.
    """
    return element is element.getroottree().getroot()
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def traverse_df_ltr_btt(root: etree._Element) -> Iterator[etree._Element]:
    def yield_children(element):
        for child in element:
            yield from yield_children(child)
        yield element
    yield from yield_children(root)
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def traverse_root(root: etree._Element) -> Iterator[etree._Element]:
    yield root
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def _is_root_condition(element: etree._Element, transformation: 'Transformation'):
    return element is transformation.root
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def Any(*conditions: Sequence[ConditionType]) -> Callable:
    """ Returns a callable that evaluates the provided test functions and returns ``True`` if any
        of them returned that.
    """
    conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions))

    def evaluator(element: etree._Element, transformation: Transformation) -> bool:
        return any(x(element, transformation) for x in conditions)
    return evaluator
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def OneOf(*conditions: Sequence[ConditionType]) -> Callable:
    """ Returns a callable that evaluates the provided test functions and returns ``True`` if
        exactly one of them returned that.
    """
    conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions))

    def evaluator(element: etree._Element, transformation: Transformation) -> bool:
        return [x(element, transformation) for x in conditions].count(True) == 1
    return evaluator
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def Not(*conditions: Sequence[ConditionType]) -> Callable:
    """ Returns a callable that evaluates the provided test functions and returns ``True`` if any
        of them returned ``False``.
    """
    conditions = tuple(_condition_factory(x) for x in _flatten_sequence(conditions))

    def evaluator(element: etree._Element, transformation: Transformation) -> bool:
        return not any(x(element, transformation) for x in conditions)
    return evaluator
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def HasLocalname(tag: AnyStr) -> Callable:
    """ Returns a callable that tests an element for the given local tag name. """
    def evaluator(element: etree._Element, _) -> bool:
        return etree.QName(element).localname == tag
    return evaluator
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def MatchesXPath(xpath: Union[str, Callable]) -> Callable:
    """ Returns a callable that tests an element for the given XPath expression (whether the
        evaluation result on the :term:`transformation root` contains it) . If the ``xpath``
        argument is a callable, it will be called with the current transformation as argument to
        obtain the expression.
    """
    def callable_evaluator(element: etree._Element, transformation: Transformation) -> bool:
        _xpath = xpath(transformation)
        dbg("Resolved XPath from callable: '{}'".format(_xpath))
        return element in transformation.xpath_evaluator(_xpath)

    def string_evaluator(element: etree._Element, transformation: Transformation) -> bool:
        return element in transformation.xpath_evaluator(xpath)

    return callable_evaluator if callable(xpath) else string_evaluator
项目:inxs    作者:funkyfuture    | 项目源码 | 文件源码
def _test_conditions(self, element: etree._Element, conditions: Sequence[Callable]) -> bool:
        # there's no dependency injection here because its overhead
        # shall be avoided during testing conditions
        for condition in conditions:
            dbg("Testing condition '{}'.".format(condition))
            if not condition(element, self):
                dbg('The condition did not apply.')
                return False
            dbg('The condition applied.')
        return True
项目:scrapy_rss    作者:woxcab    | 项目源码 | 文件源码
def assertUnorderedXmlEquivalentOutputs(self, data, expected, excepted_elements = ('lastBuildDate', 'generator')):
        """
        Children and text subnodes of each element in XML are considered as unordered set.
        Therefore if two XML files has different order of same elements then these XMLs are same.
        """
        if not excepted_elements:
            excepted_elements = ()
        if isinstance(excepted_elements, six.string_types):
            excepted_elements = (excepted_elements,)

        data = data if isinstance(data, etree._Element) \
            else etree.fromstring(self._str_to_bytes(data))
        for excepted_element in excepted_elements:
            for element in data.xpath('//{}'.format(excepted_element)):
                element.getparent().remove(element)
        data_tuple = self._xml_to_tuple(data)

        expected = expected if isinstance(expected, etree._Element) \
            else etree.fromstring(self._str_to_bytes(expected))
        for excepted_element in excepted_elements:
            for element in expected.xpath('//{}'.format(excepted_element)):
                element.getparent().remove(element)
        expected_tuple = self._xml_to_tuple(expected)

        if data_tuple != expected_tuple:
            self.fail(
                'Feeds are not equivalent accurate within ordering '
                '(taking into consideration excepted nodes {excepted_elements}):\n'
                'Given: {given}\n'
                'Expected: {expected}'
                .format(excepted_elements=excepted_elements,
                        given=etree.tostring(data), expected=etree.tostring(expected)))
项目:javapackages    作者:fedora-java    | 项目源码 | 文件源码
def annotate(elements):
    begin_comment = etree.Comment(' begin of code added by maintainer ')
    end_comment = etree.Comment(' end of code added by maintainer ')

    if isinstance(elements, etree._Element):
        if elements.text and elements.text.strip():
            begin_comment.tail = elements.text.strip()
            elements.text = ''
        elements[:0] = [begin_comment]
        elements.append(end_comment)
        return elements
    return [begin_comment] + elements + [end_comment]
项目:javapackages    作者:fedora-java    | 项目源码 | 文件源码
def replace_xml(self, replaced, content):
        parent = replaced.getparent()
        if not isinstance(replaced, etree._Element):
            parent.extend(content)
            if content.tag is not etree.Comment:
                parent.text = content.text
            return
        idx = parent.index(replaced)
        items = len(content)
        del parent[idx]
        for i, element in enumerate(content):
            parent.insert(idx + i, element)
        self.reformat(parent, parent[idx: idx + items])
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def Post(self, data, uri, extra_headers=None, url_params=None,
             escape_params=True, redirects_remaining=4, media_source=None,
             converter=None):
        """Insert or update  data into a GData service at the given URI.

        Args:
          data: string, ElementTree._Element, atom.Entry, or gdata.GDataEntry The
                XML to be sent to the uri.
          uri: string The location (feed) to which the data should be inserted.
               Example: '/base/feeds/items'.
          extra_headers: dict (optional) HTTP headers which are to be included.
                         The client automatically sets the Content-Type,
                         Authorization, and Content-Length headers.
          url_params: dict (optional) Additional URL parameters to be included
                      in the URI. These are translated into query arguments
                      in the form '&dict_key=value&...'.
                      Example: {'max-results': '250'} becomes &max-results=250
          escape_params: boolean (optional) If false, the calling code has already
                         ensured that the query will form a valid URL (all
                         reserved characters have been escaped). If true, this
                         method will escape the query and any URL parameters
                         provided.
          media_source: MediaSource (optional) Container for the media to be sent
              along with the entry, if provided.
          converter: func (optional) A function which will be executed on the
              server's response. Often this is a function like
              GDataEntryFromString which will parse the body of the server's
              response and return a GDataEntry.

        Returns:
          If the post succeeded, this method will return a GDataFeed, GDataEntry,
          or the results of running converter on the server's result body (if
          converter was specified).
        """
        return GDataService.PostOrPut(self, 'POST', data, uri,
                                      extra_headers=extra_headers, url_params=url_params,
                                      escape_params=escape_params, redirects_remaining=redirects_remaining,
                                      media_source=media_source, converter=converter)
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def Put(self, data, uri, extra_headers=None, url_params=None,
            escape_params=True, redirects_remaining=3, media_source=None,
            converter=None):
        """Updates an entry at the given URI.

        Args:
          data: string, ElementTree._Element, or xml_wrapper.ElementWrapper The
                XML containing the updated data.
          uri: string A URI indicating entry to which the update will be applied.
               Example: '/base/feeds/items/ITEM-ID'
          extra_headers: dict (optional) HTTP headers which are to be included.
                         The client automatically sets the Content-Type,
                         Authorization, and Content-Length headers.
          url_params: dict (optional) Additional URL parameters to be included
                      in the URI. These are translated into query arguments
                      in the form '&dict_key=value&...'.
                      Example: {'max-results': '250'} becomes &max-results=250
          escape_params: boolean (optional) If false, the calling code has already
                         ensured that the query will form a valid URL (all
                         reserved characters have been escaped). If true, this
                         method will escape the query and any URL parameters
                         provided.
          converter: func (optional) A function which will be executed on the
              server's response. Often this is a function like
              GDataEntryFromString which will parse the body of the server's
              response and return a GDataEntry.

        Returns:
          If the put succeeded, this method will return a GDataFeed, GDataEntry,
          or the results of running converter on the server's result body (if
          converter was specified).
        """
        return GDataService.PostOrPut(self, 'PUT', data, uri,
                                      extra_headers=extra_headers, url_params=url_params,
                                      escape_params=escape_params, redirects_remaining=redirects_remaining,
                                      media_source=media_source, converter=converter)
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def YouTubeQuery(self, query):
        """Performs a YouTube specific query and returns a resulting feed or entry.

        Args:
          query: A Query object or one if its sub-classes (YouTubeVideoQuery,
              YouTubeUserQuery or YouTubePlaylistQuery).

        Returns:
          Depending on the type of Query object submitted returns either a
              YouTubeVideoFeed, a YouTubeUserFeed, a YouTubePlaylistFeed. If the
              Query object provided was not YouTube-related, a tuple is returned.
              On success the tuple will be in this form:
              (boolean succeeded=True, ElementTree._Element result)
              On failure, the tuple will be in this form:
              (boolean succeeded=False, {'status': HTTP status code from server,
                                         'reason': HTTP reason from the server,
                                         'body': HTTP body of the server response})
        """
        result = self.Query(query.ToUri())
        if isinstance(query, YouTubeUserQuery):
            return gdata.youtube.YouTubeUserFeedFromString(result.ToString())
        elif isinstance(query, YouTubePlaylistQuery):
            return gdata.youtube.YouTubePlaylistFeedFromString(result.ToString())
        elif isinstance(query, YouTubeVideoQuery):
            return gdata.youtube.YouTubeVideoFeedFromString(result.ToString())
        else:
            return result
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def Post(self, data, uri, extra_headers=None, url_params=None,
             escape_params=True, content_type='application/atom+xml'):
        """Insert data into an APP server at the given URI.

        Args:
          data: string, ElementTree._Element, or something with a __str__ method
                The XML to be sent to the uri.
          uri: string The location (feed) to which the data should be inserted.
               Example: '/base/feeds/items'.
          extra_headers: dict (optional) HTTP headers which are to be included.
                         The client automatically sets the Content-Type,
                         Authorization, and Content-Length headers.
          url_params: dict (optional) Additional URL parameters to be included
                      in the URI. These are translated into query arguments
                      in the form '&dict_key=value&...'.
                      Example: {'max-results': '250'} becomes &max-results=250
          escape_params: boolean (optional) If false, the calling code has already
                         ensured that the query will form a valid URL (all
                         reserved characters have been escaped). If true, this
                         method will escape the query and any URL parameters
                         provided.

        Returns:
          httplib.HTTPResponse Server's response to the POST request.
        """
        if extra_headers is None:
            extra_headers = {}
        if content_type:
            extra_headers['Content-Type'] = content_type
        return self.request('POST', uri, data=data, headers=extra_headers,
                            url_params=url_params)
项目:gdata-python3    作者:dvska    | 项目源码 | 文件源码
def _BecomeChildElement(self, element_tree):
        """Converts this object into an etree element and adds it as a child node.

        Adds self to the ElementTree. This method is required to avoid verbose XML
        which constantly redefines the namespace.

        Args:
          element_tree: ElementTree._Element The element to which this object's XML
              will be added.
        """
        new_element = ElementTree.Element('tag__')  # uh, uhm... empty tag name - sorry google, this is bogus? (c)https://github.com/lqc
        element_tree.append(new_element)
        self._TransferToElementTree(new_element)
项目:python-zeep    作者:mvantellingen    | 项目源码 | 文件源码
def parse_xmlelements(self, xmlelements, schema, name=None, context=None):
        """Consume matching xmlelements and call parse() on each of them

        :param xmlelements: Dequeue of XML element objects
        :type xmlelements: collections.deque of lxml.etree._Element
        :param schema: The parent XML schema
        :type schema: zeep.xsd.Schema
        :param name: The name of the parent element
        :type name: str
        :param context: Optional parsing context (for inline schemas)
        :type context: zeep.xsd.context.XmlParserContext
        :return: dict or None

        """
        result = []

        for _unused in max_occurs_iter(self.max_occurs):
            if xmlelements:
                xmlelement = xmlelements.popleft()
                item = self.parse(xmlelement, schema, context=context)
                if item is not None:
                    result.append(item)
            else:
                break

        if not self.accepts_multiple:
            result = result[0] if result else None
        return result
项目:python-xml    作者:emusical    | 项目源码 | 文件源码
def get_node_namespace_uri(self, node):
        if '}' in node.tag:
            return node.tag.split('}')[0][1:]
        elif isinstance(node, LXMLAttribute):
            return node.namespace_uri
        elif isinstance(node, etree._ElementTree):
            return None
        elif isinstance(node, etree._Element):
            qname, ns_uri = self._unpack_name(node.tag, node)[:2]
            return ns_uri
        else:
            return None
项目:nexpose-client-python    作者:rapid7    | 项目源码 | 文件源码
def ExecuteWithPostData_FORM(session_id, uri, sub_url, timeout, post_data):
    headers = CreateHeadersWithSessionCookieAndCustomHeader(session_id)
    # TODO: another clue that refactor/redesign is required, xml testing if form code
    if isinstance(post_data, etree._Element):
        headers["Content-Type"] = "text/xml; charset=UTF-8"
        post_data = as_string(post_data)
    else:
        headers["Content-Type"] = "application/x-www-form-urlencoded; charset=UTF-8"
        post_data = urllib.parse.urlencode(utf8_encoded(post_data)).encode(encoding="utf-8")
    return ExecuteWebRequest(uri + sub_url, post_data, headers, timeout, lambda: 'POST')