Python pyspark.sql 模块,Column() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用pyspark.sql.Column()

项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def add_meta(sc, col, metadata):
    """Add metadata to a column

    Adds metadata to a column for describing extra properties. This metadata survives
    serialization from dataframe to parquet and back to dataframe. Any manipulation
    of the column, such as aliasing, will lose the metadata.

    Parameters
    ----------
    sc : pyspark.SparkContext
    col : pyspark.sql.Column
    metadata : dict

    Returns
    -------
    pyspark.sql.Column
    """
    meta = sc._jvm.org.apache.spark.sql.types \
        .Metadata.fromJson(json.dumps(metadata))
    return Column(getattr(col._jc, 'as')('', meta))
项目:spylon    作者:maxpoint    | 项目源码 | 文件源码
def wrap_function_cols(self, name, package_name=None, object_name=None, java_class_instance=None, doc=""):
        """Utility method for wrapping a scala/java function that returns a spark sql Column.

        This assumes that the function that you are wrapping takes a list of spark sql Column objects as its arguments.
        """
        def _(*cols):
            jcontainer = self.get_java_container(package_name=package_name, object_name=object_name, java_class_instance=java_class_instance)
            # Ensure that your argument is a column
            col_args = [col._jc if isinstance(col, Column) else _make_col(col)._jc for col in cols]
            function = getattr(jcontainer, name)
            args = col_args
            jc = function(*args)
            return Column(jc)
        _.__name__ = name
        _.__doc__ = doc
        return _
项目:search-MjoLniR    作者:wikimedia    | 项目源码 | 文件源码
def at_least_n_distinct(col, limit):
    """Count distinct that works with windows

    The standard distinct count in spark sql can't be applied in
    a window. This implementation allows that to work
    """
    sc = SparkContext._active_spark_context
    j_cols = _to_seq(sc, [_to_java_column(col), _to_java_column(F.lit(limit))])
    jc = sc._jvm.org.wikimedia.search.mjolnir.AtLeastNDistinct().apply(j_cols)
    return Column(jc)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def test_column_operators(self):
        ci = self.df.key
        cs = self.df.value
        c = ci == cs
        self.assertTrue(isinstance((- ci - 1 - 2) % 3 * 2.5 / 3.5, Column))
        rcc = (1 + ci), (1 - ci), (1 * ci), (1 / ci), (1 % ci), (1 ** ci), (ci ** 1)
        self.assertTrue(all(isinstance(c, Column) for c in rcc))
        cb = [ci == 5, ci != 0, ci > 3, ci < 4, ci >= 0, ci <= 7]
        self.assertTrue(all(isinstance(c, Column) for c in cb))
        cbool = (ci & ci), (ci | ci), (~ci)
        self.assertTrue(all(isinstance(c, Column) for c in cbool))
        css = cs.like('a'), cs.rlike('a'), cs.asc(), cs.desc(), cs.startswith('a'), cs.endswith('a')
        self.assertTrue(all(isinstance(c, Column) for c in css))
        self.assertTrue(isinstance(ci.cast(LongType()), Column))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def test_access_column(self):
        df = self.df
        self.assertTrue(isinstance(df.key, Column))
        self.assertTrue(isinstance(df['key'], Column))
        self.assertTrue(isinstance(df[0], Column))
        self.assertRaises(IndexError, lambda: df[2])
        self.assertRaises(AnalysisException, lambda: df["bad_key"])
        self.assertRaises(TypeError, lambda: df[{}])
