Python pyspark 模块,RDD 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyspark.RDD

项目:Eskapade    作者:KaveIO    | 项目源码 | 文件源码
def test_esk609(self):
        """Test Esk-609: Map data-frame groups"""

        # run Eskapade
        self.run_eskapade('esk609_map_df_groups.py')
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # check input data
        for key in ('map_rdd', 'flat_map_rdd'):
            self.assertIn(key, ds, 'no data found with key "{}"'.format(key))
            self.assertIsInstance(ds[key], pyspark.RDD,
                                  'object "{0:s}" is not an RDD (type "{1:s}")'.format(key, str(type(ds[key]))))

        # sums of "bar" variable
        bar_sums = [(0, 27.5), (1, 77.5), (2, 127.5), (3, 177.5), (4, 227.5), (5, 277.5), (6, 327.5), (7, 377.5),
                    (8, 427.5), (9, 477.5)]
        flmap_rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2., bar_sums[it // 10][1]) for it in range(100)]

        # check mapped data frames
        self.assertListEqual(sorted(ds['map_rdd'].collect()), bar_sums, 'unexpected values in "map_rdd"')
        self.assertListEqual(sorted(ds['flat_map_rdd'].collect()), flmap_rows, 'unexpected values in "flat_map_rdd"')
项目:geopyspark    作者:locationtech-labs    | 项目源码 | 文件源码
def create_python_rdd(jrdd, serializer):
    """Creates a Python RDD from a RDD from Scala.

    Args:
        jrdd (org.apache.spark.api.java.JavaRDD): The RDD that came from Scala.
        serializer (:class:`~geopyspark.AvroSerializer` or pyspark.serializers.AutoBatchedSerializer(AvroSerializer)):
            An instance of ``AvroSerializer`` that is either alone, or wrapped by ``AutoBatchedSerializer``.

    Returns:
        RDD
    """

    pysc = get_spark_context()

    if isinstance(serializer, AutoBatchedSerializer):
        return RDD(jrdd, pysc, serializer)
    else:
        return RDD(jrdd, pysc, AutoBatchedSerializer(serializer))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def pprint(self, num=10):
        """
        Print the first num elements of each RDD generated in this DStream.

        @param num: the number of elements from the first will be printed.
        """
        def takeAndPrint(time, rdd):
            taken = rdd.take(num + 1)
            print("-------------------------------------------")
            print("Time: %s" % time)
            print("-------------------------------------------")
            for record in taken[:num]:
                print(record)
            if len(taken) > num:
                print("...")
            print("")

        self.foreachRDD(takeAndPrint)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def window(self, windowDuration, slideDuration=None):
        """
        Return a new DStream in which each RDD contains all the elements in seen in a
        sliding window of time over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        """
        self._validate_window_param(windowDuration, slideDuration)
        d = self._ssc._jduration(windowDuration)
        if slideDuration is None:
            return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
        s = self._ssc._jduration(slideDuration)
        return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
        """
        Return a new DStream in which each RDD contains the count of distinct elements in
        RDDs in a sliding window over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        @param numPartitions:  number of partitions of each RDD in the new DStream.
        """
        keyed = self.map(lambda x: (x, 1))
        counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                             windowDuration, slideDuration, numPartitions)
        return counted.filter(lambda kv: kv[1] > 0)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = [_py2java(sc, x) for x in obj]
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data)
    return obj
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def train(cls, data, lambda_=1.0):
        """
        Train a Naive Bayes model given an RDD of (label, features)
        vectors.

        This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
        can handle all kinds of discrete data.  For example, by
        converting documents into TF-IDF vectors, it can be used for
        document classification. By making every vector a 0-1 vector,
        it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
        The input feature values must be nonnegative.

        :param data:
          RDD of LabeledPoint.
        :param lambda_:
          The smoothing parameter.
          (default: 1.0)
        """
        first = data.first()
        if not isinstance(first, LabeledPoint):
            raise ValueError("`data` should be an RDD of LabeledPoint")
        labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
        return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def predict(self, x):
        """
        Predict the label of one or more examples.

        .. note:: In Python, predict cannot currently be used within an RDD
            transformation or action.
            Call predict directly on the RDD instead.

        :param x:
          Data point (feature vector), or an RDD of data points (feature
          vectors).
        """
        if isinstance(x, RDD):
            return self.call("predict", x.map(_convert_to_vector))

        else:
            return self.call("predict", _convert_to_vector(x))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = [_py2java(sc, x) for x in obj]
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.org.apache.spark.mllib.api.python.SerDe.loads(data)
    return obj
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def predict(self, x):
        """
        Predict labels for provided features.
        Using a piecewise linear function.
        1) If x exactly matches a boundary then associated prediction
        is returned. In case there are multiple predictions with the
        same boundary then one of them is returned. Which one is
        undefined (same as java.util.Arrays.binarySearch).
        2) If x is lower or higher than all boundaries then first or
        last prediction is returned respectively. In case there are
        multiple predictions with the same boundary then the lowest
        or highest is returned respectively.
        3) If x falls between two values in boundary array then
        prediction is treated as piecewise linear function and
        interpolated value is returned. In case there are multiple
        values with the same boundary then the same rules as in 2)
        are used.

        :param x:
          Feature or RDD of Features to be labeled.
        """
        if isinstance(x, RDD):
            return x.map(lambda v: self.predict(v))
        return np.interp(x, self.boundaries, self.predictions)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def rows(self):
        """
        Rows of the IndexedRowMatrix stored as an RDD of IndexedRows.

        >>> mat = IndexedRowMatrix(sc.parallelize([IndexedRow(0, [1, 2, 3]),
        ...                                        IndexedRow(1, [4, 5, 6])]))
        >>> rows = mat.rows
        >>> rows.first()
        IndexedRow(0, [1.0,2.0,3.0])
        """
        # We use DataFrames for serialization of IndexedRows from
        # Java, so we first convert the RDD of rows to a DataFrame
        # on the Scala/Java side. Then we map each Row in the
        # DataFrame back to an IndexedRow on this side.
        rows_df = callMLlibFunc("getIndexedRows", self._java_matrix_wrapper._java_model)
        rows = rows_df.rdd.map(lambda row: IndexedRow(row[0], row[1]))
        return rows
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def entries(self):
        """
        Entries of the CoordinateMatrix stored as an RDD of
        MatrixEntries.

        >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
        ...                                        MatrixEntry(6, 4, 2.1)]))
        >>> entries = mat.entries
        >>> entries.first()
        MatrixEntry(0, 0, 1.2)
        """
        # We use DataFrames for serialization of MatrixEntry entries
        # from Java, so we first convert the RDD of entries to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a MatrixEntry on this side.
        entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
        entries = entries_df.rdd.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
        return entries
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def blocks(self):
        """
        The RDD of sub-matrix blocks
        ((blockRowIndex, blockColIndex), sub-matrix) that form this
        distributed matrix.

        >>> mat = BlockMatrix(
        ...     sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
        ...                     ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
        >>> blocks = mat.blocks
        >>> blocks.first()
        ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))

        """
        # We use DataFrames for serialization of sub-matrix blocks
        # from Java, so we first convert the RDD of blocks to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a sub-matrix block on this side.
        blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
        blocks = blocks_df.rdd.map(lambda row: ((row[0][0], row[0][1]), row[1]))
        return blocks
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def pprint(self, num=10):
        """
        Print the first num elements of each RDD generated in this DStream.

        @param num: the number of elements from the first will be printed.
        """
        def takeAndPrint(time, rdd):
            taken = rdd.take(num + 1)
            print("-------------------------------------------")
            print("Time: %s" % time)
            print("-------------------------------------------")
            for record in taken[:num]:
                print(record)
            if len(taken) > num:
                print("...")
            print("")

        self.foreachRDD(takeAndPrint)
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def window(self, windowDuration, slideDuration=None):
        """
        Return a new DStream in which each RDD contains all the elements in seen in a
        sliding window of time over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        """
        self._validate_window_param(windowDuration, slideDuration)
        d = self._ssc._jduration(windowDuration)
        if slideDuration is None:
            return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer)
        s = self._ssc._jduration(slideDuration)
        return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer)
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
        """
        Return a new DStream in which each RDD has a single element generated by reducing all
        elements in a sliding window over this DStream.

        if `invReduceFunc` is not None, the reduction is done incrementally
        using the old window's reduced value :

        1. reduce the new values that entered the window (e.g., adding new counts)

        2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
        This is more efficient than `invReduceFunc` is None.

        @param reduceFunc:     associative reduce function
        @param invReduceFunc:  inverse reduce function of `reduceFunc`
        @param windowDuration: width of the window; must be a multiple of this DStream's
                               batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                               the new DStream will generate RDDs); must be a multiple of this
                               DStream's batching interval
        """
        keyed = self.map(lambda x: (1, x))
        reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
                                             windowDuration, slideDuration, 1)
        return reduced.map(lambda kv: kv[1])
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def countByValueAndWindow(self, windowDuration, slideDuration, numPartitions=None):
        """
        Return a new DStream in which each RDD contains the count of distinct elements in
        RDDs in a sliding window over this DStream.

        @param windowDuration: width of the window; must be a multiple of this DStream's
                              batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                              the new DStream will generate RDDs); must be a multiple of this
                              DStream's batching interval
        @param numPartitions:  number of partitions of each RDD in the new DStream.
        """
        keyed = self.map(lambda x: (x, 1))
        counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                             windowDuration, slideDuration, numPartitions)
        return counted.filter(lambda kv: kv[1] > 0).count()
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def train(cls, data, lambda_=1.0):
        """
        Train a Naive Bayes model given an RDD of (label, features)
        vectors.

        This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
        can handle all kinds of discrete data.  For example, by
        converting documents into TF-IDF vectors, it can be used for
        document classification. By making every vector a 0-1 vector,
        it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
        The input feature values must be nonnegative.

        :param data: RDD of LabeledPoint.
        :param lambda_: The smoothing parameter (default: 1.0).
        """
        first = data.first()
        if not isinstance(first, LabeledPoint):
            raise ValueError("`data` should be an RDD of LabeledPoint")
        labels, pi, theta = callMLlibFunc("trainNaiveBayesModel", data, lambda_)
        return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def predict(self, x):
        """
        Predict the label of one or more examples.

        Note: In Python, predict cannot currently be used within an RDD
              transformation or action.
              Call predict directly on the RDD instead.

        :param x:  Data point (feature vector),
                   or an RDD of data points (feature vectors).
        """
        if isinstance(x, RDD):
            return self.call("predict", x.map(_convert_to_vector))

        else:
            return self.call("predict", _convert_to_vector(x))
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def _py2java(sc, obj):
    """ Convert Python object into Java """
    if isinstance(obj, RDD):
        obj = _to_java_object_rdd(obj)
    elif isinstance(obj, DataFrame):
        obj = obj._jdf
    elif isinstance(obj, SparkContext):
        obj = obj._jsc
    elif isinstance(obj, list):
        obj = ListConverter().convert([_py2java(sc, x) for x in obj], sc._gateway._gateway_client)
    elif isinstance(obj, JavaObject):
        pass
    elif isinstance(obj, (int, long, float, bool, bytes, unicode)):
        pass
    else:
        data = bytearray(PickleSerializer().dumps(obj))
        obj = sc._jvm.SerDe.loads(data)
    return obj
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def entries(self):
        """
        Entries of the CoordinateMatrix stored as an RDD of
        MatrixEntries.

        >>> mat = CoordinateMatrix(sc.parallelize([MatrixEntry(0, 0, 1.2),
        ...                                        MatrixEntry(6, 4, 2.1)]))
        >>> entries = mat.entries
        >>> entries.first()
        MatrixEntry(0, 0, 1.2)
        """
        # We use DataFrames for serialization of MatrixEntry entries
        # from Java, so we first convert the RDD of entries to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a MatrixEntry on this side.
        entries_df = callMLlibFunc("getMatrixEntries", self._java_matrix_wrapper._java_model)
        entries = entries_df.map(lambda row: MatrixEntry(row[0], row[1], row[2]))
        return entries
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def blocks(self):
        """
        The RDD of sub-matrix blocks
        ((blockRowIndex, blockColIndex), sub-matrix) that form this
        distributed matrix.

        >>> mat = BlockMatrix(
        ...     sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
        ...                     ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))]), 3, 2)
        >>> blocks = mat.blocks
        >>> blocks.first()
        ((0, 0), DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0))

        """
        # We use DataFrames for serialization of sub-matrix blocks
        # from Java, so we first convert the RDD of blocks to a
        # DataFrame on the Scala/Java side. Then we map each Row in
        # the DataFrame back to a sub-matrix block on this side.
        blocks_df = callMLlibFunc("getMatrixBlocks", self._java_matrix_wrapper._java_model)
        blocks = blocks_df.map(lambda row: ((row[0][0], row[0][1]), row[1]))
        return blocks