项目:spylon    作者:maxpoint    | 项目源码 | 文件源码
def wrap_spark_sql_udf(self, name, package_name=None, object_name=None, java_class_instance=None, doc=""):
        """Wraps a scala/java spark user defined function """
        def _(*cols):
            jcontainer = self.get_java_container(package_name=package_name, object_name=object_name, java_class_instance=java_class_instance)
            # Ensure that your argument is a column
            function = getattr(jcontainer, name)
            judf = function()
            jc = judf.apply(self.to_scala_seq([_to_java_column(c) for c in cols]))
            return Column(jc)
        _.__name__ = name
        _.__doc__ = doc
        return _
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def test_column_operators(self):
        ci = self.df.key
        cs = self.df.value
        c = ci == cs
        self.assertTrue(isinstance((- ci - 1 - 2) % 3 * 2.5 / 3.5, Column))
        rcc = (1 + ci), (1 - ci), (1 * ci), (1 / ci), (1 % ci), (1 ** ci), (ci ** 1)
        self.assertTrue(all(isinstance(c, Column) for c in rcc))
        cb = [ci == 5, ci != 0, ci > 3, ci < 4, ci >= 0, ci <= 7]
        self.assertTrue(all(isinstance(c, Column) for c in cb))
        cbool = (ci & ci), (ci | ci), (~ci)
        self.assertTrue(all(isinstance(c, Column) for c in cbool))
        css = cs.like('a'), cs.rlike('a'), cs.asc(), cs.desc(), cs.startswith('a'), cs.endswith('a')
        self.assertTrue(all(isinstance(c, Column) for c in css))
        self.assertTrue(isinstance(ci.cast(LongType()), Column))
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def test_access_column(self):
        df = self.df
        self.assertTrue(isinstance(df.key, Column))
        self.assertTrue(isinstance(df['key'], Column))
        self.assertTrue(isinstance(df[0], Column))
        self.assertRaises(IndexError, lambda: df[2])
        self.assertRaises(AnalysisException, lambda: df["bad_key"])
        self.assertRaises(TypeError, lambda: df[{}])
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def test_list_columns(self):
        from pyspark.sql.catalog import Column
        spark = self.spark
        spark.catalog._reset()
        spark.sql("CREATE DATABASE some_db")
        spark.sql("CREATE TABLE tab1 (name STRING, age INT)")
        spark.sql("CREATE TABLE some_db.tab2 (nickname STRING, tolerance FLOAT)")
        columns = sorted(spark.catalog.listColumns("tab1"), key=lambda c: c.name)
        columnsDefault = sorted(spark.catalog.listColumns("tab1", "default"), key=lambda c: c.name)
        self.assertEquals(columns, columnsDefault)
        self.assertEquals(len(columns), 2)
        self.assertEquals(columns[0], Column(
            name="age",
            description=None,
            dataType="int",
            nullable=True,
            isPartition=False,
            isBucket=False))
        self.assertEquals(columns[1], Column(
            name="name",
            description=None,
            dataType="string",
            nullable=True,
            isPartition=False,
            isBucket=False))
        columns2 = sorted(spark.catalog.listColumns("tab2", "some_db"), key=lambda c: c.name)
        self.assertEquals(len(columns2), 2)
        self.assertEquals(columns2[0], Column(
            name="nickname",
            description=None,
            dataType="string",
            nullable=True,
            isPartition=False,
            isBucket=False))
        self.assertEquals(columns2[1], Column(
            name="tolerance",
            description=None,
            dataType="float",
            nullable=True,
            isPartition=False,
            isBucket=False))
        self.assertRaisesRegexp(
            AnalysisException,
            "tab2",
            lambda: spark.catalog.listColumns("tab2"))
        self.assertRaisesRegexp(
            AnalysisException,
            "does_not_exist",
            lambda: spark.catalog.listColumns("does_not_exist"))
项目:sparkly    作者:Tubular    | 项目源码 | 文件源码
def switch_case(switch, case=None, default=None, **additional_cases):
    """Switch/case style column generation.

    Args:
        switch (str, pyspark.sql.Column): column to "switch" on;
            its values are going to be compared against defined cases.
        case (dict): case statements. When a key matches the value of
            the column in a specific row, the respective value will be
            assigned to the new column for that row. This is useful when
            your case condition constants are not strings.
        default: default value to be used when the value of the switch
            column doesn't match any keys.
        additional_cases: additional "case" statements, kwargs style.
            Same semantics with cases above. If both are provided,
            cases takes precedence.

    Returns:
        pyspark.sql.Column

    Example:
        ``switch_case('state', CA='California', NY='New York', default='Other')``

        is equivalent to

        >>> F.when(
        ... F.col('state') == 'CA', 'California'
        ).when(
        ... F.col('state') == 'NY', 'New York'
        ).otherwise('Other')
    """
    if not isinstance(switch, Column):
        switch = F.col(switch)

    def _column_or_lit(x):
        return F.lit(x) if not isinstance(x, Column) else x

    def _execute_case(accumulator, case):
        # transform the case to a pyspark.sql.functions.when statement,
        # then chain it to existing when statements
        condition_constant, assigned_value = case
        when_args = (switch == F.lit(condition_constant), _column_or_lit(assigned_value))
        return accumulator.when(*when_args)


    cases = case or {}
    for conflict in set(cases.keys()) & set(additional_cases.keys()):
        del additional_cases[conflict]
    cases = list(cases.items()) + list(additional_cases.items())

    default = _column_or_lit(default)

    if not cases:
        return default

    result = reduce(_execute_case, cases, F).otherwise(default)

    return result