项目:Eskapade    作者:KaveIO    | 项目源码 | 文件源码
def execute(self):
        """Execute RddGroupMapper"""

        # get process manager and data store
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # fetch data frame from data store
        if self.read_key not in ds:
            raise KeyError('no input data found in data store with key "{}"'.format(self.read_key))
        data = ds[self.read_key]
        if not isinstance(data, pyspark.RDD):
            raise TypeError('expected a Spark RDD for "{0:s}" (got "{1:s}")'.format(self.read_key, str(type(data))))

        # apply input map
        if self.input_map:
            data = data.map(self.input_map)

        # group data by keys in the data
        data = data.groupByKey(numPartitions=self.num_group_partitions)

        # apply map on group values
        if self.flatten_output_groups:
            data = data.flatMapValues(self.group_map)
        else:
            data = data.mapValues(self.group_map)

        # apply map on result
        if self.result_map:
            data = data.map(self.result_map)

        # store data in data store
        ds[self.store_key] = data

        return StatusCode.Success
项目:Eskapade    作者:KaveIO    | 项目源码 | 文件源码
def test_esk606(self):
        """Test Esk-606: Convert Spark data frame"""

        # run Eskapade
        self.run_eskapade('esk606_convert_spark_df.py')
        proc_mgr = ProcessManager()
        ds = proc_mgr.service(DataStore)

        # define types of stored data sets
        data_types = {'df': pyspark.sql.DataFrame, 'rdd': pyspark.RDD, 'list': list, 'pd': pd.DataFrame}

        # define functions to obtain data-frame content
        content_funcs = {'df': lambda d: sorted(d.rdd.map(tuple).collect()),
                         'rdd': lambda d: sorted(d.collect()),
                         'list': lambda d: sorted(d),
                         'pd': lambda d: sorted(map(tuple, d.values))}

        # check input data
        self.assertIn('df', ds, 'no data found with key "df"')
        self.assertIsInstance(ds['df'], pyspark.sql.DataFrame, 'unexpected type for input data frame')

        # check created data sets
        rows = [(it, 'foo{:d}'.format(it), (it + 1) / 2.) for it in range(20, 100)]
        for key, dtype in data_types.items():
            # check content
            dkey = '{}_output'.format(key)
            self.assertIn(dkey, ds, 'no data found with key "{}"'.format(dkey))
            self.assertIsInstance(ds[dkey], dtype, 'unexpected type for "{}" data'.format(key))
            self.assertListEqual(content_funcs[key](ds[dkey]), rows, 'unexpected content for "{}" data'.format(key))

            # check schema
            skey = '{}_schema'.format(key)
            self.assertIn(skey, ds, 'no schema found with key {}'.format(skey))
            self.assertListEqual(list(ds[skey]), list(ds['df'].schema), 'unexpected schema for "{}" data'.format(key))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def queueStream(self, rdds, oneAtATime=True, default=None):
        """
        Create an input stream from an queue of RDDs or list. In each batch,
        it will process either one or all of the RDDs returned by the queue.

        .. note:: Changes to the queue after the stream is created will not be recognized.

        @param rdds:       Queue of RDDs
        @param oneAtATime: pick one rdd each time or pick all of them once.
        @param default:    The default rdd if no more in rdds
        """
        if default and not isinstance(default, RDD):
            default = self._sc.parallelize(default)

        if not rdds and default:
            rdds = [rdds]

        if rdds and not isinstance(rdds[0], RDD):
            rdds = [self._sc.parallelize(input) for input in rdds]
        self._check_serializers(rdds)

        queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
        if default:
            default = default._reserialize(rdds[0]._jrdd_deserializer)
            jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd)
        else:
            jdstream = self._jssc.queueStream(queue, oneAtATime)
        return DStream(jdstream, self, rdds[0]._jrdd_deserializer)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def transform(self, dstreams, transformFunc):
        """
        Create a new DStream in which each RDD is generated by applying
        a function on RDDs of the DStreams. The order of the JavaRDDs in
        the transform function parameter will be the same as the order
        of corresponding DStreams in the list.
        """
        jdstreams = [d._jdstream for d in dstreams]
        # change the final serializer to sc.serializer
        func = TransformFunction(self._sc,
                                 lambda t, *rdds: transformFunc(rdds).map(lambda x: x),
                                 *[d._jrdd_deserializer for d in dstreams])
        jfunc = self._jvm.TransformFunction(func)
        jdstream = self._jssc.transform(jdstreams, jfunc)
        return DStream(jdstream, self, self._sc.serializer)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def count(self):
        """
        Return a new DStream in which each RDD has a single element
        generated by counting each RDD of this DStream.
        """
        return self.mapPartitions(lambda i: [sum(1 for _ in i)]).reduce(operator.add)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def mapPartitions(self, f, preservesPartitioning=False):
        """
        Return a new DStream in which each RDD is generated by applying
        mapPartitions() to each RDDs of this DStream.
        """
        def func(s, iterator):
            return f(iterator)
        return self.mapPartitionsWithIndex(func, preservesPartitioning)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
        """
        Return a new DStream in which each RDD is generated by applying
        mapPartitionsWithIndex() to each RDDs of this DStream.
        """
        return self.transform(lambda rdd: rdd.mapPartitionsWithIndex(f, preservesPartitioning))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def reduceByKey(self, func, numPartitions=None):
        """
        Return a new DStream by applying reduceByKey to each RDD.
        """
        if numPartitions is None:
            numPartitions = self._sc.defaultParallelism
        return self.combineByKey(lambda x: x, func, func, numPartitions)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
                     numPartitions=None):
        """
        Return a new DStream by applying combineByKey to each RDD.
        """
        if numPartitions is None:
            numPartitions = self._sc.defaultParallelism

        def func(rdd):
            return rdd.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions)
        return self.transform(func)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def partitionBy(self, numPartitions, partitionFunc=portable_hash):
        """
        Return a copy of the DStream in which each RDD are partitioned
        using the specified partitioner.
        """
        return self.transform(lambda rdd: rdd.partitionBy(numPartitions, partitionFunc))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def foreachRDD(self, func):
        """
        Apply a function to each RDD in this DStream.
        """
        if func.__code__.co_argcount == 1:
            old_func = func
            func = lambda t, rdd: old_func(rdd)
        jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
        api = self._ssc._jvm.PythonDStream
        api.callForeachRDD(self._jdstream, jfunc)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def checkpoint(self, interval):
        """
        Enable periodic checkpointing of RDDs of this DStream

        @param interval: time in seconds, after each period of that, generated
                         RDD will be checkpointed
        """
        self.is_checkpointed = True
        self._jdstream.checkpoint(self._ssc._jduration(interval))
        return self
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def groupByKey(self, numPartitions=None):
        """
        Return a new DStream by applying groupByKey on each RDD.
        """
        if numPartitions is None:
            numPartitions = self._sc.defaultParallelism
        return self.transform(lambda rdd: rdd.groupByKey(numPartitions))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def countByValue(self):
        """
        Return a new DStream in which each RDD contains the counts of each
        distinct value in each RDD of this DStream.
        """
        return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def saveAsTextFiles(self, prefix, suffix=None):
        """
        Save each RDD in this DStream as at text file, using string
        representation of elements.
        """
        def saveAsTextFile(t, rdd):
            path = rddToFileName(prefix, suffix, t)
            try:
                rdd.saveAsTextFile(path)
            except Py4JJavaError as e:
                # after recovered from checkpointing, the foreachRDD may
                # be called twice
                if 'FileAlreadyExistsException' not in str(e):
                    raise
        return self.foreachRDD(saveAsTextFile)

    # TODO: uncomment this until we have ssc.pickleFileStream()
    # def saveAsPickleFiles(self, prefix, suffix=None):
    #     """
    #     Save each RDD in this DStream as at binary file, the elements are
    #     serialized by pickle.
    #     """
    #     def saveAsPickleFile(t, rdd):
    #         path = rddToFileName(prefix, suffix, t)
    #         try:
    #             rdd.saveAsPickleFile(path)
    #         except Py4JJavaError as e:
    #             # after recovered from checkpointing, the foreachRDD may
    #             # be called twice
    #             if 'FileAlreadyExistsException' not in str(e):
    #                 raise
    #     return self.foreachRDD(saveAsPickleFile)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def transform(self, func):
        """
        Return a new DStream in which each RDD is generated by applying a function
        on each RDD of this DStream.

        `func` can have one argument of `rdd`, or have two arguments of
        (`time`, `rdd`)
        """
        if func.__code__.co_argcount == 1:
            oldfunc = func
            func = lambda t, rdd: oldfunc(rdd)
        assert func.__code__.co_argcount == 2, "func should take one or two arguments"
        return TransformedDStream(self, func)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def slice(self, begin, end):
        """
        Return all the RDDs between 'begin' to 'end' (both included)

        `begin`, `end` could be datetime.datetime() or unix_timestamp
        """
        jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
        return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration):
        """
        Return a new DStream in which each RDD has a single element generated by reducing all
        elements in a sliding window over this DStream.

        if `invReduceFunc` is not None, the reduction is done incrementally
        using the old window's reduced value :

        1. reduce the new values that entered the window (e.g., adding new counts)

        2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
        This is more efficient than `invReduceFunc` is None.

        @param reduceFunc:     associative and commutative reduce function
        @param invReduceFunc:  inverse reduce function of `reduceFunc`; such that for all y,
                               and invertible x:
                               `invReduceFunc(reduceFunc(x, y), x) = y`
        @param windowDuration: width of the window; must be a multiple of this DStream's
                               batching interval
        @param slideDuration:  sliding interval of the window (i.e., the interval after which
                               the new DStream will generate RDDs); must be a multiple of this
                               DStream's batching interval
        """
        keyed = self.map(lambda x: (1, x))
        reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
                                             windowDuration, slideDuration, 1)
        return reduced.map(lambda kv: kv[1])
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def countByWindow(self, windowDuration, slideDuration):
        """
        Return a new DStream in which each RDD has a single element generated
        by counting the number of elements in a window over this DStream.
        windowDuration and slideDuration are as defined in the window() operation.

        This is equivalent to window(windowDuration, slideDuration).count(),
        but will be more efficient if window is large.
        """
        return self.map(lambda x: 1).reduceByWindow(operator.add, operator.sub,
                                                    windowDuration, slideDuration)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):
        """
        Return a new "state" DStream where the state for each key is updated by applying
        the given function on the previous state of the key and the new values of the key.

        @param updateFunc: State update function. If this function returns None, then
                           corresponding state key-value pair will be eliminated.
        """
        if numPartitions is None:
            numPartitions = self._sc.defaultParallelism

        if initialRDD and not isinstance(initialRDD, RDD):
            initialRDD = self._sc.parallelize(initialRDD)

        def reduceFunc(t, a, b):
            if a is None:
                g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
            else:
                g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
                g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
            state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
            return state.filter(lambda k_v: k_v[1] is not None)

        jreduceFunc = TransformFunction(self._sc, reduceFunc,
                                        self._sc.serializer, self._jrdd_deserializer)
        if initialRDD:
            initialRDD = initialRDD._reserialize(self._jrdd_deserializer)
            dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc,
                                                       initialRDD._jrdd)
        else:
            dstream = self._sc._jvm.PythonStateDStream(self._jdstream.dstream(), jreduceFunc)

        return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def __init__(self, ctx, func, *deserializers):
        self.ctx = ctx
        self.func = func
        self.deserializers = deserializers
        self.rdd_wrap_func = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
        self.failure = None
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _to_java_object_rdd(rdd):
    """ Return an JavaRDD of Object by unpickling

    It will convert each Python object into Java object by Pyrolite, whenever the
    RDD is serialized in batch or not.
    """
    rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
    return rdd.ctx._jvm.org.apache.spark.ml.python.MLSerDe.pythonToJava(rdd._jrdd, True)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def predict(self, test):
        """
        Predict values for a single data point or an RDD of points
        using the model trained.
        """
        raise NotImplementedError
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def predict(self, x):
        """
        Predict values for a single data point or an RDD of points
        using the model trained.
        """
        if isinstance(x, RDD):
            return x.map(lambda v: self.predict(v))

        x = _convert_to_vector(x)
        if self.numClasses == 2:
            margin = self.weights.dot(x) + self._intercept
            if margin > 0:
                prob = 1 / (1 + exp(-margin))
            else:
                exp_margin = exp(margin)
                prob = exp_margin / (1 + exp_margin)
            if self._threshold is None:
                return prob
            else:
                return 1 if prob > self._threshold else 0
        else:
            best_class = 0
            max_margin = 0.0
            if x.size + 1 == self._dataWithBiasSize:
                for i in range(0, self._numClasses - 1):
                    margin = x.dot(self._weightsMatrix[i][0:x.size]) + \
                        self._weightsMatrix[i][x.size]
                    if margin > max_margin:
                        max_margin = margin
                        best_class = i + 1
            else:
                for i in range(0, self._numClasses - 1):
                    margin = x.dot(self._weightsMatrix[i])
                    if margin > max_margin:
                        max_margin = margin
                        best_class = i + 1
            return best_class
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def predict(self, x):
        """
        Predict values for a single data point or an RDD of points
        using the model trained.
        """
        if isinstance(x, RDD):
            return x.map(lambda v: self.predict(v))

        x = _convert_to_vector(x)
        margin = self.weights.dot(x) + self.intercept
        if self._threshold is None:
            return margin
        else:
            return 1 if margin > self._threshold else 0
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def trainOn(self, dstream):
        """Train the model on the incoming dstream."""
        self._validate(dstream)

        def update(rdd):
            # LogisticRegressionWithSGD.train raises an error for an empty RDD.
            if not rdd.isEmpty():
                self._model = LogisticRegressionWithSGD.train(
                    rdd, self.numIterations, self.stepSize,
                    self.miniBatchFraction, self._model.weights,
                    regParam=self.regParam, convergenceTol=self.convergenceTol)

        dstream.foreachRDD(update)
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def predict(self, x):
        """
        Predict values for a single data point or an RDD of points using
        the model trained.

        .. note:: In Python, predict cannot currently be used within an RDD
            transformation or action.
            Call predict directly on the RDD instead.
        """
        if isinstance(x, RDD):
            return self.call("predict", x.map(_convert_to_vector))

        else:
            return self.call("predict", _convert_to_vector(x))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def _train(cls, data, type, numClasses, features, impurity="gini", maxDepth=5, maxBins=32,
               minInstancesPerNode=1, minInfoGain=0.0):
        first = data.first()
        assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
        model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features,
                              impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain)
        return DecisionTreeModel(model)